Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] how to combine with ray.data #1987

Open
1 of 5 tasks
mdy666 opened this issue Nov 10, 2024 · 2 comments
Open
1 of 5 tasks

[Bug] how to combine with ray.data #1987

mdy666 opened this issue Nov 10, 2024 · 2 comments

Comments

@mdy666
Copy link

mdy666 commented Nov 10, 2024

Checklist

  • 1. I have searched related issues but cannot get the expected help.
  • 2. The bug has not been fixed in the latest version.
  • 3. Please note that if the bug-related issue you submitted lacks corresponding environment info and a minimal reproducible demo, it will be challenging for us to reproduce and resolve the issue, reducing the likelihood of receiving feedback.
  • 4. If the issue you raised is not a bug but a question, please raise a discussion at https://github.com/sgl-project/sglang/discussions/new/choose Otherwise, it will be closed.
  • 5. Please use English, otherwise it will be closed.

Describe the bug

ray.data.Dataset.map_batches(**args) is ok with vllm, i replace vllm.LLm with sglang.Engine, it's wrong, how to make it? Because ray.data is easy to manage data. Looking forward to you can help me

Running 0: 0 bundle [00:00, ? bundle/s]2024-11-10 23:01:22,492 ERROR streaming_executor_state.py:456 -- An exception was raised from a task of operator "MapBatches(LLM)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
⚠️ Dataset execution failed: : 0 bundle [00:00, ? bundle/s]

  • ReadJSON->SplitBlocks(48): 1 active, 0 queued, [cpu: 1.0, objects: 20.5KB]: : 12 bundle [00:00, 187.92 bundle/s]
    2024-11-10 23:01:22,504(WARNING actor_pool_map_operator.py:265 -- To ensure full parallelization across an actor pool of size 1, the Dataset should consist of at least 1 distinct blocks. Consider increasing the parallelism when creating the Dataset.
  • MapBatches(LLM): 1 active, 0 queued, [cpu: 0.0, gpu: 1.0, objects: 256.0MB], 1 actors [locality off]: : 0 bundle [00:00, ? bundle/s]
  • Write: 0 active, 0 queued, [cpu: 0.0, objects: 0.0B]: : 0 bundle [00:00, ? bundle/s]
    2024-11-10 23:01:22,515tERROR exceptions.py:63 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log files at /tmp/ray/session_2024-11-10_23-00-50_100958_208716/logs/ray-data. To output the full stack trace to stdout, set DataContext.log_internal_stack_trace_to_stdout to True.e/s]
    Traceback (most recent call last):
    File "/data/repo/batch_infer/sglang_infer.py", line 129, in
    new_ds.write_json(args.output_path)
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/dataset.py", line 2888, in write_json
    self.write_datasink(
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/dataset.py", line 3610, in write_datasink
    self._write_ds = Dataset(plan, logical_plan).materialize()
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/dataset.py", line 4598, in materialize
    copy._plan.execute()
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/exceptions.py", line 87, in handle_trace
    raise e.with_traceback(None)
    ray.exceptions.RayTaskError(UserCodeException): ray::MapBatches(LLM)() (pid=210888, ip=172.22.197.6, actor_id=e58321fa3f0e04abad4aba2801000000, repr=MapWorker(MapBatches(LLM)))
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/util.py", line 78, in call
    return future.result()
    File "/root/anaconda3/envs/torch/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
    File "/root/anaconda3/envs/torch/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
    File "/root/anaconda3/envs/torch/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
    File "/data/repo/batch_infer/sglang_infer.py", line 89, in call
    responses = self.llm.generate(input_texts, self.sampling_params)
    File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/sglang/srt/server.py", line 760, in generate
    loop = asyncio.get_event_loop()
    File "/root/anaconda3/envs/torch/lib/python3.10/asyncio/events.py", line 656, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
    RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

The above exception was the direct cause of the following exception:

ray::MapBatches(LLM)() (pid=210888, ip=172.22.197.6, actor_id=e58321fa3f0e04abad4aba2801000000, repr=MapWorker(MapBatches(LLM)))
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 364, in submit
yield from _map_task(
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 451, in _map_task
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in call
for data in iter:
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 134, in _udf_timed_iter
output = next(input)
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 236, in call
yield from self._batch_fn(input, ctx)
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 282, in transform_fn
res = fn(batch)
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 186, in fn
_handle_debugger_exception(e)
File "/root/anaconda3/envs/torch/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 210, in _handle_debugger_exception
raise UserCodeException() from e
ray.exceptions.UserCodeException

Reproduction

new_ds = ds.map_batches(
    LLM,
    # Set the concurrency to the number of LLM instances.
    concurrency=num_instances,
    # Specify the batch size for inference.
    batch_size=args.batch_size,
    **resources_kwarg,
)

Environment

torch 2.4 cuda 12.4

@merrymercy
Copy link
Contributor

Can you share a self-contained minimal reproducible script?

@mdy666
Copy link
Author

mdy666 commented Nov 15, 2024

Can you share a self-contained minimal reproducible script?

ray start --num-gpus=$GPUS_PER_NODE --head

import argparse
import glob
import os

import ray.data
ray.data.DataContext.log_internal_stack_trace_to_stdout = True
import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from transformers import AutoTokenizer

BASE_PROMPT = ''''#Question
{question}

#Response
'''

if name == 'main':
parser = argparse.ArgumentParser()
parser.add_argument("--core", type=str, default='vllm', choices=['vllm','sglang'])

parser.add_argument("--ngpus", type=int, default=8)
parser.add_argument("--tp_size", type=int, default=4)
parser.add_argument("--nnodes", type=int, default=1)
parser.add_argument('--node_rank', type=int, default=0)

parser.add_argument("--input_path", type=str)
parser.add_argument("--output_path", type=str)
parser.add_argument("--model_path", type=str, default='qwen25-72b/')

parser.add_argument("--batch_size", type=int, default=1000)
parser.add_argument("--temperature", type=float, default=0.7)
parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--max_new_tokens", type=int, default=512)
args = parser.parse_args()

ray.init()

num_instances = args.ngpus // args.tp_size
print(f'info: {args.ngpus}, {args.tp_size}, {num_instances}')

files = glob.glob(args.input_path) # 可以切分文件 glob.glob(args.input_path)[:10]
ds = ray.data.read_json(files)
if args.nnodes > 1:
    ds = ds.split(args.nnodes, equal=True)[args.node_rank]
    args.output_path = os.path.join(args.output_path, f'rank_{str(args.node_rank).zfill(4)}')

def helpers(args):
    if args.core == 'vllm':
        from vllm import LLM, SamplingParams
        llm = LLM(args.model_path, tensor_parallel_size=args.tp_size)
        sampling_params = SamplingParams(temperature=args.temperature, top_p=args.top_p, max_tokens=args.max_new_tokens)
        def process_responses(responses):
            outputs = []
            for i in range(len(responses)):
                outputs.append(responses[i].outputs[0].text)
            return outputs
    else:
        import sglang as sgl
        llm = sgl.Engine(model_path=args.model_path, tp_size=args.tp_size)
        llm.generate
        sampling_params = {"temperature": args.temperature, "top_p": args.top_p, 'max_new_tokens':args.max_new_tokens}
        def process_responses(responses):
            outputs = []
            for i in range(len(responses)):
                outputs.append(responses[i]['text'])
            return outputs
    return llm, sampling_params, process_responses

class LLM:
    def __init__(self):
        self.base_prompt = BASE_PROMPT
        self.tokenizer = AutoTokenizer.from_pretrained(args.model_path)
        self.llm, self.sampling_params, self.process_responses = helpers(args)
       
    def __call__(self, batch):
        questions = batch['question']
        input_texts = []
        raw_questions = []
        for i in range(len(questions)):
            prompt = self.base_prompt.format(question=questions[i])
            prompt = {'role':'user', 'content':prompt}
            prompt = self.tokenizer.apply_chat_template([prompt], tokenize=False, add_generation_prompt=True)
            input_texts.append(prompt)
            raw_questions.append(questions[i])
        responses = self.llm.generate(input_texts, self.sampling_params)
        print(responses)
        outputs = self.process_responses(responses)
        return {'raw_question': raw_questions, 'answer': outputs}

# 复制于vllm的脚本
# For tensor_parallel_size > 1, we need to create placement groups for vLLM
# to use. Every actor has to have its own placement group.
def scheduling_strategy_fn():
    # One bundle per tensor parallel worker
    pg = ray.util.placement_group(
        [{
            "GPU": 1,
            "CPU": 1
        }] * args.tp_size,
        strategy="STRICT_PACK",
    )
    return dict(scheduling_strategy=PlacementGroupSchedulingStrategy(
        pg, placement_group_capture_child_tasks=True))

resources_kwarg = {}
if args.tp_size == 1:
    # For tensor_parallel_size == 1, we simply set num_gpus=1.
    resources_kwarg["num_gpus"] = 1
else:
    # Otherwise, we have to set num_gpus=0 and provide
    # a function that will create a placement group for
    # each instance.
    resources_kwarg["num_gpus"] = 0
    resources_kwarg["ray_remote_args_fn"] = scheduling_strategy_fn

    # Apply batch inference for all input data.
new_ds = ds.map_batches(
    LLM,
    # Set the concurrency to the number of LLM instances.
    concurrency=num_instances,
    # Specify the batch size for inference.
    batch_size=args.batch_size,
    **resources_kwarg,
)
new_ds.write_json(args.output_path)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants