Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to Parsl HighThroughputExecutor for default workflow execution #97

Merged
merged 17 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cytotable/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from cloudpathlib.exceptions import InvalidPrefixError
from parsl.app.app import AppBase
from parsl.config import Config
from parsl.executors.threads import ThreadPoolExecutor
from parsl.executors import HighThroughputExecutor

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -126,7 +126,11 @@ def _default_parsl_config():
Return a default Parsl configuration for use with CytoTable.
"""
return Config(
executors=[ThreadPoolExecutor(max_threads=MAX_THREADS, label="local_threads")]
executors=[
HighThroughputExecutor(
label="htex_default_for_cytotable",
)
]
)


Expand Down
108 changes: 96 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""
conftest.py for pytest
"""

# pylint: disable=line-too-long,unused-argument

import pathlib
import shutil
import socket
import sqlite3
import subprocess
import tempfile
from contextlib import closing
from typing import Any, Dict, Generator, List, Tuple

import boto3
Expand All @@ -17,18 +22,62 @@
import pytest
from moto import mock_s3
from moto.server import ThreadedMotoServer
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from pyarrow import csv, parquet
from pycytominer.cyto_utils.cells import SingleCells

from cytotable.utils import _column_sort, _default_parsl_config
from cytotable.utils import _column_sort, _default_parsl_config, _parsl_loaded


@pytest.fixture(name="clear_parsl_config", scope="module")
def fixture_clear_parsl_config() -> None:
"""
Fixture for clearing previously set parsl configurations.

This is primarily used with the load_parsl_* fixtures in order to avoid
issues with overlapping and sometimes mixed sequence test execution.
"""

# check for previously loaded configuration
if _parsl_loaded():
# clear the previous config
parsl.clear()


@pytest.fixture(name="load_parsl", scope="session", autouse=True)
def fixture_load_parsl() -> None:
@pytest.fixture(name="load_parsl_threaded", scope="module")
def fixture_load_parsl_threaded(clear_parsl_config: None) -> None:
"""
Fixture for loading parsl for tests
Fixture for loading parsl ThreadPoolExecutor for testing

See the following for more details.
https://parsl.readthedocs.io/en/stable/stubs/parsl.executors.ThreadPoolExecutor.html

Note: we use the threadpoolexecutor in some occasions to avoid issues
with multiprocessing in moto / mocked S3 environments.
See here for more: https://docs.getmoto.org/en/latest/docs/faq.html#is-moto-concurrency-safe
"""
parsl.load(_default_parsl_config())

parsl.load(
Config(executors=[ThreadPoolExecutor(label="tpe_for_cytotable_testing")])
)


@pytest.fixture(name="load_parsl_default", scope="module")
def fixture_load_parsl_default(clear_parsl_config: None) -> None:
"""
Fixture for loading default cytotable parsl config for tests

This leverages Parsl's HighThroughputExecutor.
See here for more: https://parsl.readthedocs.io/en/stable/stubs/parsl.executors.HighThroughputExecutor.html
"""

config = _default_parsl_config()
# note: we add the debug option here for testing from the default
# referenced in configuration to help observe testing issues
config.executors[0].worker_debug = True

parsl.load(config)


# note: we use name here to avoid pylint flagging W0621
Expand Down Expand Up @@ -522,10 +571,46 @@ def fixture_cytominerdatabase_merged_cellhealth(
return control_result


@pytest.fixture(scope="session", name="infer_open_port")
def fixture_infer_open_port() -> int:
"""
Infers an open port for use with tests.
"""

# Referenced with modifications from https://stackoverflow.com/a/45690594/22216869.
# Note: this implementation opens, temporarily uses an available port, and returns
# that same port for use in tests. The contextlib.closing context relieves the use
# of the returned available port.

# Context for a socket which is opened and automatically closed
# using family=AF_INET (internet address family socket default)
# and type=SOCK_STREAM (a socket stream)
with closing(
socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
) as open_socket:
# Bind the socket to address of format (hostname, port),
# in this case, localhost and port 0.
# Using 0 indicates to use an available open port for this work.
# see: https://docs.python.org/3/library/socket.html#socket-families
open_socket.bind(("localhost", 0))

# Set the value of 1 to SO_REUSEADDR as a socket option.
# see bottom of: https://docs.python.org/3/library/socket.html
# "The SO_REUSEADDR flag tells the kernel to reuse a local socket in TIME_WAIT state,
# without waiting for its natural timeout to expire."
open_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# Return the port value of the socket address of format (hostname, port).
return open_socket.getsockname()[1]


@pytest.fixture(scope="session", name="s3_session")
def fixture_s3_session() -> boto3.session.Session:
def fixture_s3_session(
infer_open_port: int,
) -> Generator[Tuple[boto3.session.Session, int], None, None]:
"""
Yield a mocked boto session for s3 tests.
Return includes port related to session.

Referenced from:
https://docs.getmoto.org/en/latest/docs/getting_started.html
Expand All @@ -534,16 +619,16 @@ def fixture_s3_session() -> boto3.session.Session:
"""

# start a moto server for use in testing
server = ThreadedMotoServer()
server = ThreadedMotoServer(port=infer_open_port)
server.start()

with mock_s3():
yield boto3.session.Session()
yield boto3.session.Session(), infer_open_port


@pytest.fixture()
def example_s3_endpoint(
s3_session: boto3.session.Session,
s3_session: Tuple[boto3.session.Session, int],
example_local_sources: Dict[str, List[Dict[str, Any]]],
data_dir_cellprofiler_sqlite_nf1: str,
) -> str:
Expand All @@ -554,12 +639,11 @@ def example_s3_endpoint(
https://docs.getmoto.org/en/latest/docs/getting_started.html
"""
# s3 is a fixture defined above that yields a boto3 s3 client.
# Feel free to instantiate another boto3 S3 client -- Keep note of the region though.
endpoint_url = "http://localhost:5000"
endpoint_url = f"http://localhost:{s3_session[1]}"
bucket_name = "example"

# create s3 client
s3_client = s3_session.client("s3", endpoint_url=endpoint_url)
s3_client = s3_session[0].client("s3", endpoint_url=endpoint_url)

# create a bucket for content to land in
s3_client.create_bucket(Bucket=bucket_name)
Expand Down
Loading