Skip to main content
Stream responses in real-time for a better user experience. Auriko supports Server-Sent Events (SSE) for streaming.

Prerequisites

  • An Auriko API key
  • Python 3.10+ with auriko SDK installed (pip install auriko)
    • OR Node.js 18+ with @auriko/sdk installed (npm install @auriko/sdk)

Stream responses

Stream a chat completion response:
import os
from auriko import Client

client = Client(
    api_key=os.environ["AURIKO_API_KEY"],
    base_url="https://api.auriko.ai/v1"
)

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a short story"}],
    stream=True
)

for chunk in stream:
    if chunk.choices and chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Stream asynchronously (Python)

Stream with the async client:
import os
from auriko import AsyncClient
import asyncio

async def stream_response():
    client = AsyncClient(
        api_key=os.environ["AURIKO_API_KEY"],
        base_url="https://api.auriko.ai/v1"
    )

    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Write a short story"}],
        stream=True
    )
    
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

asyncio.run(stream_response())

Stream events

Each chunk contains:
# ChatCompletionChunk
chunk.id           # "chatcmpl-abc123"
chunk.model        # "gpt-4o"
chunk.created      # 1234567890
chunk.choices[0].delta.content  # Token content (may be None)
chunk.choices[0].delta.role     # "assistant" (first chunk only)
chunk.choices[0].finish_reason  # None until last chunk ("stop")

Handle final chunks

The last chunk carries finish_reason and usage. Auriko forces include_usage: true on all streaming requests. You don’t need to set stream_options manually.
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
)

full_content = ""
usage = None

for chunk in stream:
    if chunk.choices:
        if chunk.choices[0].delta.content:
            full_content += chunk.choices[0].delta.content
        if chunk.choices[0].finish_reason:
            print(f"\n\nFinished: {chunk.choices[0].finish_reason}")
    if chunk.usage:
        usage = chunk.usage

if usage:
    print(f"Tokens used: {usage.total_tokens}")
Auriko forces stream_options.include_usage to true for accurate billing. Setting it explicitly is harmless but unnecessary.

Stream properties

The stream object exposes usage, routing metadata, and response headers after iteration completes.
PropertyPythonTypeScriptAvailable
Token usagestream.usagestream.usageAfter iteration
Routing infostream.routing_metadatastream.routing_metadataAfter iteration
Response headersstream.response_headersstream.responseHeadersImmediately
Close connectionstream.close()stream.close()Any time
Use the stream as a context manager to ensure the connection is released:
with client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
) as stream:
    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

# Available after iteration
if stream.usage:
    print(f"Tokens: {stream.usage.total_tokens}")
if stream.routing_metadata:
    print(f"Provider: {stream.routing_metadata.provider}")
Use an async context manager for automatic cleanup:
async with await client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True
) as stream:
    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
routing_metadata and usage are only present in the final chunk (with choices: []). Consume the stream to completion to access them.
In TypeScript, you can only iterate a stream once. A second attempt throws an error.

Stream with tools

Accumulate tool call fragments from a streamed response:
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get weather",
            "parameters": {"type": "object", "properties": {"city": {"type": "string"}}}
        }
    }],
    stream=True
)

tool_calls = []
for chunk in stream:
    if not chunk.choices:
        continue
    delta = chunk.choices[0].delta
    
    # Handle tool call streaming
    if delta.tool_calls:
        for tc in delta.tool_calls:
            if tc.index >= len(tool_calls):
                tool_calls.append({"id": tc.id, "function": {"name": "", "arguments": ""}})
            if tc.function and tc.function.name:
                tool_calls[tc.index]["function"]["name"] += tc.function.name
            if tc.function and tc.function.arguments:
                tool_calls[tc.index]["function"]["arguments"] += tc.function.arguments

print(tool_calls)
See Tool Calling Guide for function definitions and multi-turn tool conversations.

Stream with routing options

Pass routing options to a streaming request:
stream = client.chat.completions.create(
    model="gpt-5.4",
    messages=[{"role": "user", "content": "Hello!"}],
    stream=True,
    routing={
        "optimize": "speed",
        "max_ttft_ms": 100,
    }
)

for chunk in stream:
    if chunk.choices and chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Handle stream errors

Catch errors during streaming:
import os
from auriko import Client, ProviderError, RateLimitError

client = Client(
    api_key=os.environ["AURIKO_API_KEY"],
    base_url="https://api.auriko.ai/v1"
)

try:
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello!"}],
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)

except ProviderError as e:
    print(f"Provider error: {e}")
except RateLimitError as e:
    print(f"Rate limited: {e}")
See Error Handling Guide for retry strategies and circuit breakers.

SSE format

Raw SSE events look like this. Auriko appends a final event with routing_metadata and usage before [DONE].
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10},"routing_metadata":{"provider":"openai","routing_strategy":"balanced","total_latency_ms":847,"cost":{"billable_cost_usd":0.00015}}}

data: [DONE]
The final event before [DONE] carries routing_metadata and usage with choices: []. SDKs expose these as stream.routing_metadata and stream.usage after iteration.