跳转到内容

实现和配置Story代理

故事代理是一个在 模块 1 中使用 --protocol=AG-UI 生成的 Strands 代理,因此 UI 可以通过 CopilotKit 以 Agent-User Interaction 协议从中流式传输数据。它使用库存管理控制协议(MCP)服务器来管理玩家的物品,并使用 Strands 内置的 S3SessionManager 将对话历史持久化到我们在模块 2 中配置的会话存储桶中。

更新 packages/story/dungeon_adventure_story/agent 目录下的以下文件:

import logging
import os
import uuid
from functools import cache
from typing import Any, cast
from ag_ui.core import RunAgentInput
from ag_ui_strands import StrandsAgent, StrandsAgentConfig, create_strands_app
from aws_lambda_powertools.utilities import parameters
from dungeon_adventure_agent_connection import session_id_context
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from strands.session import FileSessionManager, S3SessionManager, SessionManager
from .agent import get_agent
logging.basicConfig(level=logging.INFO)
SESSION_ID_HEADER = "x-amzn-bedrock-agentcore-runtime-session-id"
@cache
def _resolve_sessions_bucket() -> str:
"""Read the conversation-history bucket name from runtime config.
Resolved lazily (and memoised) so `fastapi dev` can import this module
before ``RUNTIME_CONFIG_APP_ID`` is in the environment.
"""
application = os.environ.get("RUNTIME_CONFIG_APP_ID")
if not application:
raise RuntimeError("RUNTIME_CONFIG_APP_ID is not set — cannot resolve the StorySessions bucket.")
provider = parameters.AppConfigProvider(environment="default", application=application)
buckets = cast(dict[str, Any], provider.get("buckets", transform="json"))
return buckets["StorySessions"]["bucketName"]
def _session_manager_provider(input_data: RunAgentInput) -> SessionManager:
"""Create a session manager keyed by the AG-UI thread_id.
- In AgentCore (and `agent-serve`), persist to the shared ``StorySessions``
S3 bucket — the same bucket the Game API reads from to rebuild
conversation history on revisit.
- In `agent-serve-local` (`SERVE_LOCAL=true`), persist to a temp
directory so the agent can run fully offline against the local MCP
server without any AWS calls.
"""
session_id = input_data.thread_id or "default"
if os.environ.get("SERVE_LOCAL") == "true":
return FileSessionManager(session_id=session_id, storage_dir="/tmp/strands-sessions")
return S3SessionManager(session_id=session_id, bucket=_resolve_sessions_bucket())
# The template Agent is cloned per thread_id by ``StrandsAgent`` — we plug in
# a ``session_manager_provider`` so each thread gets its own session manager
# and conversation history is replayed on subsequent turns and survives agent
# restarts.
_agent_ctx = get_agent()
_agent = _agent_ctx.__enter__()
agui_agent = StrandsAgent(
agent=_agent,
name="StoryAgent",
description="A Strands Agent exposed via the AG-UI protocol.",
config=StrandsAgentConfig(session_manager_provider=_session_manager_provider),
)
class _SessionIdMiddleware(BaseHTTPMiddleware):
"""Bind the session ID for this request so downstream MCP / A2A clients forward it on outbound calls."""
async def dispatch(self, request: Request, call_next):
session_id = request.headers.get(SESSION_ID_HEADER) or str(uuid.uuid4())
with session_id_context(session_id):
return await call_next(request)
app = create_strands_app(agui_agent, path="/invocations")
app.add_middleware(_SessionIdMiddleware)

更改内容包括:

  • main.py 添加了一个 session_manager_provider,在部署时为每个 thread_id 创建一个 S3SessionManager,在 agent-serve-localSERVE_LOCAL=true)下运行时则回退到 /tmp/strands-sessions 下的磁盘 FileSessionManager。部署时,S3 存储桶与 Game API 的 queryActions 读取的存储桶相同,因此浏览器可以在重新访问时重建对话记录;在本地,代理完全离线运行,针对本地 MCP 服务器,无需调用 AWS。
  • agent.py 删除了示例 subtract 工具,并将系统提示替换为地牢主持人提示,该提示邀请第一条用户消息说明玩家的姓名和类型,并使用库存 MCP 服务器的工具。

要构建代码:

Terminal window
pnpm build

要部署应用程序,请运行以下命令:

Terminal window
pnpm nx deploy infra "dungeon-adventure-infra-sandbox/*"

部署过程约需 2 分钟完成。

部署完成后,将看到类似以下输出(部分值已脱敏):

Terminal window
dungeon-adventure-infra-sandbox-Application
dungeon-adventure-infra-sandbox-Application: deploying... [2/2]
dungeon-adventure-infra-sandbox-Application
部署时间:354秒
输出:
dungeon-adventure-infra-sandbox-Application.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-Application-ElectroDbTableXXX-YYY
dungeon-adventure-infra-sandbox-Application.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/
dungeon-adventure-infra-sandbox-Application.GameUIDistributionDomainNameXXX = xxx.cloudfront.net
dungeon-adventure-infra-sandbox-Application.InventoryMcpArn = arn:aws:bedrock-agentcore:region:xxxxxxx:runtime/dungeonadventureventoryMcpServerXXXX-YYYY
dungeon-adventure-infra-sandbox-Application.RuntimeConfigApplicationId = xxxx
dungeon-adventure-infra-sandbox-Application.StoryAgentArn = arn:aws:bedrock-agentcore:region:xxxxxxx:runtime/dungeonadventurecationStoryAgentXXXX-YYYY
dungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxx
dungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityUserPoolIdXXX = region_xxx

可通过以下方式测试代理:

  • 启动代理服务器本地实例并通过生成的 agent-chat 目标与其聊天,或
  • 使用携带 JWT 令牌的 curl 调用已部署 API。

使用生成的 agent-chat 目标打开针对本地服务的 AG-UI 代理副本的交互式 REPL:

Terminal window
pnpm nx agent-chat story

agent-chat 具有 dependsOn: ['agent-serve-local'],因此无需单独启动代理服务器 —— Nx 将生成 agent-serve-local(它会在本地启动库存 MCP 服务器),然后针对 http://localhost:8081/invocations 启动 agent-chat-cli AG-UI REPL。您的第一条消息应该告诉代理您的英雄姓名和类型(例如:My name is Alice. Start my zombie adventure.),故事将流式返回。

若命令运行成功,将开始看到故事以 AG-UI 事件流(Server-Sent Events)的形式流式返回 —— RUN_STARTED / TEXT_MESSAGE_START / TEXT_MESSAGE_CONTENT 块包装实际的故事令牌:

data: {"type":"RUN_STARTED","threadId":"Alice-zombie00000000000000000000000",...}
data: {"type":"TEXT_MESSAGE_START","messageId":"...","role":"assistant"}
data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"...","delta":"Greetings, Alice. "}
data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"...","delta":"The moans grow louder beyond the barricaded door..."}

恭喜!您已成功在 Bedrock AgentCore 运行时上构建并部署首个 Strands 代理!🎉🎉🎉