Jogo de Dungeons com IA
Módulo 3: Implementação da API de História
A StoryApi consiste em uma única API generate_story
que, dado um Game
e uma lista de Action
s como contexto, irá progredir uma história. Esta API será implementada como uma API de streaming em Python/FastAPI e também demonstrará como alterações podem ser feitas no código gerado para adequá-lo ao propósito.
Implementação da API
Para criar nossa API, primeiro precisamos instalar algumas dependências adicionais:
boto3
será usado para chamar o Amazon Bedrock;uvicorn
será usado para iniciar nossa API em conjunto com o Lambda Web Adapter (LWA);copyfiles
é uma dependência npm necessária para suportar a cópia multiplataforma de arquivos ao atualizar nossa tarefabundle
.
Para instalar estas dependências, execute os seguintes comandos:
pnpm nx run dungeon_adventure.story_api:add --args boto3 uvicorn
yarn nx run dungeon_adventure.story_api:add --args boto3 uvicorn
npx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
bunx nx run dungeon_adventure.story_api:add --args boto3 uvicorn
pnpm add -Dw copyfiles
yarn add -D copyfiles
npm install --legacy-peer-deps -D copyfiles
bun install -D copyfiles
Agora vamos substituir o conteúdo dos seguintes arquivos em packages/story_api/story_api
:
import json
from boto3 import clientfrom fastapi.responses import PlainTextResponse, StreamingResponsefrom pydantic import BaseModel
from .init import app, lambda_handler
handler = lambda_handler
bedrock = client('bedrock-runtime')
class Action(BaseModel): role: str content: str
class StoryRequest(BaseModel): genre: str playerName: str actions: list[Action]
async def bedrock_stream(request: StoryRequest): messages = [ {"role": "user", "content": "Continue or create a new story..."} ]
for action in request.actions: messages.append({"role": action.role, "content": action.content})
response = bedrock.invoke_model_with_response_stream( modelId='anthropic.claude-3-sonnet-20240229-v1:0', body=json.dumps({ "system":f""" You are running an AI text adventure game in the {request.genre} genre. Player: {request.playerName}. Return less than 200 characters of text. """, "messages": messages, "max_tokens": 1000, "temperature": 0.7, "anthropic_version": "bedrock-2023-05-31" }) )
stream = response.get('body') if stream: for event in stream: chunk = event.get('chunk') if chunk: message = json.loads(chunk.get("bytes").decode()) if message['type'] == "content_block_delta": yield message['delta']['text'] or "" elif message['type'] == "message_stop": yield "\n"
@app.post("/story/generate", openapi_extra={'x-streaming': True}, response_class=PlainTextResponse)def generate_story(request: StoryRequest) -> str: return StreamingResponse(bedrock_stream(request), media_type="text/plain")
import osimport uuidfrom collections.abc import Callable
from aws_lambda_powertools import Logger, Metrics, Tracerfrom aws_lambda_powertools.metrics import MetricUnitfrom fastapi import FastAPI, Request, Responsefrom fastapi.openapi.utils import get_openapifrom fastapi.responses import JSONResponsefrom fastapi.routing import APIRoutefrom mangum import Mangumfrom pydantic import BaseModelfrom starlette.middleware.exceptions import ExceptionMiddleware
os.environ["POWERTOOLS_METRICS_NAMESPACE"] = "StoryApi"os.environ["POWERTOOLS_SERVICE_NAME"] = "StoryApi"
logger: Logger = Logger()metrics: Metrics = Metrics()tracer: Tracer = Tracer()
class InternalServerErrorDetails(BaseModel): detail: str
app = FastAPI( title="StoryApi", responses={ 500: {"model": InternalServerErrorDetails} })lambda_handler = Mangum(app)
# Add tracinglambda_handler.__name__ = "handler" # tracer requires __name__ to be setlambda_handler = tracer.capture_lambda_handler(lambda_handler)# Add logginglambda_handler = logger.inject_lambda_context(lambda_handler, clear_state=True)# Add metrics last to properly flush metrics.lambda_handler = metrics.log_metrics(lambda_handler, capture_cold_start_metric=True)
# Add exception middleware(s)app.add_middleware(ExceptionMiddleware, handlers=app.exception_handlers)
@app.exception_handler(Exception)async def unhandled_exception_handler(request, err): logger.exception("Unhandled exception")
metrics.add_metric(name="Failure", unit=MetricUnit.Count, value=1)
return JSONResponse(status_code=500, content=InternalServerErrorDetails( detail="Internal Server Error").model_dump())
@app.middleware("http")async def metrics_handler(request: Request, call_next): metrics.add_dimension("route", f"{request.method} {request.url.path}") metrics.add_metric(name="RequestCount", unit=MetricUnit.Count, value=1)
response = await call_next(request)
if response.status_code == 200: metrics.add_metric(name="Success", unit=MetricUnit.Count, value=1)
return response
# Add correlation id middleware@app.middleware("http")async def add_correlation_id(request: Request, call_next): # Get correlation id from X-Correlation-Id header corr_id = request.headers.get("x-correlation-id") if not corr_id and "aws.context" in request.scope: # If empty, use request id from aws context corr_id = request.scope["aws.context"].aws_request_id elif not corr_id: # If still empty, use uuid corr_id = uuid.uuid4().hex
# Add correlation id to logs logger.set_correlation_id(corr_id)
response = await call_next(request)
# Return correlation header in response response.headers["X-Correlation-Id"] = corr_id return response
class LoggerRouteHandler(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler()
async def route_handler(request: Request) -> Response: # Add fastapi context to logs ctx = { "path": request.url.path, "route": self.path, "method": request.method, } logger.append_keys(fastapi=ctx) logger.info("Received request")
return await original_route_handler(request)
return route_handler
app.router.route_class = LoggerRouteHandler
def custom_openapi(): if app.openapi_schema: return app.openapi_schema for route in app.routes: if isinstance(route, APIRoute): route.operation_id = route.name openapi_schema = get_openapi( title=app.title, version=app.version, openapi_version=app.openapi_version, description=app.description, routes=app.routes, ) app.openapi_schema = openapi_schema return app.openapi_schema
app.openapi = custom_openapi
Analisando o código acima:
- Usamos a configuração
x-streaming
para indicar que esta é uma API de streaming ao gerar nosso client SDK. Isso permitirá consumir esta API de forma streaming mantendo a segurança de tipos! - Nossa API simplesmente retorna um fluxo de texto conforme definido por
media_type="text/plain"
eresponse_class=PlainTextResponse
Infraestrutura
A infraestrutura que configuramos anteriormente assume que todas as APIs têm um API Gateway integrado com funções Lambda. Para nossa story_api
, não queremos usar o API Gateway pois ele não suporta respostas em streaming. Em vez disso, usaremos um Lambda Function URL configurado com response streaming.
Para suportar isso, primeiro atualizaremos nossos construtos CDK da seguinte forma:
import { Duration, Stack, CfnOutput } from 'aws-cdk-lib';import { IGrantable, Grant } from 'aws-cdk-lib/aws-iam';import { Runtime, Code, Tracing, LayerVersion, FunctionUrlAuthType, InvokeMode, Function,} from 'aws-cdk-lib/aws-lambda';import { Construct } from 'constructs';import url from 'url';import { RuntimeConfig } from '../../core/runtime-config.js';
export class StoryApi extends Construct { public readonly handler: Function;
constructor(scope: Construct, id: string) { super(scope, id);
this.handler = new Function(this, 'Handler', { runtime: Runtime.PYTHON_3_12, handler: 'run.sh', code: Code.fromAsset( url.fileURLToPath( new URL( '../../../../../../dist/packages/story_api/bundle', import.meta.url, ), ), ), timeout: Duration.seconds(30), tracing: Tracing.ACTIVE, environment: { AWS_CONNECTION_REUSE_ENABLED: '1', }, });
const stack = Stack.of(this); this.handler.addLayers( LayerVersion.fromLayerVersionArn( this, 'LWALayer', `arn:aws:lambda:${stack.region}:753240598075:layer:LambdaAdapterLayerX86:24`, ), ); this.handler.addEnvironment('PORT', '8000'); this.handler.addEnvironment('AWS_LWA_INVOKE_MODE', 'response_stream'); this.handler.addEnvironment('AWS_LAMBDA_EXEC_WRAPPER', '/opt/bootstrap'); const functionUrl = this.handler.addFunctionUrl({ authType: FunctionUrlAuthType.AWS_IAM, invokeMode: InvokeMode.RESPONSE_STREAM, cors: { allowedOrigins: ['*'], allowedHeaders: [ 'authorization', 'content-type', 'x-amz-content-sha256', 'x-amz-date', 'x-amz-security-token', ], }, });
new CfnOutput(this, 'StoryApiUrl', { value: functionUrl.url });
// Register the API URL in runtime configuration for client discovery RuntimeConfig.ensure(this).config.apis = { ...RuntimeConfig.ensure(this).config.apis!, StoryApi: functionUrl.url, }; }
public grantInvokeAccess(grantee: IGrantable) { Grant.addToPrincipal({ grantee, actions: ['lambda:InvokeFunctionUrl'], resourceArns: [this.handler.functionArn], conditions: { StringEquals: { 'lambda:FunctionUrlAuthType': 'AWS_IAM', }, }, }); }}
import { GameApi, GameUI, StoryApi, UserIdentity,} from ':dungeon-adventure/common-constructs';import * as cdk from 'aws-cdk-lib';import { Construct } from 'constructs';import { ElectrodbDynamoTable } from '../constructs/electrodb-table.js';import { PolicyStatement, Effect } from 'aws-cdk-lib/aws-iam';
export class ApplicationStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props);
// The code that defines your stack goes here const userIdentity = new UserIdentity(this, 'UserIdentity');
const electroDbTable = new ElectrodbDynamoTable(this, 'ElectroDbTable');
const gameApi = new GameApi(this, 'GameApi', { integrations: GameApi.defaultIntegrations(this) .withDefaultOptions({ environment: { TABLE_NAME: electroDbTable.tableName, }, }) .build(), });
// Grant read/write access to each procedure's lambda handler according to the permissions it requires // Grant read/write access to each handler depending on the permissions it requires electroDbTable.grantReadData(gameApi.integrations['actions.query'].handler); electroDbTable.grantReadData(gameApi.integrations['games.query'].handler); electroDbTable.grantReadWriteData( gameApi.integrations['actions.save'].handler, ); electroDbTable.grantReadWriteData( gameApi.integrations['games.save'].handler, );
const storyApi = new StoryApi(this, 'StoryApi', { integrations: StoryApi.defaultIntegrations(this).build(), }); const storyApi = new StoryApi(this, 'StoryApi'); storyApi.handler.addToRolePolicy( new PolicyStatement({ effect: Effect.ALLOW, actions: ['bedrock:InvokeModelWithResponseStream'], resources: [ 'arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0', ], }), );
// grant our authenticated role access to invoke our APIs [storyApi, gameApi].forEach((api) => api.grantInvokeAccess(userIdentity.identityPool.authenticatedRole), );
// Ensure this is instantiated last so our runtime-config.json can be automatically configured new GameUI(this, 'GameUI'); }}
Agora atualizaremos a story_api
para suportar a implantação do Lambda Web Adapter.
#!/bin/bash
PATH=$PATH:$LAMBDA_TASK_ROOT/bin \ PYTHONPATH=$PYTHONPATH:/opt/python:$LAMBDA_RUNTIME_DIR \ exec python -m uvicorn --port=$PORT story_api.main:app
{ "name": "dungeon_adventure.story_api", ... "targets": { ... "bundle": { "cache": true, "executor": "nx:run-commands", "outputs": ["{workspaceRoot}/dist/packages/story_api/bundle"], "options": { "commands": [ "uv export --frozen --no-dev --no-editable --project story_api -o dist/packages/story_api/bundle/requirements.txt", "uv pip install -n --no-installer-metadata --no-compile-bytecode --python-platform x86_64-manylinux2014 --target dist/packages/story_api/bundle -r dist/packages/story_api/bundle/requirements.txt", "copyfiles -f packages/story_api/run.sh dist/packages/story_api/bundle" ], "parallel": false }, "dependsOn": ["compile"] }, ... }}
Implantação e testes
Primeiro, vamos construir a base de código:
pnpm nx run-many --target build --all
yarn nx run-many --target build --all
npx nx run-many --target build --all
bunx nx run-many --target build --all
Sua aplicação agora pode ser implantada executando o seguinte comando:
pnpm nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
yarn nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
npx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
bunx nx run @dungeon-adventure/infra:deploy dungeon-adventure-infra-sandbox
Esta implantação levará aproximadamente 2 minutos para ser concluída.
Comando de implantação
Você também pode implantar todas as stacks contidas na aplicação CDK executando:
pnpm nx run @dungeon-adventure/infra:deploy --all
yarn nx run @dungeon-adventure/infra:deploy --all
npx nx run @dungeon-adventure/infra:deploy --all
bunx nx run @dungeon-adventure/infra:deploy --all
Isso não é recomendado pois você pode optar por separar seus estágios de implantação como stacks separadas ex: infra-prod
. Neste caso, o flag --all
tentará implantar todas as stacks, o que pode resultar em implantações indesejadas!
Após a conclusão da implantação, você verá saídas semelhantes às seguintes (alguns valores foram omitidos):
dungeon-adventure-infra-sandboxdungeon-adventure-infra-sandbox: deploying... [2/2]
✅ dungeon-adventure-infra-sandbox
✨ Tempo de implantação: 354s
Outputs:dungeon-adventure-infra-sandbox.ElectroDbTableTableNameXXX = dungeon-adventure-infra-sandbox-ElectroDbTableXXX-YYYdungeon-adventure-infra-sandbox.GameApiEndpointXXX = https://xxx.execute-api.region.amazonaws.com/prod/dungeon-adventure-infra-sandbox.GameUIDistributionDomainNameXXX = xxx.cloudfront.netdungeon-adventure-infra-sandbox.StoryApiStoryApiUrlXXX = https://xxx.lambda-url.ap-southeast-2.on.aws/dungeon-adventure-infra-sandbox.UserIdentityUserIdentityIdentityPoolIdXXX = region:xxxdungeon-adventure-infra-sandbox.UserIdentityUserIdentityUserPoolIdXXX = region_xxx
Podemos testar nossa API de duas formas:
- Iniciando uma instância local do servidor FastAPI e invocando as APIs usando
curl
- Chamando a API implantada diretamente usando curl com Sigv4
curl com Sigv4 habilitado
Você pode adicionar o seguinte script ao seu arquivo
.bashrc
(e executarsource
) ou colar diretamente no terminal onde executará o comando.~/.bashrc acurl () {REGION=$1SERVICE=$2shift; shift;curl --aws-sigv4 "aws:amz:$REGION:$SERVICE" --user "$(aws configure get aws_access_key_id):$(aws configure get aws_secret_access_key)" -H "X-Amz-Security-Token: $(aws configure get aws_session_token)" "$@"}Exemplos de uso:
API Gateway
Terminal window acurl ap-southeast-2 execute-api -X GET https://xxxURL de função Lambda com streaming
Terminal window acurl ap-southeast-2 lambda -N -X POST https://xxxAdicione esta função ao seu perfil PowerShell ou cole diretamente na sessão atual.
Terminal window function acurl {param([Parameter(Mandatory=$true)][string]$Region,[Parameter(Mandatory=$true)][string]$Service,[Parameter(ValueFromRemainingArguments=$true)][string[]]$CurlArgs)$AccessKey = aws configure get aws_access_key_id$SecretKey = aws configure get aws_secret_access_key$SessionToken = aws configure get aws_session_token& curl --aws-sigv4 "aws:amz:$Region`:$Service" --user "$AccessKey`:$SecretKey" -H "X-Amz-Security-Token: $SessionToken" @CurlArgs}Exemplos de uso:
API Gateway
Terminal window acurl ap-southeast-2 execute-api -X GET https://xxxURL de função Lambda com streaming
Terminal window acurl ap-southeast-2 lambda -N -X POST https://xxx
Inicie o servidor FastAPI localmente com:
pnpm nx run dungeon_adventure.story_api:serve
yarn nx run dungeon_adventure.story_api:serve
npx nx run dungeon_adventure.story_api:serve
bunx nx run dungeon_adventure.story_api:serve
Após iniciar, execute o comando:
curl -N -X POST http://127.0.0.1:8000/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
acurl ap-southeast-2 lambda -N -X POST \ https://xxx.lambda-url.ap-southeast-2.on.aws/story/generate \ -d '{"genre":"superhero", "actions":[], "playerName":"UnnamedHero"}' \ -H "Content-Type: application/json"
Se executado com sucesso, você verá uma resposta em streaming similar a:
UnnamedHero stood tall, his cape billowing in the wind....
Parabéns. Você construiu e implantou sua primeira API usando FastAPI! 🎉🎉🎉