Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MIPRO Bootstrap + Evaluation Parallelism #1622

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

XenonMolecule
Copy link
Collaborator

This PR includes two main updates:

  1. Enables nested parallelism in Evaluate. Before the way that signal handling for user terminations was incompatible with nested threading. If a user program used multi-threading (for instance calling BootstrapFewshotWithRandomSearch as a meta-optimization within a program they were evaluating or optimizing) then Evaluate would throw an exception. This nested multithreading now works.
  2. MIPRO bootstraps in parallel. Theoretically this change can be applied to BootstrapFewshotWithRandomSearch as well. Now all runs of the program to bootstrap the fewshot demonstrations occur at the same time. This is only enabled by setting bootstrap_parallel=True in the MIPROv2 class (which is False by default). The main TODO is to fix printing on this, because currently all the evaluation bars print simultaneously and make for a bit of an ugly printout.

I would appreciate a more thorough review of this PR since it touches functionality outside of my area of expertise (by modifying Evaluate), and because I am a relative novice when it comes to multithreading in python.

@mikeedjones
Copy link
Contributor

Which exception was thrown with a nested threadpool? An MRE/tests to avoid regressions would be great :)

dspy/evaluate/evaluate.py Outdated Show resolved Hide resolved
Comment on lines +108 to +109
if self.cancel_jobs.is_set():
break # Exit the loop if jobs are cancelled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the other futures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of how the futures work is that when I call f.cancel() below it will prevent all future jobs that haven't started yet from starting. Then for the remaining jobs that are already in progress we catch them here and terminate as soon as they finish.

Was the implementation with signal better? This seemed to be what was causing the exception, but maybe necessary to simultaneously terminate all threads?

Copy link
Contributor

@mikeedjones mikeedjones Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is much better - any event based signaling using threading.Event would require the whatever is spawning Threads to also listen for that event so not really viable. Left some other comments/nits :)

@XenonMolecule
Copy link
Collaborator Author

XenonMolecule commented Oct 14, 2024

Which exception was thrown with a nested threadpool? An MRE/tests to avoid regressions would be great :)

        with inputs:
                Example({'context': 'a boy is not concentrating on a machine', 'question': 'Can we logically conclude for sure that a boy is not concentrating on a typewriter?'}) (input_keys={'context', 'question'})

