From 0a997cd40e7661850f1d426f596cab615926a854 Mon Sep 17 00:00:00 2001 From: Nick <38217274+NJManganelli@users.noreply.github.com> Date: Tue, 23 Jul 2024 08:53:43 -0400 Subject: [PATCH] fix: set _max_workers in ThreadPoolExecutor with non-None initializer (#1254) * Add _max_workers definition to ThreadPoolExecutor when arg is not None * Add test for ThreadPoolExecutor for None and 1 max_workers * style: pre-commit fixes --------- Co-authored-by: Nick Manganelli Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- src/uproot/source/futures.py | 2 + ..._1254_test_threadpool_executor_for_dask.py | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 tests/test_1254_test_threadpool_executor_for_dask.py diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index 20673a293..53c474d0e 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -217,6 +217,8 @@ def __init__(self, max_workers: int | None = None): import multiprocessing self._max_workers = multiprocessing.cpu_count() + else: + self._max_workers = max_workers self._work_queue = queue.Queue() self._workers = [] diff --git a/tests/test_1254_test_threadpool_executor_for_dask.py b/tests/test_1254_test_threadpool_executor_for_dask.py new file mode 100644 index 000000000..67852de0e --- /dev/null +++ b/tests/test_1254_test_threadpool_executor_for_dask.py @@ -0,0 +1,44 @@ +import pytest +import skhep_testdata + +import uproot + +pytest.importorskip("pandas") + + +def test_decompression_threadpool_executor_for_dask(): + + class TestThreadPoolExecutor(uproot.source.futures.ThreadPoolExecutor): + def __init__(self, max_workers=None): + super().__init__(max_workers=max_workers) + self.submit_count = 0 + + def submit(self, task, /, *args, **kwargs): + self.submit_count += 1 + super().submit(task, *args, **kwargs) + + implicitexecutor = TestThreadPoolExecutor(max_workers=None) + + a = uproot.dask( + {skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"}, + decompression_executor=implicitexecutor, + ) + + a["i4"].compute() + + assert implicitexecutor.max_workers > 0 + + assert implicitexecutor.submit_count > 0 + + explicitexecutor = TestThreadPoolExecutor(max_workers=1) + + b = uproot.dask( + {skhep_testdata.data_path("uproot-sample-6.20.04-uncompressed.root"): "sample"}, + decompression_executor=explicitexecutor, + ) + + b["i4"].compute() + + assert explicitexecutor.max_workers == 1 + + assert explicitexecutor.submit_count > 0