Snowflake Archives - Tiger Analytics Fri, 24 Jan 2025 13:29:13 +0000 en-US hourly 1 https://wordpress.org/?v=6.8.1 https://www.tigeranalytics.com/wp-content/uploads/2023/09/favicon-Tiger-Analytics_-150x150.png Snowflake Archives - Tiger Analytics 32 32 Navigating the Digital Seas: How Snowflake’s External Access Integration Streamlines Maritime Data Management https://www.tigeranalytics.com/perspectives/blog/navigating-the-digital-seas-how-snowflakes-external-access-integration-streamlines-maritime-data-management/ Fri, 24 Jan 2025 13:10:32 +0000 https://www.tigeranalytics.com/?post_type=blog&p=24209 The maritime industry is increasingly adopting digital transformation to manage vast amounts of data from ships, sensors, weather, and third-party APIs. Snowflake’s External Access Integration simplifies this process by allowing seamless integration of real-time data without duplication. Read on to know how this feature works in practice and how it supports better, data-driven outcomes in the maritime sector.

The post Navigating the Digital Seas: How Snowflake’s External Access Integration Streamlines Maritime Data Management appeared first on Tiger Analytics.

]]>
As the maritime industry navigates tremendous volumes of data, the call for accelerated digitalization is stronger than ever. The maritime sector is a vast and intricate ecosystem where data flows continuously across interconnected sectors—from vessel management and maintenance to fuel optimization and emissions control. As the United Nations Conference on Trade and Development highlighted in its 2024 report, digital transformation through technologies like blockchain, artificial intelligence, and automation is crucial for improving port operations. Ports that have embraced these innovations report reduced waiting times, enhanced cargo tracking, and greater efficiency in transshipment processes.

In this data-intensive environment, operational data from ship-installed software is just the beginning. Third-party sources such as AIS data, weather information, and other cloud applications play a vital role in many maritime use cases. Traditionally, integrating this diverse data—often accessed via REST APIs—required external platforms like AWS Lambda or Databricks.

With Snowflake’s introduction of the External Access Integration feature, maritime organizations can now consolidate API data integration and data engineering workflows within a single, powerful platform. This breakthrough not only simplifies operations but also improves flexibility and efficiency.

Let’s discuss a use case

Suppose we need to retrieve crew rest and work hours data from a third-party regulatory service to generate near real-time, period-specific compliance reports for all vessels managed by a ship manager. These details are made available to the business through REST APIs.

Landscape Dissection and Data Enablement

Let’s assume Snowflake is the chosen cloud data warehouse platform, with Azure serving as the primary solution for data lake requirements. Operational data for vessels from various legacy systems and other sources is integrated into Snowflake. Data pipelines and models are then built on this integrated data to meet business needs. The operational data is ingested into Snowflake through a combination of Snowflake’s native data loading options and the replication tool Fivetran.

Challenges Explained

Outbound REST API calls must be made to retrieve crew rest and work hours data. The semi-structured data from the API response will need to undergo several transformations before it can be integrated with the existing vessel operational data in Snowflake. Additionally, the solution must support the near real-time requirements of the compliance report. The new pipeline should seamlessly align with the current data pipelines for ingestion and transformation, ensuring no disruptions to existing processes.

We now explore Snowflake’s external access integration to address these challenges.

What is Snowflake’s External Access Integration?

Snowflake’s External Access Integration empowers businesses to integrate the data seamlessly from diverse external sources and networks, helping them bridge data gaps and providing a holistic view for better decisions. The feature gives users the flexibility to read external data and integrate only which is necessary for the use case while the majority of the data resides at the source. Key benefits of this feature include:

  • Enabling real time access to complex third-party data providers
  • Eliminating data duplication
  • Enriching data with selective data integration that benefits your use case
  • Enhanced data-driven decision making

Leveraging Snowflake’s External Access Integration: A Step-by-Step Guide

Here is a complete walkthrough of the procedures to solve our use case:

Step 1: Creating Network Rule

  • Snowflake enables its accounts to selectively and securely access databases or services via its network rules. This enhances the security by limiting the list of IPs that can connect to Snowflake.
  • CREATE NETWORK RULE command helps us to add the list of APIs that Snowflake account should connect to.
CREATE [OR REPLACE] NETWORK RULE <nw_rule_name>
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = (<api_url_link>)

Step 2: Creating Secret

  • Securely save the credentials to be used while authenticating to APIs via secrets in Snowflake.
  • CREATE SECRET command is used to represent the credentials such as username and password, which are used to authenticate the API we have added to the network rule in step 1.
Basic Authentication
CREATE [ OR REPLACE ] SECRET <secret_name>
TYPE = PASSWORD
USERNAME = '<username>'
PASSWORD = '<password>'

Step 3: Creating External Access Integration

  • Specify the network rule and secrets used to connect to the APIs via external access integration.
  • CREATE EXTERNAL ACCESS INTEGRATION command aggregates the allowed network rule and secrets to securely use in UDFs or procedures.
CREATE [ OR REPLACE ] EXTERNAL ACCESS INTEGRATION <ext_integration_name>
ALLOWED NETWORK RULES = <nw_rule_name>
ENABLED = TRUE

Step 4: External Call

External Call

There are multiple methods to call external APIs – UDFs or procedures or direct calls from Snowflake Notebooks (Preview Feature as of now). Let’s explore Snowflake Notebooks to make external calls via Python. Snowflake Notebooks offer an interactive environment to code your logics in SQL or Python.

  • To make API calls from a particular notebook, enable the created external access integration in step 3 in your notebook. This can be done from the ‘Notebook settings’ options available for the Snowflake notebooks.
  • After importing required libraries, call the required APIs and save the response object.
  • Leverage Snowflake Snowpark framework to operate on the data frames and save your results to Snowflake tables.
  • Use Snowflake’s native functions to flatten and format the semi structured data that is mostly received as a response from the API calls.
  • The transformed data via API can be further combined with the operational or modeled data in Snowflake.

Configuration: Creating a network rule and external access integration.

create OR replace network RULE NW_RULE_PUBLIC_API
mode = egress
type = host_port
value_list = ('geocoding-api.open-meteo.com')

create or replace external access integration EAI_PUBLIC_API
allowed_network_rules = (NW_RULE_PUBLIC_API)
enabled = true

Get API Request: Get requests for a public marine REST API

import requests
def get_data_from_marine_api():
    url = f'https://geocoding-api.open-meteo.com/v1/search?name=Singapore&count=10&language=en&format=json'
    headers = {"content-type": "application/json"}
    response = requests.get(url,headers = headers)
    return response
response = get_data_from_marine_api()
data = response.json()
data_frame = pd.json_normalize(data)

Using Snowpark: To save the RAW response to the Landing Zone table.

from snowflake.snowpark.context import get_active_session
session = get_active_session()
df1 = session.create_dataframe(data_frame) 
df1.write.mode ("overwrite").save_as_table("RAW_GEO_LOCATIONS")    

Using Snowpark: To flatten the JSON for further transformations and combine with operational data for further business rules and logics. This notebook can be orchestrated in Snowflake to synchronize with the existing data pipelines.

import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col
from snowflake.snowpark.functions import *

session = get_active_session()
flatten_function = table_function("flatten")

geo_locations_raw = session.table("RAW_GEO_LOCATIONS")
geo_locations_tr = geo_locations_raw.join_table_function(flatten_function(geo_locations_raw["RESULTS"])).drop(["SEQ","PATH","RESULTS" "THIS","GENERATIONTIME_MS"])
geo_locations_trf = geo_locations_tr.select(col("index").alias("index"),col("VALUE")["country"].alias("country"),col("VALUE")["country_code"].alias("country_code"),col("VALUE")["longitude"].alias("long"),col("VALUE")["latitude"].alias("lat"),col("VALUE")["name"].alias("name"),col("VALUE")["population"].alias("population"),col("VALUE")["timezone"].alias("timezone"),col("VALUE")["elevation"].alias("elevation"))

geo_locations_trf.write.mode("overwrite").save_as_table("TR_GEO_LOCATIONS")    

The Snowflake External Access Integration advantage

  • Native feature of Snowflake which eliminates the need for moving data from one environment to another.
  • Can be integrated into the existing data pipelines in Snowflake promptly and hence, allows for easy maintenance.
  • Can use Snowflake’s Snowpark features and native functions for any data transformations.
  • Snowflake’s unified compute environment decreases the cost and enhances the efficiency of data pipelines by reducing the latency.
  • Users can not only call the REST APIs via Snowflake external access integration but also web services that are defined by SOAP protocols.

Below is sample code for calling SOAP-based services:

import requests
def get_data_from_web_service():
    url = f'https://www.w3schools.com/xml/tempconvert.asmx'
    headers = {"content-type": "application/soap+xml"}
    xml ="""
<soap12:Envelope xmlns:xsi="https://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="https://www.w3.org/2001/XMLSchema" xmlns:soap12="https://www.w3.org/2003/05/soap-envelope">
    <soap12:Body>
    <CelsiusToFahrenheit xmlns="https://www.w3schools.com/xml/">
        <Celsius>20</Celsius>
    </CelsiusToFahrenheit>
    </soap12:Body>
</soap12:Envelope>"""
    response = requests.post(url,headers = headers,data=xml)
    return response

response = get_data_from_web_service()
print(response.content)

Summary

The maritime industry, like many others, is embracing digital transformation, driven by the increasing volume and variety of data from complex systems, sensors, agencies, and regulatory bodies. This shift opens new opportunities for leveraging data from diverse sources to drive advanced analytics and machine learning. Snowflake provides a robust platform to support these efforts, offering efficient integration capabilities and external access features that make it easy to handle data from REST APIs. Its flexibility and scalability make Snowflake a valuable tool in helping the maritime industry harness the power of data for improved decision-making and operational efficiency.

The post Navigating the Digital Seas: How Snowflake’s External Access Integration Streamlines Maritime Data Management appeared first on Tiger Analytics.

]]>
Building Trusted Data: A Comprehensive Guide to Tiger Analytics’ Snowflake Native Data Quality Framework https://www.tigeranalytics.com/perspectives/blog/building-trusted-data-a-comprehensive-guide-to-tiger-analytics-snowflake-native-data-quality-framework/ Fri, 24 Jan 2025 13:06:13 +0000 https://www.tigeranalytics.com/?post_type=blog&p=24215 Challenges in data quality are increasingly hindering organizations, with issues like poor integration, operational inefficiencies, and lost revenue opportunities. A 2024 report reveals that 67% of professionals don’t fully trust their data for decision-making. To tackle these problems, Tiger Analytics developed a Snowflake native Data Quality Framework, combining Snowpark, Great Expectations, and Streamlit. Explore how the framework ensures scalable, high-quality data for informed decision-making.

The post Building Trusted Data: A Comprehensive Guide to Tiger Analytics’ Snowflake Native Data Quality Framework appeared first on Tiger Analytics.

]]>
A 2024 report on data integrity trends and insights found that 50% of the 550 leading data and analytics professionals surveyed believed data quality is the number one issue impacting their organization’s data integration projects. And that’s not all. Poor data quality was also negatively affecting other initiatives meant to improve data integrity with 67% saying they don’t completely trust the data used for decision-making. As expected, data quality is projected to be a top priority investment for 2025.

Trusted, high-quality data is essential to make informed decisions, deliver exceptional customer experiences, and stay competitive. However, maintaining quality is not quite so simple, especially as data volume grows. Data arrives from diverse sources, is processed through multiple systems, and serves a wide range of stakeholders, increasing the risk of errors and inconsistencies. Poor data quality can lead to significant challenges, including:

  • Operational Inefficiencies: Incorrect or incomplete data can disrupt workflows and increase costs.
  • Lost Revenue Opportunities: Decisions based on inaccurate data can result in missed business opportunities.
  • Compliance Risks: Regulatory requirements demand accurate and reliable data; failure to comply can result in penalties.
  • Eroded Trust: Poor data quality undermines confidence in data-driven insights, impacting decision-making and stakeholder trust.

Manual approaches to data quality are no longer sustainable in modern data environments. Organizations need a solution that operates at scale without compromising performance, integrates seamlessly into existing workflows and platforms, and provides actionable insights for continuous improvement.

This is where Tiger Analytics’ Snowflake Native Data Quality Framework comes into play, leveraging Snowflake’s unique capabilities to address these challenges effectively.

Tiger Analytics’ Snowflake Native Data Quality Framework – An Automated and Scalable Solution

At Tiger Analytics, we created a custom solution leveraging Snowpark, Great Expectations (GE), Snowflake Data Metric Functions, and Streamlit to redefine data quality processes. By designing this framework as Snowflake-native, we capitalize on the platform’s capabilities for seamless integration, scalability, and performance.

Snowflake-Native-Data-Quality-Framework

Snowflake’s native features offer significant advantages when building a Data Quality (DQ) framework, addressing the evolving needs of data management and governance. These built-in tools streamline processes, ensuring efficient monitoring, validation, and enhancement of data quality throughout the entire data lifecycle:

  • Efficient Processing with Snowpark:
    Snowpark lets users run complex validations and transformations directly within Snowflake. Its ability to execute Python, Java, or Scala workloads ensures that data remains in place, eliminating unnecessary movement and boosting performance.
  • Flexible and Predefined DQ Checks:
    The inclusion of Great Expectations and Snowflake Data Metric Functions enables a hybrid approach, combining open-source flexibility with Snowflake-native precision. This ensures that our framework can cater to both standard and custom business requirements.
  • Streamlined Front-End with Streamlit:
    Streamlit provides an interactive interface for configuring rules, schedules, and monitoring results, making it accessible to users of all skill levels.
  • Cost and Latency Benefits:
    By eliminating the need for external tools, containers, or additional compute resources, our framework minimizes latency and reduces costs. Every process is optimized to leverage Snowflake’s compute clusters for maximum efficiency.
  • Integration and Automation:
    Snowflake’s task scheduling, streams, and pipelines ensure seamless integration into existing workflows. This makes monitoring and rule execution effortless and highly automated.

Tiger Analytics’ Snowflake Native Data Quality Framework leverages Snowflake’s ecosystem to provide a scalable and reliable data quality solution that can adapt to the changing needs of modern businesses.

Breaking Down the Tiger Analytics’ Snowflake Native Data Quality Framework

  1. Streamlit App: A Unified Interface for Data Quality

    Serves as a centralized front-end, integrating multiple components of the data quality framework. It allows users to configure rules and provides access to the profiler, recommendation engine, scheduling, and monitoring functionalities – all within one cohesive interface.

    This unified approach simplifies the management and execution of data quality processes, ensuring seamless operation and improved user experience

  2. Data Profiler

    Data profiler automatically inspects and analyzes datasets to identify anomalies, missing values, duplicates, and other data quality issues directly within Snowflake. It helps generate insights into the structure and health of the data, without requiring external tools.

    It also provides metrics on data distribution, uniqueness, and other characteristics to help identify potential data quality problems

  3. DQ Rules Recommendation Engine

    The DQ Rules Recommendation Engine analyzes data patterns and profiles to suggest potential data quality rules based on profiling results, metadata, or historical data behavior. These recommendations can be automatically generated and adjusted for more accurate rule creation.

  4. DQ Engine

    The DQ Engine is the core of Tiger Analytics’ Snowflake Native Data Quality Framework. Built using Snowpark, Great Expectations, and Snowflake Data Metric Functions, it ensures efficient and scalable data quality checks directly within the Snowflake ecosystem. Key functionalities include:

    • Automated Expectation Suites:
      The engine automatically generates Great Expectations expectation suites based on the configured rules, minimizing manual effort in setting up data quality checks.
    • Snowpark Compute Execution:
      These expectation suites are executed using Snowpark’s compute capabilities, ensuring performance and scalability for even the largest datasets.
    • Results Storage and Accessibility:
      All validation results are stored in Snowflake tables, making them readily available for monitoring, dashboards, and further processing.
    • On-Demand Metric Execution:
      In addition to GE rules, the engine can execute Snowflake Data Metric Functions on demand, providing flexibility for ad hoc or predefined data quality assessments. This combination of automation, scalability, and seamless integration ensures that the DQ Engine is adaptable to diverse data quality needs.
  5. Scheduling Engine

    The Scheduling Engine automates the execution of DQ rules at specified intervals, such as on-demand, daily, or in sync with other data pipelines. By leveraging Snowflake tasks & streams, it ensures real-time or scheduled rule execution within the Snowflake ecosystem, enabling continuous data quality monitoring.

  6. Alerts and Notifications

    The framework integrates with Slack and Outlook to send real-time alerts and notifications about DQ issues. When a threshold is breached or an issue is detected, stakeholders are notified immediately, enabling swift resolution.

  7. NLP-Based DQ Insights

    Leveraging Snowflake Cortex, the NLP-powered app enables users to query DQ results using natural language, providing non-technical users with straightforward access to valuable data quality insights. Users can ask questions such as below and receive clear, actionable insights directly from the data.

    • What are the current data quality issues?
    • Which rules are failing the most?
    • How has data quality improved over time?
  8. DQ Dashboards

    These dashboards offer a comprehensive view of DQ metrics, trends, and rule performance. Users can track data quality across datasets and monitor improvements over time. It also provides interactive visualizations to track data health. Drill-down capabilities provide in-depth insight into specific issues, allowing for more detailed analysis and understanding.

  9. Data Pipeline Integration

    The framework can be integrated with existing data pipelines, ensuring that DQ checks are part of the ETL/ELT process. These checks are automatically triggered as part of the data pipeline workflow, verifying data quality before downstream usage.

