Skip to content

Commit aef74c9

Browse files
committed
Expose Bisheng workflows to external agents via MCP / 通过 MCP 向外部 Agent 开放 Bisheng 工作流能力
Expose Bisheng workflow capabilities to external agents through MCP, so outside agents can discover, read, author, and invoke workflows through a standardized protocol instead of relying only on the Bisheng product UI. This change also adds the key pieces needed for real usage, including workflow authoring, condition-node compatibility, authentication, and device-flow login, making external integration both practical and controlled. 通过 MCP 向外部 Agent 开放 Bisheng 工作流能力,使外部智能体可以通过标准化协议发现、读取、编排并调用工作流,而不再只能依赖 Bisheng 站内页面完成操作。此次改动同时补齐了工作流 authoring、条件节点兼容、鉴权与 device flow 登录等关键能力,让外部接入在可用性和安全性上都达到可落地状态。 Constraint: Need a clean commit boundary before switching to upstream/2.5.0-PM / 切换到 upstream/2.5.0-PM 前需要一个清晰的提交边界 Rejected: git stash | less visible and less durable than a branch-local checkpoint commit / 拒绝使用 git stash:可见性和持久性都不如分支内检查点提交 Confidence: medium / 中 Scope-risk: narrow / 范围窄 Reversibility: clean / 可干净回退 Directive: Curate or squash this checkpoint before merge if a cleaner history is required / 如需更干净的历史,请在合并前整理或压缩这个检查点提交 Tested: src/backend/.venv/bin/python -m pytest -q src/backend/test/test_mcp_device_flow.py src/backend/test/test_external_workflow_service.py; src/backend/.venv/bin/python -m pytest -q src/backend/test/test_workflow_mcp_e2e.py / 已验证:上述 MCP 相关后端测试 Not-tested: Full frontend build and manual end-to-end environment verification / 未验证:完整前端构建与手工端到端环境联调
1 parent 0c5d016 commit aef74c9

14 files changed

Lines changed: 1146 additions & 25 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,4 @@ docker/office/bisheng/*.gz
264264

265265
CLAUDE.md
266266
!src/backend/bisheng/telemetry_search/**/*.pyc
267+
.omx/

AGENT.md

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
## 基本行为约束
2+
3+
### 1. 直接解决问题,禁止废话
4+
- 不要寒暄
5+
- 不要重复用户问题
6+
- 不要输出无信息量的过渡句、总结句、鼓励句、评价句
7+
- 不要使用“可以”“当然”“没问题”“你这个问题很好”之类的表达
8+
- 直接进入分析、判断和方案输出
9+
10+
### 2. 优先永久性解决方案,禁止临时绕过
11+
- 默认优先选择可维护、可复用、可扩展、可验证的方案
12+
- 如果存在“根因修复”和“临时绕过”两种路径,优先输出根因修复
13+
- 不要把 hack、patch、手工补丁、一次性操作伪装成正式方案
14+
- 如果只能提供临时方案,必须明确标注这是临时措施,并说明其局限、风险、替代的正式方案
15+
16+
### 3. 结合上下文理解用户目的,禁止直接猜测
17+
- 回答前先判断:
18+
- 用户显式提出的问题是什么
19+
- 用户真正要达成的目标是什么
20+
- 当前上下文中有哪些已知约束
21+
- 不要脱离上下文按字面机械作答
22+
- 不要凭空补充不存在的前提
23+
- 当信息不足时:
24+
- 先基于已有信息给出条件化判断
25+
- 明确指出哪些部分是已知,哪些部分未知
26+
- 不要把推测当成事实
27+
28+
## 决策原则
29+
30+
### 根因优先
31+
输出方案时,优先按以下顺序思考:
32+
1. 问题的根因是什么
33+
2. 是否可以从结构、流程、接口、数据、权限、配置、架构层面彻底解决
34+
3. 该方案是否能避免同类问题再次发生
35+
4. 该方案的维护成本是否可接受
36+
37+
### 用户目标优先
38+
不要只回答表面问题,要判断用户是在:
39+
- 要一个定义
40+
- 要一个判断
41+
- 要一个方案
42+
- 要一个可落地实现
43+
- 要一个权衡分析
44+
输出必须匹配用户真实任务层级。
45+
46+
### 上下文一致性
47+
如果用户前文已经给出:
48+
- 业务背景
49+
- 系统架构
50+
- 术语定义
51+
- 限制条件
52+
- 偏好方案
53+
则必须沿用这些上下文,不要重新发明一套设定。
54+
55+
## 输出要求
56+
57+
### 信息组织
58+
优先输出以下结构之一,按任务类型自动选择:
59+
- 问题分析 → 根因 → 方案 → 风险/边界
60+
- 目标 → 约束 → 可选方案对比 → 推荐方案
61+
- 结论 → 依据 → 落地步骤
62+
- 定义 → 与相近概念的区别 → 在当前场景下的含义
63+
64+
### 表达要求
65+
- 用精确术语,不用空泛表达
66+
- 能具体就具体,不要泛泛而谈
67+
- 能落地到机制、规则、数据结构、流程,就不要只停留在概念层
68+
- 不要为了显得全面而输出无关内容
69+
70+
## 禁止行为
71+
- 禁止为了“显得聪明”而过度延展
72+
- 禁止把不确定内容说成确定事实
73+
- 禁止忽略用户已有上下文重新回答
74+
- 禁止优先给表面 workaround 而不说明正式方案
75+
- 禁止只给正确但不可执行的抽象建议
76+
77+
## 遇到信息不足时
78+
不要直接猜答案。改为:
79+
1. 先明确当前已知信息
80+
2. 给出在这些信息下最合理的分析
81+
3. 标出需要额外信息才可确定的部分
82+
4. 如果可能,给出分条件方案
83+
84+
## 最终标准
85+
你的回答必须同时满足:
86+
- 对准用户真正目标
87+
- 尽量一次解决,而不是临时糊住
88+
- 与已有上下文一致
89+
- 结论可执行、可维护、可验证

docker/bisheng/entrypoint.sh

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ start_default(){
2929
celery -A bisheng.worker.main worker -l info -c 100 -P threads -Q celery -n celery@%h
3030
}
3131

32+
start_min_worker(){
33+
# 最小化worker进程数,减少资源占用
34+
celery -A bisheng.worker.main worker -l info -c 100 -P threads -Q knowledge_celery,workflow_celery,celery -n min_worker@%h
35+
}
36+
3237
if [ "$start_mode" = "api" ]; then
3338
echo "Starting API server..."
34-
uvicorn bisheng.main:app --host 0.0.0.0 --port 7860 --no-access-log --workers 8
39+
uvicorn bisheng.main:app --host 0.0.0.0 --port 7860 --no-access-log --workers 1
3540
elif [ "$start_mode" = "knowledge" ]; then
3641
echo "Starting Knowledge Celery worker..."
3742
start_knowledge
@@ -49,14 +54,8 @@ elif [ "$start_mode" = "linsight" ]; then
4954
start_linsight
5055
elif [ "$start_mode" = "worker" ]; then
5156
echo "Starting All worker..."
52-
# 处理知识库相关任务的worker
53-
start_knowledge &
54-
# 处理工作流相关任务的worker
55-
start_workflow &
56-
# 处理linsight相关任务的worker
57-
start_linsight &
58-
# 默认其他任务的执行worker,目前是定时统计埋点数据
59-
start_default &
57+
# 最小化worker进程数,减少资源占用
58+
start_min_worker &
6059
start_beat
6160

6261
echo "All workers started successfully."

docker/docker-compose.override.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
services:
2+
backend:
3+
volumes:
4+
- ../src/backend/bisheng:/app/bisheng
5+
6+
backend_worker:
7+
volumes:
8+
- ../src/backend/bisheng:/app/bisheng

src/backend/bisheng/api/services/external_workflow.py

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,141 @@ def _ensure_create_graph_scaffold(cls, graph_data: dict) -> dict:
761761

762762
return updated_graph
763763

764+
@classmethod
765+
def _extract_descriptor_param_updates(cls, node_descriptor: dict) -> dict:
766+
params = node_descriptor.get('params')
767+
if not isinstance(params, dict):
768+
return {}
769+
updates = {}
770+
for key, value in params.items():
771+
if isinstance(value, dict) and 'value' in value:
772+
updates[key] = copy.deepcopy(value.get('value'))
773+
else:
774+
updates[key] = copy.deepcopy(value)
775+
return updates
776+
777+
@classmethod
778+
def _resolve_descriptor_position(cls,
779+
node_descriptor: dict,
780+
existing_node: Optional[dict],
781+
*,
782+
axis: str,
783+
fallback: float) -> float:
784+
position = node_descriptor.get('position')
785+
if isinstance(position, dict) and axis in position:
786+
try:
787+
return float(position.get(axis))
788+
except Exception:
789+
cls._raise_workflow_error(
790+
f'Workflow node {node_descriptor.get("id") or "unknown"} has an invalid {axis} position'
791+
)
792+
if existing_node is not None:
793+
return cls._node_position_value(existing_node, axis)
794+
return fallback
795+
796+
@classmethod
797+
def _build_graph_node_from_descriptor(cls,
798+
node_descriptor: dict,
799+
*,
800+
existing_node: Optional[dict] = None,
801+
node_index: int = 0) -> dict:
802+
node_id = node_descriptor.get('id') or (existing_node or {}).get('id')
803+
if not node_id:
804+
cls._raise_workflow_error('Workflow node descriptor must contain an id')
805+
806+
node_type = node_descriptor.get('type') or (cls._get_node_type(existing_node) if existing_node else '')
807+
if not node_type:
808+
cls._raise_workflow_error(f'Workflow node descriptor {node_id} must contain a type')
809+
810+
name = node_descriptor.get('name')
811+
description = node_descriptor.get('description')
812+
tab = node_descriptor.get('tab')
813+
position_x = cls._resolve_descriptor_position(
814+
node_descriptor,
815+
existing_node,
816+
axis='x',
817+
fallback=float(node_index * cls._DEFAULT_HORIZONTAL_NODE_GAP),
818+
)
819+
position_y = cls._resolve_descriptor_position(
820+
node_descriptor,
821+
existing_node,
822+
axis='y',
823+
fallback=0.0,
824+
)
825+
826+
if existing_node is not None and cls._get_node_type(existing_node) == node_type:
827+
node_payload = copy.deepcopy(existing_node)
828+
else:
829+
node_payload = create_graph_node_payload(
830+
node_type,
831+
node_id=node_id,
832+
name=name or '',
833+
position_x=position_x,
834+
position_y=position_y,
835+
)
836+
if node_payload is None:
837+
raise NotFoundError(msg=f'Workflow node template not found: {node_type}')
838+
839+
node_payload['id'] = node_id
840+
node_payload['position'] = {'x': position_x, 'y': position_y}
841+
node_data = node_payload.setdefault('data', {})
842+
node_data['id'] = node_id
843+
node_data['type'] = node_type
844+
if name is not None:
845+
node_data['name'] = name
846+
else:
847+
node_data.setdefault('name', '')
848+
if description is not None:
849+
node_data['description'] = description
850+
if tab is not None:
851+
node_data['tab'] = copy.deepcopy(tab)
852+
853+
param_updates = cls._extract_descriptor_param_updates(node_descriptor)
854+
if param_updates:
855+
cls._patch_node_fields(node_payload, param_updates)
856+
857+
return node_payload
858+
859+
@classmethod
860+
def _coerce_editor_graph_input(cls,
861+
graph_data: dict,
862+
*,
863+
base_graph: Optional[dict] = None) -> dict:
864+
if not isinstance(graph_data, dict):
865+
return graph_data
866+
867+
nodes = graph_data.get('nodes')
868+
if not isinstance(nodes, list):
869+
return graph_data
870+
871+
if all(not isinstance(node, dict) or isinstance(node.get('data'), dict) for node in nodes):
872+
return cls._normalize_editor_node_types(copy.deepcopy(graph_data))
873+
874+
base_nodes_by_id = {}
875+
if isinstance(base_graph, dict):
876+
for existing_node in base_graph.get('nodes', []):
877+
if isinstance(existing_node, dict) and existing_node.get('id'):
878+
base_nodes_by_id[existing_node['id']] = existing_node
879+
880+
updated_graph = copy.deepcopy(graph_data)
881+
rebuilt_nodes = []
882+
for index, raw_node in enumerate(nodes):
883+
if not isinstance(raw_node, dict):
884+
rebuilt_nodes.append(raw_node)
885+
continue
886+
if isinstance(raw_node.get('data'), dict):
887+
rebuilt_nodes.append(copy.deepcopy(raw_node))
888+
continue
889+
rebuilt_nodes.append(
890+
cls._build_graph_node_from_descriptor(
891+
raw_node,
892+
existing_node=base_nodes_by_id.get(raw_node.get('id')),
893+
node_index=index,
894+
)
895+
)
896+
updated_graph['nodes'] = rebuilt_nodes
897+
return cls._normalize_editor_node_types(updated_graph)
898+
764899
@classmethod
765900
def _apply_node_initial_params(cls, node_payload: dict, initial_params: Optional[dict]) -> dict:
766901
if not initial_params:
@@ -996,6 +1131,7 @@ def _create_workflow_draft_sync(cls,
9961131
description: Optional[str] = None,
9971132
guide_word: Optional[str] = None) -> tuple[Flow, FlowVersion]:
9981133
cls._assert_workflow_name_available(login_user, name)
1134+
graph_data = cls._coerce_editor_graph_input(graph_data)
9991135
graph_data = cls._ensure_create_graph_scaffold(graph_data)
10001136
cls._validate_draft_graph(login_user, graph_data, flow_name=name)
10011137

