Azure Databricks : End To End Project — Part 2 — Delta Lake, Autoloader — Ingest Data to the Bronze Layer
Welcome back to our blog series on Azure Databricks! In the previous post, we explored Unity Catalog, a transformative feature that enhances data governance by providing a centralized catalog for all data assets across various Databricks workspaces. We delved into its three-level namespace and demonstrated how Unity Catalog, in conjunction with PySpark, ensures that complex data processing tasks adhere to stringent governance protocols.
In this second installment, we will shift our focus to Delta Lake, another cornerstone of the Databricks ecosystem. Delta Lake is designed to bring reliability, performance, and scalability to your data lakes. By providing ACID transactions, scalable metadata handling, and unifying streaming and batch data processing, Delta Lake ensures that your data is always accurate and available for analysis. This robust platform not only enhances data reliability but also simplifies data pipeline development and management.
Throughout this post, we will explore the key features and benefits of Delta Lake, including its architecture and integration with Azure Databricks. We will guide you through the setup process, demonstrate how to perform common data operations, and show you how Delta Lake can improve your data workflows. Join us as we dive deep into the world of Delta Lake and discover how it can transform your data analytics and processing capabilities.
Data Lake vs. Traditional Database: Why We Need Delta Lake for the Lakehouse
Understanding the differences between data lakes and traditional databases is key to seeing why Delta Lake is so important for modern data management. Let’s break it down in simple terms and show how Delta Lake helps create a better system called the lakehouse.
Traditional Database
What is it? A traditional database, like SQL Server or MySQL, is used to store structured data — data that fits neatly into tables with rows and columns.
Data Lake
What is it? A data lake is a large storage repository that holds vast amounts of raw data in its native format, whether structured, semi-structured, or unstructured.
Why We Need Delta Lake
While data lakes are great for storing a lot of different types of data, they miss some key features of traditional databases, like reliable transactions and easy data management. Delta Lake brings the best of both worlds to create a lakehouse, which combines the strengths of data lakes and databases.
Key Features of Delta Lake:
ACID Transactions:
Imagine you’re updating customer records in a database. If something goes wrong halfway, you could end up with incomplete or corrupt data. Delta Lake’s ACID transactions ensure that either all changes are made, or none are, keeping data consistent.
Scalable and Efficient:
Handling petabytes of data efficiently without slowing down. Delta Lake manages metadata smartly, so even as your data grows, it remains easy to query and manage.
Unified Batch and Streaming Processing:
You can process historical sales data in batches and simultaneously handle real-time sales transactions. Delta Lake supports both, allowing seamless integration of real-time and historical data.
Time Travel:
You can look at your data as it was at a previous point in time, which is useful for audits and debugging. This feature helps you revert to older versions of data if something goes wrong.
Schema Enforcement and Evolution:
If you have a new data source with a slightly different structure, Delta Lake ensures it matches the existing data schema, preventing errors. Over time, as your data structure changes, Delta Lake allows these changes without breaking your data pipelines.
The Lakehouse Architecture
The lakehouse architecture combines the best features of data lakes and data warehouses to create a unified and highly efficient data platform.
Key Characteristics of the Lakehouse Architecture:
Unified Data Storage: A single storage layer that supports all data types, including structured, semi-structured, and unstructured data.
You can store everything from customer transaction records (structured) to social media feeds (semi-structured) to raw video files (unstructured) in one place.
ACID Transactions: Ensures data consistency and reliability with support for atomic, consistent, isolated, and durable transactions.
Running an update on sales data will be reliable, ensuring no partial or corrupt updates.
Scalable and Efficient: Handles large-scale data operations efficiently with scalable metadata management and optimized query performance.
Even with petabytes of data, queries remain fast and efficient.
Real-Time Data Processing: Supports both batch and streaming data processing for real-time analytics and up-to-date insights.
You can analyze real-time sales data alongside historical data to get immediate insights.
Flexible Schema Management: Allows schema enforcement and evolution, accommodating changing data structures without disruptions.
If the format of your log files changes, you can easily adjust without breaking existing processes.
Built-in Data Versioning: Enables time travel and data versioning for auditing, debugging, and historical analysis.
Accessing last month’s data snapshot to compare with current data trends.
Delta Lake is essential for enabling the lakehouse architecture because it fills the gaps left by traditional data lakes. It brings in the reliability and performance of databases while retaining the flexibility and scalability of data lakes. By adopting Delta Lake, you get a unified, efficient, and powerful data platform that meets the diverse needs of modern data environments. This integration enhances data reliability, streamlines data workflows, and provides the scalability needed to handle today’s data challenges.
Using Delta Lake with PySpark
Delta Lake is a powerful storage layer that brings ACID transactions, scalable metadata handling, and unified streaming and batch data processing to Apache Spark™ and big data workloads. Here are some examples on how to use Delta Lake with PySpark in Azure Databricks.
- Create Delta Table
You can create a Delta table by saving a DataFrame as a Delta table. When you use Delta Lake, your data is stored in the data lake (e.g., Azure Data Lake Storage, S3) as a collection of Parquet files. These files are organized in a directory structure that Delta Lake manages to ensure efficient data retrieval and storage.
Here’s an example using a simple DataFrame
# Sample DataFrame
data = [
(1, "Alice", 29),
(2, "Bob", 31),
(3, "Cathy", 27)
]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Save DataFrame as a Delta table
df.write.format("delta").save("/delta/table1")
Delta Log
A key component of Delta Lake is the Delta log, a transaction log that keeps track of all changes made to the Delta table. The Delta log files are stored alongside the data files in the same directory. This log enables Delta Lake to provide ACID transactions and time travel capabilities.
How It Works:
- Each transaction (e.g., an update, delete, or merge) creates a new version of the Delta table.
- The Delta log records metadata about these transactions, such as the operation type and affected files.
- Delta Lake uses the Delta log to manage concurrent writes and ensure data consistency.
For example, consider a Delta table stored at /delta/table1
. The directory structure might look like this:
/delta/table1/
├── _delta_log/
│ ├── 00000000000000000000.json
│ ├── 00000000000000000001.json
│ ├── 00000000000000000002.json
│ └── ...
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
└── ...
Each JSON file in the _delta_log
directory corresponds to a version of the table. Let's go through an example of the contents of these log files.
in the case of the save the file as we did in the example, the _delta_log/00000000000000000000.json
file might look like this:
{
"commitInfo": {
"timestamp": 1625074800000,
"operation": "WRITE",
"operationParameters": {
"mode": "Overwrite",
"partitionBy": []
}
},
"metaData": {
"id": "d8d8e9a6-7e48-40f6-9b5c-3e229d7399d3",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true},{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true}]}",
"partitionColumns": [],
"configuration": {},
"createdTime": 1625074800000
},
"protocol": {
"minReaderVersion": 1,
"minWriterVersion": 2
},
"add": [
{
"path": "part-00000-abc123.snappy.parquet",
"partitionValues": {},
"size": 12345,
"modificationTime": 1625074800000,
"dataChange": true
},
{
"path": "part-00001-def456.snappy.parquet",
"partitionValues": {},
"size": 67890,
"modificationTime": 1625074800000,
"dataChange": true
}
]
}
- Read Delta Table :
To read data from a Delta table, use the following command:
df = spark.read.format("delta").load("/delta/table1")
df.show()
- Perform Upserts (Merge) :
Delta Lake allows you to perform upserts (update and insert) easily. Here’s an example of how to merge data into an existing Delta table:
# New data to merge
new_data = [
(1, "Alice", 30), # Update age for Alice
(4, "David", 25) # Insert new record
]
columns = ["id", "name", "age"]
new_df = spark.createDataFrame(new_data, columns)
# Merge new data into Delta table
deltaTable = DeltaTable.forPath(spark, "/delta/table1")
(deltaTable.alias("oldData")
.merge(new_df.alias("newData"), "oldData.id = newData.id")
.whenMatchedUpdate(set={"age": "newData.age"})
.whenNotMatchedInsert(values={"id": "newData.id", "name": "newData.name", "age": "newData.age"})
.execute())
- Time Travel :
Delta Lake supports querying previous versions of your data using time travel. You can query by a specific version number or a timestamp:
How It Works:
- The Delta log keeps track of all versions of the table.
- You can query data as of a specific version number or timestamp.
# Query by version number
df_version_0 = spark.read.format("delta").option("versionAsOf", 0).load("/delta/table1")
df_version_0.show()
# Query by timestamp
df_timestamp = spark.read.format("delta").option("timestampAsOf", "2023-07-07T00:00:00.000Z").load("/delta/table1")
df_timestamp.show()
- Schema Enforcement and Evolution :
Delta Lake enforces schemas on your data, ensuring that the data conforms to the expected structure. It also supports schema evolution, allowing you to modify the schema as your data changes over time without breaking existing processes.
How It Works:
- When you write data to a Delta table, Delta Lake checks the schema against the defined structure.
- If the schema matches, the data is written. If not, Delta Lake can throw an error or adjust the schema according to the rules you define.
Example: Adding a new column to your Delta table schema:
df_with_new_column = df.withColumn("new_column", lit("default_value"))
df_with_new_column.write.format("delta").mode("append").save("/delta/table1")
- Data Optimization :
Delta Lake includes features to optimize data storage and retrieval, such as compaction and data skipping.
Compaction: Combines smaller Parquet files into larger ones to improve read performance.
deltaTable.optimize().execute()
Data Skipping: Stores statistics about the data in the Delta log to quickly skip irrelevant data when querying.
- Garbage Collection
Delta Lake periodically cleans up old data files that are no longer needed, freeing up storage space and maintaining performance. This process is called vacuuming.
Example: Vacuuming a Delta table to remove old files:
deltaTable.vacuum()
Ingestion to Bronze : Customer Orders Analysis
This project involves analyzing a dataset that contains five years of customer orders, from 2017 to 2021, featuring thousands of products sold. The dataset provides a rich source of information for mining insights into customer purchasing patterns and product performance over time.
In this project, we will explore different components of each platform. With Databricks, we will delve into features like Unity Catalog, which organizes and secures data across all Databricks workspaces, and Delta Lake, which provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. Similarly, in Microsoft Fabric, we will examine its integration capabilities, data management, and analytics services to handle large-scale data efficiently.
The project’s analytical phase will include data ingestion, cleansing, integration, and transformation. Following these preparatory steps, we will focus on advanced analytics, employing techniques such as regression models to predict future trends based on historical data. This hands-on comparison aims to not only highlight each platform’s technical merits but also to demonstrate their practical application in a real-world business scenario, providing valuable insights into which platform might better suit different organizational needs.
In our last part, we embarked on the Customer Orders Analysis project, focusing on a dataset covering five years of customer orders from 2017 to 2021. This rich dataset, with thousands of products sold, offers valuable insights into customer purchasing patterns and product performance over time.
Previously on Part 1 :
Enable Unity Catalog:
- Enabled Unity Catalog in the Databricks Account Console.
- Created a Metastore and assigned Unity Catalog to the workspace.
Create Storage Credentials:
- Created a connector in Azure to obtain a Managed Identity.
- Assigned the Blob Data Contributor role to the Managed Identity for accessing Azure Storage Data Lake.
Create an External Location:
- Set up containers for the medallion model (Bronze, Silver, and Gold), Landing, and Checkpoint.
- Created external locations using a notebook to establish the project architecture.
Create the Development Catalog:
- Created the dev catalog using a Setup Python Notebook.
Create Schemas:
- Created Bronze, Silver, and Gold schemas to organize data stages.
Create Bronze Tables:
- Utilized data from the Wholesale & Retail Orders Dataset (kaggle.com).
- Created tables for Orders.csv and Products-Supplier.csv in the Bronze schema.
This was a recap of what we have done last part. Now we will move and try to ingest data from the landing container to the bronze container !
Business Requirement: You have a data pipeline that processes customer orders from various sources. These orders include structured data (like CSV files) and semi-structured data (like JSON files) arriving in real-time into a raw data container. You need to load this raw data into a Bronze container for initial processing and storage.
Purpose of the Bronze Container: The Bronze container acts as the initial landing zone for raw data. It stores data in its original format, serving as the foundation for further refinement and transformation. This helps in preserving the original data and ensures traceability.
Autoloader :
Loading data from a raw container to a Bronze container in ADLS using Autoloader provides a structured and efficient approach to handle raw data ingestion. It ensures data integrity, simplifies data management, supports incremental processing, facilitates schema evolution, and enhances data traceability. This setup is essential for building robust data pipelines and enabling advanced analytics in a scalable and manageable way.
Autoloader is a powerful feature in Azure Databricks designed to simplify the process of ingesting large volumes of data into Delta Lake. One of the critical components of Autoloader’s functionality is the use of checkpoints. Let’s delve into how Autoloader works, what checkpoints are, and how they enhance the data ingestion process.
What is Autoloader?
Autoloader is a scalable, efficient, and easy-to-use data ingestion tool in Azure Databricks that continuously loads data from cloud storage into Delta Lake tables. It is designed to handle both streaming and batch data ingestion with minimal configuration, making it ideal for building data pipelines that require real-time or near-real-time data processing.
Key Features of Autoloader:
- Incremental Data Processing: Processes only new files, avoiding redundancy.
- Schema Evolution: Automatically adapts to changes in data structure.
- Scalability: Efficiently handles billions of files.
- Automatic File Discovery: Detects and processes new files without manual intervention.
- Efficient Metadata Handling: Tracks processed files and their schema.
How Autoloader Works
Autoloader uses two primary mechanisms to ingest data:
- File Notification Mode: Uses cloud-native services (e.g., AWS S3 Event Notifications, Azure Event Grid) for real-time file detection.
- Directory Listing Mode: Periodically lists directories to find new files.
Checkpoints in Autoloader
Checkpoints are a critical component of Autoloader that ensures reliable and consistent data ingestion. They record the state of data processing to ensure that the system can resume where it left off in the event of a failure. This is crucial for ensuring data integrity and consistency in streaming applications.
Key Aspects of Checkpoints:
- State Management: Keeps track of processed files, ensuring that each file is processed only once.
- Fault Tolerance: Allows the system to recover from failures by resuming from the last successful state.
- Progress Tracking: Monitors the progress of data ingestion, enabling efficient resource management.
Checkpointing Mechanism: When Autoloader processes a batch of data, it writes a checkpoint file that contains the state of the processing at that point. This includes information about which files have been processed and the schema of the data. If the processing job fails or is interrupted, Autoloader can use the checkpoint to restart from the last successful state without reprocessing already ingested files.
And that’s why we have created a checkpoint container in the Part 1.
====
First let’s load the external locations that we are going to need :
checkpoint = spark.sql("describe external location `checkpoint_location`").select("url").collect()[0][0]
landing = spark.sql("describe external location `landing_location`").select("url").collect()[0][0]
bronze = spark.sql("describe external location `bronze_location`").select("url").collect()[0][0]
Then let’s create a stream to load the supplier and orders data in near real time once they are into the landing container.
def read_productSupplier():
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType,LongType
from pyspark.sql.functions import current_timestamp
print("Reading the productSupplier Data : ", end='')
schema = StructType([
StructField("ProductID",LongType()),
StructField("ProductLine",StringType()),
StructField("ProductCategory",StringType()),
StructField("ProductGroup",StringType()),
StructField("ProductName",StringType()),
StructField("SupplierCountry",StringType()),
StructField("SupplierName",StringType()),
StructField("SupplierID",IntegerType())
])
productSupplier_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.option('cloudFiles.schemaLocation',f'{checkpoint}/productSupplierLoad/schemaInfer')
.option('header','true')
.schema(schema)
.load(landing+'/productSupplier/')
.withColumn("Extract_Time", current_timestamp()))
print('Reading Succcess !!')
print('*******************')
return productSupplier_stream
def read_customerOrders():
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType,LongType, DateType
from pyspark.sql.functions import current_timestamp
print("Reading the customerOrders Data : ", end='')
schema = StructType([
StructField("CustomerID",IntegerType()),
StructField("CustomerStatus",StringType()),
StructField("DateOrderPlaced",DateType()),
StructField("DeliveryDate",DateType()),
StructField("OrderID",IntegerType()),
StructField("ProductID",LongType()),
StructField("QuantityOrdered",IntegerType()),
StructField("TotalRetailPrice",DoubleType()),
StructField("CostPricePerUnit",DoubleType())
])
customerOrders_stream = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.option('cloudFiles.schemaLocation',f'{checkpoint}/customerOrdersLoad/schemaInfer')
.option('header','true')
.schema(schema)
.load(landing+'/customerOrders/')
.withColumn("Extract_Time", current_timestamp()))
print('Reading Succcess !!')
print('*******************')
return customerOrders_stream
Let’s move on, and create two functions to write data into the bronze layer :
def write_productSupplier(StreamingDF,environment):
print(f'Writing data to {environment}_catalog product_supplier table', end='' )
write_Stream = (StreamingDF.writeStream
.format('delta')
.option("checkpointLocation",checkpoint + '/productSupplierLoad/Checkpt')
.outputMode('append')
.queryName('productSupplierWriteStream')
.trigger(availableNow=True)
.toTable(f"`{environment}_catalog`.`bronze`.`product_supplier`"))
write_Stream.awaitTermination()
print('Write Success')
print("****************************")
And the second one :
def write_customerOrders(StreamingDF,environment):
print(f'Writing data to {environment}_catalog customer_orders table', end='' )
write_Stream = (StreamingDF.writeStream
.format('delta')
.option("checkpointLocation",checkpoint + '/customerOrdersLoad/Checkpt')
.outputMode('append')
.queryName('customerOrdersWriteStream')
.trigger(availableNow=True)
.toTable(f"`{environment}_catalog`.`bronze`.`customer_orders`"))
write_Stream.awaitTermination()
print('Write Success')
print("****************************")
Finally let’s call all the functions :
product_supplier_df = read_productSupplier()
customer_orders_df = read_customerOrders()
write_productSupplier(product_supplier_df,env)
write_customerOrder(customer_orders_df,env)
Let’s put our data in the landing container :
Let’s run now the notebook, and it failed, why ? Because I wanted to let you know about the importance of the schema enforcement, here is the error :
I have added a new column called Extract_Time, since it did not exist in the destination table schema we have created in the first part, it fails !
To resolve this, or we remove the Extract_Time or we use the options : MergeSchema ! the write_stream will look like this :
def write_productSupplier(StreamingDF,environment):
print(f'Writing data to {environment}_catalog product_supplier table', end='' )
write_Stream = (StreamingDF.writeStream
.format('delta')
.option("checkpointLocation",checkpoint + '/productSupplierLoad/Checkpt')
.option("mergeSchema","true")
.outputMode('append')
.queryName('productSupplierWriteStream')
.trigger(availableNow=True)
.toTable(f"`{environment}_catalog`.`bronze`.`product_supplier`"))
write_Stream.awaitTermination()
print('Write Success')
print("****************************")
Once is done we have now our data ingested into bronze layer :
Let’s load our data to check if it exists :
Full notebook :
Stay Tunned for part 3 !