Data Pipeline Archives - Tiger Analytics Thu, 16 Jan 2025 09:42:20 +0000 en-US hourly 1 https://wordpress.org/?v=6.8.1 https://www.tigeranalytics.com/wp-content/uploads/2023/09/favicon-Tiger-Analytics_-150x150.png Data Pipeline Archives - Tiger Analytics 32 32 Invisible Threats, Visible Solutions: Integrating AWS Macie and Tiger Data Fabric for Ultimate Security https://www.tigeranalytics.com/perspectives/blog/invisible-threats-visible-solutions-integrating-aws-macie-and-tiger-data-fabric-for-ultimate-security/ Thu, 07 Mar 2024 07:03:07 +0000 https://www.tigeranalytics.com/?post_type=blog&p=20726 Data defenses are now fortified against potential breaches with the Tiger Data Fabric-AWS Macie integration, automating sensitive data discovery, evaluation, and protection in the data pipeline for enhanced security. Explore how to integrate AWS Macie into a data fabric.

The post Invisible Threats, Visible Solutions: Integrating AWS Macie and Tiger Data Fabric for Ultimate Security appeared first on Tiger Analytics.

]]>
Discovering and handling sensitive data in the data lake or analytics environment can be challenging. It involves overcoming technical complexities in data processing and dealing with the associated costs of resources and computing.  Identifying sensitive information at the entry point of the data pipeline, probably during data ingestion, can help overcome these challenges to some extent. This proactive approach allows organizations to fortify their defenses against potential breaches and unauthorized access.

According to AWS, Amazon Macie is “a data security service that uses machine learning (ML) and pattern matching to discover and help protect sensitive data”, such as personally identifiable information (PII), payment card data, and Amazon Web Services . At Tiger Analytics we’ve integrated these features into our pipelines within our proprietary Data Fabric solution called Tiger Data Fabric.

The Tiger Data Fabric is a self-service, low/no-code data management platform that facilitates seamless data integration, efficient data ingestion, robust data quality checks, data standardization, and effective data provisioning. Its user-centric, UI-driven approach demystifies data handling, enabling professionals with diverse technical proficiencies to interact with and manage their data resources effortlessly.

Leveraging Salient Features for Enhanced Security

The Tiger Data Fabric-AWS Macie integration offers a robust solution to enhance data security measures, including:

  • Data Discovery: The solution, with the help of Macie, discovers and locates sensitive data within the active data pipeline.
  • Data Protection: The design pattern isolates the sensitive data in a secure location with restricted access.
  • Customized Actions: The solution gives flexibility to design (customize) the actions to be taken when sensitive data is identified. For instance, the discovered sensitive data can be encrypted, redacted, pseudonymized, or even dropped from the pipeline with necessary approvals from the data owners.
  • Alerts and Notification: Data owners receive alerts when any sensitive data is detected, allowing them to take the required actions in response.

Tiger Data Fabric has many data engineering capabilities and has been enhanced recently to include sensitive data scans at the data ingestion step of the pipeline. Source data present on the S3 landing zone path is scanned for sensitive information and results are captured and stored at another path in the S3 bucket.

By integrating AWS Macie with the Tiger Data Fabric, we’re able to:

  • Automate the discovery of sensitive data.
  • Discover a variety of sensitive data types.
  • Evaluate and monitor data for security and access control.
  • Review and analyze findings.

For data engineers looking to integrate “sensitive data management” into their data pipelines , here’s a walkthrough of how we, at Tiger Analytics, implement these components for maximum value:

  • S3 Buckets store data in various stages of processing. A raw databucket for uploading objects for the data pipeline, a scanning bucket where objects are scanned for sensitive data, a manual review bucket which harbors objects where sensitive data was discovered, and a scanned data bucket for starting the next ingestion step of the data pipeline.
  • Lambda and Step Functions execute the critical tasks of running sensitive data scans and managing workflows. Step Functions coordinate Lambda functions to manage business logic and execute the steps mentioned below:
    • triggerMacieJob: This Lambda function creates a Macie-sensitive data discovery job on the designated S3 bucket during the scan stage..
    • pollWait: This Step Function waits for a specific state to be reached, ensuring the job runs smoothly.
    • checkJobStatus: This Lambda function checks the status of the Macie scan job.
    • isJobComplete: This Step function uses a Choice state to determine if the job has finished. If it has, it triggers additional steps to be executed.
    • waitForJobToComplete: This Step function employs a Choice state to wait for the job to complete and prevent the next action from running before the scan is finished.
    • UpdateCatalog: This Lambda function updates the catalog table in the backend Data Fabric database, and ensures that all job statuses are accurately reflected in the database.
  • A Macie scan job scans the specified S3 bucket for sensitive data. The process of creating the Macie job involves multiple steps, allowing us to choose data identifiers, either through custom configurations or standard options:
    • We create a one-time Macie job through the triggerMacieJob Lambda function.
    • We provide the complete S3 bucket path for sensitive data buckets to filter out the scan and avoid unnecessary scanning on other buckets.
    • While creating the job, Macie provides a provision to select data identifiers for sensitive data. In the AWS Data Fabric, we have automated the selection of custom identifiers for the scan, including CREDIT_CARD_NUMBER, DRIVERS_LICENSE, PHONE_NUMBER, USA_PASSPORT_NUMBER, and USA_SOCIAL_SECURITY_NUMBER.

      The findings can be seen on the AWS console and filtered based on S3 Buckets.  We employed Glue jobs to parse the results and route the data to the manual review bucket and raw buckets. The Macie job execution time is around 4-5 minutes. After scanning, if there are less than 1000 sensitive records, they are moved to the quarantine bucket.

  • The parsing of Macie results is handled by a Glue job, implemented as a Python script. This script is responsible for extracting and organizing information from the Macie scanned results bucket.
    • In the parser job, we retrieve the severity level (High, Medium, or Low) assigned by AWS Macie during the one-time job scan.
    • In the Macie scanning bucket, we created separate folders for each source system and data asset, registered through Tiger Data Fabric UI.

      For example: zdf-fmwrk-macie-scan-zn-us-east-2/data/src_sys_id=100/data_asset_id=100000/20231026115848

      The parser job checks for severity and the report in the specified path. If sensitive data is detected, it is moved to the quarantine bucket. We format this data into parquet and process it using Spark data frames.

    • If we peruse the parquet file, found below, sensitive data can be clearly seen as SSN and phone number columns.
    • In the quarantine bucket, the same file is being moved after finding the sensitive data.

      If there are no sensitive records, move the data to the raw zone from where data is further sent to the data lake.
  • Airflow operators come in handy for orchestrating the entire pipeline, whether we integrate native AWS security services with Amazon MWAA or implement custom airflow on EC2 or EKS.
    • GlueJobOperator: Executes all the Glue jobs pre and post-Macie scan.
    • StepFunctionStartExecutionOperator: Starts the execution of the Step Function.
    • StepFunctionExecutionSensor: Waits for the Step Function execution to be completed.
    • StepFunctionGetExecutionOutput Operator: Gets the output from the Step function.
  • IAM Policies grant the necessary permissions for the AWS Lambda functions to access AWS resources that are part of the application. Also, access to the Macie review bucket is managed using standard IAM policies and best practices.

Things to Keep in Mind for Effective Implementation

  • Based on our experience integrating AWS Macie with the Tiger Data Fabric, here are some points to keep in mind for an effective integration of AWS Macie. Macie’s primary objective is sensitive data discovery. It acts as a background process that keeps scanning the S3 buckets/objects. It generates reports that can be consumed by various users and accordingly, actions can be taken. But if the requirement is to string it with a pipeline and automate the action, based on the reports, then a custom process must be created.
  • Macie stops reporting the location of sensitive data after 1000 occurrences of the same detection type. However, this quota can be increased by requesting AWS. It is important to keep in mind that in our use case, where Macie scans are integrated into the pipeline, each job is dynamically created to scan the dataset. If the sensitive data occurrences per detection type exceed 1000, we move the entire file to the quarantine zone.
  • For certain data elements that Macie doesn’t consider sensitive data, custom data identifiers help a lot. It can be defined via regular expressions and its sensitivity can also be customized. Organizations with data that are deemed sensitive internally by their data governance team can use this feature.
  • Macie also provides an allow list—this helps in ignoring some of the data elements which by default Macie tag as sensitive data.’

The AWS Macie – Tiger Data Fabric integration seamlessly enhances automated data pipelines, addressing the challenges associated with unintended exposure of sensitive information in data lakes. By incorporating customizations such as employing regular expressions for data sensitivity and establishing suppression rules within the data fabrics they are working on, data engineers gain enhanced control and capabilities over managing and safeguarding sensitive data.

Armed with the provided insights, they can easily adapt the use cases and explanations to align with their unique workflows and specific requirements.

The post Invisible Threats, Visible Solutions: Integrating AWS Macie and Tiger Data Fabric for Ultimate Security appeared first on Tiger Analytics.

]]>
Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake https://www.tigeranalytics.com/perspectives/blog/building-efficient-near-real-time-data-pipelines-debezium-kafka-and-snowflake/ Thu, 26 Nov 2020 21:38:14 +0000 https://www.tigeranalytics.com/blog/building-efficient-near-real-time-data-pipelines-debezium-kafka-and-snowflake/ Learn how Debezium, Kafka, and Snowflake combine to advance near-real-time data pipelines. Gain insights into the process of efficient data syncing, processing, and storage, crucial for informed decision-making in real estate investment.

The post Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake appeared first on Tiger Analytics.

]]>
Institutional investors in real estate usually require several discussions to finalize their investment strategies and goals. They need to acquire properties on a large scale and at a fast pace. To facilitate this, the data pipeline must be refreshed in near-real-time with properties that have recently come onto the market.

With this business use case, we worked to get home listing data to the operational data store (ODS), PostgreSQL, and sync them to the cloud data warehouse, Snowflake.

We solve the first part of the challenge —collecting data about new property listings— by using a real estate data aggregator called Xome to exchange data and load them into the ODS.

Next, we feed the properties in the ODS in near-real-time (NRT) to the Snowflake data warehouse. An analytics engine filters and selects homes based on the buy-box criteria set by investors, enriched by supporting data such as nearby schools and their ratings; neighborhood crime levels; proximity to healthcare facilities and public transportation, etc. The analytics engine then ranks the properties based on the cap rate, discount, and yield.

The property ranks are sent back into the ODS, giving underwriters a prioritized list based on their ranking. Underwriters can adjudicate risks, calculate financials like the target offer price, renovation cost, and estimated returns, and store their results in the same ODS.

Here is how we built the NRT data pipeline from the Amazon Web Services (AWS) Postgres data source to Snowflake. Our solution:

  • Uses the database log as the seed for transferring data, as it is minimally invasive to production systems;
  • Employs Debezium, an open-source connector that listens for changes in log files and records them as consumable events;
  • Transfers events reliably using Kafka, the distributed messaging system;
  • Connects Kafka to Snowflake directly and writes to Snowflake using Snowpipe, stages, files, and tables; and
  • Schedules Snowflake tasks to merge the final data sets to the target table.

data pipeline architecture

Solution architecture demonstrating the high-level flow and relationship between components

Here, step by step is how to do it:

A. Configure PostgreSQL in AWS RDS

1. To capture DML changes that persist in the database, set the Write-Ahead-Log (WAL) level to logical

Create a new parameter group and set the value of rds.logical_replication to 1.

Modify the database instance to associate to this customized parameter group.

2. Log into PostgreSQL and check the WAL level

SHOW wal_level

It should be set to logical.

3. Create a replication slot to stream the sequence of changes from the database

The built-in logical decoding process extracts DML changes from the database log into a row format that is easy to understand.

SELECT * FROM pg_create_logical_replication_slot(, ‘wal2json’);

B. Configure the Debezium and Kafka cluster

We use Debezium and Kafka in a cluster as the event messaging system that reads data from the database logs and loads them into Snowflake.

To demonstrate this use case, we have selected the minimum hardware requirements to execute this data pipeline for a sample of records. To extend this to cluster size requirements for production data, please refer to the product documentation.

Node vs. Size table

1. Prepare the hardware

For connector nodes, we use memory-optimized machines; for Kafka brokers, CPU-optimised machines with high storage capacity.

Install Java and open-source Kafka on all the machines and set up a $KAFKA_HOME directory.

2. Set up the Kafka-PostgreSQL Connector

This node connects to PostgreSQL and decodes the database log using the Debezium connector, returning events in JSON format. The node requires several JAR files to be downloaded from the Maven repository.

There are four configuration steps involved:

Config 1

 cd $KAFKA_HOME
 mkdir source_jars
 cd source_jars
 wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.9.2/avro-1.9.2.jar
 wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar
 wget https://repo1.maven.org/maven2/io/debezium/debezium-core/0.9.5.Final/debezium-core-0.9.5.Final.jar
 wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/0.9.5.Final/debezium-connector-postgres-0.9.5.Final.jar
 wget https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar
 wget https://www.java2s.com/Code/JarDownload/jackson-all/jackson-all-1.7.4.jar.zip
 wget https://www.java2s.com/Code/JarDownload/jdbc/jdbc-2.0-sources.jar.zip
 wget https://maven.repository.redhat.com/earlyaccess/all/io/confluent/kafka-avro-serializer/5.3.0/kafka-avro-serializer-5.3.0.jar
 wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.7/postgresql-42.2.7.jar

Config 2

In the $KAFKA_HOME/config directory, create a new file called postgres-kafka-connector.properties and establish a connection to Postgres to capture changed data.

There are multiple options to control the Debezium connector. Please consult the product documentation for more details.

