Week 7 - Data Quality

Introduction to Data Quality

Data quality refers to the condition of a dataset, determined by factors such as accuracy, completeness, consistency, reliability, and relevance. High-quality data is essential for effective decision-making, operational efficiency, and compliance with regulations. Poor data quality can lead to incorrect analyses, misguided decisions, and operational inefficiencies.

Key Dimensions of Data Quality

  • Accuracy: Data accurately reflects the real-world entities or events it represents.
  • Completeness: All necessary data is present without omissions or missing values.
  • Consistency: Data is consistent across different datasets or systems, without conflicting information.
  • Timeliness: Data is up-to-date and available when needed, ensuring that decisions are based on the most current information.
  • Validity: Data adheres to the defined formats, rules, and constraints, ensuring that it fits within the expected range of values.
  • Integrity: Data is correctly structured, and relationships between entities are maintained, ensuring that data dependencies and references are accurate.

Data Quality Practices

Data Validation

Data validation involves checking data against predefined rules or criteria before it is processed or stored. This ensures that only valid data enters the system.

Example: Python Code for Data Validation

import re

def validate_email(email):
    # Regex pattern for validating an email
    pattern = r"[^@]+@[^@]+\.[^@]+"
    return re.match(pattern, email) is not None

def validate_transaction(transaction):
    if not transaction.get("transaction_id"):
        raise ValueError("Missing transaction_id")
    if not transaction.get("amount") or transaction["amount"] <= 0:
        raise ValueError("Invalid amount")
    if not validate_email(transaction.get("customer_email")):
        raise ValueError("Invalid customer_email")
    return True

# Sample transaction data
transaction_data = {
    "transaction_id": "12345",
    "amount": 100,
    "customer_email": "customer@example.com"
}

# Validate the transaction data
try:
    is_valid = validate_transaction(transaction_data)
    print("Transaction is valid:", is_valid)
except ValueError as e:
    print("Validation error:", e)

Data Cleaning

Data cleaning involves detecting and correcting errors or inconsistencies in data to improve its quality.

Example: Python Code for Data Cleaning

import pandas as pd

# Sample data with missing and inconsistent values
data = {
    "transaction_id": ["12345", "12346", None, "12348"],
    "amount": [100, -50, 200, None],
    "customer_email": ["customer@example.com", "invalid_email", "another@example.com", None]
}

df = pd.DataFrame(data)

# Drop rows with missing transaction_id
df.dropna(subset=["transaction_id"], inplace=True)

# Replace negative amounts with NaN
df["amount"] = df["amount"].apply(lambda x: x if x >= 0 else None)

# Fill missing values in the 'amount' column with the median
df["amount"].fillna(df["amount"].median(), inplace=True)

# Apply the email validation function to clean the 'customer_email' column
df['customer_email'] = df['customer_email'].apply(lambda x: x if isinstance(x, str) and validate_email(x) else None)

print(df)

Data Quality Monitoring

Continuous monitoring of data quality is crucial to ensure that data remains reliable and accurate over time. This can involve automated checks and alerts when data quality issues arise.

Example: Monitoring Data Completeness

# %% Cell 1
def monitor_completeness(df, column_name):
    total_records = len(df)
    missing_records = df[column_name].isnull().sum()
    completeness_percentage = ((total_records - missing_records) / total_records) * 100
    return completeness_percentage

# Create a sample DataFrame
df = pd.DataFrame({
    "customer_email": ["a@example.com", None, "c@example.com", "d@example.com", None]
})

completeness = monitor_completeness(df, "customer_email")
print(f"Completeness of 'customer_email': {completeness}%")

Schema Design for Data Quality

Design a schema for storing data quality checks, including quality metrics, failed records, and audit IDs, you can create a set of tables that will help you manage and verify data quality over time. Below is a schema design that includes tables for quality checks, metrics, failed records, and audit logs.

erDiagram
    QualityChecks {
        INT check_id PK "Primary Key"
        VARCHAR check_name
        TEXT check_description
        TIMESTAMP created_at
        TIMESTAMP updated_at
    }

    QualityMetrics {
        INT metric_id PK "Primary Key"
        INT check_id FK "Foreign Key"
        VARCHAR metric_name
        FLOAT metric_value
        TIMESTAMP created_at
    }

    FailedRecords {
        INT failed_record_id PK "Primary Key"
        INT check_id FK "Foreign Key"
        VARCHAR record_id
        TEXT failure_reason
        TIMESTAMP failed_at
    }

    AuditLog {
        INT audit_id PK "Primary Key"
        INT check_id FK "Foreign Key"
        TIMESTAMP executed_at
        VARCHAR status
        TEXT details
    }

    QualityChecks ||--o| QualityMetrics : has
    QualityChecks ||--o| FailedRecords : has
    QualityChecks ||--o| AuditLog : has

Figure: Schema Design for Data Quality

This diagram is the least minimum attributes for Quality Audit:

  • QualityChecks Table: Stores metadata about each quality check.
  • QualityMetrics Table: Stores the results of the quality metrics for each check.
  • FailedRecords Table: Logs records that did not pass the quality checks.
  • AuditLog Table: Tracks audit records for quality checks, including execution status and details.

