Distributed Processing with DASK

July 2019 | Dios Kurniawan

Python + Pandas are great for data analytics tasks and people love them. However, they have their own limitation: standard Python interpreter only runs in a single CPU core. When it comes to larger datasets, this weakness begins to take its toll as it prevents us to process data larger than what a single core CPU can handle.

Some people say Pandas is still good for files up to 10GB, but in the era of big data, this sounds small. And it is.

Apache Spark may be the answer for large data processing and is currently gaining a lot of traction, but using Spark would mean that you have to rewrite your Python programs, because Spark has completely different programming model.

Enter Dask.

Dask (https://dask.org) is an emerging framework built around Python ecosystem which offers running Python programs in multiple parallel processors. Similar to Spark, we can now run Python programs to process large amount of data. The contrasting difference is, you do not really need to rewrite your code as Dask is modelled to mimic Pandas programming style. Minimum change is needed to adopt Dask (at least, that’s the original goal).

In this post, I will show you how to create a simple Dask job.

For the purpose of demonstration, I have set up three Linux machines as Workers and another machine as Scheduler. For instructions on setting up the environment, the information in this Dask documentation can help you. It is pretty straightforward, I could set up my four machines in a matter of minutes.

Four Linux machines are employed in this Dask demonstration

In essence, you will need to install Dask in all machines, run Dask-Scheduler program in the Scheduler machine to listen incoming connections, and then run Dask-Worker programs in all Workers to talk to it. Once running, we can log on to the monitoring web app (called Dask Bokeh) where something like this will be shown:

Dask Bokeh to monitor all 3 Workers

At this point, Dask environment is ready to run programs. To test, I wrote a new Python program like below to activate a Dask Client (your interface to the multiprocessor capabilities), and then load a data file into the Workers:

import dask.dataframe as dd 
from dask.distributed import Client
client = Client('192.168.100.106:8786', n_workers=3, threads_per_worker=2, processes=False, memory_limit='2GB')
df1 = dd.read_csv('http://192.168.100.101/download/rawdata.csv', names=['timestamp','imei','lat','long','signal','lac','cid','mcc','mnc','imsi'], usecols=range(10), parse_dates=['timestamp'], error_bad_lines='False', sep=',\s*', engine='python', skipinitialspace='True').set_index('timestamp') df1.head(10)
df1 = df1.persist()

This loads data from a CSV file located in a remote HTTP server, stored as a Dask DataFrame. In my example, the CSV file is a GPS tracking data with millions of records (around 7GB in size). There are ten fields in this file at my disposal, but I am only interested in the key column which is the IMEI, the unique GPS device identifier (Note: no, I did not take this data illegally from my workplace. Customer data remains protected, no law is violated).

As you can see in the code above, the operation is pretty much similar to Pandas DataFrame, read_csv() function is employed as usual. Take note on the persist() function which will ‘pin’ the DataFrame in all Worker’s main memory, leaving the data available for further process. We don’t have this in standard Pandas.

In this demonstration, a GroupBy() aggregate function is applied on the DataFrame to count how many records are there for each IMEI. Since the data is distributed over all Workers, the computation process will be done in all machines. Depending on the processing power, this can take several minutes. I was using four Virtual Machines on my 5-year-old Intel Xeon machine with 16GB RAM, and it took less than 3 minutes to complete.

df1.groupby('imei').long.count().compute()

The first few rows of the aggregation result are shown below (there are thousands of IMEIs in the source file):

The above example demonstrates basic data processing with Dask, but Dask has much more than that to offer. There is also Dask-ML which allows Sklearn libraries to be run on a parallel environment, all by modifying few lines of code. I have not had the chance to actually experiment with it to see if it really works, but Dask-ML certainly is an interesting option for those who need to run models on larger dataset but do not want – or do not have time – to convert to PySpark.

So how does Dask compare with Apache Spark? Spark is clearly more popular, the de facto standard for big data processing. Dask is young, Therefore one might ask, does Dask have a future by challenging Spark’s dominance? Well, for start, Dask is not meant to compete with Spark. Both have different use cases.

Furthermore, Dask receives funding from Anaconda Inc, one of the prominent players in the data science space, thus making the development pretty much active. The future seems quite bright for Dask.