July 2019 | Dios Kurniawan
Python + Pandas are great for data analytics tasks and people love them. However, they have their own limitation: standard Python interpreter only runs in a single CPU core. When it comes to larger datasets, this weakness begins to take its toll as it prevents us to process data larger than what a single core CPU can handle.
Some people say Pandas is still good for files up to 10GB, but in the era of big data, this sounds small. And it is.
Apache Spark may be the answer for large data processing and is currently gaining a lot of traction, but using Spark would mean that you have to rewrite your Python programs, because Spark has completely different programming model.
Enter Dask.
Dask (https://dask.org) is an emerging framework built around Python ecosystem which offers running Python programs in multiple parallel processors. Similar to Spark, we can now run Python programs to process large amount of data. The contrasting difference is, you do not really need to rewrite your code as Dask is modelled to mimic Pandas programming style. Minimum change is needed to adopt Dask (at least, that’s the original goal).
In this post, I will show you how to create a simple Dask job.
For the purpose of demonstration, I have set up three Linux machines as Workers and another machine as Scheduler. For instructions on setting up the environment, the information in this Dask documentation can help you. It is pretty straightforward, I could set up my four machines in a matter of minutes.
In essence, you will need to install Dask in all machines, run Dask-Scheduler program in the Scheduler machine to listen incoming connections, and then run Dask-Worker programs in all Workers to talk to it. Once running, we can log on to the monitoring web app (called Dask Bokeh) where something like this will be shown:
At this point, Dask environment is ready to run programs. To test, I wrote a new Python program like below to activate a Dask Client (your interface to the multiprocessor capabilities), and then load a data file into the Workers:
import dask.dataframe as dd from dask.distributed import Client client = Client('192.168.100.106:8786', n_workers=3, threads_per_worker=2, processes=False, memory_limit='2GB') df1 = dd.read_csv('http://192.168.100.101/download/rawdata.csv', names=['timestamp','device_id','lat','long','signal','lac','cid','mcc','mnc','imsi'], usecols=range(10), parse_dates=['timestamp'], error_bad_lines='False', sep=',\s*', engine='python', skipinitialspace='True').set_index('timestamp') df1.head(10) df1 = df1.persist()
This loads data from a CSV file located in a remote HTTP server, stored as a Dask DataFrame. In my example, the CSV file is a GPS tracking data from tracking devices with millions of records (around 10GB in size). There are ten fields in this file at my disposal, but I am only interested in the key column which is the DeviceID, the unique GPS device identifier (Disclaimer: I did not take this data illegally. Customer data remains protected, no law is violated).
As you can see in the code above, the operation is pretty much similar to Pandas DataFrame, read_csv() function is employed as usual. Take note on the persist() function which will ‘pin’ the DataFrame in all Worker’s main memory, leaving the data available for further process. We don’t have this in standard Pandas.
In this demonstration, a GroupBy() aggregate function is applied on the DataFrame to count how many records are there for each DeviceID. Since the data is distributed over all Workers, the computation process will be done in all machines. Depending on the processing power, this can take several minutes. I was using four Virtual Machines on my 5-year-old Intel Xeon machine with 16GB RAM, and it took less than 3 minutes to complete.
df1.groupby('device_id').long.count().compute()
The raw table which has more than 30 million records has been aggregated as shown below :
Machine Learning with Dask
The above example demonstrates basic data processing with Dask, but Dask has much more than that to offer. There is also Dask-ML which allows Sklearn libraries to be run on a parallel environment, all by modifying few lines of code. Dask-ML certainly is an interesting option for those who need to build models on larger dataset but do not want – or do not have time – to convert to Spark. However, the choice of ML algorithm implementation in Dask is still limited, much less than what is already offered in Scikit-learn library.
For demonstration, I tried using K-Means clustering on the same dataset, but this time around the GPS coordinate data (longitude and latitude) was used. The goal is to have the physical location of the tracking devices to be clustered into groups. In this case, I used 10 clusters for the k parameter. The code is as simple as this:
import dask_ml.datasets import dask_ml.cluster import matplotlib.pyplot as plt df2 = df1[['lat','long']] km = dask_ml.cluster.KMeans(n_clusters=10, init_max_iter=2, oversampling_factor=10) km.fit(df2) centroids = km.cluster_centers_ Y = centroids[:,0] X = centroids[:,1] fig, ax = plt.subplots() ax.scatter(X,Y)
The resulting clusters are displayed in a scatterplot, with Indonesian map superimposed on it. It is clearly seen that millions of GPS Tracking Devices are concentrated in only 10 locations:
Conclusion
Dask can be used to quickly develop data analytics and modelling on large datasets which traditionally cannot be run on standard Python library. It is not intended to compete with or replace Apache Spark. Dask is still in its early stage of development and much of its functionalities are still limited compared with those on Spark. As the above example demonstrates, Dask would be a better choice if you need to run quick-and-dirty analysis solution without the hassle of converting to Spark.
At the time of writing, Dask is at release version 2.0 and it seems to be in active development. Dask receives funding from Anaconda Inc, one of the prominent players in the data science space, thus making the ecosystem pretty much active. The future seems quite bright for Dask.