Detecting Data Quality Issues by Identifying Outliers

March 2020 | Dios Kurniawan

In my previous post on Data Quality Framework, I discussed about the concept of DQ (data quality) Test Point. The idea is to detect data quality issues by probing the data. In these DQ Test Points, DQ metrics are extracted, calculated and then compared against a baseline. 

To bolster the DQ metrics, a method called outlier detection is added to the DQ Test Points. This is a pretty simple check to see abnormalities in the data by looking for outliers. Outliers are, as you must have guessed, values which are very different from most of the population. For example, if most of Telkomsel subscribers make 5-6 voice calls per day, then those who place 100 voice calls per day are clear outliers. Outliers can be legitimate as in the previous example, but they can also be invalid or unwanted data. For example a person whose age is 250 years is certainly an invalid data, something that we want to erase from our datasets.

Most outliers are hidden inside the dataset. There are almost always outliers, but when we see a lot of them in our tables, or a sudden change in the number of outliers, we can see it as a sign of a potential data quality issue. It is important to remove outliers because they generally have negative effect on analysis and training a predictive model.

To identify outliers, one of the most common methods is to calculate the distribution of the data using Inter-quartile Range (IQR). In the simplest words, IQR is the area of the data which represents the “middle” values where half (50%) of the data belongs. Anything that falls beyond a certain distance from the IQR will be marked as outliers.  According to “Tukey’s Rule” (named after John Tukey, an American mathematician), the distance is 1.5 x IQR (see diagram below).

To perform the computation, first we will have to sort all rows in the dataset from the lowest to the highest value, then divide the data into four equal parts (hence the name “quartile”). The boundary for the first 25% of the data is called Q1 and the last 25% is called Q3. To get the IQR, subtract Q3 from Q1. After that, find the “min” and “max” limits by calculating 1.5 x IQR from Q1 and Q3, respectively. Once you have these “min” and “max” numbers, you can start counting the outliers which values are less than “min” and more than “max”. There you get your outliers.

Finding IQR like above is a computationally heavy process, especially when involving large number of rows. Luckily, our friend Spark has provided us with the tool for this. Using approxQuantile() function which allows us to compute IQR without going through the whole dataset, and this can be done quite easily. You may want to see the snippet of the PySpark code as shown below:

To give an example of how it works, this script was run on one of the tables in the ABT schema, with around 5 million records. Outliers are gathered from all numerical columns. The script produced the following statistics:

As we can see, the percentage of outliers is generally small. Anything under 2% will be considered normal. However, in the above example, there are some particular columns which have large percentage of outliers, even up to 15%. This is something worth investigating as this can lead to potential quality problems. 

What’s Next?

At the moment the IT Data Quality team is working on putting the outliers count in our BI DQ Dashboard (see below) alongside with other DQ metrics. By comparing the outliers statistics with historical data, we may be able to detect issues before they present a problem in the consumers side.

The method mentioned above is far from perfect. It only captures outliers in one dimension, which is called univariate outliers. To get outliers which lie in two or more dimensions, called multivariate outliers, different methods employing ML techniques must be prepared. Stay tuned for more posts on this matter. Our journey to improve data quality in BI keeps going on!

Tips: Installing Standalone PySpark in MacOS

August 2019 | Dios Kurniawan

I wrote this post to share with you how to install PySpark in MacOS. I was reformatting my laptop last week and I found it difficult to reinstall PySpark because the Apache Spark documentation did not mention much about MacOS. If you Google it, you would find there are quite many different ways of doing this, but I could assure you that what I wrote here would be the most straightforward way to install PySpark in a standalone setup in a MacOS.

Standalone setup is ideal if you need to write in PySpark but do not own or do not have access to a Hadoop / Spark cluster. If you only need to write PySpark programs locally in your laptop without actually running them in a cluster of machines, for example at home or in a coffee shop, then this is the way to go.

To install PySpark, follow these 7 easy steps below. This assumes that you are starting with a clean machine. Be warned, you will need a fast internet connection because the size of the software to download will be quite large.

