Stream responses in real-time for a better user experience. Auriko supports Server-Sent Events (SSE) for streaming.
Prerequisites
- An Auriko API key
- Python 3.10+ with
auriko SDK installed (pip install auriko)
- OR Node.js 18+ with
@auriko/sdk installed (npm install @auriko/sdk)
Stream responses
Stream a chat completion response:
import os
from auriko import Client
client = Client(
api_key=os.environ["AURIKO_API_KEY"],
base_url="https://api.auriko.ai/v1"
)
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a short story"}],
stream=True
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Stream asynchronously (Python)
Stream with the async client:
import os
from auriko import AsyncClient
import asyncio
async def stream_response():
client = AsyncClient(
api_key=os.environ["AURIKO_API_KEY"],
base_url="https://api.auriko.ai/v1"
)
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a short story"}],
stream=True
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_response())
Stream events
Each chunk contains:
# ChatCompletionChunk
chunk.id # "chatcmpl-abc123"
chunk.model # "gpt-4o"
chunk.created # 1234567890
chunk.choices[0].delta.content # Token content (may be None)
chunk.choices[0].delta.role # "assistant" (first chunk only)
chunk.choices[0].finish_reason # None until last chunk ("stop")
Handle final chunks
The last chunk carries finish_reason and usage. Auriko forces include_usage: true on all streaming requests. You don’t need to set stream_options manually.
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
full_content = ""
usage = None
for chunk in stream:
if chunk.choices:
if chunk.choices[0].delta.content:
full_content += chunk.choices[0].delta.content
if chunk.choices[0].finish_reason:
print(f"\n\nFinished: {chunk.choices[0].finish_reason}")
if chunk.usage:
usage = chunk.usage
if usage:
print(f"Tokens used: {usage.total_tokens}")
Auriko forces stream_options.include_usage to true for accurate billing. Setting it explicitly is harmless but unnecessary.
Stream properties
The stream object exposes usage, routing metadata, and response headers after iteration completes.
| Property | Python | TypeScript | Available |
|---|
| Token usage | stream.usage | stream.usage | After iteration |
| Routing info | stream.routing_metadata | stream.routing_metadata | After iteration |
| Response headers | stream.response_headers | stream.responseHeaders | Immediately |
| Close connection | stream.close() | stream.close() | Any time |
Use the stream as a context manager to ensure the connection is released:
with client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
) as stream:
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
# Available after iteration
if stream.usage:
print(f"Tokens: {stream.usage.total_tokens}")
if stream.routing_metadata:
print(f"Provider: {stream.routing_metadata.provider}")
Use an async context manager for automatic cleanup:
async with await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
) as stream:
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
routing_metadata and usage are only present in the final chunk (with choices: []). Consume the stream to completion to access them.
In TypeScript, you can only iterate a stream once. A second attempt throws an error.
Accumulate tool call fragments from a streamed response:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
tools=[{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather",
"parameters": {"type": "object", "properties": {"city": {"type": "string"}}}
}
}],
stream=True
)
tool_calls = []
for chunk in stream:
if not chunk.choices:
continue
delta = chunk.choices[0].delta
# Handle tool call streaming
if delta.tool_calls:
for tc in delta.tool_calls:
if tc.index >= len(tool_calls):
tool_calls.append({"id": tc.id, "function": {"name": "", "arguments": ""}})
if tc.function and tc.function.name:
tool_calls[tc.index]["function"]["name"] += tc.function.name
if tc.function and tc.function.arguments:
tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments
print(tool_calls)
See Tool Calling Guide for function definitions and multi-turn tool conversations.
Stream with routing options
Pass routing options to a streaming request:
stream = client.chat.completions.create(
model="gpt-5.4",
messages=[{"role": "user", "content": "Hello!"}],
stream=True,
routing={
"optimize": "speed",
"max_ttft_ms": 100,
}
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Handle stream errors
Catch errors during streaming:
import os
from auriko import Client, ProviderError, RateLimitError
client = Client(
api_key=os.environ["AURIKO_API_KEY"],
base_url="https://api.auriko.ai/v1"
)
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
stream=True
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
except ProviderError as e:
print(f"Provider error: {e}")
except RateLimitError as e:
print(f"Rate limited: {e}")
See Error Handling Guide for retry strategies and circuit breakers.
Raw SSE events look like this. Auriko appends a final event with routing_metadata and usage before [DONE].
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10},"routing_metadata":{"provider":"openai","routing_strategy":"balanced","total_latency_ms":847,"cost":{"billable_cost_usd":0.00015}}}
data: [DONE]
The final event before [DONE] carries routing_metadata and usage with choices: []. SDKs expose these as stream.routing_metadata and stream.usage after iteration.