Engineering
Engineering

Scaling a Pandas ETL Job to 600GB

A man working on computer monitors

We all know the convenience that comes with processing a Pandas DataFrame at sweet in-memory speed. Recently, I was working with a small sample of company data in a city. I had written a quick Python script to load raw data from a Postgres table, transform and clean the raw data for some downstream machine learning processes, and write the cleaned data to another Postgres table. The ETL job ran every couple of weeks to add new cities, but then the client requested us to scale the job to every city in the United States. With the current script running locally on my Macbook, this would have taken an unpractical 120 hours- about a single working week.

Given the exploratory nature of our work, it was more than likely that we’d have to tweak our data cleaning / transform process several times in the near future. We needed a sub-24 hour runtime to allow for quick iterations and pivots. Here’s how we scaled our simple Pandas ETL workflow to process 600GB of data.

Our Options

  1. Dask. A framework that allows for easy parallelization of existing Numpy, Pandas, and SK-learn operations, but we didn’t want to sacrifice the time associated with spinning up our own Cloud cluster using Kubernetes.

  2. Spark. This option would have been less infrastructure heavy considering Enigma’s strong familiarity with DataBricks and AWS EMR, but the time & bugs that came with porting over our Pandas code to use RDDs forced us to take another look at reusing our existing Pandas workflow.

  3. Scaling our existing Pandas job to process 600GB of data in parallel chunks.


We Chose Option 3

We settled on using the existing Pandas ETL Job. Why? We wanted to avoid the inevitable errors that would come with porting over our Pandas ETL workflow to Spark and we preferred a solution with existing and familiar infrastructure. We knew that we could hit a 2–6x speed increase by adding parallelization to the ETL script and rely on horizontal scaling across EC2 boxes if we needed quicker runtimes.

Flow chart showcasing the inputs and outputs of the workflow described above

Architecture Overview

Here’s the breakdown of our approach. We had 600GB of the raw data in a Postgres table. We wanted to allow scaling to any number of “compute nodes” (in our case these were EC2 boxes with our script downloaded). Since the script utilized multiprocessing to operate on multiple chunks of data in parallel, we needed some way to partition the data into evenly sized chunks.

Partitioning and ETL Job Metadata Table

Example excel sheet with zipcode, status_msg, exception_msg and timestamp as the column titles

Since we were processing data in the United States, we decided to treat each data within one zip code as one chunk of data. There are over 40,000 zip codes, so it created small enough chunks to parallelize as well as offering decent size consistency across chunks.

We created a Postgres table to hold the metadata across all the compute nodes’ ETL jobs. Whenever the script was ready to consume another zip code, it queried the table to figure out a zip code that had yet to be consumed by a job.

If an ETL job found Enigma HQ’s zip code (10016) was available, for example, it would set that row’s status to “RUNNING,” and hit the Postgres table with the raw data to find the data located at 10016 zip code.

Performance Enhancements

Our approach was a quick solution to help us scale our Pandas ETL workflow and keep our clients happy, and it provided a number of interesting performance enhancements—

  • We reduced Pandas DataFrame memory usage by 50% by downcasting the default types. Read more about downcasting here.

  • We used a Python memory analyzer called mprof to measure the memory usage of a data chunk’s DataFrame compared its raw size in Postgres. We needed to be sure of the dataframes’ sizes in order to put a ceiling on the number of parallel processes in the script. The last thing we wanted were memory overflows cropping up all over the place. Here is a guide that proves how ridiculously easy it is to set up mprof.

  • Using psycopg2 rather than Pandas .to_sql() function to write dataframes to the database saved time by an order of magnitude. Why? Pandas “to_sql()” function creates a SQL insert statement for each row in the dataframe, so it has both inefficiencies in terms of SQL and network I/Os. We used psycopg2, a popular PostgreSQL Python adapter to leverage its ability to use Postgres’ efficient COPY command to bulk insert data. Read more about copying data via psycopg2 here. You can find other “bulk” insert approaches here.

  • Postgres DB box and EC2 boxes were located in close proximity to avoid unneeded network I/Os. This was more of a side effect of our infrastructure at Enigma, but since our Postgres box and EC2 instances were hosted in the same AWS region, we didn’t need to deal with unnecessary network I/Os that would have come with say, a database hosted elsewhere.


Testing Setup

I removed it from the code below in the interest of offering a clean template to copy, but testing the ETL module was a crucial element for us to move quickly. We created TEST_FLAG and DEBUG_FLAG parameters for the ETL script.

  • DEBUG_FLAG told the script to avoid writing data to the database. This allowed us to debug parts of the ETL job without worrying about accidental side affects to the real data being cleaned.

  • TEST_FLAG told the script to use a small Postgres table containing a sample of the actual raw data. It then ran through the script, wrote the cleaned data to a test table, and finally, compared this resulting data to another database table representing the source of truth data for the ETL functioning correctly.


The Code

Here’s a version of the code that worked on a single compute node, but with slight tweaks will work for the multi-node approach as well.

<div class="code-wrap"><code>import io
import logging
import pickle
import psycopg2

from sqlalchemy import create_engine
import multiprocessing as mp
import numpy as np
import pandas as pd

DB_URL = <Database URL (RFC-1738 format)>
NUM_PROCESSES = 8

RAW_DATA_TABLE_NAME = <existing_postgres_raw_data_table>
CLEANED_DATA_TABLE_NAME = <postgres_cleaned_data_table_to_create>
JOB_METADATA_TABLE_NAME = <postgres_metadata_table_to_create>
ZIP_CODE_TABLE_NAME = <existing_postgres_zip_code_table>

def create_cleaned_data():
   zipcodes = []

   # Create new Postgres table to insert cleaned data into
   engine = create_engine(DB_URL)
   # Hack to create an empty Postgres table with the schema from another table
   query = "create table {} as select * from \"{}\" where 1 = 2".format(CLEANED_DATA_TABLE_NAME, <existing_postgres_claned_data_table>)
   engine.execute(query)

   # Create ETL Job Metadata table
   create_metadata_table_sql = 'create table {table_name} \
                       (zipcode VARCHAR (50), \
                        status_message VARCHAR (50), \
                        exception_message VARCHAR (1000), \
                        timestamp TIMESTAMP WITH TIME ZONE)'.format(table_name=JOB_METADATA_TABLE_NAME)
   engine.execute(create_metadata_table_sql)
   engine.dispose()
   
   # Retrieve the list of zipcodes
   query = "select * from {}".format(ZIP_CODE_TABLE_NAME)
   zipcodes = pd.read_sql(con=engine, sql=query)
   zipcodes = zipcodes["ZIP_CODE"].values
   
   # Parallelize mapping data per zip code (assign zipcode per process)
   pool = mp.Pool(processes=NUM_PROCESSES)
   data_cleaning_processes = [pool.apply_async(_clean_data, args=(zipcode,)) for zipcode in zipcodes]
   for process in data_cleaning_processes:
        process.get()

def _clean_data(zipcode):
   
   db_connection = psycopg2.connect(user=<postgres user>, host=<postgres host>, dbname=<postgres database name>, password=<postgres user password>)
   db_cursor = db_connection.cursor()
   str_buffer = io.StringIO()

   data_chunk = None

   # Load data chunk from zip code to transform
   engine = create_engine(DB_URL)
   query = "select * from {} WHERE \"ZIP_CODE\" = '{}'".format(RAW_DATA_TABLE_NAME, zipcode)
   data_chunk = pd.read_sql(con=engine, sql=query)
   engine.dispose()

   _add_audit_row(zipcode, "started")

   try:
       mapped_data_chunk = <Python function that transforms data>(data_chunk)

       # Write CSV to 'str_buffer' buffer instead of to a file on disk
       mapped_data_chunk.to_csv(str_buffer, sep='\t', header=False, index=False)
       str_buffer.seek(0)
       
       # psycopg2 cursor's efficient COPY_FROM command to bulk insert data
       db_cursor.copy_from(str_buffer, CLEANED_DATA_TABLE_NAME, null="")
       db_cursor.connection.commit()

       logging.info("cleaned & pushed a data chunk to db")
       _add_audit_row(zipcode, "finished")

   except Exception as e:
       _add_audit_row(zipcode, "error", str(e))

   str_buffer.close()
   db_cursor.close()
   db_connection.close()

def _add_audit_row(zipcode, status_message="", exception_message=""):

   sql = "INSERT INTO {}(zipcode, status_message, exception_message, timestamp) VALUES ('{}', '{}', '{}', current_timestamp);" \
       .format(JOB_METADATA_TABLE_NAME, zipcode, status_message, exception_message)
   engine = create_engine(DB_URL)
   engine.execute(sql)
   engine.dispose()

def main():
   create_cleaned_data()
   
if __name__ == '__main__':
   main()    </code></div>

I hope you’ve found our quick Pandas scaling adventure useful. Cheers!

- Ezzy

At Enigma we provide the content, tools, and expertise to empower organizations looking to make sense of the world through data. It’s an ambitious project, so we’re recruiting aggressively to find not only the smartest people in the world, but also those who are passionate about our mission. Join us—we’re hiring.