Juego de Mazmorra con IA
Módulo 3: Implementación de la API de historia
La StoryApi consta de una única API generate_story
que, dado un Game
y una lista de Action
s como contexto, generará una progresión de la historia. Esta API se implementará como una API de streaming en Python/FastAPI y demostrará cómo realizar modificaciones al código generado para adaptarlo a su propósito.
Implementación de la API
Para crear nuestra API, primero necesitamos instalar algunas dependencias adicionales:
boto3
se usará para interactuar con Amazon Bedrock;uvicorn
se utilizará para iniciar nuestra API en combinación con el Lambda Web Adapter (LWA);copyfiles
es una dependencia de npm necesaria para soportar la copia multiplataforma de archivos al actualizar nuestra tareabundle
.
Para instalar estas dependencias, ejecuta los siguientes comandos:
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
yarn nx run dungeon_adventure.story_api:add --args boto3 uvicorn
npx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
bunx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
pnpm add -Dw copyfiles
yarn add -D copyfiles
npm install --legacy-peer-deps -D copyfiles
bun install -D copyfiles
Ahora reemplazaremos el contenido de los siguientes archivos en packages/story_api/story_api
:
import json
from boto3 import clientfrom fastapi.responses import PlainTextResponse, StreamingResponsefrom pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel): role: str content: str
class StoryRequest(BaseModel): genre: str playerName: str actions: list[Action]
async def bedrock_stream(request: StoryRequest): messages = [ {"role": "user", "content": "Continue or create a new story..."} ]
for action in request.actions: messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "system":f""" You are running an AI text adventure game in the {request.genre} genre. Player: {request.playerName}. Return less than 200 characters of text. """, "messages": messages, "max_tokens": 1000, "temperature": 0.7, "anthropic_version": "bedrock-2023-05-31" }) )
stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n"
@app.post("/story/generate", openapi_extra={'x-streaming': True}, response_class=PlainTextResponse)def generate_story(request: StoryRequest) -> str: return StreamingResponse(bedrock_stream(request), media_type="text/plain")
import osimport uuidfrom collections.abc import Callable
from aws_lambda_powertools import Logger, Metrics, Tracerfrom aws_lambda_powertools.metrics import MetricUnitfrom fastapi import FastAPI, Request, Responsefrom fastapi.openapi.utils import get_openapifrom fastapi.responses import JSONResponsefrom fastapi.routing import APIRoutefrom mangum import Mangumfrom pydantic import BaseModelfrom starlette.middleware.exceptions import ExceptionMiddleware
os.environ["POWERTOOLS_METRICS_NAMESPACE"] = "StoryApi"os.environ["POWERTOOLS_SERVICE_NAME"] = "StoryApi"
logger: Logger = Logger()metrics: Metrics = Metrics()tracer: Tracer = Tracer()
class InternalServerErrorDetails(BaseModel): detail: str
app = FastAPI( title="StoryApi", responses={ 500: {"model": InternalServerErrorDetails} })lambda_handler = Mangum(app)
# Add tracinglambda_handler.__name__ = "handler" # tracer requires __name__ to be setlambda_handler = tracer.capture_lambda_handler(lambda_handler)# Add logginglambda_handler = logger.inject_lambda_context(lambda_handler, clear_state=True)# Add metrics last to properly flush metrics.lambda_handler = metrics.log_metrics(lambda_handler, capture_cold_start_metric=True)
# Add exception middleware(s)app.add_middleware(ExceptionMiddleware, handlers=app.exception_handlers)
@app.exception_handler(Exception)async def unhandled_exception_handler(request, err): logger.exception("Unhandled exception")
metrics.add_metric(name="Failure", unit=MetricUnit.Count, value=1)
return JSONResponse(status_code=500, content=InternalServerErrorDetails( detail="Internal Server Error").model_dump())
@app.middleware("http")async def metrics_handler(request: Request, call_next): metrics.add_dimension("route", f"{request.method} {request.url.path}") metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
response = await call_next(request)
if response.status_code == 200: metrics.add_metric(name="Success", unit=MetricUnit.Count, value=1)
return response
# Add correlation id middleware@app.middleware("http")async def add_correlation_id(request: Request, call_next): # Get correlation id from X-Correlation-Id header corr_id = request.headers.get("x-correlation-id") if not corr_id and "aws.context" in request.scope: # If empty, use request id from aws context corr_id = request.scope["aws.context"].aws_request_id elif not corr_id: # If still empty, use uuid corr_id = uuid.uuid4().hex
# Add correlation id to logs logger.set_correlation_id(corr_id)
response = await call_next(request)
# Return correlation header in response response.headers["X-Correlation-Id"] = corr_id return response
class LoggerRouteHandler(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler()
async def route_handler(request: Request) -> Response: # Add fastapi context to logs ctx = { "path": request.url.path, "route": self.path, "method": request.method, } logger.append_keys(fastapi=ctx) logger.info("Received request")
return await original_route_handler(request)
return route_handler
app.router.route_class = LoggerRouteHandler
def custom_openapi(): if app.openapi_schema: return app.openapi_schema for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name openapi_schema = get_openapi( title=app.title, version=app.version, openapi_version=app.openapi_version, description=app.description, routes=app.routes, ) app.openapi_schema = openapi_schema return app.openapi_schema
app.openapi = custom_openapi
Analizando el código anterior:
- Usamos la configuración
x-streaming
para indicar que es una API de streaming al generar nuestro SDK cliente. Esto nos permitirá consumir esta API en modo streaming manteniendo la seguridad de tipos. - Nuestra API simplemente devuelve un flujo de texto definido por
media_type="text/plain"
yresponse_class=PlainTextResponse
Infraestructura
La infraestructura configurada previamente asume que todas las APIs usan API Gateway integrado con funciones Lambda. Para nuestra story_api
no queremos usar API Gateway ya que no soporta respuestas en streaming. En su lugar, usaremos una Lambda Function URL configurada con response streaming.
Para implementar esto, actualizaremos primero nuestros constructos de CDK:
import { Duration, Stack, CfnOutput } from 'aws-cdk-lib';import { IGrantable, Grant } from 'aws-cdk-lib/aws-iam';import { Runtime, Code, Tracing, LayerVersion, FunctionUrlAuthType, InvokeMode, Function,} from 'aws-cdk-lib/aws-lambda';import { Construct } from 'constructs';import url from 'url';import { RuntimeConfig } from '../../core/runtime-config.js';
export class StoryApi extends Construct { public readonly handler: Function;
constructor(scope: Construct, id: string) { super(scope, id);
this.handler = new Function(this, 'Handler', { runtime: Runtime.PYTHON_3_12, handler: 'run.sh', code: Code.fromAsset( url.fileURLToPath( new URL( '../../../../../../dist/packages/story_api/bundle', import.meta.url, ), ), ), timeout: Duration.seconds(30), tracing: Tracing.ACTIVE, environment: { AWS_CONNECTION_REUSE_ENABLED: '1', }, });
const stack = Stack.of(this); this.handler.addLayers( LayerVersion.fromLayerVersionArn( this, 'LWALayer', `arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`, ), ); this.handler.addEnvironment('PORT', '8000'); this.handler.addEnvironment('AWS_LWA_INVOKE_MODE', 'response_stream'); this.handler.addEnvironment('AWS_LAMBDA_EXEC_WRAPPER', '/opt/bootstrap'); const functionUrl = this.handler.addFunctionUrl({ authType: FunctionUrlAuthType.AWS_IAM, invokeMode: InvokeMode.RESPONSE_STREAM, cors: { allowedOrigins: ['*'], allowedHeaders: [ 'authorization', 'content-type', 'x-amz-content-sha256', 'x-amz-date', 'x-amz-security-token', ], }, });
new CfnOutput(this, 'StoryApiUrl', { value: functionUrl.url });
// Register the API URL in runtime configuration for client discovery RuntimeConfig.ensure(this).config.apis = { ...RuntimeConfig.ensure(this).config.apis!, StoryApi: functionUrl.url, }; }
public grantInvokeAccess(grantee: IGrantable) { Grant.addToPrincipal({ grantee, actions: ['lambda:InvokeFunctionUrl'], resourceArns: [this.handler.functionArn], conditions: { StringEquals: { 'lambda:FunctionUrlAuthType': 'AWS_IAM', }, }, }); }}
import { GameApi, GameUI, StoryApi, UserIdentity,} from ':dungeon-adventure/common-constructs';import * as cdk from 'aws-cdk-lib';import { Construct } from 'constructs';import { ElectrodbDynamoTable } from '../constructs/electrodb-table.js';import { PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';
export class ApplicationStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props);
// The code that defines your stack goes here const userIdentity = new UserIdentity(this, 'UserIdentity');
const electroDbTable = new ElectrodbDynamoTable(this, 'ElectroDbTable');
const gameApi = new GameApi(this, 'GameApi', { integrations: GameApi.defaultIntegrations(this) .withDefaultOptions({ environment: { TABLE_NAME: electroDbTable.tableName, }, }) .build(), });
// Grant read/write access to each procedure's lambda handler according to the permissions it requires // Grant read/write access to each handler depending on the permissions it requires electroDbTable.grantReadData(gameApi.integrations['actions.query'].handler); electroDbTable.grantReadData(gameApi.integrations['games.query'].handler); electroDbTable.grantReadWriteData( gameApi.integrations['actions.save'].handler, ); electroDbTable.grantReadWriteData( gameApi.integrations['games.save'].handler, );
const storyApi = new StoryApi(this, 'StoryApi', { integrations: StoryApi.defaultIntegrations(this).build(), }); const storyApi = new StoryApi(this, 'StoryApi'); storyApi.handler.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, actions: ['bedrock:InvokeModelWithResponseStream'], resources: [ 'arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0', ], }), );
// grant our authenticated role access to invoke our APIs [storyApi, gameApi].forEach((api) => api.grantInvokeAccess(userIdentity.identityPool.authenticatedRole), );
// Ensure this is instantiated last so our runtime-config.json can be automatically configured new GameUI(this, 'GameUI'); }}
Ahora actualizaremos la story_api
para soportar el despliegue con Lambda Web Adapter:
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \ PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \ exec python -m uvicorn --port=$PORT story_api.main:app
{ "name": "dungeon_adventure.story_api", ... "targets": { ... "bundle": { "cache": true, "executor": "nx:run-commands", "outputs": ["{workspaceRoot}/dist/packages/story_api/bundle"], "options": { "commands": [ "uv export --frozen --no-dev --no-editable --project story_api -o dist/packages/story_api/bundle/requirements.txt", "uv pip install -n --no-installer-metadata --no-compile-bytecode --python-platform x86_64-manylinux2014 --target dist/packages/story_api/bundle -r dist/packages/story_api/bundle/requirements.txt", "copyfiles -f packages/story_api/run.sh dist/packages/story_api/bundle" ], "parallel": false }, "dependsOn": ["compile"] }, ... }}
Despliegue y pruebas
Primero, construyamos el código base:
pnpm nx run-many --target build --all
yarn nx run-many --target build --all
npx nx run-many --target build --all
bunx nx run-many --target build --all
Ahora puedes desplegar la aplicación ejecutando:
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
yarn nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
npx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
bunx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
Este despliegue tomará aproximadamente 2 minutos.
Comando de despliegue
Puedes desplegar todos los stacks de la aplicación CDK ejecutando:
pnpm nx run @dungeon-adventure/infra:deploy --all
yarn nx run @dungeon-adventure/infra:deploy --all
npx nx run @dungeon-adventure/infra:deploy --all
bunx nx run @dungeon-adventure/infra:deploy --all
Esto no se recomienda ya que podrías tener stacks separados para diferentes etapas de despliegue (ej. infra-prod). En ese caso, el flag --all
intentaría desplegar todo, pudiendo resultar en despliegues no deseados.
Una vez completado el despliegue, verás salidas similares a estas (algunos valores fueron omitidos):
dungeon-adventure-infra-sandboxdungeon-adventure-infra-sandbox: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox
✨ Tiempo de despliegue: 354s
Outputs:dungeon-adventure-infra-sandbox.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/dungeon-adventure-infra-sandbox.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox.UserIdentityUserIdentityUserPoolIdXXX = region_xxx
Podemos probar nuestra API de dos formas:
- Iniciando una instancia local del servidor FastAPI e invocando las APIs con
curl
- Llamar directamente a la API desplegada usando curl con Sigv4
curl con Sigv4 habilitado
Puedes agregar este script a tu archivo
.bashrc
(y hacersource
) o pegarlo directamente en la terminal:~/.bashrc acurl () {REGION=$1SERVICE=$2shift; shift;curl --aws-sigv4 "aws:amz:$REGION:$SERVICE" --user "$(aws configure get aws_access_key_id):$(aws configure get aws_secret_access_key)" -H "X-Amz-Security-Token: $(aws configure get aws_session_token)" "$@"}Ejemplos de uso:
API Gateway
Ventana de terminal acurl ap-southeast-2 execute-api -X GET https://xxxLambda Function URL con streaming
Ventana de terminal acurl ap-southeast-2 lambda -N -X POST https://xxxAgrega esta función a tu perfil de PowerShell o pégala en la sesión actual:
Ventana de terminal function acurl {param([Parameter(Mandatory=$true)][string]$Region,[Parameter(Mandatory=$true)][string]$Service,[Parameter(ValueFromRemainingArguments=$true)][string[]]$CurlArgs)$AccessKey = aws configure get aws_access_key_id$SecretKey = aws configure get aws_secret_access_key$SessionToken = aws configure get aws_session_token& curl --aws-sigv4 "aws:amz:$Region`:$Service" --user "$AccessKey`:$SecretKey" -H "X-Amz-Security-Token: $SessionToken" @CurlArgs}Ejemplos de uso:
API Gateway
Ventana de terminal acurl ap-southeast-2 execute-api -X GET https://xxxLambda Function URL con streaming
Ventana de terminal acurl ap-southeast-2 lambda -N -X POST https://xxx
Inicia el servidor FastAPI localmente con:
pnpm nx run dungeon_adventure.story_api:serve
yarn nx run dungeon_adventure.story_api:serve
npx nx run dungeon_adventure.story_api:serve
bunx nx run dungeon_adventure.story_api:serve
Una vez en ejecución, llama a la API con:
curl -N -X POST http://127.0.0.1:8000/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
acurl ap-southeast-2 lambda -N -X POST \ https://xxx.lambda-url.ap-southeast-2.on.aws/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
Si el comando se ejecuta correctamente, verás una respuesta en streaming similar a:
UnnamedHero stood tall, his cape billowing in the wind....
¡Felicidades! Has construido y desplegado tu primera API usando FastAPI. 🎉🎉🎉