Skip to content

parallel

Dask helper functions.

MyProgressBar

Bases: ProgressBar

Show progress of Dask computations.

Copy of distributed.diagnostics.progressbar.TextProgressBar that:

  • prints to stderr instead of stdout
  • Can have its interval (in seconds) set with TQDM_MININTERVAL environment variable

configure_dask_scheduler(scheduler_address, name, nproc=1)

Context manager that offers a Dask cluster.

If scheduler_address is None then creates a local Dask cluster else returns scheduler_address unchanged and the callee is responsible for cluster cleanup.

Parameters:

Name Type Description Default
scheduler_address str | Cluster | None

Address of the Dask scheduler to connect to, or None for local cluster.

required
name str

Name for the Dask cluster.

required
nproc int

Number of processes to use per worker for CPU support.

1

Yields:

Type Description
str | Cluster

The scheduler address as a string or a cluster.

dask_map_with_progress(client, func, iterable, *args, **kwargs)

Wrapper for map, progress, and gather of Dask that returns a correctly typed list.

Environment variables
  • Set interval (in seconds) of progress updates with TQDM_MININTERVAL
  • Disabled by setting TQDM_DISABLE to any value

Parameters:

Name Type Description Default
client Client

Dask client.

required
func Callable[Concatenate[dask_map_with_progress[T], dask_map_with_progress[**P]], dask_map_with_progress[R]]

Function to map; first parameter comes from iterable and any additional parameters can be provided positionally via *args or as keyword arguments via **kwargs.

required
iterable Collection[dask_map_with_progress[T]]

Collection of arguments to map over.

required
*args args

Additional positional arguments to pass to client.map().

()
**kwargs kwargs

Additional keyword arguments to pass to client.map().

{}

Returns:

Type Description
list[dask_map_with_progress[R]]

List of results of type returned by func function.

nr_cpus()

Determine the number of CPU cores to use.

If the environment variables SLURM_CPUS_PER_TASK or OMP_NUM_THREADS are set, their value is used. Otherwise, the number of physical CPU cores is returned.

Returns:

Type Description
int

The number of CPU cores to use.

Raises:

Type Description
ValueError

If the number of physical CPU cores cannot be determined.