“Data science is a multi-disciplinary field that uses scientific methods, processes, algorithms and systems to extract knowledge and insights from data in various forms.”
There are some requirements that need to be met for those wanting to do Data science projects. Data science requires some software development skills, mathematics and of course domain knowledge. What tasks are solved by Data science? What it covers is mainly machine learning, data analysis, data cleaning and financial analysis.
In recent years Python has gained a lot of momentum. Some people call it “The Swiss Army Knife of Coding”. Python is a general-purpose language with a fast learning curve and it has been designed with readability as one of the most important aspects in mind. In my view, the two main reasons for Python’s popularity are:
- Simplicity and time to market
- Large open source community
Python has a great number of scientific computing packages. The most important ones can be found in the following picture:
Today we will mainly focus on Pandas and NumPy (and later their extended versions in Dask).
Pandas is a Python package providing fast, flexible and expressive data structures designed to make working with “relational” or “labelled” data both easy and intuitive. Pandas is mainly used for handling tabular data.
- NumPy – Contains a powerful N-dimensional array object, tools for integrating C/C++ and Fortran code.
Usually people think that Python is slow. This is true for standard Python objects, however NumPy and Pandas libraries are mostly written in C/Fortran, which makes them significantly faster.
The next graph shows some statistics about the most visited Python-related questions on Stack Overflow:
You can see that Pandas and NumPy are increasing in popularity.
If they are so popular and fast, what’s the problem with them?
Both packages are designed for:
- in-memory data – i.e. you can’t work with datasets that are larger than your RAM
- single core execution by default.
*Note: I am lying a little bit here; some methods in NumPy are multithreaded by default.
Sometimes when trying to work with large datasets in Pandas, your machine just freezes, and you’re stuck waiting for minutes just to calculate a simple average over a column in your data frame.
This gets really annoying when you have a working solution in Python, but you are not able to scale it up. Of course you can always buy more RAM, but for the speedup, you need to reach out for other solutions. Sometimes those solutions would require rewriting the whole application from scratch. That is definitely not something your stakeholders want to hear.
This is where Dask comes into play
Dask is an open source project which gives you the possibility to run operations in parallel over NumPy arrays or Pandas DataFrames, and chunk your datasets into smaller pieces which can be stored on multiple worker nodes.
Dask provides high-level Array, Bag and DataFrame collections that mimic NumPy, lists and Pandas, but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.
Don’t worry, Dask is not another open source project maintained by 2-3 people. Anaconda, Inc. stands behind it – the maintainer of the most popular Python distribution for scientific computing.
Advantages of Dask:
- Manipulating large datasets, even when those datasets don’t fit in memory
- Accelerating long computations by using many cores
- API is almost the same as for NumPy and Pandas
- With minimal changes in code, you can run your existing code
- Dask can be scaled up very easily to cluster
- On a local machine, you can parallelize using Multithreading or Multiprocessing
- Dask array
o Internally coordinates many NumPy arrays
o Co-evolves with NumPy
o Supports reductions, blockwise, overlapping, linear algebra
o Dask arrays coordinate many NumPy arrays arranged into a grid. These NumPy arrays may live on disk or on other machines.
- Dask DataFrames – scalable implementation of Pandas
o Internally coordinates many Pandas DataFrames
o Co-evolves with Pandas
o Dask DataFrames coordinate many Pandas DataFrames arranged along the index
- Dask Bags
o A Dask Bag is able to store and process collections of Pythonic objects that are unable to fit into memory. Dask Bags are great for processing logs and collections of json documents.
- Dask delayed
o Dask is not limited to NumPy and pandas. You can parallelize custom Python code.
o With Dask Delayed each function call is queued, added to an execution graph and scheduled.
- Dask ML
o Scalable version of Python’s Scikit-Learn library for machine learning.
Syntax difference between NumPy and Dask Array
import numpy as np
x = np.random.random(10000, 10000)
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
For the Dask array, you need to provide the chunk size. As a rule of thumb, try to set the chunk size so that the number of chunks will be N times the number of cores.
So how does Dask work?
The API of the selected Collection produces a Task graph. As all the operations are lazy loaded, nothing is executed at that moment. Once you run the computation, the scheduler executes the operations from the task graph.
The scheduler can reside on your local machine and use a thread or a process pool, or it can reside on a distributed cluster.
If we choose a distributed scheduler, our network system will look like the following picture (actually, the local system is very similar, but there the workers represent the cores of our CPU).
There are 3 main components:
- Scheduler – coordinates the whole workflow
- Workers – distributed work and storage – computation, holds data in distributed memory
- Clients – interfaces with the users through the API
How do these graphs work?
To understand the concept, let me explain Dask delayed first. The delayed method decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph. Dask array, DataFrame uses delayed under the hood. In our example, we will use Dask delayed on some simple Python operations.
This example takes number 1 increments it, takes number 2 increments it, and then at the end it adds the two numbers together (i.e. the result should be 5).
Here is the Python code:
x = dask.delayed(inc)(1)
y = dask.delayed(inc)(2)
z = dask.delayed(add)(x, y)
The picture shows the generated task graph. As you can see, Dask was so smart that it identified that the two increment operations could be run in parallel.
If we want to calculate the result, we need to call the .compute() method over the Dask object.
Of course in real life we get much more complicated task graphs:
The task graph from above is still a simple one; normally you can get graphs with millions of operations.
Before executing the .compute() method, you might consider visualizing the task graph. By looking at it, you can identify potential bottlenecks, check if you are indeed parallelizing, or find places in the code where the tasks depend on each other (which can lead to a great deal of communication). Don’t try to use delayed on every function, and always check the task graph, otherwise you can get even worse results than the original solution.
How to parallelize with Dask
One important thing to note before you start parallelizing:
The key to accelerating your code … is understanding why it’s slow in the first place
As I said earlier for local Dask, you can switch between multithreading and multiprocessing. Multithreading in Python works differently than in other languages. Without going into too many details, the Global Interpreter Lock (GIL) stops multiple Python threads from operating in parallel, i.e. only one thread can be in a state of execution at any point in time.
Fortunately, most of the code in NumPy and Pandas is written in C and bypasses the GIL. If you use multiprocessing, the code can be run in parallel on the different cores of the CPU – however, creating processes involves serialization/deserialization of the sent into them, so there is an additional overhead. For I/O bound tasks, you should use multithreading; for CPU bound tasks (e.g. numeric computations), use multiprocessing. Of course you can always switch between the two in Dask, to see which one is better for your application.
And then there are operations which are very hard/impossible to parallelize. For example, operations that rearrange the full dataset are hard to do in parallel.
Which operations rearrange the datasets?
- DataFrames: set index, groupby-apply operations, table joins
- Arrays: rechunking, FFT analysis
Let’s imagine the following workflow:
- Reading csv files from a disk into a DataFrame
- Setting the index to different column
- Resampling the dataset
- Plotting the result
Even if we try to parallelize this workflow with Dask, we won’t get too much improvement. Why?
- Reading from disk is still the same operation
- Setting the index rearranges the dataset – it involves a large piece of interaction between the workers
- Parsing the csv and resampling the dataset are the two places where we can gain some speed
Of course if you set up a large enough cluster, you can get some serious improvement, but the cost of the infrastructure will be much higher.
This example shows us that in certain cases, Dask is not ideal for speeding up the process (of course it still can be used for handling larger-than-memory datasets). But before you start with parallelization, you have to be sure that this is exactly what your application needs.
If you decide to do parallelization, chunking strategy is the next thing to consider. You should be very careful about how you chunk your dataset:
- Too large chunks – It’s possible that you wouldn’t use all your workers, and the involved workers wouldn’t be able to handle the load because the individual chunks are too large.
- Too small chunks – Each chunk processing involves some additional tasks like communication overhead, serializing or deserializing. If you have too many chunks, these additional tasks could slow down the processing.
With more complex algorithms, the optimization becomes much harder. If there are tasks that depend on each other, or if the graph size is too large, it is hard to find an optimal solution.
Fortunately, Dask has its own diagnostics tool, where you can see what the individual workers are doing in real time, how they are communicating with each other, and spot some places with blocking code.
The web interface of the diagnostic tool is launched on the machine where the scheduler is installed.
It contains the following plots:
- Progress – The interface shows the progress of the various computations as well as the exact number completed. Each bar is assigned a colour according to the function being run. On the left, the lighter shade is the number of tasks that have both been completed and released from memory.
- Memory – The interface shows the relative memory use of each function with a horizontal bar sorted by function name.
- Task stream – The task stream plot shows when tasks complete on which workers. Worker cores are on the y-axis and time is on the x-axis. As a worker completes a task, its start and end times are recorded and a rectangle is added to this plot accordingly.
- Resources – The resources plot shows the average CPU and memory use over time as well as average network traffic.
Dask vs. Spark
- Spark is a big data ecosystem that has been used for many years. It is much larger than Dask; however, there are cases where Dask beats Spark.
- Spark DataFrames will be much better when you have large SQL-style queries, where their query optimizer can kick in.
- Dask DataFrames will be much better when queries go beyond typical database queries. This happens most often in time series, random access, and other complex computations.
- Spark will integrate better with JVM and data engineering technology. Spark is its own ecosystem.
- Dask will integrate better with Python code. Dask is designed to integrate with other libraries and pre-existing systems.
- Spark does not include support for multi-dimensional arrays natively.
- Dask fully supports the NumPy model for scalable multi-dimensional arrays. If your project highly relies on NumPy, you should consider using Dask.
Disadvantages of Dask
- Dask is not SQL – it doesn’t optimize complex queries.
- It is not as fast as MPI – communication overhead is worse.
- Dask targets Python and associated languages.
- Sometimes you don’t even need parallelism.
- The API is similar to Pandas and NumPy. However, some of the functions are missing, and to bypass the functionality relevant part of the code that needs to be modified.
Enough theory; let’s take a look at a real-life example
For my research, I decided to implement a Monte Carlo simulation for finance applications, as it is highly parallelizable, and used very frequently in the world of finance.
What is a Monte Carlo simulation?
The Monte Carlo model allows researchers to run multiple trials and define all potential outcomes of an event or investment. Together, they create a probability distribution or risk assessment for a given investment or event.
A Monte Carlo analysis is a multivariate modelling technique. All multivariate models can be thought of as complex “what if?” scenarios. Research analysts use them to forecast investment outcomes, to understand the possibilities surrounding their investment exposures, and to better mitigate risks.
So, I started with a solution in NumPy. It works, but I got into troubles very quickly. Some products are complex (containing many sub-products) and the forecasting interval is more than 10 years. The resulting dataset can grow to more than 20 GB, and the computation is slow.
- First I tried a Local Dask implementation using all 8 cores of my machine. As the computation is more CPU bound, I achieved the best results with multiprocessing. I had to run multiple experiments to find the optimal number of chunks.
- Then I went to set up a cluster. Using Kubernetes and Google Cloud, I was able to set it up in less than 10 minutes.
The following graph shows the results of parallelization using Dask.
As you can see, Local Dask with multiprocessing gives good enough results, but it has to be noted that the clustered version used only 3 nodes with a standard computer, which was enabled in my subscription program. If I had set up a larger cluster with more performant machines, I would have achieved even better results.
Dask is a very promising technology backed by a large company. It simplifies the parallelization of existing Python-based Data science workflows. The developer team behind Dask is currently working on supporting GPU-based computation and parallelization, and the integration of just-in-time compilation library Numba.