This schema design will help you efficiently manage and verify data quality across various checks, storing relevant metrics, failed records, and audit information.

Visualizing Data Quality

Visualizing data quality can help in identifying patterns, trends, and anomalies. A common approach is to create dashboards or charts that track data quality metrics over time.

Data Quality Monitoring Flowchart

flowchart TD
%% Add a transparent text node as a watermark
style Watermark fill:none,stroke:none
Watermark[Created by: LongBui]
    A[Start] --> B[Data Ingestion]
    B --> C{Validation Rules}
    C -->|Valid| D[Store Data]
    C -->|Invalid| E[Log Error]
    E --> F[Notify Team]
    F --> B
    D --> G[Monitor Quality Metrics]
    G --> H{Issue Detected?}
    H -->|Yes| I[Alert Team]
    H -->|No| J[Continue Processing]

Figure: Data Quality Workflow

An example dashboard for monitoring data quality might include the following charts:

  • Completeness: Percentage of non-null values across key columns.
  • Accuracy: Number of records that pass validation rules.
  • Consistency: Number of data conflicts across different datasets.
  • Timeliness: Data freshness, e.g., how recent the data is.

Implementing robust data quality practices is essential for any data-driven organization. By focusing on key dimensions such as accuracy, completeness, and consistency, and by using tools and techniques for validation, cleaning, and monitoring, we can ensure that the data we rely on is of the highest quality.

Metrics and Rules Definition

Defining clear metrics and rules is essential for evaluating and ensuring data quality. Metrics help quantify the quality of data, while rules provide the criteria against which data is validated.

Example: Metrics and Rules Definition

  1. Completeness Metric:

    • Metric Definition: Percentage of non-null values in a critical column.
    • Rule: A column must have at least 98% non-null values for the data to be considered complete.
    • Calculation:
    SELECT
        (COUNT(*) - COUNT(NULLIF(column_name, NULL))) / COUNT(*) * 100 AS completeness_percentage
    FROM
        table_name;
    
  2. Accuracy Metric:

    • Metric Definition: Percentage of data that matches reference or expected values.
    • Rule: At least 95% of the records in the dataset must match the expected values or reference data.
    • Calculation:
    SELECT
        COUNT(*) AS total_records,
        SUM(CASE WHEN column_name = expected_value THEN 1 ELSE 0 END) / COUNT(*) * 100 AS accuracy_percentage
    FROM
        table_name;
    
  3. Timeliness Metric:

    • Metric Definition: The average time delay between data collection and availability.
    • Rule: Data should be available within 24 hours of its collection.
    • Calculation:
    SELECT
        AVG(TIMESTAMPDIFF(HOUR, collection_time, available_time)) AS avg_timeliness
    FROM
        table_name;
    

Practice with:

  • Define data quality metrics based on business requirements and data usage.
  • Establish thresholds for these metrics that align with acceptable quality standards.
  • Regularly monitor these metrics to ensure ongoing data quality.

Data Quality with Snowflake

Snowflake provides robust features to implement data quality checks directly within your data pipelines. By leveraging SQL, Snowflake’s capabilities, and stored procedures, you can automate data quality validation processes.

Example: Implementing Data Quality Checks in Snowflake

  1. Create a Data Quality Table:

    • Store the results of data quality checks.
    CREATE TABLE data_quality_results (
        check_name STRING,
        check_date TIMESTAMP,
        check_status STRING,
        failed_records INT,
        total_records INT,
        error_message STRING
    );
    
  2. Completeness Check:

    • Ensure no critical columns contain NULL values.
    INSERT INTO data_quality_results
    SELECT
        'Completeness Check' AS check_name,
        CURRENT_TIMESTAMP() AS check_date,
        CASE WHEN COUNT(*) - COUNT(NULLIF(column_name, NULL)) = COUNT(*) THEN 'PASS' ELSE 'FAIL' END AS check_status,
        COUNT(NULLIF(column_name, NULL)) AS failed_records,
        COUNT(*) AS total_records,
        NULL AS error_message
    FROM
        table_name;
    
  3. Accuracy Check:

    • Verify that data matches reference values.
    INSERT INTO data_quality_results
    SELECT
        'Accuracy Check' AS check_name,
        CURRENT_TIMESTAMP() AS check_date,
        CASE WHEN COUNT(*) = COUNT(CASE WHEN column_name = expected_value THEN 1 END) THEN 'PASS' ELSE 'FAIL' END AS check_status,
        COUNT(*) - COUNT(CASE WHEN column_name = expected_value THEN 1 END) AS failed_records,
        COUNT(*) AS total_records,
        NULL AS error_message
    FROM
        table_name;
    
  4. Automate the Data Quality Checks:

    • Create a Snowflake stored procedure to automate the execution of these checks.
    CREATE OR REPLACE PROCEDURE run_data_quality_checks()
    RETURNS STRING
    LANGUAGE SQL
    AS
    $$
    BEGIN
        -- Completeness Check
        INSERT INTO data_quality_results ...
        -- Accuracy Check
        INSERT INTO data_quality_results ...
        RETURN 'Data Quality Checks Completed';
    END;
    $$;
    

