实现和配置Story代理
任务 1:实现故事代理
Section titled “任务 1:实现故事代理”故事代理是一个在 模块 1 中使用 --protocol=AG-UI 生成的 Strands 代理,因此 UI 可以通过 CopilotKit 以 Agent-User Interaction 协议从中流式传输数据。它使用库存管理控制协议(MCP)服务器来管理玩家的物品,并使用 Strands 内置的 S3SessionManager 将对话历史持久化到我们在模块 2 中配置的会话存储桶中。
更新 packages/story/dungeon_adventure_story/agent 目录下的以下文件:
import loggingimport osimport uuidfrom functools import cachefrom typing import Any, cast
from ag_ui.core import RunAgentInputfrom ag_ui_strands import StrandsAgent, StrandsAgentConfig, create_strands_appfrom aws_lambda_powertools.utilities import parametersfrom dungeon_adventure_agent_connection import session_id_contextfrom fastapi import Requestfrom starlette.middleware.base import BaseHTTPMiddlewarefrom strands.session import FileSessionManager, S3SessionManager, SessionManager
from .agent import get_agent
logging.basicConfig(level=logging.INFO)
SESSION_ID_HEADER = "x-amzn-bedrock-agentcore-runtime-session-id"
@cachedef _resolve_sessions_bucket() -> str: """Read the conversation-history bucket name from runtime config.
Resolved lazily (and memoised) so `fastapi dev` can import this module before ``RUNTIME_CONFIG_APP_ID`` is in the environment. """ application = os.environ.get("RUNTIME_CONFIG_APP_ID") if not application: raise RuntimeError("RUNTIME_CONFIG_APP_ID is not set — cannot resolve the StorySessions bucket.") provider = parameters.AppConfigProvider(environment="default", application=application) buckets = cast(dict[str, Any], provider.get("buckets", transform="json")) return buckets["StorySessions"]["bucketName"]
def _session_manager_provider(input_data: RunAgentInput) -> SessionManager: """Create a session manager keyed by the AG-UI thread_id.
- In AgentCore (and `agent-serve`), persist to the shared ``StorySessions`` S3 bucket — the same bucket the Game API reads from to rebuild conversation history on revisit. - In `agent-serve-local` (`SERVE_LOCAL=true`), persist to a temp directory so the agent can run fully offline against the local MCP server without any AWS calls. """ session_id = input_data.thread_id or "default" if os.environ.get("SERVE_LOCAL") == "true": return FileSessionManager(session_id=session_id, storage_dir="/tmp/strands-sessions") return S3SessionManager(session_id=session_id, bucket=_resolve_sessions_bucket())
# The template Agent is cloned per thread_id by ``StrandsAgent`` — we plug in# a ``session_manager_provider`` so each thread gets its own session manager# and conversation history is replayed on subsequent turns and survives agent# restarts._agent_ctx = get_agent()_agent = _agent_ctx.__enter__()
agui_agent = StrandsAgent( agent=_agent, name="StoryAgent", description="A Strands Agent exposed via the AG-UI protocol.", config=StrandsAgentConfig(session_manager_provider=_session_manager_provider),)
class _SessionIdMiddleware(BaseHTTPMiddleware): """Bind the session ID for this request so downstream MCP / A2A clients forward it on outbound calls."""
async def dispatch(self, request: Request, call_next): session_id = request.headers.get(SESSION_ID_HEADER) or str(uuid.uuid4()) with session_id_context(session_id): return await call_next(request)
app = create_strands_app(agui_agent, path="/invocations")app.add_middleware(_SessionIdMiddleware)import loggingimport osimport uuidfrom functools import cachefrom typing import Any, cast
from ag_ui_strands import StrandsAgent, create_strands_appfrom ag_ui.core import RunAgentInputfrom ag_ui_strands import StrandsAgent, StrandsAgentConfig, create_strands_appfrom aws_lambda_powertools.utilities import parametersfrom dungeon_adventure_agent_connection import session_id_contextfrom fastapi import Requestfrom starlette.middleware.base import BaseHTTPMiddlewarefrom strands.session import FileSessionManager, S3SessionManager, SessionManager
from .agent import get_agent
logging.basicConfig(level=logging.INFO)
SESSION_ID_HEADER = "x-amzn-bedrock-agentcore-runtime-session-id"
# Create AG-UI agent wrapper
@cachedef _resolve_sessions_bucket() -> str: """Read the conversation-history bucket name from runtime config.
Resolved lazily (and memoised) so `fastapi dev` can import this module before ``RUNTIME_CONFIG_APP_ID`` is in the environment. """ application = os.environ.get("RUNTIME_CONFIG_APP_ID") if not application: raise RuntimeError("RUNTIME_CONFIG_APP_ID is not set — cannot resolve the StorySessions bucket.") provider = parameters.AppConfigProvider(environment="default", application=application) buckets = cast(dict[str, Any], provider.get("buckets", transform="json")) return buckets["StorySessions"]["bucketName"]
def _session_manager_provider(input_data: RunAgentInput) -> SessionManager: """Create a session manager keyed by the AG-UI thread_id.
- In AgentCore (and `agent-serve`), persist to the shared ``StorySessions`` S3 bucket — the same bucket the Game API reads from to rebuild conversation history on revisit. - In `agent-serve-local` (`SERVE_LOCAL=true`), persist to a temp directory so the agent can run fully offline against the local MCP server without any AWS calls. """ session_id = input_data.thread_id or "default" if os.environ.get("SERVE_LOCAL") == "true": return FileSessionManager(session_id=session_id, storage_dir="/tmp/strands-sessions") return S3SessionManager(session_id=session_id, bucket=_resolve_sessions_bucket())
# The template Agent is cloned per thread_id by ``StrandsAgent`` — we plug in# a ``session_manager_provider`` so each thread gets its own session manager# and conversation history is replayed on subsequent turns and survives agent# restarts._agent_ctx = get_agent()_agent = _agent_ctx.__enter__()
agui_agent = StrandsAgent( agent=_agent, name="StoryAgent", description="A Strands Agent exposed via the AG-UI protocol.", config=StrandsAgentConfig(session_manager_provider=_session_manager_provider),)
class _SessionIdMiddleware(BaseHTTPMiddleware): """Bind the session ID for this request so downstream MCP / A2A clients forward it on outbound calls."""
async def dispatch(self, request: Request, call_next): session_id = request.headers.get(SESSION_ID_HEADER) or str(uuid.uuid4()) with session_id_context(session_id): return await call_next(request)
# Create FastAPI app with AG-UI endpoint and health checkapp = create_strands_app(agui_agent, path="/invocations")app.add_middleware(_SessionIdMiddleware)from contextlib import contextmanager
from dungeon_adventure_agent_connection import InventoryMcpServerClientfrom strands import Agent
@contextmanagerdef get_agent(): inventory_mcp_server = InventoryMcpServerClient.create() with ( inventory_mcp_server, ): yield Agent( system_prompt="""You are running a text adventure game for a lone adventurer. When a new storybegins, the first user message will tell you the player's name and the genre(one of 'medieval', 'zombie', 'superhero'). Greet the player by name, set thescene in the chosen genre, and populate their inventory with a few startingitems. On subsequent turns, advance the story in response to the player'sactions and keep item state in sync with the narrative.Use the tools to manage the player's inventory as items are obtained or lost.Item names in the inventory must be Title Case — match them exactly.Only use list-inventory-items if you are unsure of the exact item name before modifying it.IMPORTANT: Only call ONE tool per response turn. Never batch multiple tool calls in a single turn.Ensure you specify a suitable emoji when adding items if available.Items should be a key part of the narrative.Keep responses under 100 words.""", tools=[*inventory_mcp_server.list_tools_sync()], )from contextlib import contextmanager
from dungeon_adventure_agent_connection import InventoryMcpServerClientfrom strands import Agent, toolfrom strands_tools import current_timefrom strands import Agent
@tooldef subtract(a: int, b: int) -> int: return a - b
@contextmanagerdef get_agent(): inventory_mcp_server = InventoryMcpServerClient.create() with ( inventory_mcp_server, ): yield Agent( name="StoryAgent", description="StoryAgent Strands Agent", system_prompt="""You are a mathematical wizard.Use your tools for mathematical tasks.Refer to tools as your 'spellbook'.You are running a text adventure game for a lone adventurer. When a new storybegins, the first user message will tell you the player's name and the genre(one of 'medieval', 'zombie', 'superhero'). Greet the player by name, set thescene in the chosen genre, and populate their inventory with a few startingitems. On subsequent turns, advance the story in response to the player'sactions and keep item state in sync with the narrative.Use the tools to manage the player's inventory as items are obtained or lost.Item names in the inventory must be Title Case — match them exactly.Only use list-inventory-items if you are unsure of the exact item name before modifying it.IMPORTANT: Only call ONE tool per response turn. Never batch multiple tool calls in a single turn.Ensure you specify a suitable emoji when adding items if available.Items should be a key part of the narrative.Keep responses under 100 words.""", tools=[subtract, current_time, *inventory_mcp_server.list_tools_sync()], tools=[*inventory_mcp_server.list_tools_sync()], )更改内容包括:
main.py添加了一个session_manager_provider,在部署时为每个thread_id创建一个S3SessionManager,在agent-serve-local(SERVE_LOCAL=true)下运行时则回退到/tmp/strands-sessions下的磁盘FileSessionManager。部署时,S3 存储桶与 Game API 的queryActions读取的存储桶相同,因此浏览器可以在重新访问时重建对话记录;在本地,代理完全离线运行,针对本地 MCP 服务器,无需调用 AWS。agent.py删除了示例subtract工具,并将系统提示替换为地牢主持人提示,该提示邀请第一条用户消息说明玩家的姓名和类型,并使用库存 MCP 服务器的工具。
任务 2:部署与测试
Section titled “任务 2:部署与测试”要构建代码:
pnpm buildyarn buildnpm run buildbun build部署应用程序
Section titled “部署应用程序”要部署应用程序,请运行以下命令:
pnpm nx deploy infra "dungeon-adventure-infra-sandbox/*"yarn nx deploy infra "dungeon-adventure-infra-sandbox/*"npx nx deploy infra "dungeon-adventure-infra-sandbox/*"bunx nx deploy infra "dungeon-adventure-infra-sandbox/*"部署过程约需 2 分钟完成。
部署完成后,将看到类似以下输出(部分值已脱敏):
dungeon-adventure-infra-sandbox-Applicationdungeon-adventure-infra-sandbox-Application: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox-Application
✨ 部署时间:354秒
输出:dungeon-adventure-infra-sandbox-Application.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-Application-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox-Application.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox-Application.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox-Application.InventoryMcpArn = arn:aws:bedrock-agentcore:region:xxxxxxx:runtime/dungeonadventureventoryMcpServerXXXX-YYYYdungeon-adventure-infra-sandbox-Application.RuntimeConfigApplicationId = xxxxdungeon-adventure-infra-sandbox-Application.StoryAgentArn = arn:aws:bedrock-agentcore:region:xxxxxxx:runtime/dungeonadventurecationStoryAgentXXXX-YYYYdungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityUserPoolIdXXX = region_xxx可通过以下方式测试代理:
- 启动代理服务器本地实例并通过生成的
agent-chat目标与其聊天,或 - 使用携带 JWT 令牌的 curl 调用已部署 API。
使用生成的 agent-chat 目标打开针对本地服务的 AG-UI 代理副本的交互式 REPL:
pnpm nx agent-chat storyyarn nx agent-chat storynpx nx agent-chat storybunx nx agent-chat storyagent-chat 具有 dependsOn: ['agent-serve-local'],因此无需单独启动代理服务器 —— Nx 将生成 agent-serve-local(它会在本地启动库存 MCP 服务器),然后针对 http://localhost:8081/invocations 启动 agent-chat-cli AG-UI REPL。您的第一条消息应该告诉代理您的英雄姓名和类型(例如:My name is Alice. Start my zombie adventure.),故事将流式返回。
测试已部署代理需通过 Cognito 认证获取 JWT 令牌。首先设置环境变量:
# 从 CDK 输出中设置 Cognito 用户池 ID 和客户端 IDexport POOL_ID="<CDK 输出的 UserPoolId>"export CLIENT_ID="<CDK 输出的 UserPoolClientId>"export REGION="<区域>"创建测试用户并获取认证令牌:
# 禁用 MFA 以简化用户创建aws cognito-idp set-user-pool-mfa-config \ --mfa-configuration OFF \ --user-pool-id $POOL_ID
# 创建用户aws cognito-idp admin-create-user \ --user-pool-id $POOL_ID \ --username "test" \ --temporary-password "TempPass123-" \ --region $REGION \ --message-action SUPPRESS > /dev/null
# 设置永久密码(请替换为更安全的密码!)aws cognito-idp admin-set-user-password \ --user-pool-id $POOL_ID \ --username "test" \ --password "PermanentPass123-" \ --region $REGION \ --permanent > /dev/null
# 用户认证并获取访问令牌export BEARER_TOKEN=$(aws cognito-idp initiate-auth \ --client-id "$CLIENT_ID" \ --auth-flow USER_PASSWORD_AUTH \ --auth-parameters USERNAME='test',PASSWORD='PermanentPass123-' \ --region $REGION \ --query "AuthenticationResult.AccessToken" \ --output text)通过 Bedrock AgentCore 运行时 URL 调用已部署代理:
# 设置 CDK 输出的 Story Agent ARNexport AGENT_ARN="<CDK 输出的 StoryAgentArn>"
# URL 编码 ARNexport ENCODED_ARN=$(echo $AGENT_ARN | sed 's/:/%3A/g' | sed 's/\//%2F/g')
# 构造调用 URLexport AGENT_URL="https://bedrock-agentcore.$REGION.amazonaws.com/runtimes/$ENCODED_ARN/invocations?qualifier=DEFAULT"
# 使用 AG-UI RunAgentInput 负载调用代理curl -N -X POST "$AGENT_URL" \ -H "authorization: Bearer $BEARER_TOKEN" \ -H "Content-Type: application/json" \ -H "X-Amzn-Bedrock-AgentCore-Runtime-Session-Id: Alice-zombie00000000000000000000000" \ -d '{ "threadId": "Alice-zombie00000000000000000000000", "runId": "run-1", "messages": [{ "id": "m1", "role": "user", "content": "My name is Alice. Start my zombie adventure." }], "tools": [], "context": [], "state": {}, "forwardedProps": {} }'若命令运行成功,将开始看到故事以 AG-UI 事件流(Server-Sent Events)的形式流式返回 —— RUN_STARTED / TEXT_MESSAGE_START / TEXT_MESSAGE_CONTENT 块包装实际的故事令牌:
data: {"type":"RUN_STARTED","threadId":"Alice-zombie00000000000000000000000",...}data: {"type":"TEXT_MESSAGE_START","messageId":"...","role":"assistant"}data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"...","delta":"Greetings, Alice. "}data: {"type":"TEXT_MESSAGE_CONTENT","messageId":"...","delta":"The moans grow louder beyond the barricaded door..."}恭喜!您已成功在 Bedrock AgentCore 运行时上构建并部署首个 Strands 代理!🎉🎉🎉