Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mbio airflow dags rx test #4

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

Conversation

ruicatxiao
Copy link
Collaborator

Validated working on 4 sets of 16S ampliconseq datasets, consists of both single-end and paired-end reads

@ruicatxiao ruicatxiao requested a review from d-callan July 15, 2024 14:59
@@ -0,0 +1,5 @@
study,studyPath
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i dont want references to real data in here. those should be in https://github.com/microbiomeDB/amplicon_sequencing. but im ok w test data, so long as were very clear thats what it is. maybe rename this file to include the word 'test' explicitly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Yeah these are all test data

@@ -0,0 +1,5 @@

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file name should probably include the word 'template'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Roger

@@ -0,0 +1 @@
study,timestamp,code_revision
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this file name should probably also include the word test

dags/ampliseq.py Outdated
# When encountering the bug regarding "cannot read served logs", need to clear failed tasks and it will attempt to re-run, should complete most of the time


# Potentially to do list, have the DAG scheduled to run and check on a daily or weekly basis. Alternatively just keep it on a manual triggering
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

id leave it manually triggered for now, this is still in dev

dags/ampliseq.py Outdated
# i think we want a dict of dicts

with DAG(
dag_id="rx_test_automated_ampliseq_v6",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should change this dag id back to automated_ampliseq or whatever it was. can drop the word automated i suppose too if wed like.

dags/ampliseq.py Outdated
'path': path,
'current_timestamp': current_timestamp
})
return studies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar naming issue here i think. it looks like we only add to studies if we plan to run it again. so this should be named something like studies_to_run or similar

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it still needs resolved. its not clear which studies these are just by reading the name of the variable.. all of them? ones already processed? ones we need to process still?

dags/ampliseq.py Outdated Show resolved Hide resolved
dags/ampliseq.py Outdated
nextflow_task = BashOperator.partial(
task_id='nextflow_task',
bash_command=textwrap.dedent("""\
nextflow run nf-core/ampliseq -with-tower -r 2.9.0 \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 2.9.0 here should be replaced w AMPLISEQ_VERSION

dags/ampliseq.py Outdated Show resolved Hide resolved
dags/ampliseq.py Outdated

loaded_studies = load_studies()

with TaskGroup("nextflow_tasks", tooltip="Nextflow processing tasks") as nextflow_tasks:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont prefer making a task group of all the nextflow tasks, and another for all the r tasks. id rather a task group called something like run_ampliseq_with_post_processing and have it have two tasks, one for the nextflow task and the other the rscript per study.

as this is currently, i believe all nextflow tasks will have to complete successfully before any rscript ones will execute. we want instead any given study to move to its own rscript task as soon as its nextflow task completes.

Copy link
Collaborator

@d-callan d-callan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im going to approve, bc i dont see anything really conceptually or functionally wrong here. but id like to see some things renamed for clarity before you merge, please.

@@ -0,0 +1 @@
# Need to rename the files without the "test_" in file name for proper runs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be a README file, and could mention explicitly the paired data repo as an example of how to use this

dags/ampliseq.py Outdated
'path': path,
'current_timestamp': current_timestamp
})
return studies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it still needs resolved. its not clear which studies these are just by reading the name of the variable.. all of them? ones already processed? ones we need to process still?

dags/ampliseq.py Outdated

commands.append(command)
with TaskGroup("processing_tasks", tooltip="Processing tasks") as processing_tasks: # Merged task groups
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can improve the naming here as well.. what is the 'processing_task' doing? assume this dag grows one day and we might have multiple processing tasks. always favor overly specific and long names for things, as opposed to short or vague ones.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some specific suggestions: the task group could be called 'process_amplicon_studies', the nextflow task within that group could be called 'run_ampliseq' and the rscript task could be called 'run_r_postprocessing'

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill add another general rule w the naming of things.. putting 'task' in the name of a task isnt providing any further information to a reader (which might be your future self), so i wouldnt bother personally. you could try always starting names of tasks w a verb, if that helps.

@ruicatxiao ruicatxiao requested a review from d-callan August 21, 2024 18:23
Copy link
Collaborator

@d-callan d-callan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know i made a lot of comments, but i hope its not overwhelming. i think overall youve done a really great job of exploring airflow for the first time, and have tried and discovered some very worthwhile things.

think my chief comment currently is mostly that we should try to work on code factoring so as to produce code easier to maintain

self.log.info("Jobs still running. Waiting...")
return False
else:
self.log.info("No unfinished job found. Task complete.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is just looking for any running tasks, not a specific task it looks like? i think if someone triggers multiple ampliseq studies running remotely simultaneously, that this will mean none of them will copy back from remote until all are finished. that seems less than ideal.

logging.info(f"Executing command: {self.command}")
return super().execute(context)

class JobStatusSensor(BaseSensorOperator):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont like this is inside a dag. classes should have their own homes, and be re-usable across dags.

also, these classes are conceptually similar to cluster manager and cluster job sensor. is there a reason to not use those?

USERNAME = 'ruicatx'
KEY_FILE = os.path.join(BASE_PATH, "chmi_rsa")
REMOTE_HOST_SFTP = 'mercury.pmacs.upenn.edu'
REMOTE_HOST_SSH = 'consign.pmacs.upenn.edu'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of these information id prefer not be the dags responsibility to track. like what is the remote sftp and ssh server, and login creds for the same. the idea behind cluster manager was to delegate that task of managing that information to a reusable class. if you prefer using SSHOperator and SFTPOperator and similar, we can always consider modifying cluster manager.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am glad you investigated these operators though. are they base operators?

REMOTE_HOST_SSH = 'consign.pmacs.upenn.edu'
REMOTE_PATH = '<PATH TO REMOTE DATA DIRECTORY>'
REMOTE_CONFIG = '<PATH TO REMOTE CONFIG>'
POKE_INTERVAL = 600 # Interval in seconds for checking job status
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wouldnt be inclined to making a poke interval as a variable like this. it seems to me that different tasks will likely require different poke intervals if this dag were to grow.

--outdir {REMOTE_PATH}/{study['study']}/out \
-c {REMOTE_CONFIG}/ampliseq.config \
-profile singularity
echo "nextflow_done"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this gets launched from your home dir? nextflow will write logs to a file called .nextflow.log in whatever dir its launched from. its very valuable, as a consequence, to launch nextflow from the study directory. this lets you find the log file for a particular study easily later if needed.

i would add a line something like cd {REMOTE_PATH}/{study['study']} to this script before starting nextflow.

loaded_studies = load_studies()

with TaskGroup("process_amplicon_studies") as process_amplicon_studies:
script_names_nextflow = create_shell_script_nextflow.expand(study=loaded_studies)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is interesting. it looks like rather than attempt to expand on a task group, youve expanded on every task within the task group. that probably is avoiding some of the frustrating issues i was seeing, but is also going to probably produce a very confusing looking graph..

is there a reason (until airflow decides to better support using dynamic task mapping, task groups and branching operators together) to not build the dag manually in a loop? i know you were doing exactly that before and i was the one suggested trying something different.. but thats clearly been more trouble than its worth. particularly if its also caused you to split this dag in two for remote vs local..

@@ -0,0 +1,5 @@
study,studyPath
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont mind having the example files here, but wonder if there is a reason?

it also seems worth this examples dir having its own readme file, pointing people to the github repos where they can see these files as they are in production

CONFIG_PATH = os.path.join(BASE_PATH, "ampliseq.config")
AMPLISEQ_VERSION = '2.9.0'

def create_dag():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if were going to have two ampliseq dags (which id rather avoid but wont push you on, so long as things work and you and dan are both happy).. but if we are going to have two. they should probably be refactored some. we want to avoid duplicated code as much as possible in a production system, bc its impossible to maintain (ask me how i know lol). so id recommend considering some alternatives:

  1. writing some of these tasks that you mean to reuse across both dags into a separate file, wrapping them in a method like 'make_task_X' and have it return a reference to a task
  2. looking into TriggerDagRunOperator
  3. just writing a single dag using a loop to generate the tasks necessary for each study

@@ -0,0 +1,313 @@
from airflow import DAG
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this file?

@@ -0,0 +1,462 @@
import os, csv, logging, tarfile, pendulum, textwrap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also wondering if we want this file? to me, these types of things, if they dont work out but you dont want to lose them, are the kinds of things you let your git commit history remember for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants