We all know the convenience that comes with processing a Pandas DataFrame at sweet in-memory speed. Recently, I was working with a small sample of company data in a city. I had written a quick Python script to load raw data from a Postgres table, transform and clean the raw data for some downstream machine learning processes, and write the cleaned data to another Postgres table. The ETL job ran every couple of weeks to add new cities, but then the client requested us to scale the job to every city in the United States. With the current script running locally on my Macbook, this would have taken an unpractical 120 hours- about a single working week.
Given the exploratory nature of our work, it was more than likely that we’d have to tweak our data cleaning / transform process several times in the near future. We needed a sub-24 hour runtime to allow for quick iterations and pivots. Here’s how we scaled our simple Pandas ETL workflow to process 600GB of data.
We settled on using the existing Pandas ETL Job. Why? We wanted to avoid the inevitable errors that would come with porting over our Pandas ETL workflow to Spark and we preferred a solution with existing and familiar infrastructure. We knew that we could hit a 2–6x speed increase by adding parallelization to the ETL script and rely on horizontal scaling across EC2 boxes if we needed quicker runtimes.
Here’s the breakdown of our approach. We had 600GB of the raw data in a Postgres table. We wanted to allow scaling to any number of “compute nodes” (in our case these were EC2 boxes with our script downloaded). Since the script utilized multiprocessing to operate on multiple chunks of data in parallel, we needed some way to partition the data into evenly sized chunks.
Since we were processing data in the United States, we decided to treat each data within one zip code as one chunk of data. There are over 40,000 zip codes, so it created small enough chunks to parallelize as well as offering decent size consistency across chunks.
We created a Postgres table to hold the metadata across all the compute nodes’ ETL jobs. Whenever the script was ready to consume another zip code, it queried the table to figure out a zip code that had yet to be consumed by a job.
If an ETL job found Enigma HQ’s zip code (10016) was available, for example, it would set that row’s status to “RUNNING,” and hit the Postgres table with the raw data to find the data located at 10016 zip code.
Our approach was a quick solution to help us scale our Pandas ETL workflow and keep our clients happy, and it provided a number of interesting performance enhancements—
I removed it from the code below in the interest of offering a clean template to copy, but testing the ETL module was a crucial element for us to move quickly. We created TEST_FLAG and DEBUG_FLAG parameters for the ETL script.
Here’s a version of the code that worked on a single compute node, but with slight tweaks will work for the multi-node approach as well.
<div class="code-wrap"><code>import io
import logging
import pickle
import psycopg2
from sqlalchemy import create_engine
import multiprocessing as mp
import numpy as np
import pandas as pd
DB_URL = <Database URL (RFC-1738 format)>
NUM_PROCESSES = 8
RAW_DATA_TABLE_NAME = <existing_postgres_raw_data_table>
CLEANED_DATA_TABLE_NAME = <postgres_cleaned_data_table_to_create>
JOB_METADATA_TABLE_NAME = <postgres_metadata_table_to_create>
ZIP_CODE_TABLE_NAME = <existing_postgres_zip_code_table>
def create_cleaned_data():
zipcodes = []
# Create new Postgres table to insert cleaned data into
engine = create_engine(DB_URL)
# Hack to create an empty Postgres table with the schema from another table
query = "create table {} as select * from \"{}\" where 1 = 2".format(CLEANED_DATA_TABLE_NAME, <existing_postgres_claned_data_table>)
engine.execute(query)
# Create ETL Job Metadata table
create_metadata_table_sql = 'create table {table_name} \
(zipcode VARCHAR (50), \
status_message VARCHAR (50), \
exception_message VARCHAR (1000), \
timestamp TIMESTAMP WITH TIME ZONE)'.format(table_name=JOB_METADATA_TABLE_NAME)
engine.execute(create_metadata_table_sql)
engine.dispose()
# Retrieve the list of zipcodes
query = "select * from {}".format(ZIP_CODE_TABLE_NAME)
zipcodes = pd.read_sql(con=engine, sql=query)
zipcodes = zipcodes["ZIP_CODE"].values
# Parallelize mapping data per zip code (assign zipcode per process)
pool = mp.Pool(processes=NUM_PROCESSES)
data_cleaning_processes = [pool.apply_async(_clean_data, args=(zipcode,)) for zipcode in zipcodes]
for process in data_cleaning_processes:
process.get()
def _clean_data(zipcode):
db_connection = psycopg2.connect(user=<postgres user>, host=<postgres host>, dbname=<postgres database name>, password=<postgres user password>)
db_cursor = db_connection.cursor()
str_buffer = io.StringIO()
data_chunk = None
# Load data chunk from zip code to transform
engine = create_engine(DB_URL)
query = "select * from {} WHERE \"ZIP_CODE\" = '{}'".format(RAW_DATA_TABLE_NAME, zipcode)
data_chunk = pd.read_sql(con=engine, sql=query)
engine.dispose()
_add_audit_row(zipcode, "started")
try:
mapped_data_chunk = <Python function that transforms data>(data_chunk)
# Write CSV to 'str_buffer' buffer instead of to a file on disk
mapped_data_chunk.to_csv(str_buffer, sep='\t', header=False, index=False)
str_buffer.seek(0)
# psycopg2 cursor's efficient COPY_FROM command to bulk insert data
db_cursor.copy_from(str_buffer, CLEANED_DATA_TABLE_NAME, null="")
db_cursor.connection.commit()
logging.info("cleaned & pushed a data chunk to db")
_add_audit_row(zipcode, "finished")
except Exception as e:
_add_audit_row(zipcode, "error", str(e))
str_buffer.close()
db_cursor.close()
db_connection.close()
def _add_audit_row(zipcode, status_message="", exception_message=""):
sql = "INSERT INTO {}(zipcode, status_message, exception_message, timestamp) VALUES ('{}', '{}', '{}', current_timestamp);" \
.format(JOB_METADATA_TABLE_NAME, zipcode, status_message, exception_message)
engine = create_engine(DB_URL)
engine.execute(sql)
engine.dispose()
def main():
create_cleaned_data()
if __name__ == '__main__':
main() </code></div>
I hope you’ve found our quick Pandas scaling adventure useful. Cheers!
- Ezzy
At Enigma we provide the content, tools, and expertise to empower organizations looking to make sense of the world through data. It’s an ambitious project, so we’re recruiting aggressively to find not only the smartest people in the world, but also those who are passionate about our mission. Join us—we’re hiring.