Skip to content

Bedrock invoke

bedrock_invoke

BedrockInvoke

BedrockInvoke(model_id, endpoint_name=None, region=None, bedrock_boto3_client=None, max_attempts=3, generated_text_jmespath='choices[0].message.content', generated_token_count_jmespath='usage.completion_tokens', input_text_jmespath='messages[].content[].text', input_token_count_jmespath='usage.prompt_tokens')

Bases: BedrockInvokeBase[InvokeModelResponseTypeDef]

LLMeter Endpoint for Amazon Bedrock InvokeModel API (non-streaming)

The default ..._jmespath parameters assume your target model uses an OpenAI ChatCompletions-like API, which is true for many (but not all) Bedrock models. You'll need to override these if targeting a model with different request/response format.

Parameters:

Name Type Description Default
model_id str

The identifier for the model to use

required
endpoint_name str | None

Name of the endpoint. Defaults to None.

None
region str | None

AWS region to use. Defaults to bedrock_boto3_client's, or configured from AWS CLI.

None
bedrock_boto3_client Any

Optional pre-configured boto3 client, otherwise one will be created.

None
max_attempts int

Maximum number of retry attempts. Defaults to 3.

3
generated_text_jmespath str

JMESPath query to extract generated text from model response.

'choices[0].message.content'
generated_token_count_jmespath str | None

JMESPath query to extract generated token count from model response.

'usage.completion_tokens'
input_text_jmespath str

JMESPath query to extract input text from the model request payload.

'messages[].content[].text'
input_token_count_jmespath str | None

JMESPath query to extract input token count from the response.

'usage.prompt_tokens'
Source code in llmeter/endpoints/bedrock_invoke.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def __init__(
    self,
    model_id: str,
    endpoint_name: str | None = None,
    region: str | None = None,
    bedrock_boto3_client: Any = None,
    max_attempts: int = 3,
    generated_text_jmespath: str = "choices[0].message.content",
    generated_token_count_jmespath: str | None = "usage.completion_tokens",
    input_text_jmespath: str = "messages[].content[].text",
    input_token_count_jmespath: str | None = "usage.prompt_tokens",
):
    """Create a Bedrock InvokeModel API-based Endpoint

    The default ..._jmespath parameters assume your target model uses an OpenAI
    ChatCompletions-like API, which is true for many (but not all) Bedrock models. You'll need
    to override these if targeting a model with different request/response format.

    Args:
        model_id:
            The identifier for the model to use
        endpoint_name:
            Name of the endpoint. Defaults to None.
        region:
            AWS region to use. Defaults to bedrock_boto3_client's, or configured from AWS CLI.
        bedrock_boto3_client:
            Optional pre-configured boto3 client, otherwise one will be created.
        max_attempts:
            Maximum number of retry attempts. Defaults to 3.
        generated_text_jmespath:
            JMESPath query to extract generated text from model response.
        generated_token_count_jmespath:
            JMESPath query to extract generated token count from model response.
        input_text_jmespath:
            JMESPath query to extract input text from the model request payload.
        input_token_count_jmespath:
            JMESPath query to extract input token count from the response.
    """
    super().__init__(
        model_id=model_id,
        endpoint_name=endpoint_name,
        region=region,
        bedrock_boto3_client=bedrock_boto3_client,
        max_attempts=max_attempts,
        generated_text_jmespath=generated_text_jmespath,
        generated_token_count_jmespath=generated_token_count_jmespath,
        input_text_jmespath=input_text_jmespath,
        input_token_count_jmespath=input_token_count_jmespath,
    )

invoke

invoke(payload)

Invoke the Bedrock InvokeModel API with the given payload.

Source code in llmeter/endpoints/bedrock_invoke.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
@BedrockInvokeBase.llmeter_invoke
def invoke(self, payload: dict) -> InvokeModelResponseTypeDef:
    """Invoke the Bedrock InvokeModel API with the given payload."""
    req_body = json.dumps(payload).encode("utf-8")

    client_response = self._bedrock_client.invoke_model(  # type: ignore
        accept="application/json",
        body=req_body,
        contentType="application/json",
        modelId=self.model_id,
        # TODO: Provide config for other optional arguments
        # trace, guardrailIdentifier/Version, performanceConfigLatency, serviceTier
    )
    return client_response

