Skip to content

AIダンジョンゲーム

モジュール3: ストーリーAPIの実装

StoryApiは、Gameとコンテキスト用のActionリストを受け取り、ストーリーを進行させる単一のAPIgenerate_storyで構成されます。このAPIはPython/FastAPIでストリーミングAPIとして実装され、生成されたコードを目的に合わせて変更する方法も示します。

API実装

APIを作成するには、まず追加の依存関係をインストールする必要があります。

  • boto3はAmazon Bedrockの呼び出しに使用します
  • uvicornLambda Web Adapter (LWA)と組み合わせてAPI起動に使用します
  • copyfilesbundleタスク更新時にクロスプラットフォームのファイルコピーをサポートするnpm依存関係です

以下のコマンドを実行して依存関係をインストールします:

Terminal window
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
Terminal window
pnpm add -Dw copyfiles

次にpackages/story_api/story_api/main.pyの内容を以下のように置き換えます:

packages/story_api/story_api/main.py
import json
from boto3 import client
from fastapi.responses import PlainTextResponse, StreamingResponse
from pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel):
role: str
content: str
class StoryRequest(BaseModel):
genre: str
playerName: str
actions: list[Action]
async def bedrock_stream(request: StoryRequest):
messages = [
{"role": "user", "content": "Continue or create a new story..."}
]
for action in request.actions:
messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"system":f"""
You are running an AI text adventure game in the {request.genre} genre.
Player: {request.playerName}. Return less than 200 characters of text.
""",
"messages": messages,
"max_tokens": 1000,
"temperature": 0.7,
"anthropic_version": "bedrock-2023-05-31"
})
)
stream = response.get('body')
if stream:
for event in stream:
chunk = event.get('chunk')
if chunk:
message = json.loads(chunk.get("bytes").decode())
if message['type'] == "content_block_delta":
yield message['delta']['text'] or ""
elif message['type'] == "message_stop":
yield "\n"
@app.post("/story/generate",
openapi_extra={'x-streaming': True, 'x-query': True},
response_class=PlainTextResponse)
def generate_story(request: StoryRequest) -> str:
return StreamingResponse(bedrock_stream(request), media_type="text/plain")

上記のコード分析:

  • クライアントSDK生成時にストリーミングAPIであることを示すx-streaming設定を使用。型安全性を維持したままストリーミング処理が可能
  • POSTリクエストをmutationではなくqueryとして扱うx-query設定を使用。TanStack Queryのストリーミング状態管理を活用
  • media_type="text/plain"response_class=PlainTextResponseでテキストストリームを返す

インフラストラクチャ

以前設定したインフラストラクチャは、すべてのAPIがLambdaと統合するAPI Gatewayを使用する前提でした。story_apiではストリーミング応答をサポートしないためAPI Gatewayを使用せず、レスポンスストリーミング対応のLambda Function URLを使用します。

CDKコンストラクトを以下のように更新します:

