Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【bug】工作流的构建,条件分支再次连接汇合节点,工作流会直接结束 #1079

Open
IceHope opened this issue Jan 13, 2025 · 0 comments

Comments

@IceHope
Copy link

IceHope commented Jan 13, 2025

问题描述

在代码分支feat/0.4.2的基础上,构建一个工作流,工作流的信息如下:
%%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; __start__([<p>__start__</p>]):::first start_dd1cc(start_dd1cc) input_7932b(input_7932b<hr/><small><em>__interrupt = before</em></small>) llm_3b72f(llm_3b72f) llm_24b89(llm_24b89) output_9afde(output_9afde) end_a3447(end_a3447) condition_e3386(condition_e3386) output_9afde_fake(output_9afde_fake<hr/><small><em>__interrupt = before</em></small>) __end__([<p>__end__</p>]):::last __start__ --> start_dd1cc; end_a3447 --> __end__; input_7932b --> condition_e3386; llm_24b89 --> output_9afde; llm_3b72f --> output_9afde; output_9afde --> output_9afde_fake; start_dd1cc --> input_7932b; output_9afde_fake -.-> end_a3447; condition_e3386 -.-> llm_3b72f; condition_e3386 -.-> llm_24b89; classDef default fill:#f2f0ff,line-height:1.2 classDef first fill-opacity:0 classDef last fill:#bfb6fc

大概的流程:
开始节点---输入节点---条件节点----2个大模型节点(llm_24b89 +llm_3b72f )---输出节点output_9afde ---结束节点

可参照类似简单的demo复现

输入的内容,经过条件节点之后,会执行条件分支上2个节点(llm_24b89 ,llm_3b72f )其中之一,但是执行之后,工作流直接执行完了,并没有执行汇合的output_9afde 节点。

原因分析:

`在src/backend/bisheng/workflow/graph/graph_engine.py构建工作流的方法:

  def build_more_fan_in_node(self):
    for node_id, source_ids in self.nodes_fan_in.items():
        if not source_ids or len(source_ids) <= 1:
            continue
        wait_nodes, no_wait_nodes = self.parse_fan_in_node(node_id)
        if wait_nodes:
            logger.debug(f'node {node_id} need wait nodes {wait_nodes}')
            self.graph_builder.add_edge(wait_nodes, node_id)
        if no_wait_nodes:
            for one in no_wait_nodes:
                logger.debug(f'node {node_id} no need wait nodes {one}')
                self.graph_builder.add_edge(one, node_id)

在添加output_9afde 节点的时候,条件分支上的2个节点llm_24b89 ,llm_3b72f 当成了并行节点处理,添加边的时候wait_nodes=[llm_24b89 ,llm_3b72f ],但是条件分支只执行1个,output_9afde 永远不会等到2个节点同时执行完,由于langgrah本身的特性,会直接结束执行,导致汇合节点及其之后的节点都不会执行`

可能的思路:

` def parse_fan_in_node(self, node_id: str):
    source_ids = self.nodes_fan_in.get(node_id)
    all_next_nodes = self.nodes_next_nodes.get(node_id)
    wait_nodes = []
    no_wait_nodes = []
    for one in source_ids:
        # output节点有特殊处理逻辑
        if one.startswith(('output_', 'condition_')):
            continue
        if one in all_next_nodes:
            no_wait_nodes.append(one)
        else:
            wait_nodes.append(one)
    return wait_nodes, no_wait_nodes`

在 parse_fan_in_node()解析节点的时候,是否可以加入溯源的判断? 汇合节点之前的节点
 - 如果是并行节点,则是wait_nodes,
 - 如果是条件分支节点,则是no_wait_nodes。
 不过,并行分支里面嵌套条件分支,条件分支再嵌套并行分支呢,可能比较麻烦

希望评估下问题,以及解决办法,谢谢~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant