Week 5 - Batch Processing

Introduction

To Long; Don’t Read - TL;DR

What this video here

Batch Data is a collection of data, in terms of data processing - the batch that means processing the large dataset at the moment. It can take several minutes or even a few hours to process the data.

The most popular batch processing framework is Apache Spark, which is included in every where such as On-premises or Cloud, SaaS applications.

Check the Introduction to Spark (Official), this document is being updated but good enough to make deep dicing work to understand.

The goal of the Spark

To keep the benefits of MapReduce’s scalable, distributed, fault-tolerant processing framework, while making it more efficient and easier to use. The advantages of Spark over MapReduce are:

  • Much faster by caching data on memory, parallel processing
  • Run multi-threaded tasks on JVM processes
  • Provide richer functional programming models
  • Community

Installation

spark-shell
24/06/19 11:10:14 WARN Utils: Your hostname, Longs-Macbook-Air-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
24/06/19 11:10:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/19 11:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1718770218283).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_352)
Type in expressions to have them evaluated.
Type :help for more information.

The guideline of installation are available on the internet and also GPT, Gemeni.

And getting started with Quick Start

Application Components in Spark

First, we need to understand joins in Spark

  • Application: contains a main function
  • Job: when Action performs on RDD, a “job” is created and submitted into Spark
  • Stage: a job be divided into “stage” based on the shuffle boundary
  • Task: a stage be further divided into “tasks” based on number of partition in the RDD. So then Tasks are the smallest unit of work for Spark

Spark SQL and DataFrames

It looks like a table, because at the end; they stored and viewed with table equally.

Spark Dataframes

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brooklyn|           Bay Ridge|   Boro Zone|
|        15|       Queens|Bay Terrace/Fort ...|   Boro Zone|
|        16|       Queens|             Bayside|   Boro Zone|
|        17|     Brooklyn|             Bedford|   Boro Zone|
|        18|        Bronx|        Bedford Park|   Boro Zone|
|        19|       Queens|           Bellerose|   Boro Zone|
|        20|        Bronx|             Belmont|   Boro Zone|
+----------+-------------+--------------------+------------+

(Optional) Preparing Yellow and Green Taxi Data

Script to prepare the Dataset

    set -e

    TAXI_TYPE=yellow
    YEAR=2024
    # Notes: The Cloudfront might be changed, check the backend of official page to get the new one.
    URL_PREFIX="https://d37ci6vzurychx.cloudfront.net/trip-data/"
    FILE_EXTENSION=".parquet" # Example: yellow_tripdata_2024-01.parquet

    for MONTH in {01..05}; do
    echo "Downloading data for $MONTH"
    FILENAME="${TAXI_TYPE}_tripdata_${YEAR}-$(printf "%02d" $MONTH)${FILE_EXTENSION}"
    URL="${URL_PREFIX}${FILENAME}"
    echo "Downloading $URL"
    wget "$URL" -O "$FILENAME"
    echo "Downloaded $FILENAME"
    done

Read data with Spark

You can find the script everywhere, here is code example of Read data using Spark, import the script as Notebook format

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('dec') \
    .config("spark.executorEnv.PYSPARK_PYTHON", "/opt/homebrew/lib/python3.10/") \
    .config("spark.executorEnv.PYSPARK_DRIVER_PYTHON", "/opt/homebrew/lib/python3.10/site-packages/pyspark/") \
    .getOrCreate()

df_green = spark.read.parquet('data/stg/green/*/*')

print("Row Counts: ", df_green.count())

rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

subset_rdd = rdd.filter(lambda row: int(row['PULocationID']) != 74)
subset_rdd.collect()

print(">>>>>>>> Data: ", subset_rdd.take(10))

Note: The other way to infer the schema (apart from pandas) for the csv files, is to set the inferSchema option to true while reading the files in Spark.

If you don’t know what inferSchema is, check below

Explanation of inferSchema in Spark

When reading CSV files in Spark, you have the option to infer the schema of the data automatically. This means that Spark will analyze the data and determine the data types of each column based on the values it finds.

To implement inferSchema in Spark, you need to set the inferSchema option to true while reading the files. Here’s an example:

df = spark.read.format("csv").option("inferSchema", "true").load("path/to/csv/file.csv")

By enabling inferSchema

  • Spark will automatically assign the appropriate data types to each column in the DataFrame.
  • This can be useful when you don’t have a predefined schema or when dealing with large datasets where manually specifying the schema is not feasible.
  • Save time and effort by letting Spark automatically determine the schema of your data.

But here is disclaimer

  • It’s important to note that inferSchema may not always produce the desired results
  • Especially if the data is inconsistent or contains missing values.
  • In such cases, it’s recommended to define the schema manually using the StructType class.

Is there any reason that we should not have schema defined

  • Yes, we don’t want to have schema defined for streaming data because time constraints to force the schema (but we use the schema register)

Spark Internals

Historical, Why Spark existing?

graph TD
%% Add a transparent text node as a watermark
style Watermark fill:none,stroke:none
Watermark[Created by: LongBui];
    subgraph "Node 1"
        A[Mapper 1]
        B[Reducer 1]
    end

    subgraph "Node 2"
        C[Mapper 2]
        D[Reducer 2]
    end

    subgraph "Node 3"
        E[Mapper 3]
        F[Reducer 3]
    end

    A -->|Map Output| D
    C -->|Map Output| B
    E -->|Map Output| F

    B -->|Reduce Input| F
    D -->|Reduce Input| F

    style A fill:#f9f,stroke:#333,stroke-width:2px;
    style B fill:#f9f,stroke:#333,stroke-width:2px;
    style C fill:#f9f,stroke:#333,stroke-width:2px;
    style D fill:#f9f,stroke:#333,stroke-width:2px;
    style E fill:#f9f,stroke:#333,stroke-width:2px;
    style F fill:#f9f,stroke:#333,stroke-width:2px;

Figure: Map-Reduce Overview

From image, there are 3 core concepts:

  • Distribute data: when a data file is uploaded into the cluster, it is split into chunks, called data blocks, and distributed amongst the data nodes and replicated across the cluster
  • Distribute computation: a map function that processes a key/value pair to generate a set of intermediate key/value pairs and a reduce function that merges all intermediate values associated with the same intermediate key. Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines in the following way:
    • Mappings process data on each node
    • Result from mapping sort/shuffle by number of reducers, and sent across reducers and written SequenceFile.
    • Reducers process data on each node and write result to output file.
  • Tolerate faults: both data and computation can tolerate failures by failing over to another node for data or processing.

Example “Code of Map Reduce”

def map_function(row):
    return (row.PULocationID, row.total_amount)

mapped_rdd = subset_rdd.map(map_function)
================================================================
def reduce_function(amount1, amount2):
    return amount1 + amount2

reduced_rdd = mapped_rdd.reduceByKey(reduce_function)

Wide Transformation and Narrow Transformation

Advanced

GroupBy in Spark

Advanced

Joins in Spark

  • Basically, Spark automatically helps us to optimize the execution of tasks, stages. This means that we suddenly take advantage of changing the configuration of Executors, Manager to optimize performance of jobs.
  • By understanding the CAP theorem, Tradeoff between correctness and availability.

Advanced

Spark Job Runs - Deployment Mode

Advanced

Standalone - for development

Cluster Mode (with External Cluster Manager)

Client Mode

Spark Component

graph TD
%% Add a transparent text node as a watermark
style Watermark fill:none,stroke:none
Watermark[Created by: LongBui];

    subgraph StorageLayer["Storage Layer"]
        A[HDFS]
        B[Hive]
        C[HBase]
    end

    subgraph ClusterManagerLayer["Cluster Manager Layer"]
        D[YARN]
        E[Mesos]
        F[Kubernetes]
    end

    subgraph EngineLayer["Engine Layer"]
        G[Spark Core]
        H[Spark SQL]
        I[Spark Streaming]
        J[MLlib]
        K[GraphX]
    end

    subgraph LibraryLayer["Libraries Layer"]
        L[Spark ML]
        M[Spark GraphFrames]
        N[SparkR]
        O[PySpark]
    end

    subgraph ProgrammingLayer["Programming Languages"]
        P[Scala]
        Q[Java]
        R[Python]
    end
    ProgrammingLayer --> LibraryLayer -->EngineLayer--> ClusterManagerLayer --> StorageLayer
      style A fill:#f9f,stroke:#333,stroke-width:2px;
    style B fill:#f9f,stroke:#333,stroke-width:2px;
    style C fill:#f9f,stroke:#333,stroke-width:2px;
    style D fill:#992,stroke:#333,stroke-width:2px;
    style E fill:#992,stroke:#333,stroke-width:2px;
    style F fill:#992,stroke:#333,stroke-width:2px;
    style G fill:#ccf,stroke:#333,stroke-width:2px;
    style H fill:#ccf,stroke:#333,stroke-width:2px;
    style I fill:#ccf,stroke:#333,stroke-width:2px;
    style J fill:#ccf,stroke:#333,stroke-width:2px;
    style K fill:#ccf,stroke:#333,stroke-width:2px;
    style M fill:#ccf,stroke:#333,stroke-width:2px;
    style L fill:#8af,stroke:#333,stroke-width:2px;
    style N fill:#8af,stroke:#333,stroke-width:2px;
    style M fill:#8af,stroke:#333,stroke-width:2px;
    style O fill:#8af,stroke:#333,stroke-width:2px;
    style P fill:#6aa,stroke:#333,stroke-width:2px;
    style Q fill:#6aa,stroke:#333,stroke-width:2px;
    style R fill:#6aa,stroke:#333,stroke-width:2px;

Figure: Spark EcoSystem Architecture

  • Variety of programming languages to interact with
  • Libraries support to create application with Spark
  • Spark Core manage memory, fault recovery, scheduling, distributing and monitoring jobs/stages/tasks on cluster
  • Cluster manager is hosted to support Spark Core interact, resources managing, ect
  • Data has resided on any storage system.

Anatomy of a Spark Cluster

Inspecting the Spark cluster, we familiar with Spark via:

  • Master Node: Controls the overall execution of applications and manages resources.
  • Worker Nodes: Execute tasks and store data.
  • Executor: Process data and execute computations.
  • Driver: Coordinates tasks and communicates with the cluster manager.
  • Cluster Manager: Manages resources across the cluster (e.g., YARN, Mesos, Kubernetes).
  • Storage Level: Determines how RDDs (Resilient Distributed Datasets) are stored in memory or disk.

Trigger Spark Job

  • Notebook: everyone has familiar with Spark and PySpark via Notebooks, which is for analytical perpuses, and Databricks has turned them to standard for Job Workflows
  • CLI: spark-submit, spark-shell, spark-run, etc are the core features of Spark and where you interact with Spark Cluster, it has not GUI, works for Cluster, Kubernetes, etc

Example

Download the spark job examples here:

Run the command below:

spark-submit –jars ./jdbc_drivers/snowflake-jdbc-3.13.16.jar,./jdbc_drivers/spark-snowflake_2.12-2.9.0-spark_3.1.jar ./spark_reading_sf.py

Return the Row Count of Snowflake Table

24/07/03 07:27:54 INFO SparkConnectorContext$: Spark Connector system config: {
  "spark_connector_version" : "2.9.0",
  "spark_version" : "3.3.1",
  "application_name" : "Spark Snowflake Connector",
  "scala_version" : "2.12.15",
  "java_version" : "1.8.0_352",
  "jdbc_version" : "3.13.16",
  "certified_jdbc_version" : "3.13.3",
  }
}
24/07/03 07:27:54 INFO SnowflakeSQLStatement: Spark Connector Master: execute query with bind variable: select * from (SELECT COUNT(1) FROM WAREHOUSE.PUBLIC.WEB_EVENTS) where 1 = 0
24/07/03 07:27:57 WARN DefaultJDBCWrapper$: JDBC 3.13.16 is being used. But the certified JDBC version 3.13.3 is recommended.
24/07/03 07:27:57 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Ho_Chi_Minh',
...
24/07/03 07:28:00 INFO DAGScheduler: Job 0 finished: showString at NativeMethodAccessorImpl.java:0, took 0.863808 s
24/07/03 07:28:00 INFO CodeGenerator: Code generated in 8.590042 ms
+----------+
|"COUNT(1)"|
+----------+
|   2964624|
+----------+

24/07/03 07:28:00 INFO SparkContext: Invoking stop() from shutdown hook

Example

Reading from RDD

spark-submit spark_reading_rdd.py

24/07/03 07:18:47 INFO TaskSchedulerImpl: Killing all running tasks in stage 5: Stage finished
24/07/03 07:18:47 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:166, took 0.067395 s
<><><><><><><><><>Data:  [Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 46, 55), PULocationID=236, total_amount=21.66), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 31, 42), PULocationID=65, total_amount=42.66), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 43, 41), PULocationID=33, total_amount=24.25), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 46, 12), PULocationID=166, total_amount=24.37), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 38, 7), PULocationID=226, total_amount=6.2), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 44, 24), PULocationID=7, total_amount=20.88), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 18, 37), PULocationID=42, total_amount=24.24), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 47, 36), PULocationID=41, total_amount=31.74), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 3, 57), PULocationID=130, total_amount=33.54), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 18, 52), PULocationID=41, total_amount=13.1)]

Further Discussion

  • DuckDB has been moved to the data analysis works more easier by supporting:
    • In-memory processing
    • Rich SQL dialect support
    • Perform extremely fast analysis for everyone
    • Analyze data where it resides
  • Batched vs Micro-Batch Processing

select count(1) from read_parquet(./part-parquet-file.parquet)

DuckDbQuery

What make you better ?

  • Understand core concept of Distributed Processing Engine (DPF): computing, fault tolerant, partitioning, scheduling, priority, etc.
  • Process data with various types: Kafka, DBs, Graph(Advanced), file systems, etc.
  • Manipulate data: handling missing values, outliers, inconsistencies, filtering, aggregation, joining (you can practice this with only single dataset) (Code sample)
  • Know how to optimize Spark job performance by:
    • Minimize network shuffling
      • Filter before joins and aggregations
      • Repartition on different key or coalesce
    • Improve processing efficiency (parallelism)
    • Apply Lazy Evaluation to efficient queries execution
    • Manage memory usage by caching (broadcast)
    • Avoid to use UDFs or UDAFs (User-Defined Aggregation Functions)
    • Tuning cluster, executor, partitions

We discussed in → Advanced