For example:

 name=postgres-debezium-connector
 connector.class=io.debezium.connector.postgresql.PostgresConnector
 database.hostname= database.port=5432
 database.user=postgres
 database.password= ********
 database.dbname=postgres
 database.server.name= suppyTopic
 #This appears as prefix Kafka
 topicschema.whitelist = supply
 #Provide schema to sync data
 fromplugin.name=wal2jsonslot.name=
 #Provide Postgres replication slot name
 snapshot.fetch.size = 1000

Config 3

Set Classpath and execute

 export CLASSPATH=$KAFKA_HOME/source_jars/
 #Execute below from $KAFKA_HOME
 bin/connect-standalone.sh config/postgres-supply.properties config/postgres-kafka-connect-json.properties

3. Turn on Zookeeper

By default, Zookeeper runs on its localhost with Port ID 2181.

Start the Zookeeper process from $KAFKA_HOME.

 bin/zookeeper-server-start.sh config/zookeeper.properties

4. Set up the Kafka brokers

In contrast to related technologies like Apache Spark, Hadoop, etc., Kafka does not use a master/slave concept for its brokers: all the brokers transparently work out how to coordinate amongst themselves.

Within the $KAFKA_HOME/config directory, find the template file called server.properties and edit the two Kafka brokers as follows:

 Config 1: KAFKA broker 1, Port no and ZooKeeper address
 server1.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 1>:9093
 zookeeper.connect=<Private IP address of Zookeeper>:2181
 zookeeper.connection.timeout.ms=6000 Config 2: KAFKA broker 2, Port no and ZooKeeper address
 server2.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 2>:9094
 zookeeper.connect=<Private IP address of Zookeeper>:2181
 zookeeper.connection.timeout.ms=6000

Start the Kafka brokers by running the following commands from KAFKA_HOME.

 #Run this from Kafka broker 1
 bin/kafka-server-start.sh config/server1.properties
 
 #Run this from Kafka broker 2
 bin/kafka-server-start.sh config/server2.properties

With this setup, the two Kafka brokers will transfer data from Postgres in topics, with one topic for each source table.

C. Set up the Snowflake connector

This node reads data from each Kafka topic and writes them to Snowflake. Internally, it uses Snowflake stages and Snowpipe to sync the data to the Snowflake tables.

There are four configuration steps involved:

Config 1

Download all the dependent JARs, including the Snowflake-Kafka connector, from the Maven repository and save them under a new directory, $KAFKA_HOME/sink_jars.

 cd $KAFKA_HOME
 mkdir sink_jars
 wgethttps://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar
 wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar
 wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.1.0/snowflake-kafka-connector-1.1.0.jar
 wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar

Config 2

In $KAFKA_HOME/config/connect-standalone.properties, provide the details of the Kafka broker and its port.

 Config 2: connect-standalone.properties
 #Provide Kafka server details under this propertybootstrap.servers=ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9093,
 ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9092

Config 3

In $KAFKA_HOME/config, create a new file called kafka-snowflake-connect-json.properties. In this file, we tag each Kafka topic to its corresponding table in Snowflake, like this:

 snowflake.private.key.passphrase=<Password>#to connect Snowflake#Database and schema configuration
 snowflake.database.name=<Target database in Snowflake>
 snowflake.schema.name=<Target Schema in Snowflake> #Data format configuration
 key.converter = org.apache.kafka.connect.storage.StringConverter
 value.converter = com.snowflake.kafka.connector.records.SnowflakeJsonConverter #Provide a map from Kafka topic to table in Snowflake
 #We have two tables here. 1. Supply 2. BuyboxtopicsBuyboxtopics=postgresRDS.supply.supply,postgresRDS.supply.buybox
 snowflake.topic2table.map=postgresRDS.supply.supply:dbz_supply,postgresRDS.supply.buybox:dbz_buybox

Config 4

Set Classpath and execute.

 export CLASSPATH=$KAFKA_HOME/sink_jars/
 
 #Execute below from $KAFKA_HOME
 
 bin/connect-standalone.sh config/connect-standalone.properties
 config/kafka-snowflake-connect-json.properties

With this setup, data from the Kafka topics get loaded to the Snowflake target tables.

For example, the SUPPLY table that contains the list of homes in PostgreSQL will look like this in Snowflake:

Postgres SQL table

The table has only two JSON columns:

  • Record_Metadata: JSON-formatted primary key column
  • Record_Content: JSON-formatted actual row values

Real-time refresh

We want additions, deletions, and changes to the original data to flow down to Snowflake in real-time, or near-real-time. To achieve this, and to track the changes from original to updated data set, we use the following payload code:

 "payload": {
 "after": {
 "actual_photos_count": null,
 "additional_home_details": null,
 "address1": "8146 Lesourdsville West Chester",
 "bathrooms": 1,
 "bedrooms": 2,
 "census_tract_id": "390170111262013",
 "city": "West Chester",
 "close_date": null,
 "close_price": null,
 "country": "US",
 "created_on_utc": 1581089444809073,
 "latitude": 39.3491554260254,
 "laundry_features": null,
 ******DELETED ROWS to have few columns
 },
 "before": null,
 "op": "r",
 "source": {
 "connector": "postgresql",
 "db": "postgres",
 "last_snapshot_record": false,
 "lsn": 309438972864,
 "name": "postgresRDS",
 "schema": "supply",
 "snapshot": true,
 "table": "supply",
 "ts_usec": 1582796538603000,
 "txId": 5834,
 "version": "0.9.5.Final",
 "xmin": null
 },
 "ts_ms": 1582796538603
 }

The payload data structure has four event types:

  • R: initial data extract
  • C: inserts
  • U: updates
  • D: deletes

It holds the actual data in JSON nodes before and after.

Debezium Postgres payload data and event types

D. Create views in Snowflake

Snowflake natively supports JSON structured data. We can parse and normalize data from the table into columns simply by using database views in Snowflake.

Create a view to parse inserts, updates, and snapshots

 CREATE OR REPLACE VIEW DBZ_SUPPLY_I_U_VIEW as
 SELECT --Get contents from After JSON node for snapshot, inserts and updates ID is the primary keyrecord_content:"payload"."after"."id"::FLOAT as id
 ,record_content:"payload"."after"."actual_photos_count"::FLOAT as actual_photos_count
 ,record_content:"payload"."after"."additional_home_details"::STRING as additional_home_details
 ,record_content:"payload"."after"."address1"::STRING as address1
 ,record_content:"payload"."after"."bathrooms"::VARIANT as bathrooms
 ,record_content:"payload"."after"."bedrooms"::VARIANT as bedrooms
 ,record_content:"payload"."after"."census_tract_id"::STRING as census_tract_id
 ,record_content:"payload"."after"."city"::STRING as city
 ,record_content:"payload"."after"."close_date"::STRING::DATE as close_date
 ,record_content:"payload"."after"."close_price"::VARIANT as close_price
 ,record_content:"payload"."after"."country"::STRING as country
 ,record_content:"payload"."after"."created_on_utc"::STRING::TIMESTAMP_NTZ as created_on_utc
 ,record_content:"payload"."after"."latitude"::STRING as latitude
 ,record_content:"payload"."after"."longitude"::STRING as longitude
 ,record_content:"payload"."after"."laundry_features"::STRING as laundry_features--Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction,REGEXP_REPLACE(record_content:"payload"."op", '') as dml_operator ,
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."ts_ms", '')) as debezium_processed_ts
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."source"."ts_usec", '')) as source_processed_ts
 , REGEXP_REPLACE(record_content:"payload"."source"."name", '') as source_server
 , REGEXP_REPLACE(record_content:"payload"."source"."db", '') as source_db
 , REGEXP_REPLACE(record_content:"payload"."source"."table", '') as source_table
 , REGEXP_REPLACE(record_content:"payload"."source"."schema", '') as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
 WHERE lower(DML_OPERATOR) in ('r','c','u');

Create a view to parse deletes

 CREATE OR REPLACE VIEW DBZ_SUPPLY_D_VIEW as
 SELECT --Get contents from before JSON node for snapshot, inserts and updates. ID is the primary keyrecord_content:"payload"."before"."id"::FLOAT as id
 ,record_content:"payload"."before"."actual_photos_count"::FLOAT as actual_photos_count
 ,record_content:"payload"."before"."additional_home_details"::STRING as additional_home_details
 ,record_content:"payload"."before"."address1"::STRING as address1
 ,record_content:"payload"."before"."bathrooms"::VARIANT as bathrooms
 ,record_content:"payload"."before"."bedrooms"::VARIANT as bedrooms
 ,record_content:"payload"."before"."census_tract_id"::STRING as census_tract_id
 ,record_content:"payload"."before"."city"::STRING as city
 ,record_content:"payload"."before"."close_date"::STRING::DATE as close_date
 ,record_content:"payload"."before"."close_price"::VARIANT as close_price
 ,record_content:"payload"."before"."country"::STRING as country
 ,record_content:"payload"."before"."created_on_utc"::STRING::TIMESTAMP_NTZ as created_on_utc
 ,record_content:"payload"."before"."latitude"::STRING as latitude
 ,record_content:"payload"."before"."longitude"::STRING as longitude
 ,record_content:"payload"."before"."laundry_features"::STRING as laundry_features--Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction
 ,REGEXP_REPLACE(record_content:"payload"."op", '') as dml_operator, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."ts_ms", '')) as debezium_processed_ts
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."source"."ts_usec", '')) as source_processed_ts
 , REGEXP_REPLACE(record_content:"payload"."source"."name", '') as source_server
 , REGEXP_REPLACE(record_content:"payload"."source"."db", '') as source_db
 , REGEXP_REPLACE(record_content:"payload"."source"."table", '') as source_table
 , REGEXP_REPLACE(record_content:"payload"."source"."schema", '') as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
 WHERE lower(DML_OPERATOR) in ('d');

You can automate the creation of these views using the Information Schema tables in Snowflake. To create stored procedures to automatically create these views for all tables involved in the data pipeline, please refer to the product documentation.

E. Merge the data into the target table

Using the two views, DBZ_SUPPLY_I_U_VIEW and DBZ_SUPPLY_D_VIEW, as the source, you can merge data to the final target table, SUPPLY, using the SQL merge command.

To automate this using Snowflake Tasks:

 CREATE TASK SUPPLY_MERGE
 WAREHOUSE <WAREHOUSE_NAME>
 SCHEDULE 5 MINUTE
 ASmerge into <Database>.<Tgt_Schema>.DBZ_SUPPLY as tgt
 using <Database>.<Src_Schema>.DBZ_SUPPLY_I_U_VIEW as src
 on tgt.id =src.id --Deletes
 when matched AND src.dml_operator='d' THEN DELETE --Updates
 when matched AND src.dml_operator='u' then
 update set tgt.ACTUAL_PHOTOS_COUNT =src.ACTUAL_PHOTOS_COUNT
 ,tgt.ADDRESS1 =src.ADDRESS1
 ,tgt.BATHROOMS =src.BATHROOMS
 ,tgt.BEDROOMS =src.BEDROOMS
 ,tgt.CENSUS_TRACT_ID =src.CENSUS_TRACT_ID
 ,tgt.CITY =src.CITY
 ,tgt.CLOSE_DATE =src.CLOSE_DATE
 ,tgt.CLOSE_PRICE =src.CLOSE_PRICE
 ,tgt.COUNTRY =src.COUNTRY
 ,tgt.CREATED_ON_UTC =src.CREATED_ON_UTC
 ,tgt.LATITUDE =src.LATITUDE
 ,tgt.LONGITUDE =src.LONGITUDE
 ,tgt.LAUNDRY_FEATURES =src.LAUNDRY_FEATURES--Inserts
 when not matched and src.dml_operator in ('c','r') then
 insert (ID, ACTUAL_PHOTOS_COUNT ,ADDRESS1 ,BATHROOMS ,BEDROOMS ,
 CENSUS_TRACT_ID ,CITY ,CLOSE_DATE ,CLOSE_PRICE ,COUNTRY ,
 CREATED_ON_UTC ,LATITUDE , LONGITUDE,LAUNDRY_FEATURES )
 values (src.ID ,src.ACTUAL_PHOTOS_COUNT ,src.ADDRESS1 ,
 src.BATHROOMS ,src.BEDROOMS ,src.CENSUS_TRACT_ID ,src.CITY ,
 src.CLOSE_DATE ,src.CLOSE_PRICE ,src.COUNTRY ,src.CREATED_ON_UTC ,
 src.LATITUDE ,src.LAUNDRY_FEATURES ,src.LONGITUDE )

This task is configured to execute every five minutes.

You can monitor it using the task history:

 select *
 from table(information_schema.task_history())
 order by scheduled_time;

The NRT data pipeline is complete!

F. Things to keep in mind3

When attempting to set up an NRT to respond to your own use case, here are a few caveats:

– All sources tables must contain the primary key to propagate DML changes. If you create tables without a primary key, be sure to request the source database administrator or application team to set one up for you using the data elements in that table.

If you are still unable to include a primary key, write a separate data pipeline to perform a full data load for your tables.

– PostgreSQL wal2Json database logs don’t track DDL changes (for example, new column additions and deletions).

However, the payload data available in JSON will contain values for recently added columns. To identify DDL changes within a given timeframe, you will need to code a separate process to use database metadata tables and/or query history to scan and capture DDL changes.

These events must be pre-processed before merging the usual DML changes on to the Snowflake data warehouse.

G. Success!

This CDC-based solution reduced the waiting time for new listings posted from a daily batch window to under 30 minutes, after which the analytics engine ranked the listings and pushed them to the queue using the investors’ criteria.

Underwriters could review the listings, estimate values, and successfully meet their target of completing 1,000 single-family home acquisitions for a large investor in a very short time.

Setting up the NRT data pipeline involves configuring multiple systems to talk to each other. If set up correctly, these components will work well together to handle this and many other use cases.

Gathering and compiling data from multiple sources and making them usable in a short time is often the greatest challenge to be overcome to get value from business analytics. Write to info@tigeranalytics.com so that we can help.

The post Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake appeared first on Tiger Analytics.

]]>
Building Data Engineering Solutions: A Step-by-Step Guide with AWS https://www.tigeranalytics.com/perspectives/blog/data-engineering-implementation-using-aws/ Thu, 14 Feb 2019 18:10:43 +0000 https://www.tigeranalytics.com/?p=7076 In this article, delve into the intricacies of an AWS-based Analytics pipeline. Learn to apply this design thinking to tackle similar challenges you might encounter and in order to streamline data workflows.

The post Building Data Engineering Solutions: A Step-by-Step Guide with AWS appeared first on Tiger Analytics.

]]>
Introduction:

Lots of small to midsize companies use Analytics to understand business activity, lower their costs and increase their reach. Some of these companies may intend to build and maintain an Analytics pipeline but change their mind when they see how much money and tech know-how it takes. For any enterprise, data is an asset. And they are unwilling to share this asset with external players: they might end up risking their market advantage. To extract maximum value from intelligence harvesting, enterprises need to build and maintain their own data warehouses and surrounding infrastructure.

The Analytics field is buzzing with talks on applications related to Machine Learning, which have complex requirements like storing and processing unstructured streaming data. Instead of pushing themselves towards advanced analytics, companies can extract a lot of value simply by using good reporting infrastructure. This is because currently a lot of SME activity is still at the batch data level. From an infrastructure POV, cloud players like Amazon Web Services (AWS) and Microsoft Azure have taken away a lot of complexity. This has enabled companies to implement an accurate, robust reporting infrastructure (more or less) independently and economically. This article is about a specific lightweight implementation of Data Engineering using AWS, which would be perfect for an SME. By the time you finish reading this, you will:

1) Understand the basics of a simple Data Engineering pipeline
2) Know the details of a specific kind of AWS-based Analytics pipeline
3) Apply this design thinking to a similar problem you may come across

Analytics Data Pipeline:

SMEs have their business activity data stored in different places. Getting it all together so that a broad picture of the business’s health emerges is one of the big challenges in analytics. Gathering data from sources, storing it in a structured and accurate manner, then using that data to create reports and visualizations can give SMEs relatively large gains. From a process standpoint, this is what it might look like:

Figure 1: Simple Data Pipeline

But from a business activity effort standpoint, it’s more like:

Figure 2: Business Activity involved in a Data Pipeline

Here’s what’s interesting: although the first two components of the process consume most time and effort, when you look at it from a value chain standpoint, value is realized in the Analyze component.

Figure 3: Analytics Value Chain

The curiously inverse relationship between effort and value keeps SMEs wondering if they will realize the returns they expect on their investment and minimize costs. Analytics today might seem to be all about Machine Learning and cutting-edge technology, but SMEs can realize a lot of value by using relatively simple analytics like:

1) Time series graph on business activity for leadership
2) Bar graph visualization for sales growth over the years
3) For the Sales team: a refreshed, filterable dashboard showing the top ten clients over a chosen time period
4) For the Operations team: an email blast every morning at eight depicting business activity expense over a chosen time period

Many strategic challenges that SMEs face, like business reorganization, controlling operating costs, crisis management, require accurate data to solve. Having an Analytics data pipeline in the cloud allows enterprises to take cost-optimized, data-driven decisions. These can include both strategic decision-making for C-Suite and business-as-usual metrics for the Operations and Sales teams, allowing executives to track their progress. In a nutshell, an Analytics data pipeline makes company information accessible to executives. This is valuable in itself because it enables metrics monitoring (including the derived benefits like forecasting predictions). There you have it, folks: a convincing case for SMEs to experiment with building an in-house Analytics pipeline.

Mechanics of the pipeline:

Before we get into vendors and the value they bring, here’s something for you to think about: there are as many ways to build an Analytics pipeline as there are stars in the sky. The challenge here is to create a data pipeline that is hosted on a secure cloud infrastructure. It’s important to use cloud-native compute and storage components so that the infrastructure is easy to build and operate for an SME.

Usually, source data for SMEs are in the following formats:

1) Payment information stored in Excel
2) Business activity information coming in as API
3) Third-party interaction exported as a .CSV to a location like S3

Using AWS as a platform enables SMEs to leverage the serverless compute feature of AWS Lambda when ingesting the source data into an Aurora Postgres RDBMS. Lambda allows many programming interfaces including Python, a widely used language. Back in 2016-17, the total runtime for Lambda was at five minutes, which was not nearly enough for ETL. Two years later, the limit was increased to 15 minutes. This is still too little time to execute most ETL jobs, but enough for the batch data ingestion requirements of SMEs.

