Skip to content

Commit

Permalink
Merge pull request #5779 from cylc/8.2.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.2.x-sync into master
  • Loading branch information
oliver-sanders authored Oct 23, 2023
2 parents cd409a2 + 38cedcd commit 30cb615
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 37 deletions.
1 change: 1 addition & 0 deletions changes.d/5776.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that submit-failed tasks are marked as incomplete (so remain visible) when running in back-compat mode.
43 changes: 29 additions & 14 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,21 +531,23 @@ def initiate_data_model(self, reloaded=False):
self.generate_definition_elements()

# Update workflow statuses and totals (assume needed)
self.update_workflow()
self.update_workflow(True)

# Apply current deltas
self.batch_deltas()
self.apply_delta_batch()
# Clear deltas after application
self.clear_delta_store()
self.clear_delta_batch()

if not reloaded:
# Gather this batch of deltas for publish
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()
# Gather the store as batch of deltas for publishing
self.batch_deltas(True)
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

self.updates_pending = False

# Clear deltas after application and publishing
self.clear_delta_store()
# Clear second batch after publishing
self.clear_delta_batch()

def generate_definition_elements(self):
Expand All @@ -563,6 +565,8 @@ def generate_definition_elements(self):
workflow.id = self.workflow_id
workflow.last_updated = update_time
workflow.stamp = f'{workflow.id}@{workflow.last_updated}'
# Treat play/restart as hard reload of definition.
workflow.reloaded = True

graph = workflow.edges
graph.leaves[:] = config.leaves
Expand Down Expand Up @@ -1493,7 +1497,7 @@ def insert_db_job(self, row_idx, row):
tp_delta.jobs.append(j_id)
self.updates_pending = True

def update_data_structure(self, reloaded=False):
def update_data_structure(self):
"""Workflow batch updates in the data structure."""
# load database history for flagged nodes
self.apply_task_proxy_db_history()
Expand All @@ -1520,11 +1524,7 @@ def update_data_structure(self, reloaded=False):
# Apply all deltas
self.apply_delta_batch()

if reloaded:
self.clear_delta_batch()
self.batch_deltas(reloaded=True)

if self.updates_pending or reloaded:
if self.updates_pending:
self.apply_delta_checksum()
# Gather this batch of deltas for publish
self.publish_deltas = self.get_publish_deltas()
Expand All @@ -1535,6 +1535,18 @@ def update_data_structure(self, reloaded=False):
self.clear_delta_batch()
self.clear_delta_store()

def update_workflow_states(self):
"""Batch workflow state updates."""

# update the workflow state in the data store
self.update_workflow()

# push out update deltas
self.batch_deltas()
self.apply_delta_batch()
self.apply_delta_checksum()
self.publish_deltas = self.get_publish_deltas()

def prune_data_store(self):
"""Remove flagged nodes and edges not in the set of active paths."""

Expand Down Expand Up @@ -1807,7 +1819,7 @@ def set_graph_window_extent(self, n_edge_distance):
self.next_n_edge_distance = n_edge_distance
self.updates_pending = True

def update_workflow(self):
def update_workflow(self, reloaded=False):
"""Update workflow element status and state totals."""
# Create new message and copy existing message content
data = self.data[self.workflow_id]
Expand Down Expand Up @@ -1863,6 +1875,9 @@ def update_workflow(self):
w_delta.status_msg = status_msg
delta_set = True

if reloaded is not w_data.reloaded:
w_delta.reloaded = reloaded

if self.schd.pool.main_pool:
pool_points = set(self.schd.pool.main_pool)
oldest_point = str(min(pool_points))
Expand Down
3 changes: 3 additions & 0 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,9 @@ async def subscribe_delta(self, root, info, args):
workflow_id=w_id)
delta_store[DELTA_ADDED] = (
self.data_store_mgr.data[w_id])
delta_store[DELTA_ADDED][
WORKFLOW
].reloaded = True
deltas_queue.put(
(w_id, 'initial_burst', delta_store))
elif w_id in self.delta_store[sub_id]:
Expand Down
38 changes: 16 additions & 22 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ async def command_reload_workflow(self) -> None:
self._update_workflow_state()

# Re-initialise data model on reload
self.data_store_mgr.initiate_data_model(reloaded=True)
self.data_store_mgr.initiate_data_model(self.is_reloaded)

# Reset the remote init map to trigger fresh file installation
self.task_job_mgr.task_remote_mgr.remote_init_map.clear()
Expand Down Expand Up @@ -1548,7 +1548,7 @@ async def workflow_shutdown(self):

# Is the workflow ready to shut down now?
if self.pool.can_stop(self.stop_mode):
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()
self.proc_pool.close()
if self.stop_mode != StopMode.REQUEST_NOW_NOW:
# Wait for process pool to complete,
Expand Down Expand Up @@ -1767,7 +1767,7 @@ async def main_loop(self) -> None:

if has_updated or self.data_store_mgr.updates_pending:
# Update the datastore.
await self.update_data_structure(self.is_reloaded)
await self.update_data_structure()

if has_updated:
if not self.is_reloaded:
Expand Down Expand Up @@ -1838,37 +1838,31 @@ def _update_workflow_state(self):
A cut-down version of update_data_structure which only considers
workflow state changes e.g. status, status message, state totals, etc.
"""
# Publish any existing before potentially creating more
self._publish_deltas()
# update the workflow state in the data store
self.data_store_mgr.update_workflow()

# push out update deltas
self.data_store_mgr.batch_deltas()
self.data_store_mgr.apply_delta_batch()
self.data_store_mgr.apply_delta_checksum()
self.data_store_mgr.publish_deltas = (
self.data_store_mgr.get_publish_deltas()
)
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)

# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
self.data_store_mgr.update_workflow_states()
self._publish_deltas()

async def update_data_structure(self, reloaded: bool = False):
"""Update DB, UIS, Summary data elements"""
# Publish any existing before potentially creating more
self._publish_deltas()
# Collect/apply data store updates/deltas
self.data_store_mgr.update_data_structure(reloaded=reloaded)
# Publish updates:
self.data_store_mgr.update_data_structure()
self._publish_deltas()
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def _publish_deltas(self):
"""Publish pending deltas."""
if self.data_store_mgr.publish_pending:
self.data_store_mgr.publish_pending = False
self.server.publish_queue.put(
self.data_store_mgr.publish_deltas)
# Non-async sleep - yield to other threads rather
# than event loop
sleep(0)
# Database update
self.workflow_db_mgr.put_task_pool(self.pool)

def check_workflow_timers(self):
"""Check timers, and abort or run event handlers as configured."""
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ def remove_if_complete(self, itask):
(C7 failed tasks don't count toward runahead limit)
"""
if cylc.flow.flags.cylc7_back_compat:
if not itask.state(TASK_STATUS_FAILED):
if not itask.state(TASK_STATUS_FAILED, TASK_OUTPUT_SUBMIT_FAILED):
self.remove(itask, 'finished')
if self.compute_runahead():
self.release_runahead_tasks()
Expand Down
59 changes: 59 additions & 0 deletions tests/functional/spawn-on-demand/19-submitted-compat.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------

# Test the submitted and submit-failed triggers work correctly in back-compat
# mode. See https://github.com/cylc/cylc-flow/issues/5771

. "$(dirname "$0")/test_header"
set_test_number 4

init_workflow "${TEST_NAME_BASE}" << __FLOW__
[scheduler]
[[events]]
abort on stall timeout = True
stall timeout = PT0S
[scheduling]
[[graph]]
R1 = """
a
b
"""
[runtime]
[[a]] # should complete
[[b]] # should not complete
platform = broken
__FLOW__

mv "$WORKFLOW_RUN_DIR/flow.cylc" "$WORKFLOW_RUN_DIR/suite.rc"

workflow_run_fail "${TEST_NAME_BASE}-run" \
cylc play "${WORKFLOW_NAME}" --no-detach

grep_workflow_log_ok \
"${TEST_NAME_BASE}-back-compat" \
'Backward compatibility mode ON'
grep_workflow_log_ok \
"${TEST_NAME_BASE}-a-complete" \
'\[1/a running job:01 flows:1\] => succeeded'
grep_workflow_log_ok \
"${TEST_NAME_BASE}-b-incomplete" \
"1/b did not complete required outputs: \['submitted', 'succeeded'\]"

purge

0 comments on commit 30cb615

Please sign in to comment.