packages/common/constructs/src/core/http-api.ts
import { Construct } from 'constructs';
import { CfnOutput, Duration, Stack } from 'aws-cdk-lib';
import {
CorsHttpMethod,
HttpApi as _HttpApi,
HttpMethod,
IHttpRouteAuthorizer,
} from 'aws-cdk-lib/aws-apigatewayv2';
import { HttpLambdaIntegration } from 'aws-cdk-lib/aws-apigatewayv2-integrations';
import {
Code,
Function,
FunctionUrl,
FunctionUrlAuthType,
InvokeMode,
LayerVersion,
Runtime,
Tracing,
} from 'aws-cdk-lib/aws-lambda';
import { Grant, IGrantable } from 'aws-cdk-lib/aws-iam';
import { RuntimeConfig } from './runtime-config.js';
export interface HttpApiProps {
readonly apiName: string;
readonly handler: string;
readonly handlerFilePath: string;
readonly runtime: Runtime;
readonly defaultAuthorizer: IHttpRouteAuthorizer;
readonly apiType?: 'api-gateway' | 'function-url-streaming';
readonly allowedOrigins?: string[];
}
export class HttpApi extends Construct {
public readonly api?: _HttpApi;
public readonly routerFunctionUrl?: FunctionUrl;
public readonly routerFunction: Function;
constructor(scope: Construct, id: string, props: HttpApiProps) {
super(scope, id);
this.routerFunction = new Function(this, `${id}Handler`, {
timeout: Duration.seconds(30),
runtime: props.runtime,
handler: props.handler,
code: Code.fromAsset(props.handlerFilePath),
tracing: Tracing.ACTIVE,
environment: {
AWS_CONNECTION_REUSE_ENABLED: '1',
},
});
let apiUrl;
if (props.apiType === 'function-url-streaming') {
const stack = Stack.of(this);
this.routerFunction.addLayers(
LayerVersion.fromLayerVersionArn(
this,
'LWALayer',
`arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`,
),
);
this.routerFunction.addEnvironment('PORT', '8000');
this.routerFunction.addEnvironment(
'AWS_LWA_INVOKE_MODE',
'response_stream',
);
this.routerFunction.addEnvironment(
'AWS_LAMBDA_EXEC_WRAPPER',
'/opt/bootstrap',
);
this.routerFunctionUrl = this.routerFunction.addFunctionUrl({
authType: FunctionUrlAuthType.AWS_IAM,
invokeMode: InvokeMode.RESPONSE_STREAM,
cors: {
allowedOrigins: props.allowedOrigins ?? ['*'],
allowedHeaders: [
'authorization',
'content-type',
'x-amz-content-sha256',
'x-amz-date',
'x-amz-security-token',
],
},
});
apiUrl = this.routerFunctionUrl.url;
} else {
this.api = new _HttpApi(this, id, {
corsPreflight: {
allowOrigins: props.allowedOrigins ?? ['*'],
allowMethods: [CorsHttpMethod.ANY],
allowHeaders: [
'authorization',
'content-type',
'x-amz-content-sha256',
'x-amz-date',
'x-amz-security-token',
],
},
defaultAuthorizer: props.defaultAuthorizer,
});
this.api.addRoutes({
path: '/{proxy+}',
methods: [
HttpMethod.GET,
HttpMethod.DELETE,
HttpMethod.POST,
HttpMethod.PUT,
HttpMethod.PATCH,
HttpMethod.HEAD,
],
integration: new HttpLambdaIntegration(
'RouterIntegration',
this.routerFunction,
),
});
apiUrl = this.api.url;
}
new CfnOutput(this, `${props.apiName}Url`, { value: apiUrl! });
RuntimeConfig.ensure(this).config.httpApis = {
...RuntimeConfig.ensure(this).config.httpApis!,
[props.apiName]: apiUrl,
};
}
public grantInvokeAccess(grantee: IGrantable) {
if (this.api) {
Grant.addToPrincipal({
grantee,
actions: ['execute-api:Invoke'],
resourceArns: [this.api.arnForExecuteApi('*', '/*', '*')],
});
} else if (this.routerFunction) {
Grant.addToPrincipal({
grantee,
actions: ['lambda:InvokeFunctionUrl'],
resourceArns: [this.routerFunction.functionArn],
conditions: {
StringEquals: {
'lambda:FunctionUrlAuthType': 'AWS_IAM',
},
},
});
}
}
}

Lambda Web Adapterのデプロイをサポートするためstory_apiを更新します:

packages/story_api/run.sh
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \
PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \
exec python -m uvicorn --port=$PORT story_api.main:app

デプロイとテスト

まずコードベースをビルドします:

Terminal window
pnpm nx run-many --target build --all

以下のコマンドを実行してアプリケーションをデプロイできます:

Terminal window
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox

このデプロイは約2分かかります。

すべてのスタックを一度にデプロイする方法の詳細はこちらをクリック

デプロイが完了すると、以下のような出力が表示されます(一部の値は編集済み):

Terminal window
dungeon-adventure-infra-sandbox
dungeon-adventure-infra-sandbox: deploying... [2/2]
dungeon-adventure-infra-sandbox
Deployment time: 354s
Outputs:
dungeon-adventure-infra-sandbox.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-ElectroDbTableXXX-YYY
dungeon-adventure-infra-sandbox.GameApiGameApiUrlXXX = https://xxx.region.amazonaws.com/
dungeon-adventure-infra-sandbox.GameUIDistributionDomainNameXXX = xxx.cloudfront.net
dungeon-adventure-infra-sandbox.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/
dungeon-adventure-infra-sandbox.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxx
dungeon-adventure-infra-sandbox.UserIdentityUserIdentityUserPoolIdXXX = region_xxx

以下の方法でAPIをテストできます:

  • FastAPIサーバーをローカルで起動しcurlでAPIを呼び出す
  • デプロイ済みAPIをsigv4対応curlで直接呼び出す

以下のコマンドでローカルFastAPIサーバーを起動:

Terminal window
pnpm nx run dungeon_adventure.story_api:serve

サーバー起動後、以下のコマンドでAPIを呼び出します:

Terminal window
curl -N -X POST http://127.0.0.1:8000/story/generate \
-d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \
-H "Content-Type: application/json"

コマンドが正常に実行されると、以下のようなストリーミング応答が表示されます:

UnnamedHero stood tall, his cape billowing in the wind....

おめでとうございます。FastAPIを使用した初めてのAPIの構築とデプロイに成功しました! 🎉🎉🎉