-
Book Overview & Buying
-
Table Of Contents
Python Data Cleaning and Preparation Best Practices
By :
Batch ingestion is a data processing technique whereby large volumes of data are collected, processed, and loaded into a system at scheduled intervals, rather than in real-time. This approach allows organizations to handle substantial amounts of data efficiently by grouping data into batches, which are then processed collectively. For example, a company might collect customer transaction data throughout the day and then process it in a single batch during off-peak hours. This method is particularly useful for organizations that need to process high volumes of data but do not require immediate analysis.
Batch ingestion is beneficial because it optimizes system resources by spreading the processing load across scheduled times, often when the system is underutilized. This reduces the strain on computational resources and can lower costs, especially in cloud-based environments where computing power is metered. Additionally, batch processing simplifies data management, as it allows for the easy application of consistent transformations and validations across large datasets. For organizations with regular, predictable data flows, batch ingestion provides a reliable, scalable, and cost-effective solution for data processing and analytics.
Let’s explore batch ingestion in more detail, starting with its advantages and disadvantages.
Batch ingestion offers several notable advantages that make it an attractive choice for many data processing needs:
However, batch ingestion also comes with certain drawbacks:
Let’s look at some common use cases for ingesting data in batch mode.
Any data analytics platform such as data warehouses or data lakes requires regularly updated data for Business Intelligence (BI) and reporting. Batch ingestion is integral as it ensures that data is continually updated with the latest information, enabling businesses to perform comprehensive and up-to-date analyses. By processing data in batches, organizations can efficiently handle vast amounts of transactional and operational data, transforming it into a structured format suitable for querying and reporting. This supports BI initiatives, allowing analysts and decision-makers to generate insightful reports, track Key Performance Indicators (KPIs), and make data-driven decisions.
Extract, Transform, and Load (ETL) processes are a cornerstone of data integration projects, and batch ingestion plays a crucial role in these workflows. In ETL processes, data is extracted from various sources, transformed to fit the operational needs of the target system, and loaded into a database or data warehouse. Batch processing allows for efficient handling of these steps, particularly when dealing with large datasets that require significant transformation and cleansing. This method is ideal for periodic data consolidation, where data from disparate systems is integrated to provide a unified view, supporting activities such as data migration, system integration, and master data management.
Batch ingestion is also widely used for backups and archiving, which are critical processes for data preservation and disaster recovery. Periodic batch processing allows for the scheduled backup of databases, ensuring that all data is captured and securely stored at regular intervals. This approach minimizes the risk of data loss and provides a reliable restore point in case of system failures or data corruption. Additionally, batch processing is used for data archiving, where historical data is periodically moved from active systems to long-term storage solutions. This not only helps in managing storage costs but also ensures that important data is retained and can be retrieved for compliance, auditing, or historical analysis purposes.
Batch ingestion is a methodical process involving several key steps: data extraction, data transformation, data loading, scheduling, and automation. To illustrate these steps, let’s explore a use case involving an investment bank that needs to process and analyze trading data for regulatory compliance and performance reporting.
An investment bank needs to collect, transform, and load trading data from various financial markets into a central data warehouse. This data will be used for generating daily compliance reports, evaluating trading strategies, and making informed investment decisions.
The first step is identifying the sources from which data will be extracted. For the investment bank, this includes trading systems, market data providers, and internal risk management systems. These sources contain critical data such as trade execution details, market prices, and risk assessments. Once the sources are identified, data is collected using connectors or scripts. This involves setting up data pipelines that extract data from trading systems, import real-time market data feeds, and pull risk metrics from internal systems. The extracted data is then temporarily stored in staging areas before processing.
The extracted data often contains inconsistencies, duplicates, and missing values. Data cleaning is performed to remove duplicates, fill in missing information, and correct errors. For the investment bank, this ensures that trade records are accurate and complete, providing a reliable foundation for compliance reporting and performance analysis. After cleaning, the data undergoes transformations such as aggregations, joins, and calculations. For example, the investment bank might aggregate trade data to calculate daily trading volumes, join trade records with market data to analyze price movements, and calculate key metrics such as Profit and Loss (P&L) and risk exposure. The transformed data must be mapped to the schema of the target system. This involves aligning the data fields with the structure of the data warehouse. For instance, trade data might be mapped to tables representing transactions, market data, and risk metrics, ensuring seamless integration with the existing data model.
The transformed data is processed in batches, which allows the investment bank to handle large volumes of data efficiently, performing complex transformations and aggregations in a single run. Once processed, the data is loaded into the target storage system, such as a data warehouse or data lake. For the investment bank, this means loading the cleaned and transformed trading data into their data warehouse, where it can be accessed for compliance reporting and performance analysis.
To ensure that the batch ingestion process runs smoothly and consistently, scheduling tools such as Apache Airflow or Cron jobs are used. These tools automate the data ingestion workflows, scheduling them to run at regular intervals, such as every night or every day. This allows the investment bank to have up-to-date data available for analysis without manual intervention. Implementing monitoring is crucial to track the success and performance of batch jobs. Monitoring tools provide insights into job execution, identifying any failures or performance bottlenecks. For the investment bank, this ensures that any issues in the data ingestion process are promptly detected and resolved, maintaining the integrity and reliability of the data pipeline.
Let’s have a look at a simple example of a batch processing ingestion system written in Python. This example will simulate the ETL process. We’ll generate some mock data, process it in batches, and load it into a simulated database.
You can find the code for this part in the GitHub repository at https://github.com/PacktPublishing/Python-Data-Cleaning-and-Preparation-Best-Practices/blob/main/chapter01/1.batch.py. To run this example, we don’t need any bespoke library installation. We just need to ensure that we are running it in a standard Python environment (Python 3.x):
generate_mock_data function that generates a list of mock data records:def generate_mock_data(num_records):
data = []
for _ in range(num_records):
record = {
'id': random.randint(1, 1000),
'value': random.random() * 100
}
data.append(record)
return dataEach record is a dictionary with two fields:
id: A random integer between 1 and 1000value: A random float between 0 and 100Let’s have a look at what the data looks like:
print("Original data:", data)
{'id': 449, 'value': 99.79699336555473}
{'id': 991, 'value': 79.65999078145887}A list of dictionaries is returned, each representing a data record.
def process_in_batches(data, batch_size): for i in range(0, len(data), batch_size): yield data[i:i + batch_size]
This function takes the data, which is a list of data records to process, and batch_size, which represents the number of records per batch, as parameters. The function uses a for loop to iterate over the data in steps of batch_size. The yield keyword is used to generate batches of data, each of the batch_size size. A generator that yields batches of data is returned.
transform_data function that transforms each record in the batch:def transform_data(batch):
transformed_batch = []
for record in batch:
transformed_record = {
'id': record['id'],
'value': record['value'],
'transformed_value': record['value'] * 1.1
}
transformed_batch.append(transformed_record)
return transformed_batchThis function takes as an argument the batch, which is a list of data records to be transformed. The transformation logic is simple: a new transformed_value field is added to each record, which is the original value multiplied by 1.1. At the end, we have a list of transformed records. Let’s have a look at some of our transformed records:
{'id': 558, 'value': 12.15160339587219, 'transformed_value': 13.36676373545941}
{'id': 449, 'value': 99.79699336555473, 'transformed_value': 109.77669270211021}
{'id': 991, 'value': 79.65999078145887, 'transformed_value': 87.62598985960477}load_data function to load the data. This function simulates loading each transformed record into a database:def load_data(batch):
for record in batch:
# Simulate loading data into a database
print(f"Loading record into database: {record}")This function takes the batch as a parameter, which is a list of transformed data records that is ready to be loaded. Each record is printed to the console to simulate loading it into a database.
main function. This function calls all the aforementioned functions:def main():
# Parameters
num_records = 100 # Total number of records to generate
batch_size = 10 # Number of records per batch
# Generate data
data = generate_mock_data(num_records)
# Process and load data in batches
for batch in process_in_batches(data, batch_size):
transformed_batch = transform_data(batch)
print("Batch before loading:")
for record in transformed_batch:
print(record)
load_data(transformed_batch)
time.sleep(1) # Simulate time delay between batchesThis function calls generate_mock_data to create the mock data and uses process_in_batches to divide the data into batches. For each batch, the function does the following:
transform_dataload_dataNow, let’s transition from batch processing to a streaming paradigm. In streaming, data is processed as it arrives, rather than in predefined batches.
Change the font size
Change margin width
Change background colour