diff --git a/lint.sh b/lint.sh index 0dcf5b3..49da032 100755 --- a/lint.sh +++ b/lint.sh @@ -2,6 +2,6 @@ result=0 flake8 shub_workflow/ tests/ --application-import-names=shub_workflow --import-order-style=pep8 result=$(($result | $?)) -mypy --ignore-missing-imports --disable-error-code=method-assign shub_workflow/ tests/ +mypy --ignore-missing-imports --disable-error-code=method-assign --check-untyped-defs shub_workflow/ tests/ result=$(($result | $?)) exit $result diff --git a/shub_workflow/clone_job.py b/shub_workflow/clone_job.py index 36b94a4..e99962f 100644 --- a/shub_workflow/clone_job.py +++ b/shub_workflow/clone_job.py @@ -128,7 +128,7 @@ def add_argparser_options(self): def run(self): if self.args.key: - keys = filter(lambda x: not self.is_cloned_by_jobkey(x), self.args.key) + keys = list(filter(lambda x: not self.is_cloned_by_jobkey(x), self.args.key)) elif self.args.tag_spider: keys = [] project_id, tag, spider = self.args.tag_spider.split("/") diff --git a/shub_workflow/crawl.py b/shub_workflow/crawl.py index 8f19c0d..8c17372 100644 --- a/shub_workflow/crawl.py +++ b/shub_workflow/crawl.py @@ -240,6 +240,9 @@ def __init__(self): self.__next_job_seq = 1 self._jobuids = self.create_dupe_filter() + def get_delayed_jobs(self) -> List[FullJobParams]: + return deepcopy(self.__delayed_jobs) + @classmethod def create_dupe_filter(cls) -> DupesFilterProtocol: return BloomFilter(max_elements=1e6, error_rate=1e-6) diff --git a/shub_workflow/deliver/futils.py b/shub_workflow/deliver/futils.py index cbd5a31..95563e8 100644 --- a/shub_workflow/deliver/futils.py +++ b/shub_workflow/deliver/futils.py @@ -180,13 +180,13 @@ def upload_file(path, dest, aws_key=None, aws_secret=None, aws_token=None, **kwa gcstorage.upload_file(path, dest) -def get_glob(path, aws_key=None, aws_secret=None, aws_token=None, **kwargs): +def get_glob(path, aws_key=None, aws_secret=None, aws_token=None, **kwargs) -> List[str]: region = kwargs.pop("region", None) if check_s3_path(path): fs = S3FileSystem(**s3_credentials(aws_key, aws_secret, aws_token, region), **kwargs) fp = [_S3_ATTRIBUTE + p for p in fs.glob(s3_path(path))] else: - fp = iglob(path) + fp = list(iglob(path)) return fp diff --git a/shub_workflow/deliver/gcstorage.py b/shub_workflow/deliver/gcstorage.py index 5aadc45..0deddfa 100644 --- a/shub_workflow/deliver/gcstorage.py +++ b/shub_workflow/deliver/gcstorage.py @@ -26,12 +26,12 @@ def set_credential_file_environ(module, resource, check_exists=True): os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credfile -def upload_file(src_path, dest_path): +def upload_file(src_path: str, dest_path: str): storage_client = storage.Client() - try: - bucket_name, destination_blob_name = _GS_FOLDER_RE.match(dest_path).groups() - except AttributeError: + m = _GS_FOLDER_RE.match(dest_path) + if m is None: raise ValueError(f"Invalid destination {dest_path}") + bucket_name, destination_blob_name = m.groups() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) blob.upload_from_filename(src_path, retry=storage.retry.DEFAULT_RETRY) diff --git a/shub_workflow/graph/__init__.py b/shub_workflow/graph/__init__.py index 260749d..69dd202 100644 --- a/shub_workflow/graph/__init__.py +++ b/shub_workflow/graph/__init__.py @@ -264,7 +264,7 @@ def run_job(self, job: TaskId, is_retry=False) -> Optional[JobKey]: if task is not None: idx = jobconf["index"] return task.run(self, is_retry, index=idx) - return None + raise RuntimeError(f"Failed to run task {job}") def _must_wait_time(self, job: TaskId) -> bool: status = self.__pending_jobs[job] @@ -300,6 +300,7 @@ def run_pending_jobs(self): if job_can_run: try: jobid = self.run_job(task_id, status["is_retry"]) + assert jobid is not None, f"Failed to run task {task_id}" except Exception: self._release_resources(task_id) raise @@ -330,6 +331,7 @@ def run_pending_jobs(self): if job_can_run: try: jobid = self.run_job(task_id, status["is_retry"]) + assert jobid is not None, f"Failed to run task {task_id}" except Exception: self._release_resources(task_id) raise diff --git a/shub_workflow/graph/task.py b/shub_workflow/graph/task.py index 6c3579d..143f670 100644 --- a/shub_workflow/graph/task.py +++ b/shub_workflow/graph/task.py @@ -43,6 +43,9 @@ class JobGraphDict(TypedDict): origin: NotRequired[TaskId] index: NotRequired[int] + spider: NotRequired[str] + spider_args: NotRequired[Dict[str, str]] + class BaseTask(abc.ABC): def __init__( @@ -283,7 +286,7 @@ def get_spider_args(self): spider_args.update({"job_settings": self.__job_settings}) return spider_args - def as_jobgraph_dict(self): + def as_jobgraph_dict(self) -> JobGraphDict: jdict = super().as_jobgraph_dict() jdict.update({"spider": self.spider, "spider_args": self.get_spider_args()}) return jdict diff --git a/shub_workflow/script.py b/shub_workflow/script.py index e2c76d3..ce2b33e 100644 --- a/shub_workflow/script.py +++ b/shub_workflow/script.py @@ -516,6 +516,14 @@ def _run_loops(self) -> Generator[bool, None, None]: def base_loop_tasks(self): ... + @abc.abstractmethod + def _on_start(self): + ... + + @abc.abstractmethod + def _close(self): + ... + class BaseLoopScript(BaseScript, BaseLoopScriptProtocol): diff --git a/shub_workflow/utils/sesemail.py b/shub_workflow/utils/sesemail.py index 7000cef..5204ed1 100644 --- a/shub_workflow/utils/sesemail.py +++ b/shub_workflow/utils/sesemail.py @@ -11,6 +11,8 @@ import boto3 from botocore.client import Config +from shub_workflow.script import BaseScriptProtocol + logger = logging.getLogger(__name__) @@ -100,7 +102,7 @@ def build_email_message( return msg -class SESMailSenderMixin: +class SESMailSenderMixin(BaseScriptProtocol): """Use this mixin for enabling ses email sending capabilities on your script class""" def __init__(self): diff --git a/tests/test_base_manager.py b/tests/test_base_manager.py index 107407d..5996092 100644 --- a/tests/test_base_manager.py +++ b/tests/test_base_manager.py @@ -55,7 +55,7 @@ def workflow_loop(self): self.assertEqual(manager.name, "my_fantasy_name") manager._on_start() - self.assertFalse(manager._check_resume_workflow.called) + self.assertFalse(mocked_check_resume_workflow.called) @patch("shub_workflow.base.WorkFlowManager._check_resume_workflow") def test_check_resume_workflow_called( @@ -72,7 +72,7 @@ def workflow_loop(self): self.assertEqual(manager.name, "my_fantasy_name") manager._on_start() - self.assertTrue(manager._check_resume_workflow.called) + self.assertTrue(mocked_check_resume_workflow.called) def test_project_id_override(self, mocked_update_metadata, mocked_get_job_tags): class TestManager(WorkFlowManager): diff --git a/tests/test_crawl_manager.py b/tests/test_crawl_manager.py index 97126d1..24d8ef5 100644 --- a/tests/test_crawl_manager.py +++ b/tests/test_crawl_manager.py @@ -4,7 +4,7 @@ from shub_workflow.crawl import CrawlManager, PeriodicCrawlManager, GeneratorCrawlManager from shub_workflow.utils.contexts import script_args -from shub_workflow.script import SpiderName +from shub_workflow.script import SpiderName, Outcome class TestManager(CrawlManager): @@ -76,7 +76,7 @@ def test_schedule_spider(self, mocked_super_schedule_spider, mocked_add_job_tags self.assertTrue(result) # third loop: spider is finished. Stop. - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None mocked_super_schedule_spider.reset_mock() result = next(manager._run_loops()) @@ -121,7 +121,7 @@ def test_schedule_spider_bad_outcome(self, mocked_super_schedule_spider, mocked_ self.assertTrue(result) # third loop: spider is cancelled. Stop. Manager must be closed with cancelled close reason - manager.is_finished = lambda x: "cancelled" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("cancelled") if x == "999/1/1" else None mocked_super_schedule_spider.reset_mock() result = next(manager._run_loops()) @@ -147,7 +147,7 @@ def test_schedule_spider_with_resume(self, mocked_super_schedule_spider, mocked_ manager._on_start() self.assertTrue(manager.is_resumed) self.assertEqual(len(manager._running_job_keys), 1) - self.assertEqual(manager.get_jobs.call_count, len(mocked_get_jobs_side_effect)) + self.assertEqual(mocked_get_jobs.call_count, len(mocked_get_jobs_side_effect)) mocked_add_job_tags.assert_any_call(tags=["FLOW_ID=3a20", "NAME=test", "OTHER=other"]) # first loop: spider still running in workflow. Continue. @@ -156,7 +156,7 @@ def test_schedule_spider_with_resume(self, mocked_super_schedule_spider, mocked_ self.assertTrue(result) # second loop: spider is finished. Stop. - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None result = next(manager._run_loops()) self.assertFalse(result) @@ -177,7 +177,7 @@ def test_schedule_spider_with_resume_not_found( manager._on_start() self.assertFalse(manager.is_resumed) self.assertEqual(len(manager._running_job_keys), 0) - self.assertEqual(manager.get_jobs.call_count, len(mocked_get_jobs_side_effect)) + self.assertEqual(mocked_get_jobs.call_count, len(mocked_get_jobs_side_effect)) @patch("shub_workflow.crawl.WorkFlowManager.schedule_spider") def test_schedule_spider_with_resume_not_owned( @@ -206,7 +206,7 @@ def test_schedule_spider_with_resume_not_owned( self.assertEqual(mocked_super_schedule_spider.call_count, 1) # second loop: spider is finished. Stop. - manager.is_finished = lambda x: "finished" if x == "999/1/2" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/2" else None result = next(manager._run_loops()) self.assertFalse(result) @@ -232,7 +232,7 @@ def test_schedule_spider_periodic(self, mocked_super_schedule_spider, mocked_add self.assertTrue(result) # third loop: spider is finished. Schedule again. - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None mocked_super_schedule_spider.reset_mock() mocked_super_schedule_spider.side_effect = ["999/1/2"] result = next(manager._run_loops()) @@ -241,7 +241,7 @@ def test_schedule_spider_periodic(self, mocked_super_schedule_spider, mocked_add mocked_super_schedule_spider.assert_any_call("myspider", units=None, job_settings={}) # four loop: spider is cancelled. Schedule again. - manager.is_finished = lambda x: "cancelled" if x == "999/1/2" else None + manager.is_finished = lambda x: Outcome("cancelled") if x == "999/1/2" else None mocked_super_schedule_spider.reset_mock() mocked_super_schedule_spider.side_effect = ["999/1/3"] result = next(manager._run_loops()) @@ -278,7 +278,7 @@ def test_schedule_spider_list_bad_outcome_hook( self.assertEqual(mocked_super_schedule_spider.call_count, 2) # third loop: finish one job. We can schedule last one with third set of arguments - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 3) @@ -293,7 +293,7 @@ def test_schedule_spider_list_bad_outcome_hook( # fifth loop: second job finished with failed outcome. Retry according # to bad outcome hook - manager.is_finished = lambda x: "cancelled (stalled)" if x == "999/1/2" else None + manager.is_finished = lambda x: Outcome("cancelled (stalled)") if x == "999/1/2" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) @@ -302,13 +302,13 @@ def test_schedule_spider_list_bad_outcome_hook( ) # sixth loop: third job finished. - manager.is_finished = lambda x: "finished" if x == "999/1/3" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/3" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) # seventh loop: retried job failed again. Retry with new argument. - manager.is_finished = lambda x: "memusage_exceeded" if x == "999/1/4" else None + manager.is_finished = lambda x: Outcome("memusage_exceeded") if x == "999/1/4" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 5) @@ -317,7 +317,7 @@ def test_schedule_spider_list_bad_outcome_hook( ) # eighth loop: retried job finished. Exit. - manager.is_finished = lambda x: "finished" if x == "999/1/5" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/5" else None result = next(manager._run_loops()) self.assertFalse(result) self.assertEqual(mocked_super_schedule_spider.call_count, 5) @@ -330,7 +330,7 @@ class _ListTestManager(GeneratorCrawlManager): name = "test" default_max_jobs = 2 - spider = "myspider" + spider = SpiderName("myspider") def set_parameters_gen(self): parameters_list = [ @@ -366,7 +366,7 @@ class _ListTestManager(GeneratorCrawlManager): name = "test" default_max_jobs = 2 - spider = "myspider" + spider = SpiderName("myspider") def set_parameters_gen(self): parameters_list = [ @@ -400,7 +400,9 @@ def set_parameters_gen(self): project_id=999, tags=["CHECKED", "JOBSEQ=0000000001"], ) - jobid = GeneratorCrawlManager.get_job_unique_id({"spider": "myspider", "spider_args": {"argA": "valA"}}) + jobid = GeneratorCrawlManager.get_job_unique_id( + {"spider": SpiderName("myspider"), "spider_args": {"argA": "valA"}} + ) self.assertTrue(jobid in manager._jobuids) @patch("shub_workflow.crawl.WorkFlowManager.schedule_spider") @@ -409,7 +411,7 @@ class _ListTestManager(GeneratorCrawlManager): name = "test" default_max_jobs = 2 - spider = "myspider" + spider = SpiderName("myspider") def set_parameters_gen(self): parameters_list = [ @@ -452,7 +454,7 @@ def set_parameters_gen(self): ) # second loop: finished second spider. Finish execution - manager.is_finished = lambda x: "finished" if x == "999/2/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/2/1" else None result = next(manager._run_loops()) self.assertEqual(mocked_super_schedule_spider.call_count, 1) self.assertFalse(result) @@ -465,7 +467,7 @@ class _ListTestManager(ListTestManager): name = "test" default_max_jobs = 2 - spider = "myspider" + spider = SpiderName("myspider") def set_parameters_gen(self): parameters_list = [ @@ -505,7 +507,7 @@ def set_parameters_gen(self): # second loop: second job finished with failed outcome. Retry according # to bad outcome hook - manager.is_finished = lambda x: "cancelled (stalled)" + manager.is_finished = lambda x: Outcome("cancelled (stalled)") mocked_super_schedule_spider.side_effect = ["999/2/2"] result = next(manager._run_loops()) self.assertTrue(result) @@ -525,7 +527,7 @@ class _ListTestManager(GeneratorCrawlManager): name = "test" default_max_jobs = 1 - spider = "myspider" + spider = SpiderName("myspider") def set_parameters_gen(self): parameters_list = [ @@ -550,7 +552,7 @@ def set_parameters_gen(self): ) # second loop: finish job with bad outcome, but there is no retry. Stop. - manager.is_finished = lambda x: "cancelled (stalled)" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("cancelled (stalled)") if x == "999/1/1" else None result = next(manager._run_loops()) self.assertFalse(result) self.assertEqual(mocked_super_schedule_spider.call_count, 1) @@ -561,7 +563,7 @@ class _ListTestManager(GeneratorCrawlManager): name = "test" default_max_jobs = 2 - spider = "myspider" + spider = SpiderName("myspider") MAX_RETRIES = 2 @@ -593,7 +595,7 @@ def set_parameters_gen(self): ) # second loop: finish first job with bad outcome, retry 1. - manager.is_finished = lambda x: "cancelled (stalled)" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("cancelled (stalled)") if x == "999/1/1" else None mocked_super_schedule_spider.side_effect = ["999/1/2"] result = next(manager._run_loops()) self.assertTrue(result) @@ -603,13 +605,13 @@ def set_parameters_gen(self): ) # second loop: second job finishes with "cancelled", don't retry it. - manager.is_finished = lambda x: "cancelled" if x == "999/2/1" else None + manager.is_finished = lambda x: Outcome("cancelled") if x == "999/2/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 3) # third loop: first job finishes again with abnormal reason, retry it. - manager.is_finished = lambda x: "cancelled (stalled)" if x == "999/1/2" else None + manager.is_finished = lambda x: Outcome("cancelled (stalled)") if x == "999/1/2" else None mocked_super_schedule_spider.side_effect = ["999/1/3"] result = next(manager._run_loops()) self.assertTrue(result) @@ -619,7 +621,7 @@ def set_parameters_gen(self): ) # fourth loop: first job finishes again with abnormal reason, but max retries reached. Stop. - manager.is_finished = lambda x: "cancelled (stalled)" if x == "999/1/3" else None + manager.is_finished = lambda x: Outcome("cancelled (stalled)") if x == "999/1/3" else None result = next(manager._run_loops()) self.assertFalse(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) @@ -663,7 +665,7 @@ def set_parameters_gen(self): mocked_super_schedule_spider.assert_any_call( spider, units=None, argA=0, tags=[f"JOBSEQ={seq:010d}"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 297) + self.assertEqual(len(manager.get_delayed_jobs()), 297) # second loop. No job finished. manager.is_finished = lambda x: None @@ -673,36 +675,36 @@ def set_parameters_gen(self): # third loop. spiderB job finished. Can schedule new spiderB job. mocked_super_schedule_spider.side_effect = ["999/2/2"] - manager.is_finished = lambda x: "finished" if x == "999/2/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/2/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) mocked_super_schedule_spider.assert_any_call( "spiderB", units=None, argA=1, tags=["JOBSEQ=0000000004"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 296) + self.assertEqual(len(manager.get_delayed_jobs()), 296) # fourth loop. spiderB job finished. Can schedule new spiderB job. mocked_super_schedule_spider.side_effect = ["999/2/3"] - manager.is_finished = lambda x: "finished" if x == "999/2/2" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/2/2" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 5) mocked_super_schedule_spider.assert_any_call( "spiderB", units=None, argA=2, tags=["JOBSEQ=0000000005"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 295) + self.assertEqual(len(manager.get_delayed_jobs()), 295) # fifth loop. spiderC job finished. Can schedule new spiderC job. mocked_super_schedule_spider.side_effect = ["999/3/2"] - manager.is_finished = lambda x: "finished" if x == "999/3/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/3/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 6) mocked_super_schedule_spider.assert_any_call( "spiderC", units=None, argA=1, tags=["JOBSEQ=0000000006"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 294) + self.assertEqual(len(manager.get_delayed_jobs()), 294) @patch("shub_workflow.crawl.WorkFlowManager.schedule_spider") def test_max_jobs_per_spider_with_max_next_params_limit( @@ -742,7 +744,7 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=0, tags=["JOBSEQ=0000000001"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 2nd loop. Schedule spiderB manager.is_finished = lambda x: None @@ -752,17 +754,17 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderB", units=None, argA=0, tags=["JOBSEQ=0000000002"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 3rd loop. SpiderA finishes, but spiderC is next. - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 3) mocked_super_schedule_spider.assert_any_call( "spiderC", units=None, argA=0, tags=["JOBSEQ=0000000003"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 4th loop. Schedule spiderA mocked_super_schedule_spider.side_effect = ["999/1/2"] @@ -773,14 +775,14 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=1, tags=["JOBSEQ=0000000004"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 5th loop. Nothing scheduled. But delayed jobs queue got filled with all next jobs params manager.is_finished = lambda x: None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 296) + self.assertEqual(len(manager.get_delayed_jobs()), 296) @patch("shub_workflow.crawl.WorkFlowManager.schedule_spider") def test_max_jobs_per_spider_with_known_spiders( @@ -823,7 +825,7 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=0, tags=["JOBSEQ=0000000001"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 2nd loop. Schedule spiderB manager.is_finished = lambda x: None @@ -833,17 +835,17 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderB", units=None, argA=0, tags=["JOBSEQ=0000000002"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 3rd loop. SpiderA finishes, but spiderC is next. - manager.is_finished = lambda x: "finished" if x == "999/1/1" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/1" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 3) mocked_super_schedule_spider.assert_any_call( "spiderC", units=None, argA=0, tags=["JOBSEQ=0000000003"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 4th loop. Schedule spiderA mocked_super_schedule_spider.side_effect = ["999/1/2"] @@ -854,45 +856,45 @@ def get_max_next_params(self) -> int: mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=1, tags=["JOBSEQ=0000000004"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 5th loop. Nothing scheduled, and nothing added to delayed jobs. manager.is_finished = lambda x: None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 4) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 0) + self.assertEqual(len(manager.get_delayed_jobs()), 0) # 6th loop. SpiderA finishes and rescheduled. SpiderB and SpiderC need to go to delayed jobs. mocked_super_schedule_spider.side_effect = ["999/1/3"] - manager.is_finished = lambda x: "finished" if x == "999/1/2" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/2" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 5) mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=2, tags=["JOBSEQ=0000000005"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 2) + self.assertEqual(len(manager.get_delayed_jobs()), 2) self.assertEqual(manager.get_delayed_spiders(), {"spiderB", "spiderC"}) # 7th loop. SpiderA finishes, reschedule one more. Two more spiderB and spiderC next params added to # delayed jobs mocked_super_schedule_spider.side_effect = ["999/1/4"] - manager.is_finished = lambda x: "finished" if x == "999/1/3" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/3" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 6) mocked_super_schedule_spider.assert_any_call( "spiderA", units=None, argA=3, tags=["JOBSEQ=0000000006"], job_settings={} ) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 4) + self.assertEqual(len(manager.get_delayed_jobs()), 4) self.assertEqual(manager.get_delayed_spiders(), {"spiderB", "spiderC"}) # 8th loop. SpiderA finishes, but there are no more next params for spiderA. Delayed job become filled # with all next params - manager.is_finished = lambda x: "finished" if x == "999/1/4" else None + manager.is_finished = lambda x: Outcome("finished") if x == "999/1/4" else None result = next(manager._run_loops()) self.assertTrue(result) self.assertEqual(mocked_super_schedule_spider.call_count, 6) - self.assertEqual(len(manager._GeneratorCrawlManager__delayed_jobs), 198) + self.assertEqual(len(manager.get_delayed_jobs()), 198) self.assertEqual(manager.get_delayed_spiders(), {"spiderB", "spiderC"})