Gioco di Dungeon con IA
Modulo 3: Implementazione dell’API per la storia
Sezione intitolata “Modulo 3: Implementazione dell’API per la storia”La StoryApi è composta da una singola API generate_story
che, dato un Game
e una lista di Action
come contesto, farà progredire una storia. Questa API verrà implementata come API streaming in Python/FastAPI e dimostrerà inoltre come apportare modifiche al codice generato per adattarlo allo scopo.
Implementazione dell’API
Sezione intitolata “Implementazione dell’API”Per creare la nostra API, dobbiamo prima installare alcune dipendenze aggiuntive.
boto3
verrà utilizzato per chiamare Amazon Bedrock;uvicorn
verrà utilizzato per avviare la nostra API in combinazione con Lambda Web Adapter (LWA);copyfiles
è una dipendenza npm necessaria per supportare la copia multipiattaforma di file durante l’aggiornamento del taskbundle
.
Per installare queste dipendenze, esegui i seguenti comandi:
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
yarn nx run dungeon_adventure.story_api:add --args boto3 uvicorn
npx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
bunx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
pnpm add -Dw copyfiles
yarn add -D copyfiles
npm install --legacy-peer-deps -D copyfiles
bun install -D copyfiles
Ora sostituiamo il contenuto dei seguenti file in packages/story_api/story_api
:
import json
from boto3 import clientfrom fastapi.responses import PlainTextResponse, StreamingResponsefrom pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel): role: str content: str
class StoryRequest(BaseModel): genre: str playerName: str actions: list[Action]
async def bedrock_stream(request: StoryRequest): messages = [ {"role": "user", "content": "Continue or create a new story..."} ]
for action in request.actions: messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "system":f""" You are running an AI text adventure game in the {request.genre} genre. Player: {request.playerName}. Return less than 200 characters of text. """, "messages": messages, "max_tokens": 1000, "temperature": 0.7, "anthropic_version": "bedrock-2023-05-31" }) )
stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n"
@app.post("/story/generate", openapi_extra={'x-streaming': True}, response_class=PlainTextResponse)def generate_story(request: StoryRequest) -> str: return StreamingResponse(bedrock_stream(request), media_type="text/plain")
import osimport uuidfrom collections.abc import Callable
from aws_lambda_powertools import Logger, Metrics, Tracerfrom aws_lambda_powertools.metrics import MetricUnitfrom fastapi import FastAPI, Request, Responsefrom fastapi.openapi.utils import get_openapifrom fastapi.responses import JSONResponsefrom fastapi.routing import APIRoutefrom mangum import Mangumfrom pydantic import BaseModelfrom starlette.middleware.exceptions import ExceptionMiddleware
os.environ["POWERTOOLS_METRICS_NAMESPACE"] = "StoryApi"os.environ["POWERTOOLS_SERVICE_NAME"] = "StoryApi"
logger: Logger = Logger()metrics: Metrics = Metrics()tracer: Tracer = Tracer()
class InternalServerErrorDetails(BaseModel): detail: str
app = FastAPI( title="StoryApi", responses={ 500: {"model": InternalServerErrorDetails} })lambda_handler = Mangum(app)
# Add tracinglambda_handler.__name__ = "handler" # tracer requires __name__ to be setlambda_handler = tracer.capture_lambda_handler(lambda_handler)# Add logginglambda_handler = logger.inject_lambda_context(lambda_handler, clear_state=True)# Add metrics last to properly flush metrics.lambda_handler = metrics.log_metrics(lambda_handler, capture_cold_start_metric=True)
# Add exception middleware(s)app.add_middleware(ExceptionMiddleware, handlers=app.exception_handlers)
@app.exception_handler(Exception)async def unhandled_exception_handler(request, err): logger.exception("Unhandled exception")
metrics.add_metric(name="Failure", unit=MetricUnit.Count, value=1)
return JSONResponse(status_code=500, content=InternalServerErrorDetails( detail="Internal Server Error").model_dump())
@app.middleware("http")async def metrics_handler(request: Request, call_next): metrics.add_dimension("route", f"{request.method} {request.url.path}") metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
response = await call_next(request)
if response.status_code == 200: metrics.add_metric(name="Success", unit=MetricUnit.Count, value=1)
return response
# Add correlation id middleware@app.middleware("http")async def add_correlation_id(request: Request, call_next): # Get correlation id from X-Correlation-Id header corr_id = request.headers.get("x-correlation-id") if not corr_id and "aws.context" in request.scope: # If empty, use request id from aws context corr_id = request.scope["aws.context"].aws_request_id elif not corr_id: # If still empty, use uuid corr_id = uuid.uuid4().hex
# Add correlation id to logs logger.set_correlation_id(corr_id)
response = await call_next(request)
# Return correlation header in response response.headers["X-Correlation-Id"] = corr_id return response
class LoggerRouteHandler(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler()
async def route_handler(request: Request) -> Response: # Add fastapi context to logs ctx = { "path": request.url.path, "route": self.path, "method": request.method, } logger.append_keys(fastapi=ctx) logger.info("Received request")
return await original_route_handler(request)
return route_handler
app.router.route_class = LoggerRouteHandler
def custom_openapi(): if app.openapi_schema: return app.openapi_schema for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name openapi_schema = get_openapi( title=app.title, version=app.version, openapi_version=app.openapi_version, description=app.description, routes=app.routes, ) app.openapi_schema = openapi_schema return app.openapi_schema
app.openapi = custom_openapi
Analizzando il codice sopra:
- Utilizziamo l’impostazione
x-streaming
per indicare che si tratta di un’API streaming quando genereremo il nostro client SDK. Questo ci permetterà di consumare questa API in modalità streaming mantenendo la type-safety! - La nostra API restituisce semplicemente un flusso di testo come definito sia da
media_type="text/plain"
che daresponse_class=PlainTextResponse
Infrastruttura
Sezione intitolata “Infrastruttura”L’infrastruttura configurata precedentemente presuppone che tutte le API abbiano un API Gateway integrato con funzioni Lambda. Per la nostra story_api
non vogliamo utilizzare API Gateway poiché non supporta risposte in streaming. Utilizzeremo invece un Lambda Function URL configurato con response streaming.
Per supportare questo, aggiorneremo prima i nostri costrutti CDK come segue:
import { Duration, Stack, CfnOutput } from 'aws-cdk-lib';import { IGrantable, Grant } from 'aws-cdk-lib/aws-iam';import { Runtime, Code, Tracing, LayerVersion, FunctionUrlAuthType, InvokeMode, Function,} from 'aws-cdk-lib/aws-lambda';import { Construct } from 'constructs';import url from 'url';import { RuntimeConfig } from '../../core/runtime-config.js';
export class StoryApi extends Construct { public readonly handler: Function;
constructor(scope: Construct, id: string) { super(scope, id);
this.handler = new Function(this, 'Handler', { runtime: Runtime.PYTHON_3_12, handler: 'run.sh', code: Code.fromAsset( url.fileURLToPath( new URL( '../../../../../../dist/packages/story_api/bundle', import.meta.url, ), ), ), timeout: Duration.seconds(30), tracing: Tracing.ACTIVE, environment: { AWS_CONNECTION_REUSE_ENABLED: '1', }, });
const stack = Stack.of(this); this.handler.addLayers( LayerVersion.fromLayerVersionArn( this, 'LWALayer', `arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`, ), ); this.handler.addEnvironment('PORT', '8000'); this.handler.addEnvironment('AWS_LWA_INVOKE_MODE', 'response_stream'); this.handler.addEnvironment('AWS_LAMBDA_EXEC_WRAPPER', '/opt/bootstrap'); const functionUrl = this.handler.addFunctionUrl({ authType: FunctionUrlAuthType.AWS_IAM, invokeMode: InvokeMode.RESPONSE_STREAM, cors: { allowedOrigins: ['*'], allowedHeaders: [ 'authorization', 'content-type', 'x-amz-content-sha256', 'x-amz-date', 'x-amz-security-token', ], }, });
new CfnOutput(this, 'StoryApiUrl', { value: functionUrl.url });
// Register the API URL in runtime configuration for client discovery RuntimeConfig.ensure(this).config.apis = { ...RuntimeConfig.ensure(this).config.apis!, StoryApi: functionUrl.url, }; }
public grantInvokeAccess(grantee: IGrantable) { Grant.addToPrincipal({ grantee, actions: ['lambda:InvokeFunctionUrl'], resourceArns: [this.handler.functionArn], conditions: { StringEquals: { 'lambda:FunctionUrlAuthType': 'AWS_IAM', }, }, }); }}
import { GameApi, GameUI, StoryApi, UserIdentity,} from ':dungeon-adventure/common-constructs';import * as cdk from 'aws-cdk-lib';import { Stack, StackProps } from 'aws-cdk-lib';import { Construct } from 'constructs';import { ElectrodbDynamoTable } from '../constructs/electrodb-table.js';import { PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';
export class ApplicationStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) {export class ApplicationStack extends Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props);
// The code that defines your stack goes here const userIdentity = new UserIdentity(this, 'UserIdentity');
const electroDbTable = new ElectrodbDynamoTable(this, 'ElectroDbTable');
const gameApi = new GameApi(this, 'GameApi', { integrations: GameApi.defaultIntegrations(this) .withDefaultOptions({ environment: { TABLE_NAME: electroDbTable.tableName, }, }) .build(), });
// Grant read/write access to each procedure's lambda handler according to the permissions it requires // Grant read/write access to each handler depending on the permissions it requires electroDbTable.grantReadData(gameApi.integrations['actions.query'].handler); electroDbTable.grantReadData(gameApi.integrations['games.query'].handler); electroDbTable.grantReadWriteData( gameApi.integrations['actions.save'].handler, ); electroDbTable.grantReadWriteData( gameApi.integrations['games.save'].handler, );
const storyApi = new StoryApi(this, 'StoryApi', { integrations: StoryApi.defaultIntegrations(this).build(), }); const storyApi = new StoryApi(this, 'StoryApi'); storyApi.handler.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, actions: ['bedrock:InvokeModelWithResponseStream'], resources: [ 'arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0', ], }), );
// grant our authenticated role access to invoke our APIs [storyApi, gameApi].forEach((api) => api.grantInvokeAccess(userIdentity.identityPool.authenticatedRole), );
// Ensure this is instantiated last so our runtime-config.json can be automatically configured new GameUI(this, 'GameUI'); }}
Ora aggiorneremo la story_api
per supportare la distribuzione con Lambda Web Adapter.
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \ PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \ exec python -m uvicorn --port=$PORT story_api.main:app
{ "name": "dungeon_adventure.story_api", ... "targets": { ... "bundle": { "cache": true, "executor": "nx:run-commands", "outputs": ["{workspaceRoot}/dist/packages/story_api/bundle"], "options": { "commands": [ "uv export --frozen --no-dev --no-editable --project packages/story_api --package dungeon_adventure.story_api -o dist/packages/story_api/bundle/requirements.txt", "uv pip install -n --no-deps --no-installer-metadata --no-compile-bytecode --python-platform x86_64-manylinux2014 --target dist/packages/story_api/bundle -r dist/packages/story_api/bundle/requirements.txt", "copyfiles -f packages/story_api/run.sh dist/packages/story_api/bundle" ], "parallel": false }, "dependsOn": ["compile"] }, ... }}
Distribuzione e test
Sezione intitolata “Distribuzione e test”Prima di tutto, compiliamo il codice base:
pnpm nx run-many --target build --all
yarn nx run-many --target build --all
npx nx run-many --target build --all
bunx nx run-many --target build --all
Ora puoi distribuire l’applicazione eseguendo:
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
yarn nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
npx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
bunx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox/*
La distribuzione richiederà circa 2 minuti.
Al completamento della distribuzione, dovresti vedere output simili ai seguenti (alcuni valori sono stati oscurati):
dungeon-adventure-infra-sandbox-Applicationdungeon-adventure-infra-sandbox-Application: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox-Application
✨ Tempo di distribuzione: 354s
Outputs:dungeon-adventure-infra-sandbox-Application.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-Application-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox-Application.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox-Application.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox-Application.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/dungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox-Application.UserIdentityUserIdentityUserPoolIdXXX = region_xxx
Possiamo testare la nostra API:
- Avviando un’istanza locale del server FastAPI e invocando le API con
curl
- Chiamare l'API distribuita usando curl abilitato per Sigv4
curl abilitato Sigv4
Puoi aggiungere questo script al tuo file
.bashrc
(e eseguiresource
) o incollarlo direttamente nel terminale.~/.bashrc acurl () {REGION=$1SERVICE=$2shift; shift;curl --aws-sigv4 "aws:amz:$REGION:$SERVICE" --user "$(aws configure get aws_access_key_id):$(aws configure get aws_secret_access_key)" -H "X-Amz-Security-Token: $(aws configure get aws_session_token)" "$@"}Esempi di utilizzo:
API Gateway
Sezione intitolata “API Gateway”Terminal window acurl ap-southeast-2 execute-api -X GET https://xxxLambda Function URL streaming
Sezione intitolata “Lambda Function URL streaming”Terminal window acurl ap-southeast-2 lambda -N -X POST https://xxxAggiungi questa funzione al tuo profilo PowerShell o incollala nella sessione corrente.
Terminal window function acurl {param([Parameter(Mandatory=$true)][string]$Region,[Parameter(Mandatory=$true)][string]$Service,[Parameter(ValueFromRemainingArguments=$true)][string[]]$CurlArgs)$AccessKey = aws configure get aws_access_key_id$SecretKey = aws configure get aws_secret_access_key$SessionToken = aws configure get aws_session_token& curl --aws-sigv4 "aws:amz:$Region`:$Service" --user "$AccessKey`:$SecretKey" -H "X-Amz-Security-Token: $SessionToken" @CurlArgs}Esempi di utilizzo:
API Gateway
Sezione intitolata “API Gateway”Terminal window acurl ap-southeast-2 execute-api -X GET https://xxxLambda Function URL streaming
Sezione intitolata “Lambda Function URL streaming”Terminal window acurl ap-southeast-2 lambda -N -X POST https://xxx
Avvia il server FastAPI locale con:
pnpm nx run dungeon_adventure.story_api:serve
yarn nx run dungeon_adventure.story_api:serve
npx nx run dungeon_adventure.story_api:serve
bunx nx run dungeon_adventure.story_api:serve
Esegui la chiamata API con:
curl -N -X POST http://127.0.0.1:8000/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
acurl ap-southeast-2 lambda -N -X POST \ https://xxx.lambda-url.ap-southeast-2.on.aws/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
Se il comando viene eseguito correttamente, dovresti vedere una risposta in streaming simile a:
UnnamedHero si ergeva maestoso, il mantello che sventolava nel vento....
Congratulazioni. Hai costruito e distribuito la tua prima API con FastAPI! 🎉🎉🎉