Incremental Load Technique with CDC (Change Data Capture).
Change Data Capture (CDC) : Incremental Load with CDC (Change Data Capture) is a strategy in data warehousing and ETL (Extract, Transform, Load) processes where only the changed or newly added data is loaded from source systems to the target system. CDC is particularly useful in scenarios where processing the entire dataset is impractical due to its size or where real-time or near-real-time updates are essential.
Key Concepts for CDC : Change Data Capture (CDC) is the process of identifying and capturing changes made to source data since the last extraction. It helps in tracking inserts, updates, and deletes. Instead of reloading the entire dataset, only the changes (inserts, updates, deletes) are applied to the target system. This minimizes processing time and resources.
Example with PySpark Code :
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
1. Create a SparkSession :
spark = SparkSession.builder.appName("IncrementalLoadWithCDC").getOrCreate()
2. Sample data for the source table (product_data) :
source_data = [
(1, 'A', 100, 500, '2023-01-01 08:00:00'),
(2, 'B', 50, 300, '2023-01-02 10:30:00'),
(3, 'A', 75, 400, '2023-01-02 15:45:00'),
(4, 'C', 30, 200, '2023-01-03 09:15:00')
]
source_columns = ['order_id', 'product_id', 'quantity', 'amount', 'order_date']
source_df = spark.createDataFrame(source_data, source_columns)
3. Sample data for the target table (dw_product_data) :
target_data = [
(1, 'A', 100, 500, '2023-01-01 08:00:00'),
(2, 'B', 50, 300, '2023-01-02 10:30:00')
]
target_columns = ['order_id', 'product_id', 'quantity', 'amount', 'order_date']
target_df = spark.createDataFrame(target_data, target_columns)
4. Simulate changes in the source data :
changes_data = [
(2, 'B', 70, 400, '2023-01-02 10:30:00'), # Updated record
(3, 'A', 75, 400, '2023-01-02 15:45:00'), # New record
(4, 'C', 30, 200, '2023-01-03 09:15:00') # New record
]
changes_df = spark.createDataFrame(changes_data, source_columns)
5. Identify changes using CDC (assuming order_id is the primary key) :
updated_records = changes_df.join(target_df, 'order_id', 'inner').filter(
(changes_df['quantity'] != target_df['quantity']) |
(changes_df['amount'] != target_df['amount']) |
(changes_df['order_date'] != target_df['order_date'])
).select( #here adding select in update_records to avoid ambiguity.
changes_df['order_id'],
changes_df['product_id'],
changes_df['quantity'],
changes_df['amount'],
changes_df['order_date']
)
6. Getting New Records : new_records = changes_df.join(target_df, 'order_id', 'left_anti')
7. Update existing records in the target table :
target_df = target_df.join(updated_records, 'order_id', 'left_anti').union(updated_records)
8. Append new records to the target table :
target_df = target_df.union(new_records)
9. Show the updated target table :
target_df.show()
10. Finally Stop the SparkSession :
spark.stop()
Comments
Post a Comment