Practice with:

  • Implement regular data quality checks within Snowflake using SQL and stored procedures.
  • Store the results in a dedicated table for easy tracking and monitoring.
  • Automate the execution of these checks to ensure continuous data quality.

Data Quality with dbt

dbt (Data Build Tool) allows you to define and enforce data quality rules within your transformation workflows. By creating tests and using dbt’s built-in capabilities, you can ensure that your transformed data meets the defined quality standards.

dbtTestingCloud

Example: Data Quality Tests with dbt

  1. Define a Test for Null Values:

    • Create a custom test to check for NULL values in critical columns.
    tests:
      - name: no_nulls
        columns:
          - name: column_name
            tests:
              - not_null
    
  2. Check for Data Consistency:

    • Ensure that a column contains only valid predefined values.
    tests:
      - name: valid_values
        columns:
          - name: status
            tests:
              - accepted_values:
                  values: ['pending', 'completed', 'failed']
    
  3. Run the Tests:

    • Execute the dbt tests as part of your transformation pipeline.
    dbt test
    
  4. Create a Custom Data Quality Model:

    • Build a custom model to validate and log data quality checks within your dbt project.
    -- models/data_quality_checks.sql
    WITH null_check AS (
        SELECT COUNT(*) AS null_count
        FROM {{ ref('your_table') }}
        WHERE column_name IS NULL
    )
    SELECT
        'Null Check' AS check_name,
        CURRENT_TIMESTAMP() AS check_date,
        CASE WHEN null_count = 0 THEN 'PASS' ELSE 'FAIL' END AS check_status
    FROM null_check;
    
  5. Schedule Regular Tests:

    • Use dbt’s scheduling capabilities (e.g., with Airflow) to run tests regularly.

Data Quality with Deequ

Installing deequ into Spark Cluster and init Context

import os
os.environ["SPARK_VERSION"] = "3.3.1"

from pyspark.sql import SparkSession, Row, DataFrame
import json
import pandas as pd
import pydeequ

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("spark.driver.memory", "8g")
    .config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
    .getOrCreate())
  1. Using Analyzer in Deequ

For Data Profiling


analysisResult = AnalysisRunner(spark) \
                    .onData(df) \
                    .addAnalyzer(Size()) \
                    .addAnalyzer(Completeness("Zone")) \
                    .addAnalyzer(Distinctness("Zone")) \
                    .addAnalyzer(Distinctness("service_zone")) \
                    .addAnalyzer(Completeness("service_zone")) \
                    .addAnalyzer(Compliance("LocationID", "LocationID >= 0.0")) \
                    .run()

analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
+-------+------------+------------+------------------+
| entity|    instance|        name|             value|
+-------+------------+------------+------------------+
| Column|        Zone|Completeness|               1.0|
| Column|service_zone|Completeness|0.8333333333333334|
| Column|service_zone|Distinctness|               0.6|
| Column|  LocationID|  Compliance|               1.0|
|Dataset|           *|        Size|              12.0|
| Column|        Zone|Distinctness|               1.0|
+-------+------------+------------+------------------+
  1. Using Verification

For generating data quality Check

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.hasSize(lambda x: x >= 3000) \
        .isContainedIn("service_zone", ["Zone1", "Zone2", "Zone3"]) \
        .isUnique("Zone") \
        .isComplete("service_zone") \
        .isComplete("Zone")).run()

DeeQu-Result

  1. Visualizing DQ Chart

DQ-PieChart1

  1. Using Auto Suggestion
from pydeequ.suggestions import *

suggestionResult = ConstraintSuggestionRunner(spark) \
             .onData(df) \
             .addConstraintRule(DEFAULT()) \
             .run()
  "constraint_suggestions": [
    {
      "constraint_name": "CompletenessConstraint(Completeness(LocationID,None,None))",
      "column_name": "LocationID",
      "current_value": "Completeness: 1.0",
      "description": "'LocationID' is not null",
      "suggesting_rule": "CompleteIfCompleteRule()",
      "rule_description": "If a column is complete in the sample, we suggest a NOT NULL constraint",
      "code_for_constraint": ".isComplete(\"LocationID\")"
    },
    {
      "constraint_name": "ComplianceConstraint(Compliance('LocationID' has no negative values,LocationID >= 0,None,List(LocationID),None))",
      "column_name": "LocationID",
      "current_value": "Minimum: 1.0",
      "description": "'LocationID' has no negative values",
      "suggesting_rule": "NonNegativeNumbersRule()",
      "rule_description": "If we see only non-negative numbers in a column, we suggest a corresponding constraint",
      "code_for_constraint": ".isNonNegative(\"LocationID\")"
    }
  ]

Summary

These sections provide practical guidance on how to define, implement, and monitor data quality metrics using Snowflake and dbt, ensuring that your data meets the required standards throughout the data lifecycle.