Week 5 - Batch Processing
- Introduction
- Installation
- Application Components in Spark
- Spark SQL and DataFrames
- Explanation of inferSchema in Spark
- Spark Internals
- Spark Job Runs - Deployment Mode
- Spark Component
- Trigger Spark Job
- Further Discussion
- What make you better ?
Introduction
To Long; Don’t Read - TL;DR
What this video here
Batch Data is a collection of data, in terms of data processing - the batch that means processing the large dataset at the moment. It can take several minutes or even a few hours to process the data.
The most popular batch processing framework is Apache Spark, which is included in every where such as On-premises or Cloud, SaaS applications.
Check the Introduction to Spark (Official), this document is being updated but good enough to make deep dicing work to understand.
The goal of the Spark
To keep the benefits of MapReduce’s scalable, distributed, fault-tolerant processing framework, while making it more efficient and easier to use. The advantages of Spark over MapReduce are:
- Much faster by caching data on memory, parallel processing
- Run multi-threaded tasks on JVM processes
- Provide richer functional programming models
- Community
Installation
- Make sure you have known my dotfile
- Getting Spark Installed via Official Instructions here
- Or you can follow the instructions in JAVA + Spark + PySpark Setup
spark-shell
24/06/19 11:10:14 WARN Utils: Your hostname, Longs-Macbook-Air-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
24/06/19 11:10:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/19 11:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.1.4:4040
Spark context available as 'sc' (master = local[*], app id = local-1718770218283).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.1
/_/
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_352)
Type in expressions to have them evaluated.
Type :help for more information.
The guideline of installation are available on the internet and also GPT, Gemeni.
And getting started with Quick Start
Application Components in Spark
First, we need to understand joins in Spark
- Application: contains a main function
- Job: when Action performs on RDD, a “job” is created and submitted into Spark
- Stage: a job be divided into “stage” based on the shuffle boundary
- Task: a stage be further divided into “tasks” based on number of partition in the RDD. So then Tasks are the smallest unit of work for Spark
Spark SQL and DataFrames
It looks like a table, because at the end; they stored and viewed with table equally.
Spark Dataframes
+----------+-------------+--------------------+------------+
|LocationID| Borough| Zone|service_zone|
+----------+-------------+--------------------+------------+
| 1| EWR| Newark Airport| EWR|
| 2| Queens| Jamaica Bay| Boro Zone|
| 3| Bronx|Allerton/Pelham G...| Boro Zone|
| 4| Manhattan| Alphabet City| Yellow Zone|
| 5|Staten Island| Arden Heights| Boro Zone|
| 6|Staten Island|Arrochar/Fort Wad...| Boro Zone|
| 7| Queens| Astoria| Boro Zone|
| 8| Queens| Astoria Park| Boro Zone|
| 9| Queens| Auburndale| Boro Zone|
| 10| Queens| Baisley Park| Boro Zone|
| 11| Brooklyn| Bath Beach| Boro Zone|
| 12| Manhattan| Battery Park| Yellow Zone|
| 13| Manhattan| Battery Park City| Yellow Zone|
| 14| Brooklyn| Bay Ridge| Boro Zone|
| 15| Queens|Bay Terrace/Fort ...| Boro Zone|
| 16| Queens| Bayside| Boro Zone|
| 17| Brooklyn| Bedford| Boro Zone|
| 18| Bronx| Bedford Park| Boro Zone|
| 19| Queens| Bellerose| Boro Zone|
| 20| Bronx| Belmont| Boro Zone|
+----------+-------------+--------------------+------------+
(Optional) Preparing Yellow and Green Taxi Data
Script to prepare the Dataset
set -e
TAXI_TYPE=yellow
YEAR=2024
# Notes: The Cloudfront might be changed, check the backend of official page to get the new one.
URL_PREFIX="https://d37ci6vzurychx.cloudfront.net/trip-data/"
FILE_EXTENSION=".parquet" # Example: yellow_tripdata_2024-01.parquet
for MONTH in {01..05}; do
echo "Downloading data for $MONTH"
FILENAME="${TAXI_TYPE}_tripdata_${YEAR}-$(printf "%02d" $MONTH)${FILE_EXTENSION}"
URL="${URL_PREFIX}${FILENAME}"
echo "Downloading $URL"
wget "$URL" -O "$FILENAME"
echo "Downloaded $FILENAME"
done
Read data with Spark
You can find the script everywhere, here is code example of Read data using Spark, import the script as Notebook format
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('dec') \
.config("spark.executorEnv.PYSPARK_PYTHON", "/opt/homebrew/lib/python3.10/") \
.config("spark.executorEnv.PYSPARK_DRIVER_PYTHON", "/opt/homebrew/lib/python3.10/site-packages/pyspark/") \
.getOrCreate()
df_green = spark.read.parquet('data/stg/green/*/*')
print("Row Counts: ", df_green.count())
rdd = df_green \
.select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
.rdd
subset_rdd = rdd.filter(lambda row: int(row['PULocationID']) != 74)
subset_rdd.collect()
print(">>>>>>>> Data: ", subset_rdd.take(10))
Note: The other way to infer the schema (apart from pandas) for the csv files, is to set the inferSchema
option to true
while reading the files in Spark.
If you don’t know what inferSchema is, check below
Explanation of inferSchema in Spark
When reading CSV files in Spark, you have the option to infer the schema of the data automatically. This means that Spark will analyze the data and determine the data types of each column based on the values it finds.
To implement inferSchema in Spark, you need to set the inferSchema
option to true
while reading the files. Here’s an example:
df = spark.read.format("csv").option("inferSchema", "true").load("path/to/csv/file.csv")
By enabling inferSchema
- Spark will automatically assign the appropriate data types to each column in the DataFrame.
- This can be useful when you don’t have a predefined schema or when dealing with large datasets where manually specifying the schema is not feasible.
- Save time and effort by letting Spark automatically determine the schema of your data.
But here is disclaimer
- It’s important to note that inferSchema may not always produce the desired results
- Especially if the data is inconsistent or contains missing values.
- In such cases, it’s recommended to define the schema manually using the
StructType
class.
Is there any reason that we should not have schema defined
- Yes, we don’t want to have schema defined for streaming data because time constraints to force the schema (but we use the schema register)
Spark Internals
Historical, Why Spark existing?
graph TD %% Add a transparent text node as a watermark style Watermark fill:none,stroke:none Watermark[Created by: LongBui]; subgraph "Node 1" A[Mapper 1] B[Reducer 1] end subgraph "Node 2" C[Mapper 2] D[Reducer 2] end subgraph "Node 3" E[Mapper 3] F[Reducer 3] end A -->|Map Output| D C -->|Map Output| B E -->|Map Output| F B -->|Reduce Input| F D -->|Reduce Input| F style A fill:#f9f,stroke:#333,stroke-width:2px; style B fill:#f9f,stroke:#333,stroke-width:2px; style C fill:#f9f,stroke:#333,stroke-width:2px; style D fill:#f9f,stroke:#333,stroke-width:2px; style E fill:#f9f,stroke:#333,stroke-width:2px; style F fill:#f9f,stroke:#333,stroke-width:2px;
Figure: Map-Reduce Overview
From image, there are 3 core concepts:
- Distribute data: when a data file is uploaded into the cluster, it is split into chunks, called data blocks, and distributed amongst the data nodes and replicated across the cluster
- Distribute computation: a map function that processes a key/value pair to generate a set of intermediate key/value pairs and a reduce function that merges all intermediate values associated with the same intermediate key. Programs written in this functional style are automatically parallelized and executed on a large cluster of commodity machines in the following way:
- Mappings process data on each node
- Result from mapping sort/shuffle by number of reducers, and sent across reducers and written SequenceFile.
- Reducers process data on each node and write result to output file.
- Tolerate faults: both data and computation can tolerate failures by failing over to another node for data or processing.
Example “Code of Map Reduce”
def map_function(row):
return (row.PULocationID, row.total_amount)
mapped_rdd = subset_rdd.map(map_function)
================================================================
def reduce_function(amount1, amount2):
return amount1 + amount2
reduced_rdd = mapped_rdd.reduceByKey(reduce_function)
Wide Transformation and Narrow Transformation
→ Advanced …
GroupBy in Spark
→ Advanced …
Joins in Spark
- Basically, Spark automatically helps us to optimize the execution of tasks, stages. This means that we suddenly take advantage of changing the configuration of Executors, Manager to optimize performance of jobs.
- By understanding the CAP theorem, Tradeoff between correctness and availability.
→ Advanced …
Spark Job Runs - Deployment Mode
→ Advanced …
Standalone - for development
Cluster Mode (with External Cluster Manager)
Client Mode
Spark Component
graph TD %% Add a transparent text node as a watermark style Watermark fill:none,stroke:none Watermark[Created by: LongBui]; subgraph StorageLayer["Storage Layer"] A[HDFS] B[Hive] C[HBase] end subgraph ClusterManagerLayer["Cluster Manager Layer"] D[YARN] E[Mesos] F[Kubernetes] end subgraph EngineLayer["Engine Layer"] G[Spark Core] H[Spark SQL] I[Spark Streaming] J[MLlib] K[GraphX] end subgraph LibraryLayer["Libraries Layer"] L[Spark ML] M[Spark GraphFrames] N[SparkR] O[PySpark] end subgraph ProgrammingLayer["Programming Languages"] P[Scala] Q[Java] R[Python] end ProgrammingLayer --> LibraryLayer -->EngineLayer--> ClusterManagerLayer --> StorageLayer style A fill:#f9f,stroke:#333,stroke-width:2px; style B fill:#f9f,stroke:#333,stroke-width:2px; style C fill:#f9f,stroke:#333,stroke-width:2px; style D fill:#992,stroke:#333,stroke-width:2px; style E fill:#992,stroke:#333,stroke-width:2px; style F fill:#992,stroke:#333,stroke-width:2px; style G fill:#ccf,stroke:#333,stroke-width:2px; style H fill:#ccf,stroke:#333,stroke-width:2px; style I fill:#ccf,stroke:#333,stroke-width:2px; style J fill:#ccf,stroke:#333,stroke-width:2px; style K fill:#ccf,stroke:#333,stroke-width:2px; style M fill:#ccf,stroke:#333,stroke-width:2px; style L fill:#8af,stroke:#333,stroke-width:2px; style N fill:#8af,stroke:#333,stroke-width:2px; style M fill:#8af,stroke:#333,stroke-width:2px; style O fill:#8af,stroke:#333,stroke-width:2px; style P fill:#6aa,stroke:#333,stroke-width:2px; style Q fill:#6aa,stroke:#333,stroke-width:2px; style R fill:#6aa,stroke:#333,stroke-width:2px;
Figure: Spark EcoSystem Architecture
- Variety of programming languages to interact with
- Libraries support to create application with Spark
- Spark Core manage memory, fault recovery, scheduling, distributing and monitoring jobs/stages/tasks on cluster
- Cluster manager is hosted to support Spark Core interact, resources managing, ect
- Data has resided on any storage system.
Anatomy of a Spark Cluster
Inspecting the Spark cluster, we familiar with Spark via:
- Master Node: Controls the overall execution of applications and manages resources.
- Worker Nodes: Execute tasks and store data.
- Executor: Process data and execute computations.
- Driver: Coordinates tasks and communicates with the cluster manager.
- Cluster Manager: Manages resources across the cluster (e.g., YARN, Mesos, Kubernetes).
- Storage Level: Determines how RDDs (Resilient Distributed Datasets) are stored in memory or disk.
Trigger Spark Job
- Notebook: everyone has familiar with Spark and PySpark via Notebooks, which is for analytical perpuses, and Databricks has turned them to standard for Job Workflows
- CLI: spark-submit, spark-shell, spark-run, etc are the core features of Spark and where you interact with Spark Cluster, it has not GUI, works for Cluster, Kubernetes, etc
Download the spark job examples here:
Run the command below:
spark-submit –jars ./jdbc_drivers/snowflake-jdbc-3.13.16.jar,./jdbc_drivers/spark-snowflake_2.12-2.9.0-spark_3.1.jar ./spark_reading_sf.py
Return the Row Count of Snowflake Table
24/07/03 07:27:54 INFO SparkConnectorContext$: Spark Connector system config: {
"spark_connector_version" : "2.9.0",
"spark_version" : "3.3.1",
"application_name" : "Spark Snowflake Connector",
"scala_version" : "2.12.15",
"java_version" : "1.8.0_352",
"jdbc_version" : "3.13.16",
"certified_jdbc_version" : "3.13.3",
}
}
24/07/03 07:27:54 INFO SnowflakeSQLStatement: Spark Connector Master: execute query with bind variable: select * from (SELECT COUNT(1) FROM WAREHOUSE.PUBLIC.WEB_EVENTS) where 1 = 0
24/07/03 07:27:57 WARN DefaultJDBCWrapper$: JDBC 3.13.16 is being used. But the certified JDBC version 3.13.3 is recommended.
24/07/03 07:27:57 INFO SnowflakeSQLStatement: Spark Connector Master: execute query without bind variable: alter session set timezone = 'Asia/Ho_Chi_Minh',
...
24/07/03 07:28:00 INFO DAGScheduler: Job 0 finished: showString at NativeMethodAccessorImpl.java:0, took 0.863808 s
24/07/03 07:28:00 INFO CodeGenerator: Code generated in 8.590042 ms
+----------+
|"COUNT(1)"|
+----------+
| 2964624|
+----------+
24/07/03 07:28:00 INFO SparkContext: Invoking stop() from shutdown hook
24/07/03 07:18:47 INFO TaskSchedulerImpl: Killing all running tasks in stage 5: Stage finished
24/07/03 07:18:47 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:166, took 0.067395 s
<><><><><><><><><>Data: [Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 46, 55), PULocationID=236, total_amount=21.66), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 31, 42), PULocationID=65, total_amount=42.66), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 43, 41), PULocationID=33, total_amount=24.25), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 46, 12), PULocationID=166, total_amount=24.37), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 38, 7), PULocationID=226, total_amount=6.2), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 44, 24), PULocationID=7, total_amount=20.88), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 18, 37), PULocationID=42, total_amount=24.24), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 47, 36), PULocationID=41, total_amount=31.74), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 3, 57), PULocationID=130, total_amount=33.54), Row(lpep_pickup_datetime=datetime.datetime(2024, 1, 1, 7, 18, 52), PULocationID=41, total_amount=13.1)]
Further Discussion
- DuckDB has been moved to the data analysis works more easier by supporting:
- In-memory processing
- Rich SQL dialect support
- Perform extremely fast analysis for everyone
- Analyze data where it resides
- Batched vs Micro-Batch Processing
select count(1) from read_parquet(./part-parquet-file.parquet)
What make you better ?
- Understand core concept of Distributed Processing Engine (DPF): computing, fault tolerant, partitioning, scheduling, priority, etc.
- Process data with various types: Kafka, DBs, Graph(Advanced), file systems, etc.
- Manipulate data: handling missing values, outliers, inconsistencies, filtering, aggregation, joining (you can practice this with only single dataset) (Code sample)
-
Know how to optimize Spark job performance by:
- Minimize network shuffling
- Filter before joins and aggregations
- Repartition on different key or
coalesce
- Improve processing efficiency (parallelism)
- Apply Lazy Evaluation to efficient queries execution
- Manage memory usage by caching (broadcast)
- Avoid to use UDFs or UDAFs (User-Defined Aggregation Functions)
- Tuning cluster, executor, partitions
- Minimize network shuffling
We discussed in → Advanced …