process_raw_response

process_raw_response(raw_response, start_t, response)

Parse the response from a Bedrock InvokeModel API call.

Parameters:

Name Type Description Default
raw_response

Raw response from the Bedrock API.

required
start_t float

The timestamp when the request was initiated.

required
response InvocationResponse

LLMeter InvocationResponse object on which results will be saved (in-place)

required
Source code in llmeter/endpoints/bedrock_invoke.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def process_raw_response(
    self, raw_response, start_t: float, response: InvocationResponse
) -> None:
    """Parse the response from a Bedrock InvokeModel API call.

    Args:
        raw_response: Raw response from the Bedrock API.
        start_t: The timestamp when the request was initiated.
        response: LLMeter InvocationResponse object on which results will be saved (in-place)
    """
    response_body_json = raw_response["body"].read().decode("utf-8")
    # Stop timer as soon as response is fully received, before parsing out components:
    response.time_to_last_token = time.perf_counter() - start_t

    response_body = json.loads(response_body_json)

    response.id = response_body.get("id") or raw_response.get(
        "ResponseMetadata", {}
    ).get("RequestId")
    response.retries = raw_response.get("ResponseMetadata", {}).get("RetryAttempts")

    response_text = jmespath.search(self.generated_text_jmespath, response_body)
    if isinstance(response_text, list):
        response_text = "\n".join(response_text)
    response.response_text = response_text

    response.num_tokens_input = (
        jmespath.search(self.input_token_count_jmespath, response_body)
        if self.input_token_count_jmespath
        else None
    )
    response.num_tokens_output = (
        jmespath.search(self.generated_token_count_jmespath, response_body)
        if self.generated_token_count_jmespath
        else None
    )

BedrockInvokeBase

BedrockInvokeBase(model_id, generated_text_jmespath, input_text_jmespath, generated_token_count_jmespath=None, input_token_count_jmespath=None, endpoint_name=None, region=None, bedrock_boto3_client=None, max_attempts=3)

Bases: Endpoint[TBedrockInvokeResponse], Generic[TBedrockInvokeResponse]

Source code in llmeter/endpoints/bedrock_invoke.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(
    self,
    model_id: str,
    generated_text_jmespath: str,
    input_text_jmespath: str,
    generated_token_count_jmespath: str | None = None,
    input_token_count_jmespath: str | None = None,
    endpoint_name: str | None = None,
    region: str | None = None,
    bedrock_boto3_client: Any = None,
    max_attempts: int = 3,
):
    """Shared constructor logic for Bedrock Invoke*-API endpoints"""
    super().__init__(
        model_id=model_id,
        endpoint_name=endpoint_name or "amazon bedrock",
        provider="bedrock",
    )

    self.generated_text_jmespath = generated_text_jmespath
    self.generated_token_count_jmespath = generated_token_count_jmespath
    self.input_text_jmespath = input_text_jmespath
    self.input_token_count_jmespath = input_token_count_jmespath

    self.region = (
        region
        or (bedrock_boto3_client and bedrock_boto3_client.meta.region_name)
        or boto3.session.Session().region_name
    )
    logger.info(f"Using AWS region: {self.region}")

    self._bedrock_client = bedrock_boto3_client
    if self._bedrock_client is None:
        config = Config(retries={"max_attempts": max_attempts, "mode": "standard"})
        self._bedrock_client = boto3.client(
            "bedrock-runtime", region_name=self.region, config=config
        )

create_payload staticmethod

create_payload(user_message, max_tokens=256, **kwargs)

Create a payload, assuming your target Bedrock model supports ChatCompletions-like API

Parameters:

Name Type Description Default
user_message str | list[str]

The user's message or a sequence of messages.

required
max_tokens int | None

The maximum number of tokens to generate. Defaults to 256.

256
**kwargs Any

Additional keyword arguments to include in the payload.

{}

Returns:

Name Type Description
dict dict

The formatted payload for the Bedrock API request.

Raises:

Type Description
TypeError

If user_message is not a string or list of strings

ValueError

If max_tokens is not a positive integer

Source code in llmeter/endpoints/bedrock_invoke.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
@staticmethod
def create_payload(
    user_message: str | list[str], max_tokens: int | None = 256, **kwargs: Any
) -> dict:
    """
    Create a payload, assuming your target Bedrock model supports ChatCompletions-like API

    Args:
        user_message: The user's message or a sequence of messages.
        max_tokens: The maximum number of tokens to generate. Defaults to 256.
        **kwargs: Additional keyword arguments to include in the payload.

    Returns:
        dict: The formatted payload for the Bedrock API request.

    Raises:
        TypeError: If user_message is not a string or list of strings
        ValueError: If max_tokens is not a positive integer
    """
    if not isinstance(user_message, (str, list)):
        raise TypeError("user_message must be a string or list of strings")

    if isinstance(user_message, list):
        if not all(isinstance(msg, str) for msg in user_message):
            raise TypeError("All messages must be strings")
        if not user_message:
            raise ValueError("user_message list cannot be empty")

    if not isinstance(max_tokens, int) or max_tokens <= 0:
        raise ValueError("max_tokens must be a positive integer")

    if isinstance(user_message, str):
        user_message = [user_message]

    try:
        payload: dict = {
            "messages": [
                {"role": "user", "content": [{"text": k, "type": "text"}]}
                for k in user_message
            ],
        }

        if max_tokens:
            payload["max_tokens"] = max_tokens

        payload.update(kwargs)
        return payload

    except Exception as e:
        logger.exception("Failed to create InvokeModel payload")
        raise RuntimeError(f"Failed to create payload: {str(e)}") from e

BedrockInvokeStream

BedrockInvokeStream(model_id, endpoint_name=None, region=None, bedrock_boto3_client=None, max_attempts=3, generated_text_jmespath='choices[0].delta.content', generated_token_count_jmespath='"amazon-bedrock-invocationMetrics".outputTokenCount', input_text_jmespath='messages[].content[].text', input_token_count_jmespath='"amazon-bedrock-invocationMetrics".inputTokenCount')

Bases: BedrockInvokeBase[InvokeModelWithResponseStreamResponseTypeDef]

LLMeter Endpoint for Amazon Bedrock InvokeModelWithResponseStream API

The default ..._jmespath parameters assume your target model uses an OpenAI ChatCompletions-like streaming API, which is true for many (but not all) Bedrock models. You'll need to override these if targeting a model with different request/response format.

Parameters:

Name Type Description Default
model_id str

The identifier for the model to use

required
endpoint_name str | None

Name of the endpoint. Defaults to None.

None
region str | None

AWS region to use. Defaults to bedrock_boto3_client's, or configured from AWS CLI.

None
bedrock_boto3_client Any

Optional pre-configured boto3 client, otherwise one will be created.

None
max_attempts int

Maximum number of retry attempts. Defaults to 3.

3
generated_text_jmespath str

JMESPath query to extract incremental text from a chunk of the model response.

'choices[0].delta.content'
generated_token_count_jmespath str | None

JMESPath query to extract generated token count from a chunk of model response.

'"amazon-bedrock-invocationMetrics".outputTokenCount'
input_text_jmespath str

JMESPath query to extract input text from the model request payload.

'messages[].content[].text'
input_token_count_jmespath str | None

JMESPath query to extract input token count from a chunk of the model response.

'"amazon-bedrock-invocationMetrics".inputTokenCount'
Source code in llmeter/endpoints/bedrock_invoke.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def __init__(
    self,
    model_id: str,
    endpoint_name: str | None = None,
    region: str | None = None,
    bedrock_boto3_client: Any = None,
    max_attempts: int = 3,
    generated_text_jmespath: str = "choices[0].delta.content",
    generated_token_count_jmespath: str
    | None = '"amazon-bedrock-invocationMetrics".outputTokenCount',
    input_text_jmespath: str = "messages[].content[].text",
    input_token_count_jmespath: str
    | None = '"amazon-bedrock-invocationMetrics".inputTokenCount',
):
    """Create a Bedrock InvokeModelWithResponseStream API-based Endpoint

    The default ..._jmespath parameters assume your target model uses an OpenAI
    ChatCompletions-like streaming API, which is true for many (but not all) Bedrock models.
    You'll need to override these if targeting a model with different request/response format.

    Args:
        model_id:
            The identifier for the model to use
        endpoint_name:
            Name of the endpoint. Defaults to None.
        region:
            AWS region to use. Defaults to bedrock_boto3_client's, or configured from AWS CLI.
        bedrock_boto3_client:
            Optional pre-configured boto3 client, otherwise one will be created.
        max_attempts:
            Maximum number of retry attempts. Defaults to 3.
        generated_text_jmespath:
            JMESPath query to extract incremental text from *a chunk of* the model response.
        generated_token_count_jmespath:
            JMESPath query to extract generated token count from *a chunk of* model response.
        input_text_jmespath:
            JMESPath query to extract input text from the model request payload.
        input_token_count_jmespath:
            JMESPath query to extract input token count from *a chunk of* the model response.
    """
    super().__init__(
        model_id=model_id,
        endpoint_name=endpoint_name,
        region=region,
        bedrock_boto3_client=bedrock_boto3_client,
        max_attempts=max_attempts,
        generated_text_jmespath=generated_text_jmespath,
        generated_token_count_jmespath=generated_token_count_jmespath,
        input_text_jmespath=input_text_jmespath,
        input_token_count_jmespath=input_token_count_jmespath,
    )

process_raw_response

process_raw_response(raw_response, start_t, response)

Parse the streaming response from Bedrock InvokeModelWithResponseStream API.

Parameters:

Name Type Description Default
client_response

The raw response from the Bedrock API.

required
start_t float

The timestamp when the request was initiated.

required

Returns:

Type Description
None

InvocationResponse with the generated text and metadata.

Source code in llmeter/endpoints/bedrock_invoke.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def process_raw_response(
    self, raw_response, start_t: float, response: InvocationResponse
) -> None:
    """Parse the streaming response from Bedrock InvokeModelWithResponseStream API.

    Args:
        client_response: The raw response from the Bedrock API.
        start_t: The timestamp when the request was initiated.

    Returns:
        InvocationResponse with the generated text and metadata.
    """
    chunks = []
    resp_meta = raw_response.get("ResponseMetadata", {})
    response.id = resp_meta.get("RequestId")
    response.retries = resp_meta.get("RetryAttempts")
    response.time_to_first_token = None
    response.time_to_last_token = None

    for event in raw_response["body"]:
        now = time.perf_counter()
        if "chunk" in event:
            chunk_bytes = event["chunk"]["bytes"]
            chunk_data = json.loads(chunk_bytes)
            if "id" in chunk_data:
                response.id = chunk_data["id"]
            chunk_text = jmespath.search(self.generated_text_jmespath, chunk_data)
            if isinstance(chunk_text, list):
                chunk_text = "".join(chunk_text)
            if chunk_text:
                if response.time_to_first_token is None:
                    response.time_to_first_token = now - start_t
                response.time_to_last_token = now - start_t
                if response.response_text is None:
                    response.response_text = chunk_text
                else:
                    response.response_text += chunk_text
            chunks.append(chunk_data)
        else:
            # Non-chunk events: check for Bedrock error events, skip
            # everything else (e.g. messageStart, contentBlockStart).
            for error_type in BEDROCK_STREAM_ERROR_TYPES:
                if error_type in event:
                    response.error = (
                        f"Bedrock {error_type}: {event[error_type]['message']}"
                    )
                    response.time_to_last_token = now - start_t
                    # We don't throw error here yet , because we still want to try and loop
                    # through the received chunks again below.
                    break

    # Post-process additional (token count) data from chunks
    # (after performance timing, to avoid counting JMESPath overhead)
    # TODO: Count cache read tokens if returned by this endpoint?
    for chunk in chunks:
        chk_tokens_input = (
            jmespath.search(self.input_token_count_jmespath, chunk)
            if self.input_token_count_jmespath
            else None
        )
        if chk_tokens_input is not None:
            response.num_tokens_input = chk_tokens_input
        chk_tokens_output = (
            jmespath.search(self.generated_token_count_jmespath, chunk)
            if self.generated_token_count_jmespath
            else None
        )
        if chk_tokens_output is not None:
            response.num_tokens_output = chk_tokens_output