How the Framework Adds Value

As organizations rely more on data to guide strategies, ensuring the accuracy, consistency, and integrity of that data becomes a top priority. Tiger Analytics’ Snowflake Native Data Quality Framework addresses this need by providing a comprehensive, end-to-end solution that integrates seamlessly into your existing Snowflake environment. With customizable features and actionable insights, it empowers teams to act quickly and efficiently. Here are the key benefits explained:

  • End-to-End Solution: Everything from profiling to monitoring is integrated in one place.
  • Customizable: Flexibly configure rules, thresholds, and schedules to meet your specific business requirements.
  • Real-Time DQ Enforcement: Maintain data quality throughout the entire data lifecycle with real-time checks.
  • Seamless Integration: Fully native to Snowflake, integrates easily with existing data pipelines and workflows.
  • Actionable Insights: Provide clear, actionable insights to help users take corrective actions quickly.
  • Scalability: Leverages Snowflake’s compute power, allowing for easy scaling as data volume grows.
  • Minimal Latency: Ensures efficient processing and reduced delays by executing DQ checks directly within Snowflake.
  • User-Friendly: Intuitive interface for both technical and non-technical users, enabling broad organizational adoption.
  • Proactive Monitoring: Identify data quality issues before they affect downstream processes.
  • Cost-Efficiency: Reduces the need for external tools, minimizing costs and eliminating data movement overhead.

Next Steps

While the framework offers a wide range of features to address data quality needs, we are continuously looking for opportunities to enhance its functionality. We at Tiger Analytics are exploring additional improvements that will further streamline processes, and increase flexibility. Some of the enhancements we are currently working on include:

  • AI-Driven Recommendations: Use machine learning to improve and refine DQ rule suggestions.
  • Anomaly Detection: Leverage AI to detect unusual patterns and data quality issues that may not be captured by traditional rules.
  • Advanced Visualizations: Enhance dashboards with predictive analytics and deeper trend insights.
  • Expanded Integration: Explore broader support for hybrid cloud and multi-database environments.

A streamlined data quality framework redefines how organizations ensure and monitor data quality. By leveraging Snowflake’s capabilities and tools like SnowPark, our Snowflake Native Data Quality Framework simplifies complex processes and delivers measurable value.

The post Building Trusted Data: A Comprehensive Guide to Tiger Analytics’ Snowflake Native Data Quality Framework appeared first on Tiger Analytics.

]]>
Building Dynamic Data Pipelines with Snowpark: Our Framework to Drive Modern Data Transformation https://www.tigeranalytics.com/perspectives/blog/building-dynamic-data-pipelines-with-snowpark-our-framework-to-drive-modern-data-transformation/ Mon, 21 Oct 2024 14:30:41 +0000 https://www.tigeranalytics.com/?post_type=blog&p=23878 Learn about the challenges of traditional data transformation methods and how a dynamic approach using metadata configuration can help address these issues. By defining transformation rules and specifications, enterprises can create flexible pipelines that adapt to their evolving data processing needs, ultimately accelerating the process of extracting insights from data.

The post Building Dynamic Data Pipelines with Snowpark: Our Framework to Drive Modern Data Transformation appeared first on Tiger Analytics.

]]>
Why is Data Transformation important?

E owns a retail business and makes their daily sales through their online and brick-and-mortar stores. Every day, they receive a huge wave of data in their systems—products viewed, orders placed, returns received, new customers, repeat customers, etc. Data ingestion pipelines help them ingest data from disparate sources. The ingested raw data pool is large and can generate valuable insights, but how do they harmonize it and turn it into actionable insights that boost the business?

That’s where Data transformation comes in.

Customers may have diverse data formats and reporting requirements, and that’s why data transformation forms a crucial aspect of any analytics product. If we examine traditional methods of manually building data transformation pipelines, we can see that these can be time-consuming and often delay the life cycle of deriving insights from data. One solution to this challenge is implementing dynamic transformation using metadata configuration. A flexible pipeline capable of adapting to data processing needs can be created by defining transformation rules and specifications.

Modernizing Data Platforms with Snowflake: Our Snowpark & Streamlit Solution

At Tiger Analytics, we created a custom solution with Snowpark and Streamlit to build dynamic data transformation pipelines based on a metadata-driven framework to accelerate data platform modernization into Snowflake. Once data is available in the raw layer, the current framework can be leveraged to build a data pipeline to transform and harmonize data in subsequent layers. The high-level architecture of this Snowpark framework is depicted below in Figure 1.


Figure 1: High-level architecture

Tiger’s Snowpark Data Transformation Framework Provides Five Key Functionalities:

  • Metadata Setup
  • Once the data ingestion is complete, this Snowpark-based framework is leveraged by users to feed in Source to Target Mapping (STTM) and additional driver (technical) metadata (Incremental/Full Load, Schedule timing, SCD types, etc.), which drives the entire process. The input details are parsed, validated with Snowflake raw layer, and merged into metadata tables.

  • Dynamic Transformation Pipeline Generation
  • After the metadata setup is complete, the next Snowpark code reads the STTM metadata table and creates data transformation SQL dynamically. Based on additional driver metadata information, the Snowpark code implements change data capture (CDC) based on watermark/audit columns on top of raw layer ingested data sets, Truncate and Load, Dynamic merge/insert SQL for SCD types, etc. It is purely driven by user-provided metadata input, and any changes will be refactored in the pipeline during its generation. For this Snowpark framework, it is just a matter of a few clicks to accommodate any changes like

    • New data source, logic to be included
    • Logic changes to existing pipeline
  • Schedule and Execution
  • Snowflake Task is used to implement orchestration for data transformation pipelines. It is created dynamically using the Snowpark framework. Users can provide preferred cadence information as input, which can be referenced to create tasks dynamically with Cron schedule. Once the Tasks are created and resumed, they will automatically trigger in the scheduled cadence. As this is a metadata-driven framework, the good part is that any changes will be accommodated in the pipeline quickly.

  • Alert & Notification
  • A Snowpark-based email notification framework helps trigger emails in Outlook and Slack notifications in case of any failures observed during data transformation pipeline execution.

  • Detailed Logging
  • This Snowpark framework captures detailed logging of each step for ease of debugging. It also provides information about the pipeline execution status, which can be a key metric for other downstream modules like Observability.

This Snowpark metadata driven framework can be leveraged in any data transformation program in Snowflake to set up dynamic transformation pipelines. It can help to accelerate the overall journey and will implement the business logic efficiently, with reduced code quality issues which often arise because of human errors.

Comparing our Snowpark Framework to Other Data Transformation Pipelines

What makes our Snowpark framework unique compared to other traditional data transformation pipelines?

  • Agility: Post finalization of the data model, this framework helps in faster implementation of transformation pipelines dynamically in Snowflake.
  • Flexibility: Allows users to make easy modifications in the logic to adapt to the specific needs of each customer without manually changing the underlying code.
  • Efficiency: Less efforts are needed to implement changes; when input metadata is changed, the pipeline gets updated.
  • Auto alerts: An email is triggered based on Snowflake’s native email integration feature to alert users of any pipeline execution failures.
  • Multi-feature support: Provides flexibility to enable SCD-type implementations, change data capture, and e2e detailed orchestration leveraging Snowflake Tasks.
  • Pipeline Monitoring: Proper logs are maintained at each level to trace back easily in case of any issues. Proven highly impactful, reducing the overall time for the support team to debug.
  • Scalability: Data volumes keep increasing. Dynamic transformation pipelines in cloud data warehouses like Snowflake can scale with the increasing demand.
  • Ease of Maintenance: Simple to maintain and update as metadata changes are made directly from the Streamlit app without altering the pipeline manually.

Final thoughts

At Tiger Analytics, we created a flexible metadata-driven solution to generate dynamic data transformation pipelines. With the help of the Snowpark framework, business teams can now rapidly convert their Source-to-Target Mapping (STTM) logic into executable pipelines, significantly reducing the time to market.

The ability to quickly adapt transformation rules and handle changes through a simple, metadata-driven process ensures that businesses can harmonize raw data from multiple silos without delays, delivering faster insights and value.

With its scalability, adaptability, and ease of maintenance, this framework allows organizations to manage complex transformations effectively. In short, dynamic data transformation ensures that the data is always optimized for decision-making, empowering businesses to stay agile and competitive.

The post Building Dynamic Data Pipelines with Snowpark: Our Framework to Drive Modern Data Transformation appeared first on Tiger Analytics.

]]>
How to Simplify Data Profiling and Management with Snowpark and Streamlit https://www.tigeranalytics.com/perspectives/blog/how-to-simplify-data-profiling-and-management-with-snowpark-and-streamlit/ Thu, 10 Oct 2024 12:41:48 +0000 https://www.tigeranalytics.com/?post_type=blog&p=23845 Learn why data quality is one of the most overlooked aspects of data management. While all models need good quality data to generate useful insights and patterns, data quality is especially important. In this blog, we explore how data profiling can help you understand your data quality. Discover how Tiger Analytics leverages Snowpark and Streamlit to simplify data profiling and management.

The post How to Simplify Data Profiling and Management with Snowpark and Streamlit appeared first on Tiger Analytics.

]]>
The accuracy of the data-to-insights journey is underpinned by one of the most foundational yet often overlooked aspects of data management – Data Quality. While all models need good quality data to generate useful insights and patterns, data quality is especially important across industries like retail, healthcare, and finance. Inconsistent, missing, or duplicate data can impact critical operations, from customer segmentation to and even affect regulatory compliance, resulting in potential financial or reputational losses.

Let’s look at an example:

A large retail company relies on customer data from various sources, such as online orders, in-store purchases, and loyalty program interactions. Over time, inconsistencies and errors in the customer database, such as duplicate records, incorrect addresses, and missing contact details, impacted the company’s ability to deliver personalized marketing campaigns, segment customers accurately, and forecast demand.

Data Profiling Matters – Third-party or Native app? Understanding the options

Data profiling helps the organization understand the nature of the data to build the data models, and ensures data quality and consistency, enabling faster decision-making and more accurate insights.

  • Improves Data Accuracy: Identifies inconsistencies, errors, and missing values.
  • Supports Better Decision-Making: Ensures reliable data for predictive analytics.
  • Enhances Efficiency: Helps detect and remove redundant data, optimizing resources and storage.

For clients using Snowflake for data management purposes, traditional data profiling tools often require moving data outside of Snowflake, creating complexity, higher costs, and security risks.

  • Data Transfer Overhead: External tools may require data to be moved out of Snowflake, increasing latency and security risks.
  • Scalability Limitations: Third-party tools may struggle with large Snowflake datasets.
  • Cost and Performance: Increased egress costs and underutilization of Snowflake’s native capabilities.
  • Integration Complexity: Complex setup and potential incompatibility with Snowflake’s governance and security features.

At Tiger Analytics, our clients faced a similar problem statement. To address these issues, we developed a Snowflake Native App utilizing Snowpark and Streamlit to perform advanced data profiling and analysis within the Snowflake ecosystem. This solution leverages Snowflake’s virtual warehouses for scalable, serverless computational power, enabling efficient profiling without external infrastructure.

How Snowpark Makes Data Profiling Simple and Effective

Snowpark efficiently manages large datasets by chunking data into smaller pieces, ensuring smooth profiling tasks. We execute YData Profiler and custom Python functions directly within Snowflake, storing results like outlier detection and statistical analysis for historical tracking.

We also created stored procedures and UDFs with Snowpark to automate daily or incremental profiling jobs. The app tracks newly ingested data, using Snowflake’s Task Scheduler to run operations automatically. Additionally, profiling outputs integrate seamlessly into data pipelines, with alerts triggered when anomalies are detected, ensuring continuous data quality monitoring.

By keeping operations within Snowflake, Snowpark reduces data transfer, lowering latency and enhancing performance. Its native integration ensures efficient, secure, and scalable data profiling.

Let’s look at the key features of the app, built leveraging Snowpark’s capabilities.

Building a Native Data Profiling App in Snowflake – Lessons learnt:

1. Comprehensive Data Profiling

At the core of the app’s profiling capabilities are the YData Profiler or custom-built profilers – Python libraries, integrated using Snowpark. These libraries allow users to profile data directly within Snowflake by leveraging its built-in compute resources.

Key features include:

  • Column Summary Statistics: The Quickly review important statistics for columns with all the datatypes like string, number, and date to understand the data at a glance.
  • Data Completeness Checks: Identify missing values and assess the completeness of your datasets to ensure no critical information is overlooked.
  • Data Consistency Checks: Detect anomalies or inconsistent data points to ensure that your data is uniform and accurate across the board.
  • Pattern Recognition and Value Distribution: Analyze data patterns and value distributions to identify trends or detect unusual values that might indicate data quality issues.
  • Overall Data Quality Checks: Review the health of your dataset by identifying potential outliers, duplicates, or incomplete data points.

2. Snowflake Compute Efficiency

The app runs entirely within Snowflake’s virtual warehouse environment. No external servers or machines are needed, as the app fully utilizes Snowflake’s built-in computing power. This reduces infrastructure complexity while ensuring top-tier performance, allowing users to profile and manage even large datasets efficiently.

3. Flexible Profiling Options

The app allows users to conduct profiling in two distinct ways—either by examining entire tables or by focusing on specific columns. This flexibility ensures that users can tailor the profiling process to their exact needs, from broad overviews to highly targeted analyses.

4. Full Data Management Capabilities

In addition to profiling, the app supports essential data management tasks. Users can insert, update, and delete records within Snowflake directly from the app, providing an all-in-one tool for both profiling and managing data.

5. Streamlit-Powered UI for Seamless Interaction

The app is built using Streamlit, which provides a clean, easy-to-use user interface. The UI allows users to interact with the app’s profiling and data management features without needing deep technical expertise. HTML-based reports generated by the app can be easily shared with stakeholders, offering clear and comprehensive data insights.

6. Ease in Generating and Sharing Profiling Reports

Once the data profiling is complete, the app generates a pre-signed URL that allows users to save and share the profiling reports. Here’s how it works:

  • Generating Pre-Signed URLs: The app creates a pre-signed URL to a file on a Snowflake stage using the stage name and relative file path. This URL provides access to the generated reports without requiring direct interaction with Snowflake’s internal storage.
  • Accessing Files: Users can access the files in the stage through several methods:
    • Navigate directly to the pre-signed URL in a web browser.
    • Retrieve the pre-signed URL within Snowsight by clicking on it in the results table.
    • Send the pre-signed URL in a request to the REST API for file support.
  • Handling External Stages: For files in external stages that reference Microsoft Azure cloud storage, the function requires Azure Active Directory authentication. This is because querying the function fails if the container is accessed using a shared access signature (SAS) token. The GET_PRESIGNED_URL function requires Azure Active Directory authentication to create a user delegation SAS token, utilizing a storage integration object that stores a generated service principal.

7. Different roles within an organization can utilize this app in various scenarios:

  • Data Analysts: Data analysts can use the app to profile datasets, identify inconsistencies, and understand data quality issues. They will analyze the patterns and relationships in the data and point out the necessary fixes to resolve any errors, such as missing values or outliers.
  • Data Stewards/Data Owners: After receiving insights from data analysts, data stewards or data owners can apply the suggested fixes to cleanse the data, ensuring it meets quality standards. They can make adjustments directly through the app by inserting, updating, or deleting records, ensuring the data is clean and accurate for downstream processes.

This collaborative approach between analysts and data stewards ensures that the data is high quality and reliable, supporting effective decision-making across the organization.

Final notes

Snowpark offers a novel approach to data profiling by bringing it into Snowflake’s native environment. This approach reduces complexity, enhances performance, and ensures security. Whether improving customer segmentation in retail, ensuring compliance in healthcare, or detecting fraud in finance, Snowflake Native Apps with Snowpark provides a timely solution for maintaining high data quality across industries.

For data engineers looking to address client pain points this translates to:

  • Seamless Deployment: Easily deployable across teams or accounts, streamlining collaboration.
  • Dynamic UI: The Streamlit-powered UI provides an interactive dashboard, allowing users to profile data without extensive technical knowledge.
  • Flexibility: Supports profiling of both Snowflake tables and external files (e.g., CSV, JSON) in external stages like S3 or Azure Blob.

With upcoming features like AI-driven insights, anomaly detection, and hierarchical data modeling, Snowpark provides a powerful and flexible platform for maintaining data quality across industries, helping businesses make smarter decisions and drive better outcomes.

The post How to Simplify Data Profiling and Management with Snowpark and Streamlit appeared first on Tiger Analytics.

]]>
Tiger’s Snowpark-Based Framework for Snowflake: Illuminating the Path to Efficient Data Ingestion https://www.tigeranalytics.com/perspectives/blog/tigers-snowpark-based-framework-for-snowflake-illuminating-the-path-to-efficient-data-ingestion/ Thu, 25 Apr 2024 07:05:45 +0000 https://www.tigeranalytics.com/?post_type=blog&p=21444 In the era of AI and machine learning, efficient data ingestion is crucial for organizations to harness the full potential of their data assets. Tiger's Snowpark-based framework addresses the limitations of Snowflake's native data ingestion methods, offering a highly customizable and metadata-driven approach that ensures data quality, observability, and seamless transformation.

The post Tiger’s Snowpark-Based Framework for Snowflake: Illuminating the Path to Efficient Data Ingestion appeared first on Tiger Analytics.

]]>
In the fast-paced world of E-commerce, inventory data is a goldmine of insights waiting to be unearthed. Imagine an online retailer with thousands of products, each with their own unique attributes, stock levels, and sales history. By efficiently ingesting and analyzing this inventory data, the retailer can optimize stock levels, predict demand, and make informed decisions to drive growth and profitability. As data volumes continue to grow and the complexity of data sources increases, the importance of efficient data ingestion becomes even more critical.

