Dask utilities

This module contains helpers for using dask: https://dask.pydata.org/en/latest/

Starting or connecting to a server

connect(args) Connect to the dask cluster specifed by the arguments in args
add_dask_options(parser, num_procs, …) Add options for connecting to and/or controlling a local dask cluster
add_dask_values_to_args(args, num_procs, …) Add the options for a dask cluster to the given argparse namespace

Submitting jobs

apply_iter(it, client, func, *args, …) Distribute calls to func on each item in it across client.
apply_df(data_frame, client, func, *args, …) Distribute calls to func on each row in data_frame across client.
apply_groups(groups, client, func, *args, …) Distribute calls to func on each group in groups across client.

Other dask helpers

check_status(f_list) Collect the status counts of a list of futures
collect_results(f_list, finished_only, …) Collect the results from a list of futures
cancel_all(f_list[, pending_only]) Cancel all (pending) tasks in the list

scikit-learn helpers

dask_pipeline(pipeline, dask_client) This class is a simple wrapper to submit an sklearn pipeline to a dask cluster for fitting.

Definitions

This module contains helpers for using dask: https://dask.pydata.org/en/latest/

pyllars.dask_utils.add_dask_options(parser: argparse.ArgumentParser, num_procs: int = 1, num_threads_per_proc: int = 1, cluster_location: str = 'LOCAL') → None[source]

Add options for connecting to and/or controlling a local dask cluster

Parameters:
  • parser (argparse.ArgumentParser) – The parser to which the options will be added
  • num_procs (int) – The default number of processes for a local cluster
  • num_threads_per_proc (int) – The default number of threads for each process for a local cluster
  • cluster_location (str) – The default location of the cluster
Returns:

None – A “dask cluster options” group is added to the parser

Return type:

None

pyllars.dask_utils.add_dask_values_to_args(args: argparse.Namespace, num_procs: int = 1, num_threads_per_proc: int = 1, cluster_location: str = 'LOCAL', client_restart: bool = False) → None[source]

Add the options for a dask cluster to the given argparse namespace

This function is mostly intended as a helper for use in ipython notebooks.

Parameters:
  • args (argparse.Namespace) – The namespace on which the arguments will be set
  • num_procs (int) – The number of processes for a local cluster
  • num_threads_per_proc (int) – The number of threads for each process for a local cluster
  • cluster_location (str) – The location of the cluster
  • client_restart (bool) – Whether to restart the client after connection
Returns:

None – The respective options will be set on the namespace

Return type:

None

pyllars.dask_utils.apply_df(data_frame: pandas.core.frame.DataFrame, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]

Distribute calls to func on each row in data_frame across client.

Additionally, args and kwargs are passed to the function call.

Parameters:
  • data_frame (pandas.DataFrame) – A data frame
  • client (dask.distributed.Client) – A dask client
  • func (typing.Callable) – The function to apply to each row in data_frame
  • args – Positional arguments to pass to func
  • kwargs – Keyword arguments to pass to func
  • return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
  • progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
  • priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns:

results – Either the result of each function call or a future which will give the result, depending on the value of return_futures

Return type:

typing.List

pyllars.dask_utils.apply_groups(groups: pandas.core.groupby.generic.DataFrameGroupBy, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]

Distribute calls to func on each group in groups across client.

Additionally, args and kwargs are passed to the function call.

Parameters:
  • groups (pandas.DataFrameGroupBy) – The result of a call to groupby on a data frame
  • client (distributed.Client) – A dask client
  • func (typing.Callable) – The function to apply to each group in groups
  • args – Positional arguments to pass to func
  • kwargs – Keyword arguments to pass to func
  • return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
  • progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
  • priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns:

results – Either the result of each function call or a future which will give the result, depending on the value of return_futures.

Return type:

typing.List

pyllars.dask_utils.apply_iter(it: Iterable, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]

Distribute calls to func on each item in it across client.

Parameters:
  • it (typing.Iterable) – The inputs for func
  • client (dask.distributed.Client) – A dask client
  • func (typing.Callable) – The function to apply to each item in it
  • args – Positional arguments to pass to func
  • kwargs – Keyword arguments to pass to func
  • return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
  • progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
  • priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns:

results – Either the result of each function call or a future which will give the result, depending on the value of return_futures

Return type:

typing.List

pyllars.dask_utils.cancel_all(f_list: Iterable[distributed.client.Future], pending_only=True) → None[source]

Cancel all (pending) tasks in the list

By default, only pending tasks are cancelled.

Parameters:
  • f_list (Iterable[dask.distributed.client.Future]) – The list of futures
  • pending_only (bool) – Whether to cancel only tasks whose status is ‘pending’
Returns:

None – The specified tasks are cancelled.

Return type:

None

pyllars.dask_utils.check_status(f_list: Iterable[distributed.client.Future]) → collections.Counter[source]

Collect the status counts of a list of futures

This is primarily intended to check the status of jobs submitted with the various apply functions when return_futures is True.

Parameters:f_list (typing.List[dask.distributed.client.Future]) – The list of futures
Returns:status_counter – The number of futures with each status
Return type:collections.Counter
pyllars.dask_utils.collect_results(f_list: Iterable[distributed.client.Future], finished_only: bool = True, progress_bar: bool = False) → List[source]

Collect the results from a list of futures

By default, only results from finished tasks will be collected. Thus, the function is (more or less) non-blocking.

Parameters:
  • f_list (typing.List[dask.distributed.client.Future]) – The list of futures
  • finished_only (bool) – Whether to collect only results for jobs whose status is ‘finished’
  • progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
Returns:

results – The results for each (finished, if specified) task

Return type:

typing.List

pyllars.dask_utils.connect(args: argparse.Namespace) → Tuple[distributed.client.Client, Optional[distributed.deploy.local.LocalCluster]][source]

Connect to the dask cluster specifed by the arguments in args

Specifically, this function uses args.cluster_location to determine whether to start a dask.distributed.LocalCluster (in case args.cluster_location is “LOCAL”) or to (attempt to) connect to an existing cluster (any other value).

If a local cluster is started, it will use a number of worker processes equal to args.num_procs. Each process will use args.num_threads_per_proc threads. The scheduler for the local cluster will listen to a random port.

Parameters:args (argparse.Namespace) –

A namespace containing the following fields:

  • cluster_location
  • client_restart
  • num_procs
  • num_threads_per_proc
Returns:
  • client (dask.distributed.Client) – The client for the dask connection
  • cluster (dask.distributed.LocalCluster or None) – If a local cluster is started, the reference to the local cluster object is returned. Otherwise, None is returned.
class pyllars.dask_utils.dask_pipeline(pipeline, dask_client)[source]

This class is a simple wrapper to submit an sklearn pipeline to a dask cluster for fitting.

Examples

my_pipeline = sklearn.pipeline.Pipeline(steps)
d_pipeline = dask_pipeline(my_pipeline, dask_client)
d_pipeline_fit = d_pipeline.fit(X, y)
pipeline_fit = d_pipeline_fit.collect_results()
collect_results()[source]

Collect the “fit” pipeline from dask_client. Then, cleanup the references to the future and client.

fit(X, y)[source]

Submit the call to fit of the underlying pipeline to dask_client

pyllars.dask_utils.get_dask_cmd_options(args: argparse.Namespace) → List[str][source]

Extract the flags and options specified for dask from the parsed arguments.

Presumably, these were added with add_dask_options. This function returns the arguments as an array. Thus, they are suitable for use with subprocess.run and similar functions.

Parameters:args (argparse.Namespace) – The parsed arguments
Returns:dask_options – The list of dask options and their values.
Return type:typing.List[str]