Jeu de Donjon IA
Module 3 : Implémentation de l’API Story
La StoryApi comprend une unique API generate_story
qui, étant donné un Game
et une liste d’Action
comme contexte, fait progresser une histoire. Cette API sera implémentée comme une API de streaming en Python/FastAPI et démontrera également comment modifier le code généré pour l’adapter à son usage.
Implémentation de l’API
Pour créer notre API, nous devons d’abord installer quelques dépendances supplémentaires.
boto3
sera utilisé pour appeler Amazon Bedrock ;uvicorn
sera utilisé pour démarrer notre API en conjonction avec le Lambda Web Adapter (LWA).copyfiles
est une dépendance npm nécessaire pour supporter la copie multiplateforme de fichiers lors de la mise à jour de notre tâchebundle
.
Pour installer ces dépendances, exécutez les commandes suivantes :
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
yarn nx run dungeon_adventure.story_api:add --args boto3 uvicorn
npx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
bunx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
pnpm add -Dw copyfiles
yarn add -D copyfiles
npm install --legacy-peer-deps -D copyfiles
bun install -D copyfiles
Maintenant remplaçons le contenu des fichiers suivants dans packages/story_api/story_api
:
import json
from boto3 import clientfrom fastapi.responses import PlainTextResponse, StreamingResponsefrom pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel): role: str content: str
class StoryRequest(BaseModel): genre: str playerName: str actions: list[Action]
async def bedrock_stream(request: StoryRequest): messages = [ {"role": "user", "content": "Continue or create a new story..."} ]
for action in request.actions: messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "system":f""" You are running an AI text adventure game in the {request.genre} genre. Player: {request.playerName}. Return less than 200 characters of text. """, "messages": messages, "max_tokens": 1000, "temperature": 0.7, "anthropic_version": "bedrock-2023-05-31" }) )
stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n"
@app.post("/story/generate", openapi_extra={'x-streaming': True}, response_class=PlainTextResponse)def generate_story(request: StoryRequest) -> str: return StreamingResponse(bedrock_stream(request), media_type="text/plain")
import osimport uuidfrom collections.abc import Callable
from aws_lambda_powertools import Logger, Metrics, Tracerfrom aws_lambda_powertools.metrics import MetricUnitfrom fastapi import FastAPI, Request, Responsefrom fastapi.openapi.utils import get_openapifrom fastapi.responses import JSONResponsefrom fastapi.routing import APIRoutefrom mangum import Mangumfrom pydantic import BaseModelfrom starlette.middleware.exceptions import ExceptionMiddleware
os.environ["POWERTOOLS_METRICS_NAMESPACE"] = "StoryApi"os.environ["POWERTOOLS_SERVICE_NAME"] = "StoryApi"
logger: Logger = Logger()metrics: Metrics = Metrics()tracer: Tracer = Tracer()
class InternalServerErrorDetails(BaseModel): detail: str
app = FastAPI( title="StoryApi", responses={ 500: {"model": InternalServerErrorDetails} })lambda_handler = Mangum(app)
# Add tracinglambda_handler.__name__ = "handler" # tracer requires __name__ to be setlambda_handler = tracer.capture_lambda_handler(lambda_handler)# Add logginglambda_handler = logger.inject_lambda_context(lambda_handler, clear_state=True)# Add metrics last to properly flush metrics.lambda_handler = metrics.log_metrics(lambda_handler, capture_cold_start_metric=True)
# Add exception middleware(s)app.add_middleware(ExceptionMiddleware, handlers=app.exception_handlers)
@app.exception_handler(Exception)async def unhandled_exception_handler(request, err): logger.exception("Unhandled exception")
metrics.add_metric(name="Failure", unit=MetricUnit.Count, value=1)
return JSONResponse(status_code=500, content=InternalServerErrorDetails( detail="Internal Server Error").model_dump())
@app.middleware("http")async def metrics_handler(request: Request, call_next): metrics.add_dimension("route", f"{request.method} {request.url.path}") metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
response = await call_next(request)
if response.status_code == 200: metrics.add_metric(name="Success", unit=MetricUnit.Count, value=1)
return response
# Add correlation id middleware@app.middleware("http")async def add_correlation_id(request: Request, call_next): # Get correlation id from X-Correlation-Id header corr_id = request.headers.get("x-correlation-id") if not corr_id and "aws.context" in request.scope: # If empty, use request id from aws context corr_id = request.scope["aws.context"].aws_request_id elif not corr_id: # If still empty, use uuid corr_id = uuid.uuid4().hex
# Add correlation id to logs logger.set_correlation_id(corr_id)
response = await call_next(request)
# Return correlation header in response response.headers["X-Correlation-Id"] = corr_id return response
class LoggerRouteHandler(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler()
async def route_handler(request: Request) -> Response: # Add fastapi context to logs ctx = { "path": request.url.path, "route": self.path, "method": request.method, } logger.append_keys(fastapi=ctx) logger.info("Received request")
return await original_route_handler(request)
return route_handler
app.router.route_class = LoggerRouteHandler
def custom_openapi(): if app.openapi_schema: return app.openapi_schema for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name openapi_schema = get_openapi( title=app.title, version=app.version, openapi_version=app.openapi_version, description=app.description, routes=app.routes, ) app.openapi_schema = openapi_schema return app.openapi_schema
app.openapi = custom_openapi
Analyse du code :
- Nous utilisons le paramètre
x-streaming
pour indiquer qu’il s’agit d’une API de streaming lors de la génération de notre SDK client. Cela permet de consommer cette API en streaming tout en conservant la sécurité des types ! - Notre API retourne simplement un flux texte comme défini par
media_type="text/plain"
etresponse_class=PlainTextResponse
Infrastructure
L’infrastructure configurée précédemment suppose que toutes les APIs utilisent une API Gateway intégrée à des fonctions Lambda. Pour notre story_api
, nous ne voulons pas utiliser API Gateway car il ne supporte pas les réponses en streaming. À la place, nous utiliserons une URL de fonction Lambda configurée avec le streaming de réponse.
Pour supporter cela, nous allons d’abord mettre à jour nos constructs CDK comme suit :
import { Duration, Stack, CfnOutput } from 'aws-cdk-lib';import { IGrantable, Grant } from 'aws-cdk-lib/aws-iam';import { Runtime, Code, Tracing, LayerVersion, FunctionUrlAuthType, InvokeMode, Function,} from 'aws-cdk-lib/aws-lambda';import { Construct } from 'constructs';import url from 'url';import { RuntimeConfig } from '../../core/runtime-config.js';
export class StoryApi extends Construct { public readonly handler: Function;
constructor(scope: Construct, id: string) { super(scope, id);
this.handler = new Function(this, 'Handler', { runtime: Runtime.PYTHON_3_12, handler: 'run.sh', code: Code.fromAsset( url.fileURLToPath( new URL( '../../../../../../dist/packages/story_api/bundle', import.meta.url, ), ), ), timeout: Duration.seconds(30), tracing: Tracing.ACTIVE, environment: { AWS_CONNECTION_REUSE_ENABLED: '1', }, });
const stack = Stack.of(this); this.handler.addLayers( LayerVersion.fromLayerVersionArn( this, 'LWALayer', `arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`, ), ); this.handler.addEnvironment('PORT', '8000'); this.handler.addEnvironment('AWS_LWA_INVOKE_MODE', 'response_stream'); this.handler.addEnvironment('AWS_LAMBDA_EXEC_WRAPPER', '/opt/bootstrap'); const functionUrl = this.handler.addFunctionUrl({ authType: FunctionUrlAuthType.AWS_IAM, invokeMode: InvokeMode.RESPONSE_STREAM, cors: { allowedOrigins: ['*'], allowedHeaders: [ 'authorization', 'content-type', 'x-amz-content-sha256', 'x-amz-date', 'x-amz-security-token', ], }, });
new CfnOutput(this, 'StoryApiUrl', { value: functionUrl.url });
// Register the API URL in runtime configuration for client discovery RuntimeConfig.ensure(this).config.apis = { ...RuntimeConfig.ensure(this).config.apis!, StoryApi: functionUrl.url, }; }
public grantInvokeAccess(grantee: IGrantable) { Grant.addToPrincipal({ grantee, actions: ['lambda:InvokeFunctionUrl'], resourceArns: [this.handler.functionArn], conditions: { StringEquals: { 'lambda:FunctionUrlAuthType': 'AWS_IAM', }, }, }); }}
import { GameApi, GameUI, StoryApi, UserIdentity,} from ':dungeon-adventure/common-constructs';import * as cdk from 'aws-cdk-lib';import { Construct } from 'constructs';import { ElectrodbDynamoTable } from '../constructs/electrodb-table.js';import { PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';
export class ApplicationStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props);
// The code that defines your stack goes here const userIdentity = new UserIdentity(this, 'UserIdentity');
const electroDbTable = new ElectrodbDynamoTable(this, 'ElectroDbTable');
const gameApi = new GameApi(this, 'GameApi', { integrations: GameApi.defaultIntegrations(this) .withDefaultOptions({ environment: { TABLE_NAME: electroDbTable.tableName, }, }) .build(), });
// Grant read/write access to each procedure's lambda handler according to the permissions it requires // Grant read/write access to each handler depending on the permissions it requires electroDbTable.grantReadData(gameApi.integrations['actions.query'].handler); electroDbTable.grantReadData(gameApi.integrations['games.query'].handler); electroDbTable.grantReadWriteData( gameApi.integrations['actions.save'].handler, ); electroDbTable.grantReadWriteData( gameApi.integrations['games.save'].handler, );
const storyApi = new StoryApi(this, 'StoryApi', { integrations: StoryApi.defaultIntegrations(this).build(), }); const storyApi = new StoryApi(this, 'StoryApi'); storyApi.handler.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, actions: ['bedrock:InvokeModelWithResponseStream'], resources: [ 'arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0', ], }), );
// grant our authenticated role access to invoke our APIs [storyApi, gameApi].forEach((api) => api.grantInvokeAccess(userIdentity.identityPool.authenticatedRole), );
// Ensure this is instantiated last so our runtime-config.json can be automatically configured new GameUI(this, 'GameUI'); }}
Maintenant nous mettrons à jour la story_api
pour supporter le déploiement du Lambda Web Adapter.
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \ PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \ exec python -m uvicorn --port=$PORT story_api.main:app
{ "name": "dungeon_adventure.story_api", ... "targets": { ... "bundle": { "cache": true, "executor": "nx:run-commands", "outputs": ["{workspaceRoot}/dist/packages/story_api/bundle"], "options": { "commands": [ "uv export --frozen --no-dev --no-editable --project story_api -o dist/packages/story_api/bundle/requirements.txt", "uv pip install -n --no-installer-metadata --no-compile-bytecode --python-platform x86_64-manylinux2014 --target dist/packages/story_api/bundle -r dist/packages/story_api/bundle/requirements.txt", "copyfiles -f packages/story_api/run.sh dist/packages/story_api/bundle" ], "parallel": false }, "dependsOn": ["compile"] }, ... }}
Déploiement et tests
D’abord, construisons la base de code :
pnpm nx run-many --target build --all
yarn nx run-many --target build --all
npx nx run-many --target build --all
bunx nx run-many --target build --all
Votre application peut maintenant être déployée avec la commande suivante :
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
yarn nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
npx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
bunx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
Ce déploiement prendra environ 2 minutes.
Commande de déploiement
Vous pouvez également déployer toutes les stacks de l’application CDK avec :
pnpm nx run @dungeon-adventure/infra:deploy --all
yarn nx run @dungeon-adventure/infra:deploy --all
npx nx run @dungeon-adventure/infra:deploy --all
bunx nx run @dungeon-adventure/infra:deploy --all
Ceci n’est pas recommandé car vous pourriez choisir de séparer vos environnements de déploiement en stacks distinctes (ex: infra-prod)
. Dans ce cas, le flag --all
tentera de tout déployer, ce qui peut entraîner des déploiements indésirables !
Une fois le déploiement terminé, vous devriez voir des sorties similaires à ceci (certaines valeurs ont été masquées) :
dungeon-adventure-infra-sandboxdungeon-adventure-infra-sandbox: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox
✨ Temps de déploiement : 354s
Outputs:dungeon-adventure-infra-sandbox.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/dungeon-adventure-infra-sandbox.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox.UserIdentityUserIdentityUserPoolIdXXX = region_xxx
Nous pouvons tester notre API de deux manières :
- Démarrer une instance locale du serveur FastAPI et appeler les APIs avec
curl
- Appeler l'API déployée directement avec curl activé pour Sigv4
curl avec Sigv4
Vous pouvez soit ajouter ce script à votre fichier
.bashrc
(et lesourcer
), soit le coller directement dans le terminal.~/.bashrc acurl () {REGION=$1SERVICE=$2shift; shift;curl --aws-sigv4 "aws:amz:$REGION:$SERVICE" --user "$(aws configure get aws_access_key_id):$(aws configure get aws_secret_access_key)" -H "X-Amz-Security-Token: $(aws configure get aws_session_token)" "$@"}Exemples d’utilisation :
API Gateway
Fenêtre de terminal acurl ap-southeast-2 execute-api -X GET https://xxxURL de fonction Lambda en streaming
Fenêtre de terminal acurl ap-southeast-2 lambda -N -X POST https://xxxVous pouvez ajouter cette fonction à votre profil PowerShell ou la coller dans la session actuelle.
Fenêtre de terminal function acurl {param([Parameter(Mandatory=$true)][string]$Region,[Parameter(Mandatory=$true)][string]$Service,[Parameter(ValueFromRemainingArguments=$true)][string[]]$CurlArgs)$AccessKey = aws configure get aws_access_key_id$SecretKey = aws configure get aws_secret_access_key$SessionToken = aws configure get aws_session_token& curl --aws-sigv4 "aws:amz:$Region`:$Service" --user "$AccessKey`:$SecretKey" -H "X-Amz-Security-Token: $SessionToken" @CurlArgs}Exemples d’utilisation :
API Gateway
Fenêtre de terminal acurl ap-southeast-2 execute-api -X GET https://xxxURL de fonction Lambda en streaming
Fenêtre de terminal acurl ap-southeast-2 lambda -N -X POST https://xxx
Démarrez le serveur FastAPI local avec :
pnpm nx run dungeon_adventure.story_api:serve
yarn nx run dungeon_adventure.story_api:serve
npx nx run dungeon_adventure.story_api:serve
bunx nx run dungeon_adventure.story_api:serve
Une fois le serveur démarré, appelez-le avec :
curl -N -X POST http://127.0.0.1:8000/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
acurl ap-southeast-2 lambda -N -X POST \ https://xxx.lambda-url.ap-southeast-2.on.aws/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
Si la commande réussit, vous devriez voir une réponse en streaming similaire à :
UnnamedHero se tenait fier, sa cape flottant au vent....
Félicitations. Vous avez créé et déployé votre première API avec FastAPI ! 🎉🎉🎉