The open blogging platform. Say no to algorithms and paywalls.

Pandas and Multiprocessing: How to create dataframes in a parallel way

Scenario: Read a large number of XLS files with pandas convert them to dataframes and concat them to a single dataframe.

image

A simple and easy way to do this is to perform the following:

  1. Read the XLS file
  2. Create a dataframe
  3. Append the dataframe to a list
  4. Concat the list of dataframes to a single dataframe

This will work but it has a big disadvantage, it does not takes advantage that capability that modern operating systems have: perform tasks in parallel, nor takes advantage the multiple cpu cores that a computer might have, so for each xls file steps 1–4 will executed in sequence from a single cpu core, this is a total waste of hardware resources and time since the computer might have cpu cores that are not used.

We can overcome this with the multiprocessing library of Python. multiprocessing allows us to create a pool processes that we can then assign some specific functions to run, those processes will run on parallel which will reduce the total time to complete the task, lets see how we can do this, comments inline.

Complete script: pandas_multi_example.py

#!/usr/bin/env python
import pandas
import psutil
import time
import os
from pathlib import Path
from multiprocessing import Pool

def get_files(directory,pattern):
    '''
    Get the files of a directory
    '''
    for path in Path(directory).rglob(pattern):
        yield path.absolute()
def process_file(filename):
    ''''
    Read an xls file, retun a dataframe
    ''''
    return pandas.read_excel(filename,index_col=None)
def pd_wrapper(directory,pattern,processes=-1):
    # Decide how many proccesses will be created
    sum_size = 0
    if processes <=0:
        num_cpus = psutil.cpu_count(logical=False)
    else:
        num_cpus = processes
   files = []
   # Get files based on pattern and their sum of size
   for file in get_files(directory=directory,pattern=pattern):
       sum_size =sum_size + os.path.getsize(file)
       files.append(file)
    print('files:%s,size:%s bytes, procs:%s'%(len(files),sum_size,num_cpus))
    # Create the pool
    process_pool = Pool(processes=num_cpus)
    start = time.time()
    # Start processes in the pool
    dfs = process_pool.map(process_file, files)
    # Concat dataframes to one dataframe
    data = pandas.concat(dfs, ignore_index=True)
    end = time.time()
    print('Completed in: %s sec'%(end - start))
    return data

if __name__ == '__main__':
	df = pd_wrapper(directory='./xls',pattern='*.xls',processes=-1)
    print(df.count)

Some performance tests:

My computer has 4 physical cores, change the script to execute the following,

This will control how many processes will be created for the pool to execute the task:

pd_wrapper(directory='./xls',pattern='*.xls',processes=1)
pd_wrapper(directory='./xls',pattern='*.xls',processes=2)
pd_wrapper(directory='./xls',pattern='*.xls',processes=3)
pd_wrapper(directory='./xls',pattern='*.xls',processes=-1)
pd_wrapper(directory='./xls',pattern='*.xls',processes=5)
pd_wrapper(directory='./xls',pattern='*.xls',processes=6)

Results:

files:40,size:26976744 bytes, procs:1
Completed in: 38.6713969707489 sec
files:40,size:26976744 bytes, procs:2
Completed in: 25.472070455551147 sec
files:40,size:26976744 bytes, procs:3
Completed in: 20.10475254058838 sec
files:40,size:26976744 bytes, procs:4
Completed in: 18.09688663482666 sec
files:40,size:26976744 bytes, procs:5
Completed in: 17.657267808914185 sec
files:40,size:26976744 bytes, procs:6
Completed in: 17.992421627044678 sec

We can see that the optimal results are when we have created a pool with ~4,5 processes, this is happening because the computer I am running this has 4 physical CPUs, so it uses 100% of the resources.

Note: processes=-1 in the pd_wrapper wrapper function means: use the number of processes will be equal to the number of physical CPU cores.

Further reading: Pandas Cheatsheet.




Continue Learning