Dask | |
Author: | Matthew Rocklin |
Developer: | Dask |
Released: | |
Latest Release Version: | 2024.2.1 |
Operating System: | Linux, Microsoft Windows, macOS |
Programming Language: | Python |
Genre: | Data analytics |
Language: | Python |
Dask is an open-source Python library for parallel computing. Dask [1] scales Python code from multi-core local machines to large distributed clusters in the cloud. Dask provides a familiar user interface by mirroring the APIs of other libraries in the PyData ecosystem including: Pandas, scikit-learn and NumPy. It also exposes low-level APIs that help programmers run custom algorithms in parallel.
Dask was created by Matthew Rocklin[2] in December 2014[3] and has over 9.8k stars and 500 contributors on GitHub.[4]
Dask is used by retail, financial, governmental organizations, as well as life science and geophysical institutes. Walmart,[5] Wayfair,[6] JDA,[7] GrubHub,[8] General Motors,[9] Nvidia,[10] Harvard Medical School, Capital One[11] and NASA[12] are among the organizations that use Dask.
Dask has two parts:[13]
Dask's high-level parallel collections – DataFrames,[14] Bags,[15] and Arrays[16] – operate in parallel on datasets that may not fit into memory.
Dask’s task scheduler executes task graphs in parallel. It can scale to thousand-node clusters. This powers the high-level collections as well as custom, user-defined workloads using low-level collections.
Dask supports several user interfaces[17] called high-level and low-level collections:
Under the hood, each of these user interfaces adopts the same parallel computing machinery.
Dask's high-level collections are the natural entry point for users who are interested in scaling up their pandas, NumPy or scikit-learn workload. Dask’s DataFrame, Array and Dask-ML are alternatives to Pandas DataFrame, Numpy Array and scikit-learn respectively with slight variations to the original interfaces.
Dask Array is a high-level collection that parallelizes array-based workloads and maintains the familiar NumPy API, such as slicing, arithmetic, reductions, mathematics, etc., making it easy for Numpy users to scale up array operations.
A Dask array comprises many smaller n-dimensional Numpy arrays and uses a blocked algorithm to enable computation on larger-than-memory arrays. During an operation, Dask translates the array operation into a task graph, breaks up large Numpy arrays into multiple smaller chunks, and executes the work on each chunk in parallel. Results from each chunk are combined to produce the final output.
Dask DataFrame is a high-level collection that parallelizes DataFrame based workloads. A Dask DataFrame comprises many smaller Pandas DataFrames partitioned along the index. It maintains the familiar Pandas API, making it easy for Pandas users to scale up DataFrame workloads. During a DataFrame operation, Dask creates a task graph and triggers operations on the constituent DataFrames in a manner that reduces memory footprint and increases parallelism through sharing and deleting of intermediate results.
Dask Bag is an unordered collection of repeated objects, a hybrid between a set and a list. Dask Bag is used to parallelize computation of semi-structured or unstructured data, such as JSON records, text data, log files or user-defined Python objects using operations such as filter, fold, map and groupby. Dask Bags can be created from an existing Python iterable or can load data directly from text files and binary files in the Avro format.
The Dask low-level interface allows for more customization. It is suitable for data that does not fall within the scope of a Dask DataFrame, Bag or Array. Dask has the following low-level collections:
Dask delayed is an interface used to parallelize generic Python code that does not fit into high level collections like Dask Array or Dask DataFrame. Python functions decorated with Dask delayed adopt a lazy evaluation strategy by deferring execution and generating a task graph with the function and its arguments. The Python function will only execute when .compute is invoked. Dask delayed can be used as a function dask.delayed or as a decorator @dask.delayed.
Dask Futures, an immediate (non-lazy) alternative to Dask delayed, provides a real-time task framework that extends Python’s concurrent.futures interface, which provides a high-level interface for asynchronous execution of callables.
It is common to combine high and low-level interfaces. For example, users can run Dask array/bag/dataframe to load and pre-process data, then switch to Dask delayed for a custom algorithm that is specific to their domain, then switch back to Dask array/dataframe to clean up and store results.
Dask’s high and low-level collections create a directed acyclic graph of tasks,[22] which represents the relationship between computation tasks. A node in a task graph represents a Python function that performs a unit of computation and an edge represents the data dependency between the upstream and downstream task. After a task graph is generated, the task scheduler manages the workflow based on the given task graph by assigning tasks to workers in a manner that improves parallelism and respects the data dependencies.
Dask provides two families of schedulers: single-machine scheduler and distributed scheduler.
A single-machine scheduler is the default scheduler which provides basic features on local processes or thread pool and is meant to be used on a single machine. It is simple and cheap to use but does not scale.
Dask’s distributed scheduler[23] can be set up on a local machine or scale out on a cluster. Dask can work with resource managers, such as Hadoop YARN, Kubernetes, or PBS, Slurm, SGD and LSF for High Performance Computing (HPC) clusters.
Dask-ML is compatible with scikit-learn’s estimator API of fit, transform and predict and is well integrated with machine learning and deep learning frameworks such XGBoost, LightGBM, PyTorch, Keras and TensorFlow through scikit-learn compatible wrappers.
Selected scikit-learn estimators and utilities can be parallelized [24] through executing jobs across multiple CPU cores using the Joblib library. The number of processes are determined by the n_jobs parameters. By default, the Joblib library uses loky as its multi-processing back-end. Dask offers an alternative Joblib backend which is useful for scaling of Joblib-backed scikit-learn algorithms out to a cluster of machines for compute constrained workloads.
For memory constrained workloads, Dask offers alternatives, such as Parallel Meta-estimators[25] for parallelizing and scaling out tasks that are not parallelized within scikit-learn and Incremental Hyperparameter Optimization[26] for scaling hyper-parameter search and parallelized estimators.[27]
XGBoost [28] and LightGBM [29] are popular algorithms that are based on Gradient Boosting and both are integrated with Dask for distributed learning. Dask does not power XGBoost or LightGBM, rather it facilitates setting up of the cluster, scheduler, and workers required then hands off the data to the machine learning framework to perform distributed training.
Training an XGBoost model with Dask,[30] a Dask cluster is composed of a central scheduler and multiple distributed workers, is accomplished by spinning up an XGBoost scheduler in the same process running the Dask central scheduler and XGBoost worker in the same process running the Dask workers. Dask workers then hand over the Pandas DataFrame to the local XGBoost worker for distributed training.
Skorch [31] is a scikit-learn compatible wrapper for PyTorch, which enables Dask-ML to be used together with PyTorch.
SciKeras [32] is an scikit-learn compatible wrapper for Keras models which enables Dask-ML to be used with Keras.
Examples of retail use include:
Dask is used for high resolution, 4-dimensional, cellular imagery by Harvard Medical School, Howard Hughes Medical Institute, Chan Zuckerberg Initiative, and the UC Berkeley Advanced Bioimaging Center. They record the evolution and movements of a 3-dimensional cell over time, in maximum detail. This generates large amounts of data that is difficult to analyze with traditional methods. Dask helps them scale their data analysis workflows with its API that resembles NumPy, Pandas, and scikit-learn code.
Dask is also used at the Novartis Institute for Biomedical Research to scale machine learning prototypes.
Dask is used in Climate Science, Energy, Hydrology, Meteorology, and Satellite Imaging by companies such as NASA, LANL, PANGEO:[37] Earth Science and the UK Meteorology Office.[38]
Oceanographers produce massive simulated datasets of the Earth’s oceans and researchers can look at large seismology datasets from sensors around the world, collect a large number of observations from satellites and weather stations, and run big simulations.
Dask is integrated into many libraries, such as Pangeo [39] and xarray; time series software like Prophet [40] and tsfresh;[41] ETL/ML software like scikit-learn,[42] RAPIDS,[43] and XGBoost; workflow management tools like Apache Airflow[44] and Prefect.[45]
Dask was originally developed at Continuum Analytics, a for-profit Python consulting company that eventually became Anaconda, Inc.,[46] the creator of many open-source packages and the Anaconda Python distribution. Dask grew out of the Blaze[47] project, a DARPA[48] funded project to accelerate computation in open source.
Blaze was an ambitious project that tried to redefine computation, storage, compression, and data science APIs for Python, led originally by Travis Oliphant and Peter Wang, the co-founders of Anaconda. However, Blaze’s approach of being an ecosystem-in-a-package meant that it was harder for new users to easily adopt.
Instead of rewriting a software ecosystem, Dask’s team intended to augment the existing one with the right component. With this idea in mind, on December 21, 2014 Matthew Rocklin created Dask. The purpose[49] of Dask was originally to parallelize NumPy so that it could use one full workstation computer, which was common in finance shops at the time.
The first projects to really adopt Dask were Xarray [50] (commonly used in geo sciences) and Scikit-Image[51] (commonly used in image processing). Dask was integrated into Xarray within a few months of being created. It provided Dask with its first user community, which remains to this day.
Understanding that there is demand for a lightweight parallelism solution for Pandas DataFrames[52] and machine learning tools, such as scikit-learn, Dask quickly evolved to support other projects as well.
Since 2018, other teams and institutions in academia, tech companies, and large corporations such as NASA, UK Met Office, Blue Yonder and Nvidia, became interested in Dask and began integrating it into their systems.
Dask received support from a diverse set of sources:[53] the US Government (DARPA grant), the Gordon and Betty Moore Foundation, Anaconda, NSF, NASA (US research grants with collaborations like Pangeo) and Nvidia.
In 2020, Matthew Rocklin founded Coiled Computing, Inc.[54] to provide further support for Dask development and enable companies to deploy Dask clusters in the cloud. In May 2021, the company raised $21 million in series A funding led by Bessemer Venture Partners.[55]