Dask utilities¶
This module contains helpers for using dask: https://dask.pydata.org/en/latest/
Starting or connecting to a server¶
connect (args) |
Connect to the dask cluster specifed by the arguments in args |
add_dask_options (parser, num_procs, …) |
Add options for connecting to and/or controlling a local dask cluster |
add_dask_values_to_args (args, num_procs, …) |
Add the options for a dask cluster to the given argparse namespace |
Submitting jobs¶
apply_iter (it, client, func, *args, …) |
Distribute calls to func on each item in it across client. |
apply_df (data_frame, client, func, *args, …) |
Distribute calls to func on each row in data_frame across client. |
apply_groups (groups, client, func, *args, …) |
Distribute calls to func on each group in groups across client. |
Other dask helpers¶
check_status (f_list) |
Collect the status counts of a list of futures |
collect_results (f_list, finished_only, …) |
Collect the results from a list of futures |
cancel_all (f_list[, pending_only]) |
Cancel all (pending) tasks in the list |
scikit-learn helpers¶
dask_pipeline (pipeline, dask_client) |
This class is a simple wrapper to submit an sklearn pipeline to a dask cluster for fitting. |
Definitions¶
This module contains helpers for using dask: https://dask.pydata.org/en/latest/
-
pyllars.dask_utils.
add_dask_options
(parser: argparse.ArgumentParser, num_procs: int = 1, num_threads_per_proc: int = 1, cluster_location: str = 'LOCAL') → None[source]¶ Add options for connecting to and/or controlling a local dask cluster
Parameters: - parser (argparse.ArgumentParser) – The parser to which the options will be added
- num_procs (int) – The default number of processes for a local cluster
- num_threads_per_proc (int) – The default number of threads for each process for a local cluster
- cluster_location (str) – The default location of the cluster
Returns: None – A “dask cluster options” group is added to the parser
Return type:
-
pyllars.dask_utils.
add_dask_values_to_args
(args: argparse.Namespace, num_procs: int = 1, num_threads_per_proc: int = 1, cluster_location: str = 'LOCAL', client_restart: bool = False) → None[source]¶ Add the options for a dask cluster to the given argparse namespace
This function is mostly intended as a helper for use in ipython notebooks.
Parameters: - args (argparse.Namespace) – The namespace on which the arguments will be set
- num_procs (int) – The number of processes for a local cluster
- num_threads_per_proc (int) – The number of threads for each process for a local cluster
- cluster_location (str) – The location of the cluster
- client_restart (bool) – Whether to restart the client after connection
Returns: None – The respective options will be set on the namespace
Return type:
-
pyllars.dask_utils.
apply_df
(data_frame: pandas.core.frame.DataFrame, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]¶ Distribute calls to func on each row in data_frame across client.
Additionally, args and kwargs are passed to the function call.
Parameters: - data_frame (pandas.DataFrame) – A data frame
- client (dask.distributed.Client) – A dask client
- func (typing.Callable) – The function to apply to each row in data_frame
- args – Positional arguments to pass to func
- kwargs – Keyword arguments to pass to func
- return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
- progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
- priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns: results – Either the result of each function call or a future which will give the result, depending on the value of return_futures
Return type:
-
pyllars.dask_utils.
apply_groups
(groups: pandas.core.groupby.generic.DataFrameGroupBy, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]¶ Distribute calls to func on each group in groups across client.
Additionally, args and kwargs are passed to the function call.
Parameters: - groups (pandas.DataFrameGroupBy) – The result of a call to groupby on a data frame
- client (distributed.Client) – A dask client
- func (typing.Callable) – The function to apply to each group in groups
- args – Positional arguments to pass to func
- kwargs – Keyword arguments to pass to func
- return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
- progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
- priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns: results – Either the result of each function call or a future which will give the result, depending on the value of return_futures.
Return type:
-
pyllars.dask_utils.
apply_iter
(it: Iterable, client: distributed.client.Client, func: Callable, *args, return_futures: bool = False, progress_bar: bool = True, priority: int = 0, **kwargs) → List[source]¶ Distribute calls to func on each item in it across client.
Parameters: - it (typing.Iterable) – The inputs for func
- client (dask.distributed.Client) – A dask client
- func (typing.Callable) – The function to apply to each item in it
- args – Positional arguments to pass to func
- kwargs – Keyword arguments to pass to func
- return_futures (bool) – Whether to wait for the results (False, the default) or return a list of dask futures (when True). If a list of futures is returned, the result method should be called on each of them at some point before attempting to use the results.
- progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
- priority (int) – The priority of the submitted tasks. Please see the dask documentation for more details: http://distributed.readthedocs.io/en/latest/priority.html
Returns: results – Either the result of each function call or a future which will give the result, depending on the value of return_futures
Return type:
-
pyllars.dask_utils.
cancel_all
(f_list: Iterable[distributed.client.Future], pending_only=True) → None[source]¶ Cancel all (pending) tasks in the list
By default, only pending tasks are cancelled.
Parameters: - f_list (Iterable[dask.distributed.client.Future]) – The list of futures
- pending_only (bool) – Whether to cancel only tasks whose status is ‘pending’
Returns: None – The specified tasks are cancelled.
Return type:
-
pyllars.dask_utils.
check_status
(f_list: Iterable[distributed.client.Future]) → collections.Counter[source]¶ Collect the status counts of a list of futures
This is primarily intended to check the status of jobs submitted with the various apply functions when return_futures is True.
Parameters: f_list (typing.List[dask.distributed.client.Future]) – The list of futures Returns: status_counter – The number of futures with each status Return type: collections.Counter
-
pyllars.dask_utils.
collect_results
(f_list: Iterable[distributed.client.Future], finished_only: bool = True, progress_bar: bool = False) → List[source]¶ Collect the results from a list of futures
By default, only results from finished tasks will be collected. Thus, the function is (more or less) non-blocking.
Parameters: - f_list (typing.List[dask.distributed.client.Future]) – The list of futures
- finished_only (bool) – Whether to collect only results for jobs whose status is ‘finished’
- progress_bar (bool) – Whether to show a progress bar when waiting for results. The parameter is only relevant when return_futures is False.
Returns: results – The results for each (finished, if specified) task
Return type:
-
pyllars.dask_utils.
connect
(args: argparse.Namespace) → Tuple[distributed.client.Client, Optional[distributed.deploy.local.LocalCluster]][source]¶ Connect to the dask cluster specifed by the arguments in args
Specifically, this function uses args.cluster_location to determine whether to start a dask.distributed.LocalCluster (in case args.cluster_location is “LOCAL”) or to (attempt to) connect to an existing cluster (any other value).
If a local cluster is started, it will use a number of worker processes equal to args.num_procs. Each process will use args.num_threads_per_proc threads. The scheduler for the local cluster will listen to a random port.
Parameters: args (argparse.Namespace) – A namespace containing the following fields:
- cluster_location
- client_restart
- num_procs
- num_threads_per_proc
Returns: - client (dask.distributed.Client) – The client for the dask connection
- cluster (dask.distributed.LocalCluster or None) – If a local cluster is started, the reference to the local cluster object is returned. Otherwise, None is returned.
-
class
pyllars.dask_utils.
dask_pipeline
(pipeline, dask_client)[source]¶ This class is a simple wrapper to submit an sklearn pipeline to a dask cluster for fitting.
Examples
my_pipeline = sklearn.pipeline.Pipeline(steps) d_pipeline = dask_pipeline(my_pipeline, dask_client) d_pipeline_fit = d_pipeline.fit(X, y) pipeline_fit = d_pipeline_fit.collect_results()
-
pyllars.dask_utils.
get_dask_cmd_options
(args: argparse.Namespace) → List[str][source]¶ Extract the flags and options specified for dask from the parsed arguments.
Presumably, these were added with add_dask_options. This function returns the arguments as an array. Thus, they are suitable for use with subprocess.run and similar functions.
Parameters: args (argparse.Namespace) – The parsed arguments Returns: dask_options – The list of dask options and their values. Return type: typing.List[str]