Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory usage by master when using clustermq backend #933

Closed
2 of 3 tasks
brendanf opened this issue Jul 9, 2019 · 14 comments
Closed
2 of 3 tasks

Memory usage by master when using clustermq backend #933

brendanf opened this issue Jul 9, 2019 · 14 comments
Assignees

Comments

@brendanf
Copy link
Contributor

brendanf commented Jul 9, 2019

Prework

  • Read and abide by drake's code of conduct.
  • Search for duplicates among the existing issues, both open and closed.
  • Advanced users: verify that the bottleneck still persists in the current development version (i.e. remotes::install_github("ropensci/drake")) and mention the SHA-1 hash of the Git commit you install.

Using drake 7.4.0 and clustermq 0.8.8.

Description

When scheduling tasks using the clustermq backend (in my particular case, to slurm via ssh), the master process sends data to multiple workers concurrently. Depending on the data that needs to be sent, this can result in the master dying due to out-of-memory. In any case, it delays the time to the first worker having all its data and starting to work.

Reproducible example

I'm not sure how to reproduce this locally, so this example would need tweaking for whatever cluster setup you are using.

R script
library(drake)
library(clustermq)

options(clustermq.scheduler = "ssh",
        clustermq.ssh.host = "rackham2",
        clustermq.ssh.log = "cmq_ssh.log",
        clustermq.template = here::here("config/SSH.tmpl"))

plan <- drake_plan(
   big_data = matrix(runif(1e8), nrow = 1e5),
   processing = target(
      {
         Sys.sleep(100)
         big_data
      },
      transform = split(big_data, slices = 100)
   )
)

make(plan,
     memory_strategy = "memory",
     garbage_collection = TRUE,
     parallelism = "clustermq",
     jobs = 100,
     jobs_preprocess = 1,
     console_log_file = "drake.log",
     template = list(project_dir = "oueme-fungi-soils",
                     remote_profile = "config/slurm.Rprofile",
                     memory = 8192,
                     minutes = 10,
                     n_slots = 1,
                     log_file = "logs/slurm_%a.log"))
config/SSH.tmpl
ssh -o "ExitOnForwardFailure yes" -f \
    -R {{ ctl_port }}:localhost:{{ local_port }} \
    -R {{ job_port }}:localhost:{{ fwd_port }} \
    {{ ssh_host }} \
    "conda activate oueme-fungi-soils-drake && \
        cd {{ project_dir | oueme-fungi-soils }} && \
        R_PROFILE={{ remote_profile | config/slurm.Rprofile }} R --no-save --no-restore -e \
        'clustermq:::ssh_proxy(ctl={{ ctl_port }}, job={{ job_port }})' \
        > {{ ssh_log | /dev/null }} 2>&1"
config/slurm.Rprofile
options(clustermq.scheduler = "slurm",
        clustermq.template  = here::here("config/slurm_clustermq.tmpl"))
slurm_clustermq.tmpl
#!/bin/sh
# From https://github.com/mschubert/clustermq/wiki/SLURM
#SBATCH --account=SNIC2018-8-131
#SBATCH --job-name={{ job_name }}           # job name
#SBATCH --partition=core                 # partition
#SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array index
#SBATCH --error={{ log_file | /dev/null }}  # log file
#SBATCH --mem-per-cpu={{ memory | 8192 }}   # memory
#SBATCH --cpus-per-task={{ n_slots | 1 }}
#SBATCH --time={{ minutes | 6:00:00 }}
#SBATCH --array=1-{{ n_jobs }}              # job array

conda activate oueme-fungi-soils-drake
ulimit -v $(( 1024 * {{ memory | 8192 }}  * {{ n_slots | 1 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Benchmarks

drake.log
astrophytum | 4059 | 2019-07-09 11:01:31.437780 | begin make()
astrophytum | 4059 | 2019-07-09 11:01:31.438837 | begin drake_config()
astrophytum | 4059 | 2019-07-09 11:01:31.440881 | cache /home/brendan/Documents/Uppsala/Projects/oueme-fungi-soils/.drake
astrophytum | 4059 | 2019-07-09 11:01:31.450449 | analyze environment
astrophytum | 4059 | 2019-07-09 11:01:31.453230 | analyze plan
astrophytum | 4059 | 2019-07-09 11:01:31.455081 | get knitr hash
astrophytum | 4059 | 2019-07-09 11:01:31.461256 | analyze big_data
astrophytum | 4059 | 2019-07-09 11:01:31.462165 | analyze processing_1
astrophytum | 4059 | 2019-07-09 11:01:31.463033 | analyze processing_2
astrophytum | 4059 | 2019-07-09 11:01:31.463799 | analyze processing_3
...
astrophytum | 4059 | 2019-07-09 11:01:31.533810 | analyze processing_98
astrophytum | 4059 | 2019-07-09 11:01:31.534364 | analyze processing_99
astrophytum | 4059 | 2019-07-09 11:01:31.534910 | analyze processing_100
astrophytum | 4059 | 2019-07-09 11:01:31.537741 | set knitr files
astrophytum | 4059 | 2019-07-09 11:01:31.539416 | connect plan
astrophytum | 4059 | 2019-07-09 11:01:31.539634 | connect big_data
astrophytum | 4059 | 2019-07-09 11:01:31.539803 | connect processing_1
astrophytum | 4059 | 2019-07-09 11:01:31.539984 | connect processing_2
astrophytum | 4059 | 2019-07-09 11:01:31.540160 | connect processing_3
...
astrophytum | 4059 | 2019-07-09 11:01:31.556891 | connect processing_98
astrophytum | 4059 | 2019-07-09 11:01:31.557058 | connect processing_99
astrophytum | 4059 | 2019-07-09 11:01:31.557241 | connect processing_100
astrophytum | 4059 | 2019-07-09 11:01:31.557874 | connect output files
astrophytum | 4059 | 2019-07-09 11:01:31.562491 | end drake_config()
astrophytum | 4059 | 2019-07-09 11:01:32.189539 | begin outdated()
astrophytum | 4059 | 2019-07-09 11:01:32.190873 | find more outdated targets
astrophytum | 4059 | 2019-07-09 11:01:32.193397 | find downstream outdated targets
astrophytum | 4059 | 2019-07-09 11:01:32.196485 | end outdated()
astrophytum | 4059 | 2019-07-09 11:01:32.196684 | isolate oudated targets
astrophytum | 4059 | 2019-07-09 11:01:32.197874 | construct priority queue
astrophytum | 4059 | 2019-07-09 11:01:37.656253 | setting common data
astrophytum | 4059 | 2019-07-09 11:01:37.664995 | begin scheduling targets
astrophytum | 4059 | 2019-07-09 11:04:06.965182 | sending common data
astrophytum | 4059 | 2019-07-09 11:04:07.814152 | target big_data
astrophytum | 4059 | 2019-07-09 11:04:07.936526 | sending common data
astrophytum | 4059 | 2019-07-09 11:04:07.936985 | sending common data
astrophytum | 4059 | 2019-07-09 11:04:07.937368 | sending common data
...
astrophytum | 4059 | 2019-07-09 11:04:16.642524 | sending common data
astrophytum | 4059 | 2019-07-09 11:04:16.642812 | sending common data
astrophytum | 4059 | 2019-07-09 11:04:16.643099 | sending common data
astrophytum | 4059 | 2019-07-09 11:05:17.057571 | store big_data
astrophytum | 4059 | 2019-07-09 11:07:16.540973 | time  big_data  4.849s | 188.21s (~3.14 minutes)  (exec | total)
astrophytum | 4059 | 2019-07-09 11:07:16.543537 | target processing_1
astrophytum | 4059 | 2019-07-09 11:07:18.485839 | target processing_2
astrophytum | 4059 | 2019-07-09 11:07:20.448040 | target processing_3
astrophytum | 4059 | 2019-07-09 11:07:24.064041 | target processing_4
astrophytum | 4059 | 2019-07-09 11:07:45.972504 | target processing_5
astrophytum | 4059 | 2019-07-09 11:08:19.861755 | target processing_6
astrophytum | 4059 | 2019-07-09 11:08:25.591622 | target processing_7
astrophytum | 4059 | 2019-07-09 11:09:09.549872 | target processing_8
astrophytum | 4059 | 2019-07-09 11:09:50.697039 | target processing_9
astrophytum | 4059 | 2019-07-09 11:10:34.376859 | target processing_10
astrophytum | 4059 | 2019-07-09 11:11:16.411776 | target processing_11
astrophytum | 4059 | 2019-07-09 11:12:18.440834 | target processing_12
astrophytum | 4059 | 2019-07-09 11:13:19.942966 | target processing_13

The first few targets are assigned at a rate of about one every 2 seconds. After that, my computer is starting to swap, and targets are started more slowly, by the end about one per minute. Total memory usage by R was about 10GB when it crashed during processing_13. big_data is about 750MB. Outgoing network utilization was about 15-20MB/s while the targets were being queued. I also tried a smaller instance with big_data around 75MB, which completed successfully. The data transfer continued for several minutes after the last job was queued, verifying that new jobs are being queued and their data loaded into memory before the data from earlier jobs are sent.

I reduced the memory usage and total transfer size by doing the split locally as a separate target:

plan <- drake_plan(
   big_data = target(
      matrix(runif(1e8), nrow = 1e5),
      hpc = FALSE
   ),
   split_data = target(
      big_data,
      transform = split(big_data, slices = 100),
      hpc = FALSE
   ),
   processing = target(
      {
         Sys.sleep(100)
         split_data
      },
      transform = map(split_data, .id = FALSE),
      hpc = TRUE
   )
)

This should probably be best practice when using drake_split on a distributed system, especially over ssh; otherwise the whole data is sent to every worker, and then split. In my case, this increased the number of jobs that could be scheduled before crashing, but did not get me through my whole plan.

Reducing the number of workers is successful, but that defeats the purpose of high performance computing.

@brendanf
Copy link
Contributor Author

brendanf commented Jul 9, 2019

Accidentally submitted without writing anything, but it's coming...

@brendanf
Copy link
Contributor Author

brendanf commented Jul 9, 2019

Now it's ready.

@wlandau
Copy link
Member

wlandau commented Jul 9, 2019

Thanks for sending. Memory usage has gone way up on my priority list. For your use case, I recommend make(caching = "worker"). That way, we avoid sending lots of data over the SSH connection.

drake/R/backend-clustermq.R

Lines 101 to 106 in a082e5f

if (identical(config$caching, "master")) {
manage_memory(target = target, config = config, jobs = 1)
deps <- cmq_deps_list(target = target, config = config)
} else {
deps <- NULL
}

Related: in the development version, make(memory_strategy = "autoclean") will unload big_data right after it is built (https://ropenscilabs.github.io/drake-manual/memory.html).

@wlandau
Copy link
Member

wlandau commented Jul 9, 2019

Above, I meant to say that we can reduce data sent over the SSH connection with worker caching (as long as the SLURM cluster has access to the cache as a mounted file system).

By the way, I really like your idea of splitting locally first. I am actually using it on a real project today.

@wlandau
Copy link
Member

wlandau commented Jul 9, 2019

Oops, closed the wrong issue.

@wlandau wlandau reopened this Jul 9, 2019
@wlandau
Copy link
Member

wlandau commented Jul 9, 2019

One disadvantage of splitting locally is that we lose some concurrency. Not sure what we can do about that right now.

@brendanf
Copy link
Contributor Author

Above, I meant to say that we can reduce data sent over the SSH connection with worker caching (as long as the SLURM cluster has access to the cache as a mounted file system).

Unfortunately, that is not my case. My whole purpose for using the clustermq ssh backend is because I don't have any shared filesystem between my local machine and the cluster, but I want to have access to the cache locally.

I think it would help me to use a blocking connection to transfer dependencies to workers, or at least to be able to limit the number of simultaneous transfers. I'm not sure whether that's a drake issue or a clustermq issue.

@wlandau
Copy link
Member

wlandau commented Jul 10, 2019

I think it would help me to use a blocking connection to transfer dependencies to workers, or at least to be able to limit the number of simultaneous transfers. I'm not sure whether that's a drake issue or a clustermq issue.

Unfortunately, it goes deeper than that. In rzmq, the data is part of the message, so the command and the data are sent together.. @mschubert, will memory-conscious blocking be possible after mschubert/clustermq#151?

@wlandau
Copy link
Member

wlandau commented Jul 10, 2019

Unfortunately, that is not my case. My whole purpose for using the clustermq ssh backend is because I don't have any shared filesystem between my local machine and the cluster, but I want to have access to the cache locally.

How important is this? My usual recommendation is to run everything on the cluster itself, even the master process that calls make(): https://ropenscilabs.github.io/drake-manual/hpc.html#master-on-a-cluster. make() takes a long time to run, so I think it really belongs on a compute node and not a local machine. It frees you up to do other stuff locally, your workflow is less likely to get interrupted, and you avoid setting your computer on fire.

@brendanf
Copy link
Contributor Author

How important is this? My usual recommendation is to run everything on the cluster itself, even the master process that calls make()

That's what I have done in the past. I thought this would be nice in that it allows more direct interactivity with the results, rather than writing everything of interest to files and transferring them.

Can I transfer the cache using e.g. rsync?

@mschubert, will memory-conscious blocking be possible after mschubert/clustermq#151?

@mschubert It looks like pbdZMQ uses blocking connections by default. This is also the behavior of the rzmq-compatibility wrapper function, so blocking will come along for the ride by default if clustermq switches to using pbdZMQ.

@mschubert
Copy link

As much as I'd like to blame rzmq here, I don't think that is what's happening and switching to pbdZMQ will not change how the clustermq sockets communicate (I will interface with the C library instead of the R functions, which is required for socket monitoring anyway).

The way drake interfaces with clustermq is just not set out for efficiency over SSH. In fact, it's not set out for efficiency at all, this is why @wlandau implemented his own worker caching: Every call that drake does needs to come with its own data (for now).

Of course, data should only be transferred once (clustermq does this for batch processing, but not via drake) and the worker caching by drake shouldn't be necessary. This is on my list (with mschubert/clustermq#154), but chances that I'll have time to fix this in the next couple of weeks are quite low.

Once that's fixed we'll also have to adjust the way drake interfaces with clustermq. I've openend a new issue to track this: mschubert/clustermq#160 and https://github.com/mschubert/clustermq/projects/4.

@brendanf
Copy link
Contributor Author

@mschubert That does seem like a more complete solution to the issue of transferring the same data many times. In the case where different targets are using different input data, however, this issue would still be causing a crash.

The problem is that all of these transfers are being started concurrently. If Qsys$private$send() blocked execution until the transfer was complete, then the memory requirements would be limited to the dependencies for one target. However, the current situation is that Qsys$private$send() returns as soon as the transfer is scheduled (because rzmq::send.socket() returns as soon as the transfer is scheduled), and so drake schedules the next target, which means filling the buffer for the next transfer before the first has finished. The result is that dependencies for many targets (whether they are the same data or not) are buffered on the master at the same time.

@mschubert
Copy link

Good point! Now tracked here: mschubert/clustermq#161

@wlandau
Copy link
Member

wlandau commented Jul 12, 2019

Excellent. Looks like future work on clustermq can cover this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants