Langgraph: Long running node causing super step to slow down #2415
collinquome
started this conversation in
Discussions
Replies: 3 comments
-
Found this resource: |
Beta Was this translation helpful? Give feedback.
0 replies
-
So I tried the guides recommendation, still hitting the same problem (Node B waits for Node L to finish) Code from langgraph.graph import START, END, StateGraph, MessagesState, add_messages
from asyncio import sleep
from langchain_core.messages import AIMessage
from datetime import datetime
def wait_and_message_node(m, seconds=1):
async def _node(state: MessagesState):
print(f"Starting node {m} at {datetime.now().second}")
await sleep(seconds)
return {"messages": [AIMessage(m)]}
return _node
graph_builder = StateGraph(MessagesState)
for node in ["a","b","c","d","e","f","g"]:
graph_builder.add_node(node, wait_and_message_node(f"Node: {node}"))
graph_builder.add_node("l", wait_and_message_node("Node: l", seconds=5))
graph_builder.add_edge(START, "a")
graph_builder.add_edge(START, "l")
graph_builder.add_edge("a", "b")
graph_builder.add_edge("b", "c")
graph_builder.add_edge("c", "d")
graph_builder.add_edge(["l", "d"], "e") # THIS is what I needed.
graph_builder.add_edge("e", "f")
graph_builder.add_edge("f", "g")
graph_builder.add_edge("g", END)
agent = graph_builder.compile()
if __name__ == '__main__':
import asyncio
async def main():
async for m in agent.astream({"messages": []}):
print(m)
asyncio.run(main()) Output
|
Beta Was this translation helpful? Give feedback.
0 replies
-
Got it! The solution was adding subgraphs. You can add "subgraph=True" when streaming to get the subgraph state: New Code from langgraph.graph import START, END, StateGraph, MessagesState, add_messages
from asyncio import sleep
from langchain_core.messages import AIMessage
from datetime import datetime
def wait_and_message_node(m, seconds=1):
async def _node(state: MessagesState):
print(f"Starting node {m} at {datetime.now().second}")
await sleep(seconds)
return {"messages": [AIMessage(m)]}
return _node
graph_builder = StateGraph(MessagesState)
subgraph_builder = StateGraph(MessagesState)
for node in ["a", "b", "c", "d"]:
subgraph_builder.add_node(node, wait_and_message_node(f"Node: {node}"))
subgraph_builder.add_edge(START, "a")
subgraph_builder.add_edge("a", "b")
subgraph_builder.add_edge("b", "c")
subgraph_builder.add_edge("c", "d")
subgraph_builder.add_edge("d", END)
subgraph = subgraph_builder.compile()
for node in ["e","f","g"]:
graph_builder.add_node(node, wait_and_message_node(f"Node: {node}"))
graph_builder.add_node("l", wait_and_message_node("Node: l", seconds=5))
graph_builder.add_node("sub_graph", subgraph)
graph_builder.add_edge(START, "sub_graph")
graph_builder.add_edge(START, "l")
graph_builder.add_edge(["l", "sub_graph"], "e") # THIS is what I needed.
graph_builder.add_edge("e", "f")
graph_builder.add_edge("f", "g")
graph_builder.add_edge("g", END)
agent = graph_builder.compile()
if __name__ == '__main__':
import asyncio
async def main():
async for (_, m) in agent.astream({"messages": []}, subgraphs=True):
print(m)
asyncio.run(main()) New Output
|
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Is there a good pattern for allowing a node to run, while other nodes run in the graph?
Currently, B doesn't seem to start until L completes.
What I want to do:
|-- Langgraph
| |-- START
| | A |-- L: Long running Node
| | B |
| | C |
| | D | L Finish
| | E Requires outputs of D and L
| F
| G
| END
Beta Was this translation helpful? Give feedback.
All reactions