With advancements in artificial intelligence (AI) and machine learning (ML), the demand for real-time and accurate data ingestion has reached new heights. AI and ML models, require a constant feed of high-quality data to train, adapt, and deliver accurate insights and predictions. Consequently, organizations must prioritize robust data ingestion strategies to harness the full potential of their data assets and stay competitive in the AI-driven era.

Challenges with Existing Data Ingestion Mechanisms

While platforms like Snowflake offer powerful data warehousing capabilities, the native data ingestion methods provided by Snowflake, such as Snowpipe and the COPY command, often face limitations that hinder scalability, flexibility, and efficiency.

Limitations of the COPY Method

  • Data Transformation Overhead: Extensive transformation during the COPY process can introduce overhead, which is better performed post-loading.
  • Limited Horizontal Scalability: COPY struggles to scale efficiently with large data volumes, underutilizing warehouse resources.
  • File Format Compatibility: Complex formats like Excel require preprocessing for compatibility with Snowflake’s COPY INTO operation.
  • Data Validation and Error Handling: Snowflake’s validation during COPY is limited; additional checks can burden performance.
  • Manual Optimization: Achieving optimal performance with COPY demands meticulous file size and concurrency management, adding complexity.

Limitations of Snowpipe

  • Lack of Upsert Support: Snowpipe lacks direct upsert functionality, necessitating complex workarounds.
  • Limited Real-Time Capabilities: While near-real-time, Snowpipe may not meet the needs for instant data availability or complex streaming transformations.
  • Scheduling Flexibility: Continuous operation limits precise control over data loading times.
  • Data Quality and Consistency: Snowpipe offers limited support for data validation and transformation, requiring additional checks.
  • Limited Flexibility: Snowpipe is optimized for streaming data into Snowflake, limiting custom processing and external integrations.
  • Support for Specific Data Formats: Snowpipe supports delimited text, JSON, Avro, Parquet, ORC, and XML (using Snowflake XML format), necessitating conversion for unsupported formats.

Tiger’s Snowpark-Based Framework – Transforming Data Ingestion

To address these challenges and unlock the full potential of data ingestion, organizations are turning to innovative solutions that leverage advanced technologies and frameworks. One such solution we’ve built, is Tiger’s Snowpark-based framework for Snowflake.

Our solution transforms data ingestion by offering a highly customizable framework driven by metadata tables. Users can efficiently tailor ingestion processes to various data sources and business rules. Advanced auditing and reconciliation ensure thorough tracking and resolution of data integrity issues. Additionally, built-in data quality checks and observability features enable real-time monitoring and proactive alerting. Overall, the Tiger framework provides a robust, adaptable, and efficient solution for managing data ingestion challenges within the Snowflake ecosystem.

Snowpark based framework

Key features of Tiger’s Snowpark-based framework include:

Configurability and Metadata-Driven Approach:

  • Flexible Configuration: Users can tailor the framework to their needs, accommodating diverse data sources, formats, and business rules.
  • Metadata-Driven Processes: The framework utilizes metadata tables and configuration files to drive every aspect of the ingestion process, promoting consistency and ease of management.

Advanced Auditing and Reconciliation:

  • Detailed Logging: The framework provides comprehensive auditing and logging capabilities, ensuring traceability, compliance, and data lineage visibility.
  • Automated Reconciliation: Built-in reconciliation mechanisms identify and resolve discrepancies, minimizing errors and ensuring data integrity.

Enhanced Data Quality and Observability:

  • Real-Time Monitoring: The framework offers real-time data quality checks and observability features, enabling users to detect anomalies and deviations promptly.
  • Custom Alerts and Notifications: Users can set up custom thresholds and receive alerts for data quality issues, facilitating proactive monitoring and intervention.

Seamless Transformation and Schema Evolution:

  • Sophisticated Transformations: Leveraging Snowpark’s capabilities, users can perform complex data transformations and manage schema evolution seamlessly.
  • Adaptability to Changes: The framework automatically adapts to schema changes, ensuring compatibility with downstream systems and minimizing disruption.

Data continues to be the seminal building block that determines the accuracy of the output. As businesses race through this data-driven era, investing in robust and future-proof data ingestion frameworks will be key to translating data into real-world insights.

The post Tiger’s Snowpark-Based Framework for Snowflake: Illuminating the Path to Efficient Data Ingestion appeared first on Tiger Analytics.

]]>
Migrating from Legacy Systems to Snowflake: Simplifying Excel Data Migration with Snowpark Python https://www.tigeranalytics.com/perspectives/blog/migrating-from-legacy-systems-to-snowflake-simplifying-excel-data-migration-with-snowpark-python/ Thu, 18 Apr 2024 05:29:21 +0000 https://www.tigeranalytics.com/?post_type=blog&p=21382 Discover how Snowpark Python streamlines the process of migrating complex Excel data to Snowflake, eliminating the need for external ETL tools and ensuring data accuracy.

The post Migrating from Legacy Systems to Snowflake: Simplifying Excel Data Migration with Snowpark Python appeared first on Tiger Analytics.

]]>
A global manufacturing company is embarking on a digital transformation journey, migrating from legacy systems, including Oracle databases and QlikView for visualization, to Snowflake Data Platform and Power BI for advanced analytics and reporting. What does a day in the life of their data analyst look like?

Their workday is consumed by the arduous task of migrating complex Excel data from legacy systems to Snowflake. They spend hours grappling with detailed Excel files, trying to navigate through multiple headers, footers, subtotals, formulas, macros, and custom formatting. The manual process is time-consuming, and error-prone, and hinders their ability to focus on deriving valuable insights from the data.

To streamline their workday, the data analyst can leverage Snowpark Python’s capabilities to streamline the process. They can effortlessly access and process Excel files directly within Snowflake, eliminating the need for external ETL tools or complex migration scripts. With just a few lines of code, they can automate the extraction of data from Excel files, regardless of their complexity. Formulas, conditional formatting, and macros are handled seamlessly, ensuring data accuracy and consistency.

Many businesses today grapple with the complexities of Excel data migration. Traditional ETL scripts may suffice for straightforward data migration, but heavily customized processes pose significant challenges. That’s where Snowpark Python comes into the picture.

Snowpark Python: Simplifying Excel Data Migration

Snowpark Python presents itself as a versatile tool that simplifies the process of migrating Excel data to Snowflake. By leveraging Snowpark’s file access capabilities, users can directly access and process Excel files within Snowflake, eliminating the need for external ETL tools or complex migration scripts. This approach not only streamlines the migration process but also ensures data accuracy and consistency.

With Snowpark Python, businesses can efficiently extract data from Excel files, regardless of their complexity. Python’s rich ecosystem of libraries enables users to handle formulas, conditional formatting, and macros in Excel files. By integrating Python scripts seamlessly into Snowflake pipelines, the migration process can be automated, maintaining data quality throughout. This approach not only simplifies the migration process but also enhances scalability and performance.

Snowpark-image

Tiger Analytics’ Approach to Excel Data Migration using Snowpark Python

At Tiger Analytics, we‘ve worked with several Fortune 500 clients on data migration projects. In doing so, we’ve found a robust solution: using Snowpark Python to tackle this problem head-on. Here’s how it works.

We crafted Snowpark code that seamlessly integrates Excel libraries to facilitate data loading into Snowflake. Our approach involves configuring a metadata table within Snowflake to store essential details such as Excel file names, sheet names, and cell information. By utilizing Snowpark Python and standard stored procedures, we have implemented a streamlined process that extracts configurations from the metadata table and dynamically loads Excel files into Snowflake based on these parameters. This approach ensures data integrity and accuracy throughout the migration process, empowering businesses to unlock the full potential of their data analytics workflows within Snowflake. So we’re able to not only accelerate the migration process but also future-proof data operations, enabling organizations to focus on deriving valuable insights from their data.

The advantage of using Snowpark Python is that it enables new use cases for Snowflake customers, allowing them to ingest data from specialized file formats without the need to build and maintain external file ingestion processes. This results in faster development lifecycles, reduced time spent managing various cloud provider services, lower costs, and more time spent adding business value.

For organizations looking to modernize data operations and migrate Excel data from legacy systems into Snowflake, Snowpark Python offers a useful solution. With the right partners and supporting tech, a seamless data migration will pave the way for enhanced data-driven decision-making.

