Skip to content

parallel

Dask helper functions.

configure_dask_scheduler(scheduler_address, name, nproc=1)

Configure the Dask scheduler by reusing existing or creating a new cluster.

Parameters:

Name Type Description Default
scheduler_address str | Cluster | None

Address of the Dask scheduler to connect to, or None for local cluster.

required
name str

Name for the Dask cluster.

required
nproc int

Number of processes to use per worker for CPU support.

1

Returns:

Type Description
str | Cluster

A Dask Cluster instance or a string address for the scheduler.

dask_map_with_progress(client, func, iterable, *args, **kwargs)

Wrapper for map, progress, and gather of Dask that returns a correctly typed list.

Parameters:

Name Type Description Default
client Client

Dask client.

required
func Callable[Concatenate[dask_map_with_progress[T], dask_map_with_progress[**P]], dask_map_with_progress[R]]

Function to map; first parameter comes from iterable and any additional parameters can be provided positionally via *args or as keyword arguments via **kwargs.

required
iterable Collection[dask_map_with_progress[T]]

Collection of arguments to map over.

required
*args args

Additional positional arguments to pass to client.map().

()
**kwargs kwargs

Additional keyword arguments to pass to client.map().

{}

Returns:

Type Description
list[dask_map_with_progress[R]]

List of results of type returned by func function.

nr_cpus()

Determine the number of CPU cores to use.

If the environment variables SLURM_CPUS_PER_TASK or OMP_NUM_THREADS are set, their value is used. Otherwise, the number of physical CPU cores is returned.

Returns:

Type Description
int

The number of CPU cores to use.

Raises:

Type Description
ValueError

If the number of physical CPU cores cannot be determined.