Lambda is usually hosted within a private subnet in the enterprise Virtual Private Cloud (VPC), but it can communicate with third-party source systems through a Network Address Translator (NAT) and Internet Gateways (IG). Python’s libraries (like Pandas) make tabular data quick and easy to process. Once processed, the output dataframe from Lambda is stored onto a table in the Aurora Postgres Database. Aurora prefix is for the AWS flavor of the Postgres Database offering. It makes sense to choose a vanilla relational database because most data is in Excel-type rows and columns format anyway, and reporting engines like Tableau and other BI tools work well with RDBMS engines.

Mapping the components to the process outlined in Figure 1, we get:

Figure 4: Revisiting Analytics pipeline

AWS Architecture:

Let’s take a deeper look into AWS architecture.

Figure 5: AWS-based batch data processing architecture using Serverless Lambda function and RDS database

Figure 5 adds more details to the AWS aspects of a Data Engineering pipeline. Operating on AWS requires companies to share security responsibilities such as:

1) Hosting AWS components with a VPC
2) Identifying public and private subnets
3) Ensuring IG and NAT Gateways can allow components hosted within private subnets to communicate with the internet
4) Provisioning the Database as publicly not accessible
5) Setting aside a dedicated EC2 to route web traffic to this publicly inaccessible database
6) Provisioning security groups for EC2’s public subnet (Lambda in private subnet and Database in DB subnet)
7) Provisioning subnets for app and DB tier in two different Availability Zones (AZ) to ensure (a) DB tier provisioning requirements are met, and (b) Lambda doesn’t run out of IPs when triggered

Running the pipeline:

New data is ingested by timed invocation of Lambda using CloudWatch rules. CloudWatch monitors AWS resources and invokes services at set times using Chron expression. CloudWatch can also be effectively used as a SQL Server Job agent to trigger Lambda events. This accommodates activities with different frequencies like:

1) Refreshing sales activity (daily)
2) Operating Costs information (weekly)
3) Payment activity (biweekly)
4) Tax information (monthly)

CloudWatch can deploy a specific Python script (that takes data from the source, does necessary transformations, and loads it onto a table with known structure) to Lambda once the respective source file or data refresh frequency is known.

Moving on to Postgres, its unique Materialized View and SQL Stored procedure feature (that allows further processing) can also be invoked using a combination of Lambda and CloudWatch. This workflow is helpful to propagate base data after refresh into denormalized, wide tables which can store company-wide sales and operations information.

Figure 6: An example of data flow for building aggregate metrics

Once respective views are refreshed with the latest data, we can connect to the Database using a BI tool for reporting and analysis. It’s important to remember that because we are operating on the AWS ecosystem, the Database must be provisioned as publicly inaccessible and be hosted within a private subnet. Users should only be able to reach it through a web proxy, like nginx or httpd, that is set up on an EC2 on the public subnet to route traffic within the VPC.

Figure 7: BI Connection flow to DB

Access to data can be controlled at the Database level (by granting or denying access to a specific schema) and at the connection level (by whitelisting specific IPs to allow connections and denying connect access by default).

Accuracy is the name of the game:

So you have a really secure and robust AWS architecture, a well-tested Python code for Lambda executions, and a not-so-cheap BI tool subscription. Are you all set? Not really. You might just miss the bus if inaccuracy creeps into the tables during data refresh. A dashboard is only as good as the accuracy of the numbers it displays. Take extra care to ensure that the schema tables you have designed include metadata columns required to identify inaccurate and duplicate data.

Conclusion:

In this article, we took a narrow-angle approach to a specific Data Engineering example. We saw the Effort vs Return spectrum in the Analytics value chain and the value that can be harvested by taking advantage of the available Cloud options. We noted the value in empowering C-suite leaders and company executives with descriptive interactive dashboards.

We looked at building a specific AWS cloud-based Data Engineering pipeline that is relatively uncomplicated and can be implemented by SMEs. We went over the architecture and its different components and briefly touched on the elements of running a pipeline and finally, on the importance of accuracy in reporting and analysis.

Although we saw one specific implementation in this article, the attempt here is to convey the idea that getting value out of an in-house Analytics pipeline is easier than what it used to be say a decade ago. With open source and cloud tools here to make the journey easy, it doesn’t take long to explore and exploit the value hidden in data.

[References:
Disruptive Analytics, Apress, 2016]

 

The post Building Data Engineering Solutions: A Step-by-Step Guide with AWS appeared first on Tiger Analytics.

]]>