diff --git a/libs/ccc/coef/impl.py b/libs/ccc/coef/impl.py index 29a1f654..18532990 100644 --- a/libs/ccc/coef/impl.py +++ b/libs/ccc/coef/impl.py @@ -1,6 +1,8 @@ """ Contains function that implement the Clustermatch Correlation Coefficient (CCC). """ +from __future__ import annotations + import os from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor from typing import Iterable, Union @@ -515,6 +517,32 @@ def cdist_func(x, y): return max_ari_list, max_part_idx_list, pvalues +def get_n_workers(n_jobs: int | None) -> int: + """ + Helper function to get the number of workers for parallel processing. + + Args: + n_jobs: value specified by the main ccc function. + Returns: + The number of workers to use for parallel processing + """ + n_cpu_cores = os.cpu_count() + if n_cpu_cores is None: + raise ValueError("Could not determine the number of CPU cores. Please specify a positive value of n_jobs") + + n_workers = n_cpu_cores + if n_jobs is None: + return n_workers + + n_workers = os.cpu_count() + n_jobs if n_jobs < 0 else n_jobs + + if n_workers < 1: + raise ValueError(f"The number of threads/processes to use must be greater than 0. Got {n_workers}." + "Please check the n_jobs argument provided") + + return n_workers + + def ccc( x: NDArray, y: NDArray = None, @@ -544,9 +572,10 @@ def ccc( n_chunks_threads_ratio: allows to modify how pairwise comparisons are split across different threads. It's given as the ratio parameter of function get_chunks. - n_jobs: number of CPU cores to use for parallelization. The value + n_jobs: number of CPU cores/threads to use for parallelization. The value None will use all available cores (`os.cpu_count()`), and negative - values will use `os.cpu_count() - n_jobs`. Default is 1. + values will use `os.cpu_count() + n_jobs` (exception will be raised + if this expression yields a result less than 1). Default is 1. pvalue_n_perms: if given, it computes the p-value of the coefficient using the given number of permutations. partitioning_executor: Executor type used for partitioning the data. It @@ -596,7 +625,8 @@ def ccc( X_numerical_type = None if x.ndim == 1 and (y is not None and y.ndim == 1): # both x and y are 1d arrays - assert x.shape == y.shape, "x and y need to be of the same size" + if not x.shape == y.shape: + raise ValueError("x and y need to be of the same size") n_objects = x.shape[0] n_features = 2 @@ -612,10 +642,9 @@ def ccc( # plus we have the features data type (numerical, categorical, etc) if isinstance(x, np.ndarray): - assert get_feature_type_and_encode(x[0, :])[1], ( - "If data is a 2d numpy array, it has to be numerical. Use pandas.DataFrame if " - "you need to mix features with different data types" - ) + if not get_feature_type_and_encode(x[0, :])[1]: + raise ValueError("If data is a 2d numpy array, it has to be numerical. Use pandas.DataFrame if " + "you need to mix features with different data types") n_objects = x.shape[1] n_features = x.shape[0] @@ -639,8 +668,7 @@ def ccc( raise ValueError("Wrong combination of parameters x and y") # get number of cores to use - n_jobs = os.cpu_count() if n_jobs is None else n_jobs - default_n_threads = (os.cpu_count() - n_jobs) if n_jobs < 0 else n_jobs + n_workers = get_n_workers(n_jobs) if internal_n_clusters is not None: _tmp_list = List() @@ -675,11 +703,11 @@ def ccc( max_parts = np.zeros((n_features_comp, 2), dtype=np.uint64) with ( - ThreadPoolExecutor(max_workers=default_n_threads) as executor, - ProcessPoolExecutor(max_workers=default_n_threads) as pexecutor, + ThreadPoolExecutor(max_workers=n_workers) as executor, + ProcessPoolExecutor(max_workers=n_workers) as pexecutor, ): map_func = map - if default_n_threads > 1: + if n_workers > 1: if partitioning_executor == "thread": map_func = executor.map elif partitioning_executor == "process": @@ -695,7 +723,7 @@ def ccc( for f_idx in range(n_features) for c_idx, c in enumerate(range_n_clusters) ], - default_n_threads, + n_workers, n_chunks_threads_ratio, ) @@ -732,7 +760,7 @@ def ccc( cdist_executor = False inner_executor = DummyExecutor() - if default_n_threads > 1: + if n_workers > 1: if n_features_comp == 1: map_func = map cdist_executor = executor @@ -742,14 +770,14 @@ def ccc( map_func = pexecutor.map # iterate over all chunks of object pairs and compute the coefficient - inputs = get_chunks(n_features_comp, default_n_threads, n_chunks_threads_ratio) + inputs = get_chunks(n_features_comp, n_workers, n_chunks_threads_ratio) inputs = [ ( i, n_features, parts, pvalue_n_perms, - default_n_threads, + n_workers, n_chunks_threads_ratio, cdist_executor, inner_executor, diff --git a/tests/test_coef.py b/tests/test_coef.py index 3127c790..359c2728 100644 --- a/tests/test_coef.py +++ b/tests/test_coef.py @@ -1,5 +1,6 @@ from concurrent.futures import ThreadPoolExecutor from random import shuffle +from unittest.mock import patch import time import os @@ -19,6 +20,7 @@ cdist_parts_basic, cdist_parts_parallel, get_chunks, + get_n_workers, ) @@ -1557,3 +1559,27 @@ def test_cm_with_too_few_objects(): ccc(data, internal_n_clusters=3) assert "too few objects" in str(e.value) + + + +@pytest.mark.parametrize("n_jobs, cpu_count, expected", [ + (None, 4, 4), + (2, 4, 2), + (-1, 4, 3), + (6, 4, 6), +]) +def test_get_n_workers_valid(n_jobs, cpu_count, expected): + with patch('os.cpu_count', return_value=cpu_count): + assert get_n_workers(n_jobs) == expected + + +@pytest.mark.parametrize("n_jobs, cpu_count, error_type, error_message", [ + (0, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got 0"), + (-5, 4, ValueError, "The number of threads/processes to use must be greater than 0. Got -1"), + (2, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"), + (None, None, ValueError, "Could not determine the number of CPU cores. Please specify a positive value of n_jobs"), +]) +def test_get_n_workers_invalid(n_jobs, cpu_count, error_type, error_message): + with patch('os.cpu_count', return_value=cpu_count): + with pytest.raises(error_type, match=error_message): + get_n_workers(n_jobs)