Steps:

  1. Download and install Java (JDK) SE 8 if you haven’t done so. Beware: do not use other version. If you use newer version, you will have to downgrade.
  2. Download and install Anaconda (http://anaconda.com). Pick Python 3.x instead of Python 2.x. Test your installation by creating and running a simple Python program before you proceed to the next step.
  3. Download and install Homebrew (https://brew.sh). Homebrew is package manager for MacOS, we will need this to install Spark. Once Homebrew is installed, open a new Terminal and run this command to get core Apache Spark package:
brew install apache-spark

4. Once finished, go to your Spark directory /usr/local/Cellar/apache-spark as shown below (change the numbers with the actual version you have installed in your computer, in my case it is “2.4.5”) and then find and edit the bash_profile file in that directory:

cd /usr/local/Cellar/apache-spark/2.4.5
nano ~/.bash_profile

Add these new lines at the very bottom of the file, then save and close:

export SPARK_PATH=/usr/local/Cellar/apache-spark/2.4.5
export PYSPARK_DRIVER_PYTHON="jupyter"
export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
export PYSPARK_PYTHON=python3 alias snotebook='$SPARK_PATH/bin/pyspark --master local[2]'

5. Run the file bash_profile you just edited by executing this:

source ~/.bash_profile

6. Next, download and install Findspark and PySpark using Conda:

conda install -c conda-forge findspark
conda install pyspark

7. Test your installation by starting a new Python 3 program as below (you can also use Jupyter Notebook):

import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext(appName="dios")

If the program returns no error, that’s it, you’ve got your Pyspark environment ready!

Distributed Processing with DASK

July 2019 | Dios Kurniawan

Python + Pandas are great for data analytics tasks and people love them. However, they have their own limitation: standard Python interpreter only runs in a single CPU core. When it comes to larger datasets, this weakness begins to take its toll as it prevents us to process data larger than what a single core CPU can handle.

Some people say Pandas is still good for files up to 10GB, but in the era of big data, this sounds small. And it is.

Apache Spark may be the answer for large data processing and is currently gaining a lot of traction, but using Spark would mean that you have to rewrite your Python programs, because Spark has completely different programming model.

Enter Dask.

Dask (https://dask.org) is an emerging framework built around Python ecosystem which offers running Python programs in multiple parallel processors. Similar to Spark, we can now run Python programs to process large amount of data. The contrasting difference is, you do not really need to rewrite your code as Dask is modelled to mimic Pandas programming style. Minimum change is needed to adopt Dask (at least, that’s the original goal).

In this post, I will show you how to create a simple Dask job.

For the purpose of demonstration, I have set up three Linux machines as Workers and another machine as Scheduler. For instructions on setting up the environment, the information in this Dask documentation can help you. It is pretty straightforward, I could set up my four machines in a matter of minutes.

Four Linux machines are employed in this Dask demonstration

In essence, you will need to install Dask in all machines, run Dask-Scheduler program in the Scheduler machine to listen incoming connections, and then run Dask-Worker programs in all Workers to talk to it. Once running, we can log on to the monitoring web app (called Dask Bokeh) where something like this will be shown:

Dask Bokeh to monitor all 3 Workers

At this point, Dask environment is ready to run programs. To test, I wrote a new Python program like below to activate a Dask Client (your interface to the multiprocessor capabilities), and then load a data file into the Workers:

import dask.dataframe as dd 
from dask.distributed import Client
client = Client('192.168.100.106:8786', n_workers=3, threads_per_worker=2, processes=False, memory_limit='2GB')
df1 = dd.read_csv('http://192.168.100.101/download/rawdata.csv', names=['timestamp','imei','lat','long','signal','lac','cid','mcc','mnc','imsi'], usecols=range(10), parse_dates=['timestamp'], error_bad_lines='False', sep=',\s*', engine='python', skipinitialspace='True').set_index('timestamp') df1.head(10)
df1 = df1.persist()

This loads data from a CSV file located in a remote HTTP server, stored as a Dask DataFrame. In my example, the CSV file is a GPS tracking data with millions of records (around 7GB in size). There are ten fields in this file at my disposal, but I am only interested in the key column which is the IMEI, the unique GPS device identifier (Note: no, I did not take this data illegally from my workplace. Customer data remains protected, no law is violated).

As you can see in the code above, the operation is pretty much similar to Pandas DataFrame, read_csv() function is employed as usual. Take note on the persist() function which will ‘pin’ the DataFrame in all Worker’s main memory, leaving the data available for further process. We don’t have this in standard Pandas.

In this demonstration, a GroupBy() aggregate function is applied on the DataFrame to count how many records are there for each IMEI. Since the data is distributed over all Workers, the computation process will be done in all machines. Depending on the processing power, this can take several minutes. I was using four Virtual Machines on my 5-year-old Intel Xeon machine with 16GB RAM, and it took less than 3 minutes to complete.

df1.groupby('imei').long.count().compute()

The first few rows of the aggregation result are shown below (there are thousands of IMEIs in the source file):

The above example demonstrates basic data processing with Dask, but Dask has much more than that to offer. There is also Dask-ML which allows Sklearn libraries to be run on a parallel environment, all by modifying few lines of code. I have not had the chance to actually experiment with it to see if it really works, but Dask-ML certainly is an interesting option for those who need to run models on larger dataset but do not want – or do not have time – to convert to PySpark.

So how does Dask compare with Apache Spark? Spark is clearly more popular, the de facto standard for big data processing. Dask is young, Therefore one might ask, does Dask have a future by challenging Spark’s dominance? Well, for start, Dask is not meant to compete with Spark. Both have different use cases.

Furthermore, Dask receives funding from Anaconda Inc, one of the prominent players in the data science space, thus making the development pretty much active. The future seems quite bright for Dask.