Skip to content

Commit

Permalink
fix the trace component in grad component taking parameter version of…
Browse files Browse the repository at this point in the history
… graidents which makes the input args way too big, and add parameter and output parameter allowed_components
  • Loading branch information
liyin2015 committed Dec 27, 2024
1 parent 32dda35 commit 8ac80e4
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 183 deletions.
97 changes: 52 additions & 45 deletions adalflow/adalflow/components/agent/react.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@
__all__ = ["DEFAULT_REACT_AGENT_SYSTEM_PROMPT", "ReActAgent"]


react_agent_task_desc = r"""{# role/task description #}
react_agent_task_desc = r"""
Answer the user's query using the tools provided below with minimal steps and maximum accuracy.
{# REACT instructions #}
Each step you will read the previous Thought, Action, and Observation(execution result of the action) and then provide the next Thought and Action.
<START_OF_TASK_SPEC>
{# Task specification to teach the agent how to think using 'divide and conquer' strategy #}
- For simple queries: Directly call the ``finish`` action and provide the answer.
- For complex queries:
- Step 1: Read the user query and potentially divide it into subqueries. And get started with the first subquery.
Expand Down Expand Up @@ -115,14 +114,44 @@ def call(
"""Append the step_output to the step_history."""
if not step_history:
step_history = []
# make a copy step_history for better tracking
step_history = deepcopy(step_history)

step_history.append(step_output)
# printc(f"step_history: {step_history}", color="yellow")
return step_history


class FunctionOutputToStepOutput(GradComponent):
def __init__(self):
super().__init__()
self.name = "FunctionOutputToStepOutput"
self._component_desc = "Convert the FunctionOutput to StepOutput"

def call(
self,
action_str: FunctionExpression,
step: int,
result: Union[FunctionOutput, Parameter],
func: Function,
id: Optional[str] = None,
) -> StepOutput:
"""Convert the action string to StepOutput."""
step_output = StepOutput(step=step)
if not isinstance(action_str, FunctionExpression):
raise ValueError(f"Expected FunctionExpression, but got {type(action_str)}")
step_output.action = action_str
step_output.function = func
# printc(f"result: {result}", color="blue")
result = result.data if isinstance(result, Parameter) else result
if isinstance(result, FunctionOutput):
step_output.observation = (
result.output.data
if isinstance(result.output, Parameter)
else result.output
)

return step_output


@dataclass
class ReActOutput(DataClass):
r"""Similar to GeneratorOutput, but with additional step history and final answer."""
Expand Down Expand Up @@ -309,41 +338,9 @@ def _execute_action(

if isinstance(response, Parameter):

class ActionStrToStepOutput(GradComponent):
def __init__(self):
super().__init__()
self.name = "ActionStrToStepOutput"
self._component_desc = "Convert the action string to StepOutput."

def call(
self,
action_str: FunctionExpression,
step: int,
result: Union[FunctionOutput, Parameter],
func: Function,
) -> StepOutput:
"""Convert the action string to StepOutput."""
step_output = StepOutput(step=step)
if not isinstance(action_str, FunctionExpression):
raise ValueError(
f"Expected FunctionExpression, but got {type(action_str)}"
)
step_output.action = action_str
step_output.function = func
# printc(f"result: {result}", color="blue")
result = result.data if isinstance(result, Parameter) else result
if isinstance(result, FunctionOutput):
step_output.observation = (
result.output.data
if isinstance(result.output, Parameter)
else result.output
)

return step_output

try:

tmp_action_str_to_step_output = ActionStrToStepOutput()
function_output_to_step_output = FunctionOutputToStepOutput()

# printc(f"response: {response}", color="yellow")
# TO FunctionExpression
Expand All @@ -354,7 +351,6 @@ def call(
func: Union[Function, Parameter] = self.tool_manager(
expr_or_fun=response, step="parse"
)
printc(f"tool_manager: {self.tool_manager.training}", color="red")
if not isinstance(func, Parameter):
raise ValueError(
f"Expected Parameter, but got {type(func)}: {func}"
Expand All @@ -369,10 +365,13 @@ def call(
result: Parameter = self.tool_manager(expr_or_fun=func, step="execute")
# printc(f"result: {result}", color="red")
result.add_successor_map_fn(
successor=tmp_action_str_to_step_output, map_fn=lambda x: x.data
successor=function_output_to_step_output, map_fn=lambda x: x.data
)
response.add_successor_map_fn(
successor=function_output_to_step_output, map_fn=lambda x: x.data
)
action_step = tmp_action_str_to_step_output.forward(
action_str=response.data,
action_step = function_output_to_step_output.forward(
action_str=response,
step=action_step.step,
result=result,
func=func,
Expand Down Expand Up @@ -498,7 +497,12 @@ def _run_one_step(

step_output: Parameter = self._execute_action(step_output, response, id)

printc(f"step_output: {step_output}", color="red")
# printc(f"step_output: {step_output}", color="red")
if not isinstance(step_output, Parameter):
raise ValueError(
f"Ensure step_output to be Parameter at training mode. Got {type(step_output)}.\n\
Please check the observation for error details: {step_output}"
)
step_output.add_successor_map_fn(
successor=self.append_step_history, map_fn=lambda x: x.data
)
Expand Down Expand Up @@ -546,7 +550,10 @@ def _check_last_step(
def _get_answer(
self, step_history: Union["Parameter", List[str]] = None
) -> Union[str, "Parameter"]:
"""Get the final answer from the step history."""
"""Get the final answer from the step history.
When in training mode, we pass the whole step_history to the backward engine to find the feedback
"""
if not step_history:
return None

Expand Down Expand Up @@ -626,11 +633,11 @@ def bicall(
raise e # the only place to raise the error for debugging. In normal cases, the agent should not raise an error.

answer = self._get_answer(step_history)
printc(f"answer: {answer}", color="yellow")
if self.training:
return answer
# wrap the output
output = ReActOutput(step_history=step_history, id=id, answer=answer)
printc(f"answer: {output}", color="yellow")

return output

Expand Down
8 changes: 2 additions & 6 deletions adalflow/adalflow/core/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
OBJECTIVE_INSTRUCTION_BASE,
OBJECTIVE_INSTRUCTION_CHAIN,
)
from adalflow.utils.logger import printc

__all__ = ["Generator", "BackwardEngine", "create_teacher_generator"]

Expand Down Expand Up @@ -761,7 +762,6 @@ def _backward_through_all_predecessors(
gradient_output: GeneratorOutput = backward_engine(
prompt_kwargs=backward_engine_prompt_kwargs
)
print(f"gradient_output: {gradient_output}")
if not isinstance(gradient_output, GeneratorOutput):
raise ValueError(
f"Generator: Backward Engine should return a GeneratorOutput. Got {gradient_output} instead."
Expand All @@ -771,17 +771,14 @@ def _backward_through_all_predecessors(

try:
response_gradient_list = parser.call(gradient_output.data)
print(f"response_gradient_list: {response_gradient_list}")
except Exception as e:
log.error(f"Error parsing the response_gradient_list: {e}")
failure_message = backward_engine.failure_message_to_optimizer(
gradient_output
)
if failure_message:
response_gradient_list = [failure_message] * len(children_params)
print(f"failure_message: {failure_message}")

print(f"response_gradient_list: {response_gradient_list}")
printc(f"failure_message: {failure_message}", color="red")

# generate the gradient for each child
for i, pred in enumerate(children_params):
Expand All @@ -796,7 +793,6 @@ def _backward_through_all_predecessors(
if response_gradient_list and len(response_gradient_list) > i
else "Failed to get the gradient."
)
print(f"i: {i}, gradient_data: {gradient_data}")

var_gradient = Gradient(
data=gradient_data,
Expand Down
5 changes: 2 additions & 3 deletions adalflow/adalflow/core/tool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
from copy import deepcopy
import asyncio
from adalflow.optim.parameter import Parameter, ParameterType
from adalflow.optim.parameter import Parameter, ParameterType, OutputParameter
import nest_asyncio
import warnings

Expand Down Expand Up @@ -282,10 +282,9 @@ def bicall(
# warnings.info(
# f"Error executing function: {output}", UserWarning
# )
output = Parameter(
output = OutputParameter(
name=func.data.name,
data=output,
eval_input=func.eval_input,
requires_opt=False,
param_type=ParameterType.OUTPUT,
)
Expand Down
35 changes: 10 additions & 25 deletions adalflow/adalflow/optim/grad_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from collections import OrderedDict
import uuid
import logging
from copy import deepcopy

if TYPE_CHECKING:
from adalflow.core.generator import BackwardEngine
Expand Down Expand Up @@ -124,11 +123,16 @@ def forward(self, *args, **kwargs) -> "Parameter":
call_response = self.call(*unwrapped_args, **unwrapped_kwargs)

if isinstance(call_response, Parameter):
raise ValueError("A GradComponent call should not return Parameter")
predecessors.append(call_response)
return call_response

# 4. Create a Parameter object to trace the forward pass
input_args.update(kwargs)
# input_args.update(kwargs)
# use unwrapped args and unwrapped kwargs to trace the forward pass
tracing_args = {i: v for i, v in enumerate(unwrapped_args)}
tracing_args.update(**unwrapped_kwargs)

response = OutputParameter(
data=call_response,
name=self.name + "_output",
Expand All @@ -138,7 +142,7 @@ def forward(self, *args, **kwargs) -> "Parameter":
)
response.set_predecessors(predecessors)
response.trace_forward_pass(
input_args=input_args,
input_args=tracing_args,
full_response=call_response,
id=self.id,
name=self.name,
Expand Down Expand Up @@ -172,14 +176,9 @@ def backward(self, *, response: "Parameter", id: str = None, **kwargs):
for pred in children_params:
pred.backward_engine_disabled = True

for pred in children_params:
for _, pred in enumerate(children_params):
pred.set_score(response._score)
from adalflow.utils.logger import printc

printc(
f"Retriever: Backward: {pred.name} set_score: {response._score}, {response.name}",
"blue",
)
if pred.param_type == ParameterType.DEMOS:
pred.add_score_to_trace(
trace_id=id, score=response._score, is_teacher=self.teacher_mode
Expand All @@ -192,30 +191,16 @@ def backward(self, *, response: "Parameter", id: str = None, **kwargs):

for grad in response.gradients:
# make a copy of the gradient
grad = deepcopy(grad)
# grad = deepcopy(grad)
# update the gradient context and from and to
grad.update_from_to(response, pred)
grad.is_default_copy = True
grad.add_context(
GradientContext(
variable_desc=pred.role_desc,
response_desc=response.name,
# context=f"""""", # TODO: check the forward pass component trace
input_output=f"""{response.component_trace}""",
input_output=f"""{response.component_trace.to_context_str()}""",
)
)

# grad.from_response_id = response.id
# grad.name = f"{grad.name}_to_{pred.name}"

pred.add_gradient(grad)
# pred.add_gradient(
# gradient=Parameter(
# name=f"gradient",
# data=response.get_gradient_and_context_text(
# skip_correct_sample=True
# ),
# param_type=ParameterType.GRADIENT,
# from_response_id=response.id,
# )
# )
Loading

0 comments on commit 8ac80e4

Please sign in to comment.