Stack trace:
        Traceback (most recent call last):
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/evaluate/evaluate.py", line 175, in wrapped_program
    prediction = program(**example.inputs())
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/primitives/program.py", line 26, in __call__
    return self.forward(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/testing/tasks/scone.py", line 68, in forward
    new_prog = optimizer.compile(dspy.ChainOfThought("context,question->answer"), trainset=self.trainset, valset=self.testset)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/teleprompt/random_search.py", line 118, in compile
    score, subscores = evaluate(program, return_all_scores=True)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/evaluate/evaluate.py", line 211, in __call__
    reordered_devset, ncorrect, ntotal = self._execute_multi_thread(
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/evaluate/evaluate.py", line 119, in _execute_multi_thread
    with ThreadPoolExecutor(max_workers=num_threads) as executor, interrupt_handler_manager():
  File "/opt/miniconda3/envs/dspy_maintenance/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/Users/michaelryan/Documents/School/Stanford/misc/dspy_updates/dspy/dspy/evaluate/evaluate.py", line 108, in interrupt_handler_manager
    signal.signal(signal.SIGINT, interrupt_handler)
  File "/opt/miniconda3/envs/dspy_maintenance/lib/python3.12/signal.py", line 58, in signal
    handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: signal only works in main thread of the main interpreter```

# Cancel remaining futures
for f in futures:
f.cancel()
break
Copy link
Contributor

@mikeedjones mikeedjones Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to re-raise the KeyboardInterrupt here? If there's some other logic following on from Evaluate does the process get stopped or do you have to fire the interrupt again? Or does the exception get re-raised below?

Comment on lines +136 to +137
if prediction is job_cancelled:
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to return early so maybe handle this case first?

Comment on lines +145 to +150
self.cancel_jobs.set()
dspy.logger.warning("Received KeyboardInterrupt. Cancelling evaluation.")
# Cancel remaining futures
for f in futures:
f.cancel()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe wrap this up as a local function since the same as above.

Comment on lines +108 to +109
if self.cancel_jobs.is_set():
break # Exit the loop if jobs are cancelled
Copy link
Contributor

@mikeedjones mikeedjones Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is much better - any event based signaling using threading.Event would require the whatever is spawning Threads to also listen for that event so not really viable. Left some other comments/nits :)

@okhat
Copy link
Collaborator

okhat commented Oct 15, 2024

@XenonMolecule I love this and @mikeedjones thanks for the review :D

I want to merge this and to use it. I see failing tests. Is that expected? Are there blockers for merge?

@@ -400,7 +402,7 @@ def _bootstrap_fewshot_examples(
teacher_settings=self.teacher_settings,
seed=seed,
metric_threshold=self.metric_threshold,
rng=self.rng,
parallel_bootstrapping=self.parallel_bootstrapping,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, i gotta check what the semantics are of defaulting to self.attribute.....

Copy link
Collaborator

@okhat okhat Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw why False by default? just in case it breaks something? imo if stable enough, should just use num_threads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially it's in case it breaks something I didn't test for and partially because the printing is currently ugly, but I can change the logic to support num_threads

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we want to handle num_threads? Theoretically BootstrapFewshotWithRandomSearch will call Evaluate which means if num_threads=6 one interpretation is allowing 6*6=36 simultaneous threads, but this is likely not what a user expects. We could allow two different variables for num_threads (num_threads_eval, num_threads_bootstrap), or we could set each to the sqrt of num_threads (with max bootstrap threads being limited to num_candidates+3)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, hmm you're mentioning BootstrapFewshotWithRandomSearch? I thought we're parallelizing across BootstrapFewShot. Is that wrong?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I am parallelizing calls to BootstrapFewShot rather than BootstrapFewShot itself. I see what you are saying though and can refactor to that instead. MIPRO has a util function that basically runs BootstrapFewshotWithRandomSearch just without the final evaluation, so my comment earlier about threads within threads is not a concern if we pick to parallelize EITHER calls to BootstrapFewShot or BootstrapFewShot. Parallelizing BootstrapFewShot has the benefit that it can work for both RandomSearch and MIPRO out of the box.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallelizing BootstrapFewShot has the benefit that it can work for both RandomSearch and MIPRO out of the box.

Yes, that's the better one.

# Dictionary to store the results in order
seed_to_result = {}

with ThreadPoolExecutor(max_workers=num_threads) as executor:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code is probably wrong for a very very subtle reason that probably has zero effect for @XenonMolecule 's usecase but will have for some other people

Launching threads in DSPy is tricky, because (like dspy.Evaluate does) you need to handle dspy.settings.stack_by_tid or whatever it's called

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a good oppty here to refactor dspy.Evaluate's thread launcher into its own reuseable parallelizer for dspy and then use it here too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably wasn't sufficient but I used ctrl-f on the repo and used GitHub search and both times the only use of dspy.settings.stack_by_thread was in Evaluate.py? Might've missed it somewhere else though

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @XenonMolecule , I don't get the response. I'm saying we cannot launch threads with ThreadProolExecutor plainly. It's missing the important steps that dspy.Evaluate does when launching a thread. Not sure if you addressed this comment above (I think no).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't think I addressed this comment fully, my point was that I wasn't sure where else dspy.settings.stack_by_thread was being used, so I was unsure of the expected behavior for this setting. It looked like it was only used in Evaluate.py and nowhere else in the repo. Maybe we can chat offline about how dspy.settings.stack_by_thread needs to be handled.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will very subtly ruin strange things if not handled correctly. Speaking from experience but don't let this one fall to the wayside.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let's make sure we handle threading correctly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do have a good oppty here to refactor dspy.Evaluate's thread launcher into its own reuseable parallelizer for dspy and then use it here too

This would be nice (not sure if it should be a part of this PR) Moreover, it would be even nicer if all the logic for handling various control signals can be handled by this parallelizer, ensuring that we don't deal with them every time we make use of parallelism in DSPy.

@XenonMolecule
Copy link
Collaborator Author

@XenonMolecule I love this and @mikeedjones thanks for the review :D

I want to merge this and to use it. I see failing tests. Is that expected? Are there blockers for merge?

The failing tests only started when I pushed a commit that changed comments, so not sure what happened. Looking into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants