We all know the convenience that comes with processing a Pandas DataFrame at sweet in-memory speed. Recently, I was working with a small sample of company data in a city. I had written a quick Python script to load raw data from a Postgres table, transform and clean the raw data for some downstream machine learning processes, and write the cleaned data to another Postgres table. The ETL job ran every couple of weeks to add new cities, but then the client requested us to scale the job to every city in the United States. With the current script running locally on my Macbook, this would have taken an unpractical 120 hours- about a single working week.
Given the exploratory nature of our work, it was more than likely that we’d have to tweak our data cleaning / transform process several times in the near future. We needed a sub-24 hour runtime to allow for quick iterations and pivots. Here’s how we scaled our simple Pandas ETL workflow to process 600GB of data.
Dask. A framework that allows for easy parallelization of existing Numpy, Pandas, and SK-learn operations, but we didn’t want to sacrifice the time associated with spinning up our own Cloud cluster using Kubernetes.
Spark. This option would have been less infrastructure heavy considering Enigma’s strong familiarity with DataBricks and AWS EMR, but the time & bugs that came with porting over our Pandas code to use RDDs forced us to take another look at reusing our existing Pandas workflow.
Scaling our existing Pandas job to process 600GB of data in parallel chunks.
We chose option 3
We settled on using the existing Pandas ETL Job. Why? We wanted to avoid the inevitable errors that would come with porting over our Pandas ETL workflow to Spark and we preferred a solution with existing and familiar infrastructure. We knew that we could hit a 2–6x speed increase by adding parallelization to the ETL script and rely on horizontal scaling across EC2 boxes if we needed quicker runtimes.
Here’s the breakdown of our approach. We had 600GB of the raw data in a Postgres table. We wanted to allow scaling to any number of “compute nodes” (in our case these were EC2 boxes with our script downloaded). Since the script utilized multiprocessing to operate on multiple chunks of data in parallel, we needed some way to partition the data into evenly sized chunks.
Partitioning and ETL job metadata table
Since we were processing data in the United States, we decided to treat each data within one zip code as one chunk of data. There are over 40,000 zip codes, so it created small enough chunks to parallelize as well as offering decent size consistency across chunks.
We created a Postgres table to hold the metadata across all the compute nodes’ ETL jobs. Whenever the script was ready to consume another zip code, it queried the table to figure out a zip code that had yet to be consumed by a job.
If an ETL job found Enigma HQ’s zip code (10016) was available, for example, it would set that row’s status to “RUNNING,” and hit the Postgres table with the raw data to find the data located at 10016 zip code.
Our approach was a quick solution to help us scale our Pandas ETL workflow and keep our clients happy, and it provided a number of interesting performance enhancements—
We reduced Pandas DataFrame memory usage by 50% by downcasting the default types. Read more about downcasting here.
We used a Python memory analyzer called mprof to measure the memory usage of a data chunk’s DataFrame compared its raw size in Postgres. We needed to be sure of the dataframes’ sizes in order to put a ceiling on the number of parallel processes in the script. The last thing we wanted were memory overflows cropping up all over the place. Here is a guide that proves how ridiculously easy it is to set up mprof.
Using psycopg2 rather than Pandas .to_sql() function to write dataframes to the database saved time by an order of magnitude. Why? Pandas “to_sql()” function creates a SQL insert statement for each row in the dataframe, so it has both inefficiencies in terms of SQL and network I/Os. We used psycopg2, a popular PostgreSQL Python adapter to leverage its ability to use Postgres’ efficient COPY command to bulk insert data. Read more about copying data via psycopg2 here. You can find other “bulk” insert approaches here.
Postgres DB box and EC2 boxes were located in close proximity to avoid unneeded network I/Os. This was more of a side effect of our infrastructure at Enigma, but since our Postgres box and EC2 instances were hosted in the same AWS region, we didn’t need to deal with unnecessary network I/Os that would have come with say, a database hosted elsewhere.
I removed it from the code below in the interest of offering a clean template to copy, but testing the ETL module was a crucial element for us to move quickly. We created TEST_FLAG and DEBUG_FLAG parameters for the ETL script.
DEBUG_FLAG told the script to avoid writing data to the database. This allowed us to debug parts of the ETL job without worrying about accidental side affects to the real data being cleaned.
TEST_FLAG told the script to use a small Postgres table containing a sample of the actual raw data. It then ran through the script, wrote the cleaned data to a test table, and finally, compared this resulting data to another database table representing the source of truth data for the ETL functioning correctly.
Here’s a version of the code that worked on a single compute node, but with slight tweaks will work for the multi-node approach as well.
I hope you’ve found our quick Pandas scaling adventure useful. Cheers!
At Enigma we provide the content, tools, and expertise to empower organizations looking to make sense of the world through data. It’s an ambitious project, so we’re recruiting aggressively to find not only the smartest people in the world, but also those who are passionate about our mission. Join us—we’re hiring.