Jeu de Donjon IA
Module 3 : Implémentation de l’API Story
Section intitulée « Module 3 : Implémentation de l’API Story »La StoryApi comprend une unique API generate_story
qui, étant donné un Game
et une liste d’Action
comme contexte, fait progresser une histoire. Cette API sera implémentée comme une API streaming en Python/FastAPI et démontrera également comment modifier le code généré pour l’adapter à son usage.
Implémentation de l’API
Section intitulée « Implémentation de l’API »Pour créer notre API, nous devons d’abord installer quelques dépendances supplémentaires :
boto3
sera utilisé pour appeler Amazon Bedrock ;uvicorn
sera utilisé pour démarrer notre API en conjonction avec le Lambda Web Adapter (LWA) ;copyfiles
est une dépendance npm nécessaire pour supporter la copie multiplateforme de fichiers lors de la mise à jour de notre tâchebundle
.
Pour installer ces dépendances, exécutez les commandes suivantes :
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
yarn nx run dungeon_adventure.story_api:add --args boto3 uvicorn
npx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
bunx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
pnpm add -Dw copyfiles
yarn add -D copyfiles
npm install --legacy-peer-deps -D copyfiles
bun install -D copyfiles
Maintenant remplaçons le contenu des fichiers suivants dans packages/story_api/story_api
:
import json
from boto3 import clientfrom fastapi.responses import PlainTextResponse, StreamingResponsefrom pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel): role: str content: str
class StoryRequest(BaseModel): genre: str playerName: str actions: list[Action]
async def bedrock_stream(request: StoryRequest): messages = [ {"role": "user", "content": "Continue or create a new story..."} ]
for action in request.actions: messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "system":f""" You are running an AI text adventure game in the {request.genre} genre. Player: {request.playerName}. Return less than 200 characters of text. """, "messages": messages, "max_tokens": 1000, "temperature": 0.7, "anthropic_version": "bedrock-2023-05-31" }) )
stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n"
@app.post("/story/generate", openapi_extra={'x-streaming': True}, response_class=PlainTextResponse)def generate_story(request: StoryRequest) -> str: return StreamingResponse(bedrock_stream(request), media_type="text/plain")
import osimport uuidfrom collections.abc import Callable
from aws_lambda_powertools import Logger, Metrics, Tracerfrom aws_lambda_powertools.metrics import MetricUnitfrom fastapi import FastAPI, Request, Responsefrom fastapi.openapi.utils import get_openapifrom fastapi.responses import JSONResponsefrom fastapi.routing import APIRoutefrom mangum import Mangumfrom pydantic import BaseModelfrom starlette.middleware.exceptions import ExceptionMiddleware
os.environ["POWERTOOLS_METRICS_NAMESPACE"] = "StoryApi"os.environ["POWERTOOLS_SERVICE_NAME"] = "StoryApi"
logger: Logger = Logger()metrics: Metrics = Metrics()tracer: Tracer = Tracer()
class InternalServerErrorDetails(BaseModel): detail: str
app = FastAPI( title="StoryApi", responses={ 500: {"model": InternalServerErrorDetails} })lambda_handler = Mangum(app)
# Add tracinglambda_handler.__name__ = "handler" # tracer requires __name__ to be setlambda_handler = tracer.capture_lambda_handler(lambda_handler)# Add logginglambda_handler = logger.inject_lambda_context(lambda_handler, clear_state=True)# Add metrics last to properly flush metrics.lambda_handler = metrics.log_metrics(lambda_handler, capture_cold_start_metric=True)
# Add exception middleware(s)app.add_middleware(ExceptionMiddleware, handlers=app.exception_handlers)
@app.exception_handler(Exception)async def unhandled_exception_handler(request, err): logger.exception("Unhandled exception")
metrics.add_metric(name="Failure", unit=MetricUnit.Count, value=1)
return JSONResponse(status_code=500, content=InternalServerErrorDetails( detail="Internal Server Error").model_dump())
@app.middleware("http")async def metrics_handler(request: Request, call_next): metrics.add_dimension("route", f"{request.method} {request.url.path}") metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
response = await call_next(request)
if response.status_code == 200: metrics.add_metric(name="Success", unit=MetricUnit.Count, value=1)
return response
# Add correlation id middleware@app.middleware("http")async def add_correlation_id(request: Request, call_next): # Get correlation id from X-Correlation-Id header corr_id = request.headers.get("x-correlation-id") if not corr_id and "aws.context" in request.scope: # If empty, use request id from aws context corr_id = request.scope["aws.context"].aws_request_id elif not corr_id: # If still empty, use uuid corr_id = uuid.uuid4().hex
# Add correlation id to logs logger.set_correlation_id(corr_id)
response = await call_next(request)
# Return correlation header in response response.headers["X-Correlation-Id"] = corr_id return response
class LoggerRouteHandler(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler()
async def route_handler(request: Request) -> Response: # Add fastapi context to logs ctx = { "path": request.url.path, "route": self.path, "method": request.method, } logger.append_keys(fastapi=ctx) logger.info("Received request")
return await original_route_handler(request)
return route_handler
app.router.route_class = LoggerRouteHandler
def custom_openapi(): if app.openapi_schema: return app.openapi_schema for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name openapi_schema = get_openapi( title=app.title, version=app.version, openapi_version=app.openapi_version, description=app.description, routes=app.routes, ) app.openapi_schema = openapi_schema return app.openapi_schema
app.openapi = custom_openapi
Analyse du code :
- Nous utilisons le paramètre
x-streaming
pour indiquer qu’il s’agit d’une API streaming lors de la génération de notre SDK client. Cela permet de consommer cette API en mode streaming tout en conservant la sécurité des types ! - Notre API retourne simplement un flux texte comme défini par
media_type="text/plain"
etresponse_class=PlainTextResponse
Infrastructure
Section intitulée « Infrastructure »L’infrastructure configurée précédemment suppose que toutes les APIs utilisent une API Gateway intégrée à des fonctions Lambda. Pour notre story_api
, nous ne souhaitons pas utiliser API Gateway car il ne supporte pas les réponses streaming. À la place, nous utiliserons une URL de fonction Lambda configurée avec le streaming de réponse.
Pour supporter cela, nous allons d’abord mettre à jour nos constructions CDK comme suit :
import { Duration, Stack, CfnOutput } from 'aws-cdk-lib';import { IGrantable, Grant } from 'aws-cdk-lib/aws-iam';import { Runtime, Code, Tracing, LayerVersion, FunctionUrlAuthType, InvokeMode, Function,} from 'aws-cdk-lib/aws-lambda';import { Construct } from 'constructs';import url from 'url';import { RuntimeConfig } from '../../core/runtime-config.js';
export class StoryApi extends Construct { public readonly handler: Function;
constructor(scope: Construct, id: string) { super(scope, id);
this.handler = new Function(this, 'Handler', { runtime: Runtime.PYTHON_3_12, handler: 'run.sh', code: Code.fromAsset( url.fileURLToPath( new URL( '../../../../../../dist/packages/story_api/bundle', import.meta.url, ), ), ), timeout: Duration.seconds(30), tracing: Tracing.ACTIVE, environment: { AWS_CONNECTION_REUSE_ENABLED: '1', }, });
const stack = Stack.of(this); this.handler.addLayers( LayerVersion.fromLayerVersionArn( this, 'LWALayer', `arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`, ), ); this.handler.addEnvironment('PORT', '8000'); this.handler.addEnvironment('AWS_LWA_INVOKE_MODE', 'response_stream'); this.handler.addEnvironment('AWS_LAMBDA_EXEC_WRAPPER', '/opt/bootstrap'); const functionUrl = this.handler.addFunctionUrl({ authType: FunctionUrlAuthType.AWS_IAM, invokeMode: InvokeMode.RESPONSE_STREAM, cors: { allowedOrigins: ['*'], allowedHeaders: [ 'authorization', 'content-type', 'x-amz-content-sha256', 'x-amz-date', 'x-amz-security-token', ], }, });
new CfnOutput(this, 'StoryApiUrl', { value: functionUrl.url });
// Register the API URL in runtime configuration for client discovery RuntimeConfig.ensure(this).config.apis = { ...RuntimeConfig.ensure(this).config.apis!, StoryApi: functionUrl.url, }; }
public grantInvokeAccess(grantee: IGrantable) { Grant.addToPrincipal({ grantee, actions: ['lambda:InvokeFunctionUrl'], resourceArns: [this.handler.functionArn], conditions: { StringEquals: { 'lambda:FunctionUrlAuthType': 'AWS_IAM', }, }, }); }}
import { GameApi, GameUI, StoryApi, UserIdentity,} from ':dungeon-adventure/common-constructs';import * as cdk from 'aws-cdk-lib';import { Stack, StackProps } from 'aws-cdk-lib';import { Construct } from 'constructs';import { ElectrodbDynamoTable } from '../constructs/electrodb-table.js';import { PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';
export class ApplicationStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) {export class ApplicationStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props);
// The code that defines your stack goes here const userIdentity = new UserIdentity(this, 'UserIdentity');
const electroDbTable = new ElectrodbDynamoTable(this, 'ElectroDbTable');
const gameApi = new GameApi(this, 'GameApi', { integrations: GameApi.defaultIntegrations(this) .withDefaultOptions({ environment: { TABLE_NAME: electroDbTable.tableName, }, }) .build(), });
// Grant read/write access to each procedure's lambda handler according to the permissions it requires // Grant read/write access to each handler depending on the permissions it requires electroDbTable.grantReadData(gameApi.integrations['actions.query'].handler); electroDbTable.grantReadData(gameApi.integrations['games.query'].handler); electroDbTable.grantReadWriteData( gameApi.integrations['actions.save'].handler, ); electroDbTable.grantReadWriteData( gameApi.integrations['games.save'].handler, );
const storyApi = new StoryApi(this, 'StoryApi', { integrations: StoryApi.defaultIntegrations(this).build(), }); const storyApi = new StoryApi(this, 'StoryApi'); storyApi.handler.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, actions: ['bedrock:InvokeModelWithResponseStream'], resources: [ 'arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0', ], }), );
// grant our authenticated role access to invoke our APIs [storyApi, gameApi].forEach((api) => api.grantInvokeAccess(userIdentity.identityPool.authenticatedRole), );
// Ensure this is instantiated last so our runtime-config.json can be automatically configured new GameUI(this, 'GameUI'); }}
Maintenant mettons à jour la story_api
pour supporter le déploiement avec Lambda Web Adapter.
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \ PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \ exec python -m uvicorn --port=$PORT story_api.main:app
{ "name": "dungeon_adventure.story_api", ... "targets": { ... "bundle": { "cache": true, "executor": "nx:run-commands", "outputs": ["{workspaceRoot}/dist/packages/story_api/bundle"], "options": { "commands": [ "uv export --frozen --no-dev --no-editable --project packages/story_api --package dungeon_adventure.story_api -o dist/packages/story_api/bundle/requirements.txt", "uv pip install -n --no-deps --no-installer-metadata --no-compile-bytecode --python-platform x86_64-manylinux2014 --target dist/packages/story_api/bundle -r dist/packages/story_api/bundle/requirements.txt", "copyfiles -f packages/story_api/run.sh dist/packages/story_api/bundle" ], "parallel": false }, "dependsOn": ["compile"] }, ... }}
Déploiement et tests
Section intitulée « Déploiement et tests »D’abord, construisons la base de code :
pnpm nx run-many --target build --all
yarn nx run-many --target build --all
npx nx run-many --target build --all
bunx nx run-many --target build --all
Vous pouvez maintenant déployer l’application avec cette commande :
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
yarn nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
npx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
bunx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
Ce déploiement prendra environ 2 minutes.
Une fois terminé, vous devriez voir des sorties similaires à ceci (certaines valeurs ont été masquées) :
dungeon-adventure-infra-sandbox-Applicationdungeon-adventure-infra-sandbox-Application: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox-Application
✨ Temps de déploiement : 354s
Outputs:dungeon-adventure-infra-sandbox-Application.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-Application-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox-Application.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox-Application.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox-Application.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/dungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityUserPoolIdXXX = region_xxx
Nous pouvons tester notre API en :
- Démarrant une instance locale du serveur FastApi et en l’appelant avec
curl
- Appeler l'API déployée directement avec curl sigv4
curl avec Sigv4 activé
Vous pouvez ajouter ce script à votre
.bashrc
(puissource
) ou le coller directement dans le terminal :~/.bashrc acurl () {REGION=$1SERVICE=$2shift; shift;curl --aws-sigv4 "aws:amz:$REGION:$SERVICE" --user "$(aws configure get aws_access_key_id):$(aws configure get aws_secret_access_key)" -H "X-Amz-Security-Token: $(aws configure get aws_session_token)" "$@"}Exemples d’utilisation :
API Gateway
Section intitulée « API Gateway »Fenêtre de terminal acurl ap-southeast-2 execute-api -X GET https://xxxURL de fonction Lambda streaming
Section intitulée « URL de fonction Lambda streaming »Fenêtre de terminal acurl ap-southeast-2 lambda -N -X POST https://xxxAjoutez cette fonction à votre profil PowerShell ou collez-la dans la session actuelle :
Fenêtre de terminal function acurl {param([Parameter(Mandatory=$true)][string]$Region,[Parameter(Mandatory=$true)][string]$Service,[Parameter(ValueFromRemainingArguments=$true)][string[]]$CurlArgs)$AccessKey = aws configure get aws_access_key_id$SecretKey = aws configure get aws_secret_access_key$SessionToken = aws configure get aws_session_token& curl --aws-sigv4 "aws:amz:$Region`:$Service" --user "$AccessKey`:$SecretKey" -H "X-Amz-Security-Token: $SessionToken" @CurlArgs}Exemples d’utilisation :
API Gateway
Section intitulée « API Gateway »Fenêtre de terminal acurl ap-southeast-2 execute-api -X GET https://xxxURL de fonction Lambda streaming
Section intitulée « URL de fonction Lambda streaming »Fenêtre de terminal acurl ap-southeast-2 lambda -N -X POST https://xxx
Démarrez le serveur FastAPI local avec :
pnpm nx run dungeon_adventure.story_api:serve
yarn nx run dungeon_adventure.story_api:serve
npx nx run dungeon_adventure.story_api:serve
bunx nx run dungeon_adventure.story_api:serve
Puis appelez-le avec :
curl -N -X POST http://127.0.0.1:8000/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
acurl ap-southeast-2 lambda -N -X POST \ https://xxx.lambda-url.ap-southeast-2.on.aws/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
Si la commande réussit, vous devriez voir une réponse streamée similaire à :
UnnamedHero se tenait fier, sa cape flottant au vent....
Félicitations. Vous avez déployé votre première API avec FastAPI ! 🎉🎉🎉