@@ -1054,7 +1190,7 @@ async def update_workflow_draft(cls,
10541190
if name is not None and name != flow.name:
10551191
cls._assert_workflow_name_available(login_user, name, exclude_flow_id=flow.id)
10561192

1057-
graph_data = cls._normalize_editor_node_types(copy.deepcopy(graph_data))
1193+
graph_data = cls._coerce_editor_graph_input(graph_data, base_graph=editable_version.data)
10581194
if has_flow_updates:
10591195
if name is not None:
10601196
flow.name = name

src/backend/bisheng/mcp_server/auth.py

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ def _hash_session_token(token: str) -> str:
127127
return hashlib.sha256(token.encode('utf-8')).hexdigest()
128128

129129

130+
def hash_bisheng_session_token(token: str) -> str:
131+
return _hash_session_token(token)
132+
133+
130134
def _normalize_scopes(scopes: Optional[list[str] | tuple[str, ...]]) -> tuple[str, ...]:
131135
if not scopes:
132136
return _MCP_DEFAULT_SCOPES
@@ -137,6 +141,12 @@ def _normalize_scopes(scopes: Optional[list[str] | tuple[str, ...]]) -> tuple[st
137141
return tuple(normalized or _MCP_DEFAULT_SCOPES)
138142

139143

144+
def normalize_mcp_scopes(scopes: Optional[list[str] | tuple[str, ...] | str]) -> tuple[str, ...]:
145+
if isinstance(scopes, str):
146+
scopes = [scope.strip() for scope in scopes.replace(',', ' ').split() if scope.strip()]
147+
return _normalize_scopes(scopes)
148+
149+
140150
async def _assert_parent_session_valid(user_id: int, parent_session_hash: str):
141151
redis_client = await get_redis_client()
142152
current_session = await redis_client.aget(USER_CURRENT_SESSION.format(user_id))
@@ -155,11 +165,11 @@ async def resolve_login_user_from_bisheng_access_token(token: str) -> UserPayloa
155165
)
156166

157167

158-
def create_mcp_access_token(login_user: LoginUser,
159-
parent_access_token: str,
160-
*,
161-
scopes: Optional[list[str] | tuple[str, ...]] = None,
162-
expires_in: int = _MCP_DEFAULT_EXPIRES_IN) -> tuple[str, dict]:
168+
def _create_mcp_access_token_from_session_hash(login_user: LoginUser,
169+
parent_session_hash: str,
170+
*,
171+
scopes: Optional[list[str] | tuple[str, ...]] = None,
172+
expires_in: int = _MCP_DEFAULT_EXPIRES_IN) -> tuple[str, dict]:
163173
now = int(datetime.now(timezone.utc).timestamp())
164174
expires_in = max(60, min(int(expires_in), _MCP_MAX_EXPIRES_IN))
165175
normalized_scopes = list(_normalize_scopes(scopes))
@@ -174,7 +184,7 @@ def create_mcp_access_token(login_user: LoginUser,
174184
'jti': generate_uuid(),
175185
'token_type': _MCP_TOKEN_TYPE,
176186
'scope': normalized_scopes,
177-
'parent_session_hash': _hash_session_token(parent_access_token),
187+
'parent_session_hash': parent_session_hash,
178188
}
179189
token = jwt.encode(claims, AuthJwt().jwt_secret, algorithm='HS256')
180190
return token, {
@@ -186,6 +196,32 @@ def create_mcp_access_token(login_user: LoginUser,
186196
}
187197

188198

199+
def create_mcp_access_token(login_user: LoginUser,
200+
parent_access_token: str,
201+
*,
202+
scopes: Optional[list[str] | tuple[str, ...]] = None,
203+
expires_in: int = _MCP_DEFAULT_EXPIRES_IN) -> tuple[str, dict]:
204+
return _create_mcp_access_token_from_session_hash(
205+
login_user,
206+
_hash_session_token(parent_access_token),
207+
scopes=scopes,
208+
expires_in=expires_in,
209+
)
210+
211+
212+
def create_mcp_access_token_from_session_hash(login_user: LoginUser,
213+
parent_session_hash: str,
214+
*,
215+
scopes: Optional[list[str] | tuple[str, ...]] = None,
216+
expires_in: int = _MCP_DEFAULT_EXPIRES_IN) -> tuple[str, dict]:
217+
return _create_mcp_access_token_from_session_hash(
218+
login_user,
219+
parent_session_hash,
220+
scopes=scopes,
221+
expires_in=expires_in,
222+
)
223+
224+
189225
async def _validate_mcp_access_token(token: str) -> tuple[UserPayload, tuple[str, ...]]:
190226
try:
191227
payload = jwt.decode(

0 commit comments

Comments
 (0)