Distributed Processing with DASK

July 2019 | Dios Kurniawan

Python + Pandas are great for data analytics tasks and people love them. However, they have their own limitation: standard Python interpreter only runs in a single CPU core. When it comes to larger datasets, this weakness begins to take its toll as it prevents us to process data larger than what a single core CPU can handle.

Some people say Pandas is still good for files up to 10GB, but in the era of big data, this sounds small. And it is.

Apache Spark may be the answer for large data processing and is currently gaining a lot of traction, but using Spark would mean that you have to rewrite your Python programs, because Spark has completely different programming model.

Enter Dask.

Dask (https://dask.org) is an emerging framework built around Python ecosystem which offers running Python programs in multiple parallel processors. Similar to Spark, we can now run Python programs to process large amount of data. The contrasting difference is, you do not really need to rewrite your code as Dask is modelled to mimic Pandas programming style. Minimum change is needed to adopt Dask (at least, that’s the original goal).

In this post, I will show you how to create a simple Dask job.

For the purpose of demonstration, I have set up three Linux machines as Workers and another machine as Scheduler. For instructions on setting up the environment, the information in this Dask documentation can help you. It is pretty straightforward, I could set up my four machines in a matter of minutes.

Four Linux machines are employed in this Dask demonstration

In essence, you will need to install Dask in all machines, run Dask-Scheduler program in the Scheduler machine to listen incoming connections, and then run Dask-Worker programs in all Workers to talk to it. Once running, we can log on to the monitoring web app (called Dask Bokeh) where something like this will be shown:

Dask Bokeh to monitor all 3 Workers

At this point, Dask environment is ready to run programs. To test, I wrote a new Python program like below to activate a Dask Client (your interface to the multiprocessor capabilities), and then load a data file into the Workers:

import dask.dataframe as dd 
from dask.distributed import Client
client = Client('192.168.100.106:8786', n_workers=3, threads_per_worker=2, processes=False, memory_limit='2GB')
df1 = dd.read_csv('http://192.168.100.101/download/rawdata.csv', names=['timestamp','imei','lat','long','signal','lac','cid','mcc','mnc','imsi'], usecols=range(10), parse_dates=['timestamp'], error_bad_lines='False', sep=',\s*', engine='python', skipinitialspace='True').set_index('timestamp') df1.head(10)
df1 = df1.persist()

This loads data from a CSV file located in a remote HTTP server, stored as a Dask DataFrame. In my example, the CSV file is a GPS tracking data with millions of records (around 7GB in size). There are ten fields in this file at my disposal, but I am only interested in the key column which is the IMEI, the unique GPS device identifier (Note: no, I did not take this data illegally from my workplace. Customer data remains protected, no law is violated).

As you can see in the code above, the operation is pretty much similar to Pandas DataFrame, read_csv() function is employed as usual. Take note on the persist() function which will ‘pin’ the DataFrame in all Worker’s main memory, leaving the data available for further process. We don’t have this in standard Pandas.

In this demonstration, a GroupBy() aggregate function is applied on the DataFrame to count how many records are there for each IMEI. Since the data is distributed over all Workers, the computation process will be done in all machines. Depending on the processing power, this can take several minutes. I was using four Virtual Machines on my 5-year-old Intel Xeon machine with 16GB RAM, and it took less than 3 minutes to complete.

df1.groupby('imei').long.count().compute()

The first few rows of the aggregation result are shown below (there are thousands of IMEIs in the source file):

The above example demonstrates basic data processing with Dask, but Dask has much more than that to offer. There is also Dask-ML which allows Sklearn libraries to be run on a parallel environment, all by modifying few lines of code. I have not had the chance to actually experiment with it to see if it really works, but Dask-ML certainly is an interesting option for those who need to run models on larger dataset but do not want – or do not have time – to convert to PySpark.

So how does Dask compare with Apache Spark? Spark is clearly more popular, the de facto standard for big data processing. Dask is young, Therefore one might ask, does Dask have a future by challenging Spark’s dominance? Well, for start, Dask is not meant to compete with Spark. Both have different use cases.

Furthermore, Dask receives funding from Anaconda Inc, one of the prominent players in the data science space, thus making the development pretty much active. The future seems quite bright for Dask.

Installing OpenCV with Anaconda

May 2019 | Dios Kurniawan

OpenCV is an open-source Computer Vision library which can be used in Python. Installing OpenCV in MacOS is a bit cumbersome since there is no binaries provided in the website, but you can do this easily with the help of Anaconda. This is what I can share with you to install OpenCV with Anaconda:

  1. Open terminal, and type: conda create –name ComputerVision python=3.7
  2. Install OpenCV: conda install -c menpo opencv
  3. Activate: source activate ComputerVision
  4. Test your installation: import cv2 as cv

That’s all.

Pandas for Data Transformation

December 2018 | Dios Kurniawan

Market Basket Analysis (MBA, as many call it) is an analytical method widely used in retail business to gather insights on what products are usually purchased together by consumers. This time around, I was given a problem of analyzing transaction data from a client in Food & Beverages business, finding purchase patterns so the management can later examine which meals and drinks to bundle into ‘paket hemat’.

To start with, I have to extract the transaction data from the Point of Sale (POS) system, which sits in a SQL database, into a CSV file. The data, as one might suspect, is in raw format and requires preprocessing. The data is not much, less than 20,000 rows so I immediately thought that I would simply use Python and run the process in my laptop.

Below is an example of the original transaction data format (I changed the product names to obscure its real values for publication in this blog). The data has gone through some cleansing to eliminate nulls and inconsistent values.

I was looking for a quick way to transpose transactional data above into 1-hot encoded format which spans to the right. Sure, I could do that in SQL but that would require me to write a long query and re-extract the data from POS again. I did not want to do that. Pandas came to the rescue:

import pandas as pd
penjualan1 = pd.read_csv('D:\data1.csc', parse_dates=['TRX_TS'], index_col=['TRX_ID'])
pivot1=penjualan1.pivot_table(index='TRX_ID', columns='PRODUCT_NAME', values='PRICE').fillna(0) 
pivot1[pivot1 > 0] = 1

It results in exactly the format I need:

Voila! Data is transformed within few seconds. With such a short program, only two lines of code, my data is ready for further analysis. Pandas is just great.