The post Migrating from Legacy Systems to Snowflake: Simplifying Excel Data Migration with Snowpark Python appeared first on Tiger Analytics.

]]>
Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake https://www.tigeranalytics.com/perspectives/blog/building-efficient-near-real-time-data-pipelines-debezium-kafka-and-snowflake/ Thu, 26 Nov 2020 21:38:14 +0000 https://www.tigeranalytics.com/blog/building-efficient-near-real-time-data-pipelines-debezium-kafka-and-snowflake/ Learn how Debezium, Kafka, and Snowflake combine to advance near-real-time data pipelines. Gain insights into the process of efficient data syncing, processing, and storage, crucial for informed decision-making in real estate investment.

The post Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake appeared first on Tiger Analytics.

]]>
Institutional investors in real estate usually require several discussions to finalize their investment strategies and goals. They need to acquire properties on a large scale and at a fast pace. To facilitate this, the data pipeline must be refreshed in near-real-time with properties that have recently come onto the market.

With this business use case, we worked to get home listing data to the operational data store (ODS), PostgreSQL, and sync them to the cloud data warehouse, Snowflake.

We solve the first part of the challenge —collecting data about new property listings— by using a real estate data aggregator called Xome to exchange data and load them into the ODS.

Next, we feed the properties in the ODS in near-real-time (NRT) to the Snowflake data warehouse. An analytics engine filters and selects homes based on the buy-box criteria set by investors, enriched by supporting data such as nearby schools and their ratings; neighborhood crime levels; proximity to healthcare facilities and public transportation, etc. The analytics engine then ranks the properties based on the cap rate, discount, and yield.

The property ranks are sent back into the ODS, giving underwriters a prioritized list based on their ranking. Underwriters can adjudicate risks, calculate financials like the target offer price, renovation cost, and estimated returns, and store their results in the same ODS.

Here is how we built the NRT data pipeline from the Amazon Web Services (AWS) Postgres data source to Snowflake. Our solution:

  • Uses the database log as the seed for transferring data, as it is minimally invasive to production systems;
  • Employs Debezium, an open-source connector that listens for changes in log files and records them as consumable events;
  • Transfers events reliably using Kafka, the distributed messaging system;
  • Connects Kafka to Snowflake directly and writes to Snowflake using Snowpipe, stages, files, and tables; and
  • Schedules Snowflake tasks to merge the final data sets to the target table.

data pipeline architecture

Solution architecture demonstrating the high-level flow and relationship between components

Here, step by step is how to do it:

A. Configure PostgreSQL in AWS RDS

1. To capture DML changes that persist in the database, set the Write-Ahead-Log (WAL) level to logical

Create a new parameter group and set the value of rds.logical_replication to 1.

Modify the database instance to associate to this customized parameter group.

2. Log into PostgreSQL and check the WAL level

SHOW wal_level

It should be set to logical.

3. Create a replication slot to stream the sequence of changes from the database

The built-in logical decoding process extracts DML changes from the database log into a row format that is easy to understand.

SELECT * FROM pg_create_logical_replication_slot(, ‘wal2json’);

B. Configure the Debezium and Kafka cluster

We use Debezium and Kafka in a cluster as the event messaging system that reads data from the database logs and loads them into Snowflake.

To demonstrate this use case, we have selected the minimum hardware requirements to execute this data pipeline for a sample of records. To extend this to cluster size requirements for production data, please refer to the product documentation.

Node vs. Size table

1. Prepare the hardware

For connector nodes, we use memory-optimized machines; for Kafka brokers, CPU-optimised machines with high storage capacity.

Install Java and open-source Kafka on all the machines and set up a $KAFKA_HOME directory.

2. Set up the Kafka-PostgreSQL Connector

This node connects to PostgreSQL and decodes the database log using the Debezium connector, returning events in JSON format. The node requires several JAR files to be downloaded from the Maven repository.

There are four configuration steps involved:

Config 1

 cd $KAFKA_HOME
 mkdir source_jars
 cd source_jars
 wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.9.2/avro-1.9.2.jar
 wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar
 wget https://repo1.maven.org/maven2/io/debezium/debezium-core/0.9.5.Final/debezium-core-0.9.5.Final.jar
 wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/0.9.5.Final/debezium-connector-postgres-0.9.5.Final.jar
 wget https://repo1.maven.org/maven2/org/codehaus/jackson/jackson-core-asl/1.9.13/jackson-core-asl-1.9.13.jar
 wget https://www.java2s.com/Code/JarDownload/jackson-all/jackson-all-1.7.4.jar.zip
 wget https://www.java2s.com/Code/JarDownload/jdbc/jdbc-2.0-sources.jar.zip
 wget https://maven.repository.redhat.com/earlyaccess/all/io/confluent/kafka-avro-serializer/5.3.0/kafka-avro-serializer-5.3.0.jar
 wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.7/postgresql-42.2.7.jar

Config 2

In the $KAFKA_HOME/config directory, create a new file called postgres-kafka-connector.properties and establish a connection to Postgres to capture changed data.

There are multiple options to control the Debezium connector. Please consult the product documentation for more details.

For example:

 name=postgres-debezium-connector
 connector.class=io.debezium.connector.postgresql.PostgresConnector
 database.hostname= database.port=5432
 database.user=postgres
 database.password= ********
 database.dbname=postgres
 database.server.name= suppyTopic
 #This appears as prefix Kafka
 topicschema.whitelist = supply
 #Provide schema to sync data
 fromplugin.name=wal2jsonslot.name=
 #Provide Postgres replication slot name
 snapshot.fetch.size = 1000

Config 3

Set Classpath and execute

 export CLASSPATH=$KAFKA_HOME/source_jars/
 #Execute below from $KAFKA_HOME
 bin/connect-standalone.sh config/postgres-supply.properties config/postgres-kafka-connect-json.properties

3. Turn on Zookeeper

By default, Zookeeper runs on its localhost with Port ID 2181.

Start the Zookeeper process from $KAFKA_HOME.

 bin/zookeeper-server-start.sh config/zookeeper.properties

4. Set up the Kafka brokers

In contrast to related technologies like Apache Spark, Hadoop, etc., Kafka does not use a master/slave concept for its brokers: all the brokers transparently work out how to coordinate amongst themselves.

Within the $KAFKA_HOME/config directory, find the template file called server.properties and edit the two Kafka brokers as follows:

 Config 1: KAFKA broker 1, Port no and ZooKeeper address
 server1.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 1>:9093
 zookeeper.connect=<Private IP address of Zookeeper>:2181
 zookeeper.connection.timeout.ms=6000 Config 2: KAFKA broker 2, Port no and ZooKeeper address
 server2.propertieslisteners=PLAINTEXT:<Public IP address of this Kafka broker 2>:9094
 zookeeper.connect=<Private IP address of Zookeeper>:2181
 zookeeper.connection.timeout.ms=6000

Start the Kafka brokers by running the following commands from KAFKA_HOME.

 #Run this from Kafka broker 1
 bin/kafka-server-start.sh config/server1.properties
 
 #Run this from Kafka broker 2
 bin/kafka-server-start.sh config/server2.properties

With this setup, the two Kafka brokers will transfer data from Postgres in topics, with one topic for each source table.

C. Set up the Snowflake connector

This node reads data from each Kafka topic and writes them to Snowflake. Internally, it uses Snowflake stages and Snowpipe to sync the data to the Snowflake tables.

There are four configuration steps involved:

Config 1

Download all the dependent JARs, including the Snowflake-Kafka connector, from the Maven repository and save them under a new directory, $KAFKA_HOME/sink_jars.

 cd $KAFKA_HOME
 mkdir sink_jars
 wgethttps://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar
 wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar
 wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.1.0/snowflake-kafka-connector-1.1.0.jar
 wget https://packages.confluent.io/maven/io/confluent/common-utils/5.4.0/common-utils-5.4.0.jar

Config 2

In $KAFKA_HOME/config/connect-standalone.properties, provide the details of the Kafka broker and its port.

 Config 2: connect-standalone.properties
 #Provide Kafka server details under this propertybootstrap.servers=ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9093,
 ec2-X-YY-ZZZ-XXX.us-east-2.compute.amazonaws.com:9092

Config 3

In $KAFKA_HOME/config, create a new file called kafka-snowflake-connect-json.properties. In this file, we tag each Kafka topic to its corresponding table in Snowflake, like this:

 snowflake.private.key.passphrase=<Password>#to connect Snowflake#Database and schema configuration
 snowflake.database.name=<Target database in Snowflake>
 snowflake.schema.name=<Target Schema in Snowflake> #Data format configuration
 key.converter = org.apache.kafka.connect.storage.StringConverter
 value.converter = com.snowflake.kafka.connector.records.SnowflakeJsonConverter #Provide a map from Kafka topic to table in Snowflake
 #We have two tables here. 1. Supply 2. BuyboxtopicsBuyboxtopics=postgresRDS.supply.supply,postgresRDS.supply.buybox
 snowflake.topic2table.map=postgresRDS.supply.supply:dbz_supply,postgresRDS.supply.buybox:dbz_buybox

Config 4

Set Classpath and execute.

 export CLASSPATH=$KAFKA_HOME/sink_jars/
 
 #Execute below from $KAFKA_HOME
 
 bin/connect-standalone.sh config/connect-standalone.properties
 config/kafka-snowflake-connect-json.properties

With this setup, data from the Kafka topics get loaded to the Snowflake target tables.

For example, the SUPPLY table that contains the list of homes in PostgreSQL will look like this in Snowflake:

Postgres SQL table

The table has only two JSON columns:

  • Record_Metadata: JSON-formatted primary key column
  • Record_Content: JSON-formatted actual row values

Real-time refresh

We want additions, deletions, and changes to the original data to flow down to Snowflake in real-time, or near-real-time. To achieve this, and to track the changes from original to updated data set, we use the following payload code:

 "payload": {
 "after": {
 "actual_photos_count": null,
 "additional_home_details": null,
 "address1": "8146 Lesourdsville West Chester",
 "bathrooms": 1,
 "bedrooms": 2,
 "census_tract_id": "390170111262013",
 "city": "West Chester",
 "close_date": null,
 "close_price": null,
 "country": "US",
 "created_on_utc": 1581089444809073,
 "latitude": 39.3491554260254,
 "laundry_features": null,
 ******DELETED ROWS to have few columns
 },
 "before": null,
 "op": "r",
 "source": {
 "connector": "postgresql",
 "db": "postgres",
 "last_snapshot_record": false,
 "lsn": 309438972864,
 "name": "postgresRDS",
 "schema": "supply",
 "snapshot": true,
 "table": "supply",
 "ts_usec": 1582796538603000,
 "txId": 5834,
 "version": "0.9.5.Final",
 "xmin": null
 },
 "ts_ms": 1582796538603
 }

The payload data structure has four event types:

  • R: initial data extract
  • C: inserts
  • U: updates
  • D: deletes

It holds the actual data in JSON nodes before and after.

Debezium Postgres payload data and event types

D. Create views in Snowflake

Snowflake natively supports JSON structured data. We can parse and normalize data from the table into columns simply by using database views in Snowflake.

Create a view to parse inserts, updates, and snapshots

 CREATE OR REPLACE VIEW DBZ_SUPPLY_I_U_VIEW as
 SELECT --Get contents from After JSON node for snapshot, inserts and updates ID is the primary keyrecord_content:"payload"."after"."id"::FLOAT as id
 ,record_content:"payload"."after"."actual_photos_count"::FLOAT as actual_photos_count
 ,record_content:"payload"."after"."additional_home_details"::STRING as additional_home_details
 ,record_content:"payload"."after"."address1"::STRING as address1
 ,record_content:"payload"."after"."bathrooms"::VARIANT as bathrooms
 ,record_content:"payload"."after"."bedrooms"::VARIANT as bedrooms
 ,record_content:"payload"."after"."census_tract_id"::STRING as census_tract_id
 ,record_content:"payload"."after"."city"::STRING as city
 ,record_content:"payload"."after"."close_date"::STRING::DATE as close_date
 ,record_content:"payload"."after"."close_price"::VARIANT as close_price
 ,record_content:"payload"."after"."country"::STRING as country
 ,record_content:"payload"."after"."created_on_utc"::STRING::TIMESTAMP_NTZ as created_on_utc
 ,record_content:"payload"."after"."latitude"::STRING as latitude
 ,record_content:"payload"."after"."longitude"::STRING as longitude
 ,record_content:"payload"."after"."laundry_features"::STRING as laundry_features--Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction,REGEXP_REPLACE(record_content:"payload"."op", '') as dml_operator ,
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."ts_ms", '')) as debezium_processed_ts
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."source"."ts_usec", '')) as source_processed_ts
 , REGEXP_REPLACE(record_content:"payload"."source"."name", '') as source_server
 , REGEXP_REPLACE(record_content:"payload"."source"."db", '') as source_db
 , REGEXP_REPLACE(record_content:"payload"."source"."table", '') as source_table
 , REGEXP_REPLACE(record_content:"payload"."source"."schema", '') as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
 WHERE lower(DML_OPERATOR) in ('r','c','u');

Create a view to parse deletes

 CREATE OR REPLACE VIEW DBZ_SUPPLY_D_VIEW as
 SELECT --Get contents from before JSON node for snapshot, inserts and updates. ID is the primary keyrecord_content:"payload"."before"."id"::FLOAT as id
 ,record_content:"payload"."before"."actual_photos_count"::FLOAT as actual_photos_count
 ,record_content:"payload"."before"."additional_home_details"::STRING as additional_home_details
 ,record_content:"payload"."before"."address1"::STRING as address1
 ,record_content:"payload"."before"."bathrooms"::VARIANT as bathrooms
 ,record_content:"payload"."before"."bedrooms"::VARIANT as bedrooms
 ,record_content:"payload"."before"."census_tract_id"::STRING as census_tract_id
 ,record_content:"payload"."before"."city"::STRING as city
 ,record_content:"payload"."before"."close_date"::STRING::DATE as close_date
 ,record_content:"payload"."before"."close_price"::VARIANT as close_price
 ,record_content:"payload"."before"."country"::STRING as country
 ,record_content:"payload"."before"."created_on_utc"::STRING::TIMESTAMP_NTZ as created_on_utc
 ,record_content:"payload"."before"."latitude"::STRING as latitude
 ,record_content:"payload"."before"."longitude"::STRING as longitude
 ,record_content:"payload"."before"."laundry_features"::STRING as laundry_features--Get additional fields, about timestamp when debezium captured data, when postgres applied that transaction
 ,REGEXP_REPLACE(record_content:"payload"."op", '') as dml_operator, TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."ts_ms", '')) as debezium_processed_ts
 , TO_TIMESTAMP_NTZ ( REGEXP_REPLACE(record_content:"payload"."source"."ts_usec", '')) as source_processed_ts
 , REGEXP_REPLACE(record_content:"payload"."source"."name", '') as source_server
 , REGEXP_REPLACE(record_content:"payload"."source"."db", '') as source_db
 , REGEXP_REPLACE(record_content:"payload"."source"."table", '') as source_table
 , REGEXP_REPLACE(record_content:"payload"."source"."schema", '') as source_schemaFROM <Database>.<Schema>.DBZ_SUPPLY
 WHERE lower(DML_OPERATOR) in ('d');

You can automate the creation of these views using the Information Schema tables in Snowflake. To create stored procedures to automatically create these views for all tables involved in the data pipeline, please refer to the product documentation.

E. Merge the data into the target table

Using the two views, DBZ_SUPPLY_I_U_VIEW and DBZ_SUPPLY_D_VIEW, as the source, you can merge data to the final target table, SUPPLY, using the SQL merge command.

To automate this using Snowflake Tasks:

 CREATE TASK SUPPLY_MERGE
 WAREHOUSE <WAREHOUSE_NAME>
 SCHEDULE 5 MINUTE
 ASmerge into <Database>.<Tgt_Schema>.DBZ_SUPPLY as tgt
 using <Database>.<Src_Schema>.DBZ_SUPPLY_I_U_VIEW as src
 on tgt.id =src.id --Deletes
 when matched AND src.dml_operator='d' THEN DELETE --Updates
 when matched AND src.dml_operator='u' then
 update set tgt.ACTUAL_PHOTOS_COUNT =src.ACTUAL_PHOTOS_COUNT
 ,tgt.ADDRESS1 =src.ADDRESS1
 ,tgt.BATHROOMS =src.BATHROOMS
 ,tgt.BEDROOMS =src.BEDROOMS
 ,tgt.CENSUS_TRACT_ID =src.CENSUS_TRACT_ID
 ,tgt.CITY =src.CITY
 ,tgt.CLOSE_DATE =src.CLOSE_DATE
 ,tgt.CLOSE_PRICE =src.CLOSE_PRICE
 ,tgt.COUNTRY =src.COUNTRY
 ,tgt.CREATED_ON_UTC =src.CREATED_ON_UTC
 ,tgt.LATITUDE =src.LATITUDE
 ,tgt.LONGITUDE =src.LONGITUDE
 ,tgt.LAUNDRY_FEATURES =src.LAUNDRY_FEATURES--Inserts
 when not matched and src.dml_operator in ('c','r') then
 insert (ID, ACTUAL_PHOTOS_COUNT ,ADDRESS1 ,BATHROOMS ,BEDROOMS ,
 CENSUS_TRACT_ID ,CITY ,CLOSE_DATE ,CLOSE_PRICE ,COUNTRY ,
 CREATED_ON_UTC ,LATITUDE , LONGITUDE,LAUNDRY_FEATURES )
 values (src.ID ,src.ACTUAL_PHOTOS_COUNT ,src.ADDRESS1 ,
 src.BATHROOMS ,src.BEDROOMS ,src.CENSUS_TRACT_ID ,src.CITY ,
 src.CLOSE_DATE ,src.CLOSE_PRICE ,src.COUNTRY ,src.CREATED_ON_UTC ,
 src.LATITUDE ,src.LAUNDRY_FEATURES ,src.LONGITUDE )

This task is configured to execute every five minutes.

You can monitor it using the task history:

 select *
 from table(information_schema.task_history())
 order by scheduled_time;

The NRT data pipeline is complete!

F. Things to keep in mind3

When attempting to set up an NRT to respond to your own use case, here are a few caveats:

– All sources tables must contain the primary key to propagate DML changes. If you create tables without a primary key, be sure to request the source database administrator or application team to set one up for you using the data elements in that table.

If you are still unable to include a primary key, write a separate data pipeline to perform a full data load for your tables.

– PostgreSQL wal2Json database logs don’t track DDL changes (for example, new column additions and deletions).

However, the payload data available in JSON will contain values for recently added columns. To identify DDL changes within a given timeframe, you will need to code a separate process to use database metadata tables and/or query history to scan and capture DDL changes.

These events must be pre-processed before merging the usual DML changes on to the Snowflake data warehouse.

G. Success!

This CDC-based solution reduced the waiting time for new listings posted from a daily batch window to under 30 minutes, after which the analytics engine ranked the listings and pushed them to the queue using the investors’ criteria.

Underwriters could review the listings, estimate values, and successfully meet their target of completing 1,000 single-family home acquisitions for a large investor in a very short time.

Setting up the NRT data pipeline involves configuring multiple systems to talk to each other. If set up correctly, these components will work well together to handle this and many other use cases.

Gathering and compiling data from multiple sources and making them usable in a short time is often the greatest challenge to be overcome to get value from business analytics. Write to info@tigeranalytics.com so that we can help.

The post Building Efficient Near-Real Time Data Pipelines: Debezium, Kafka, and Snowflake appeared first on Tiger Analytics.

]]>
Spark-Snowflake Connector: In-Depth Analysis of Internal Mechanisms https://www.tigeranalytics.com/perspectives/blog/spark-snowflake-connector-in-depth-analysis-of-internal-mechanisms/ Thu, 25 Jun 2020 10:50:56 +0000 https://www.tigeranalytics.com/blog/spark-snowflake-connector-in-depth-analysis-of-internal-mechanisms/ Examine the internal workings of the Spark-Snowflake Connector with a clear breakdown of how the connector integrates Apache Spark with Snowflake for enhanced data processing capabilities. Gain insights into its architecture, key components, and techniques for seamlessly optimizing performance during large-scale data operations.

The post Spark-Snowflake Connector: In-Depth Analysis of Internal Mechanisms appeared first on Tiger Analytics.

]]>
Introduction

The Snowflake Connector for Spark enables the use of Snowflake as a Spark data source – similar to other data sources like PostgreSQL, HDFS, S3, etc. Though most data engineers use Snowflake, what happens internally is a mystery to many. But only if one understands the underlying architecture and its functioning, can they figure out how to fine-tune their performance and troubleshoot issues that might arise. This blog thus aims to explaining in detail the internal architecture and functioning of the Snowflake Connector.

Before getting into the details, let us understand what happens when one does not use the Spark-Snowflake Connector.

Data Loading to Snowflake without Spark- Snowflake Connector

Create Staging Area -> Load local files -> Staging area in cloud -> Create file format -> Load to Snowflake from staging area using the respective file format

Loading Data from Local to Snowflake

Data Loading using Spark-Snowflake Connector

When we use the Spark Snowflake connector to load the data into Snowflake, it does a lot of things that are abstract to us. The connector takes care of all the heavy lifting tasks.

Spark Snowflake Connector

Spark Snowflake Connector (Source:https://docs.snowflake.net/manuals/user-guide/spark-connector-overview.html#interaction-between-snowflake-and-spark)

This blog illustrates one such example where the Spark-Snowflake Connector is used to read and write data in Databricks. Databricks has integrated the Snowflake Connector for Spark into the Databricks Unified Analytics Platform to provide native connectivity between Spark and Snowflake.

The Snowflake Spark Connector supports Internal (temp location managed by Snowflake automatically) and External (temp location for data transfer managed by user) transfer modes. Here is a brief description of the two modes of transfer-

Internal Data Transfer

This type of data transfer is facilitated through a Snowflake internal stage that is automatically created and managed by the Snowflake Connector.

External Data Transfer

External data transfer, on the other hand, is facilitated through a storage location that the user specifies. The storage location must be created and configured as part of the Spark connector installation/configuration.

Further, the files created by the connector during external transfer are intended to be temporary, but the connector does not automatically delete these files from the storage location. This type of data transfer is facilitated through a Snowflake internal stage that is automatically created and managed by the Snowflake Connector.

Use Cases

Below are the use cases we are going to run on Spark and see how the Spark Snowflake connector works internally-

1. Initial Loading from Spark to Snowflake

2. Loading the same Snowflake table in Overwrite mode

3. Loading the same Snowflake table in Append mode

4. Read the Snowflake table

Snowflake Connection Parameters

Snowflake Connection Parameters

1. Initial Loading from Spark to Snowflake

When a new table is loaded for the very first time from Spark to Snowflake, the following command will be running on Spark. This command, in turn, starts to execute a set of SQL queries in Snowflake using the connector.

spark snowflake overwrite mode

The single Spark command above triggers the following 9 SQL queries in Snowflake

Snowflake Initial Load Query History

Snowflake Initial Load Query History

i) Spark, by default, uses the local time zone. This can be changed by using the sfTimezone option in the connector

ii) The below query creates a temporary internal stage in Snowflake. We can use other cloud providers that we can configure in Spark.

iii) The GET command downloads data files from one of the following Snowflake stages to a local directory/folder on a client machine. We have metadata checks at this stage.

iv) The PUT command uploads (i.e., stages) data files from a local directory/folder onto a client machine to one of the Snowflake strategies

v) The DESC command failed as the table did not exist previously, but this is now taken care of by the Snowflake connector internally. It won’t throw any error in the Spark Job

vi) The IDENTIFIER keyword is used to identify objects by name, using string literals, session variables, or bind variables.

vii) The command below loads data into Snowflake’s temporary table to maintain consistency. By doing so, Spark follows Write All or Write Nothing architecture.

viii) The DESC command below failed as the table did not exist previously, but this is now taken care of by the Snowflake connector internally. It didn’t stop the process. The metadata check at this point is to see whether to use RENAME TO or SWAP WITH

ix) The RENAME TO command renames the specified table with a new identifier that is not currently used by any other table in the schema. If the table is already present, we have to use SWAP WITH and then drop the identifier

2. Loading the same Snowflake table in Overwrite mode

The above Spark command triggers the following 10 SQL queries in Snowflake. This time there is no failure when we ran the overwrite command a second time because this time, the table already exists.

i) We have metadata checks at the internal stages

ii) The SWAP WITH command swaps all content and metadata between two specified tables, including any integrity constraints defined for the tables. It also swaps all access control privilege grants. The two tables are essentially renamed in a single transaction.

The RENAME TO command is used when the table is not present because it is faster than renaming and dropping the intermediate table. But this can only be used when the table does not exist in Snowflake. This means that RENAME TO is only performed during the Initial Load.

iii) The DROP command drops the intermediate staging table

3. Loading the same Spark table in Append mode

The above Spark command triggers the following 7 SQL queries in Snowflake.

Note: When we use OVERWRITE mode, the data is copied into the intermediate staged table, but during APPEND, the data is loaded into the actual table in Snowflake.

i) In order to maintain the ACID compliance, this mode uses all the transactions inside the BEGIN and COMMIT. If anything goes wrong, it uses ROLLBACK so that the previous state of the table is untouched.

4. Reading the Snowflake Table

The above Spark command triggers the following SQL query in Snowflake. The reason for this is that Spark follows the Lazy Execution pattern. So until an action is performed, it will not read the actual data. Spark internally maintains lineage to process it effectively. The following query is to check whether the table is present or not and to retrieve only the schema of the table.

The Spark action below triggers 5 SQL queries in Snowflake

i) First, it creates a temporary internal stage to load the read data from Snowflake.

ii) Next, it downloads data files from the Snowflake internal stage to a local directory/folder on a client machine.

iii) The default timestamp data type mapping is TIMESTAMP_NTZ (no time zone), so you must explicitly set the TIMESTAMP_TYPE_MAPPING parameter to use TIMESTAMP_LTZ.

iv) The data is then copied from Snowflake to the internal stage.

v) Finally, it downloads data files from the Snowflake internal stage to a local directory/folder on a client machine.

Wrapping Up

Spark Snowflake connector comes with lots of benefits like query pushdown, column mapping, etc. This acts as an abstract layer and does a lot of groundwork in the back end.

Happy Learning!!

The post Spark-Snowflake Connector: In-Depth Analysis of Internal Mechanisms appeared first on Tiger Analytics.

]]>