Integration Guide

Everything a service needs to log LLM traffic through Keplor.

How it works

Your application or gateway makes LLM calls as usual, then POSTs event data to Keplor. Keplor computes cost from its bundled pricing catalog, compresses and stores the event, and makes it queryable via the API.

Your App / Gateway                Keplor
      |                              |
      |-- LLM call --> Provider      |
      |<-- response --               |
      |                              |
      |-- POST /v1/events ---------> | validate, compute cost,
      |<-- 201 {id, cost} ---------- | compress, store

Keplor is observational only — it never touches your LLM traffic. It just records what happened.

Minimal example

Send the two required fields (model and provider) plus token counts:

$ curl -X POST http://localhost:8080/v1/events \
  -H "Authorization: Bearer sk-your-key" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "gpt-4o",
    "provider": "openai",
    "usage": {"input_tokens": 1000, "output_tokens": 500},
    "latency": {"ttft_ms": 30, "total_ms": 450},
    "user_id": "alice",
    "source": "my-app"
  }'

Response:

{
  "id": "01JA2B3C4D5E6F7G8H9J0KMNPQ",
  "cost_nanodollars": 6250000,
  "model": "gpt-4o",
  "provider": "openai"
}

Cost is auto-computed: 6250000 nanodollars = $0.00625.

Authentication

When API keys are configured, include a Bearer token:

Authorization: Bearer sk-your-key

Key formats

Config formatKey IDTier
"prod-svc:sk-abc123"prod-svcdefault_tier
"sk-abc123"key_<sha256-prefix>default_tier
{ id, secret, tier }id valueexplicit tier

Retention tiers

Each API key is assigned a retention tier that controls how long its events are kept. Configure tiers in [retention]:

[retention]
default_tier = "free"

[[retention.tiers]]
name = "free"
days = 7

[[retention.tiers]]
name = "pro"
days = 90

[[retention.tiers]]
name = "team"
days = 180

Assign tiers to keys using the extended format:

[[auth.api_key_entries]]
id = "pro-user"
secret = "sk-pro-key"
tier = "pro"

Tier names are fully configurable — add "enterprise", "trial", or any custom name. GC runs one pass per tier automatically.

Server-side key attribution

When auth is enabled, Keplor overrides the client-provided api_key_id with the authenticated key's ID and assigns the key's retention tier. This prevents clients from spoofing attribution.

Open mode

When no keys are configured (the default), auth is disabled. All requests are accepted without a Bearer token and assigned to default_tier.

What to send

Only model and provider are required. Everything else is optional with sensible defaults.

Required fields

FieldTypeDescription
modelstringModel name ("gpt-4o", "claude-sonnet-4-20250514")
providerstringProvider key (see supported providers)

Token usage

FieldTypeDefaultDescription
usage.input_tokensu320Input/prompt tokens
usage.output_tokensu320Output/completion tokens
usage.cache_read_input_tokensu320Tokens served from cache
usage.cache_creation_input_tokensu320Tokens written to cache
usage.reasoning_tokensu320Chain-of-thought / thinking tokens
usage.audio_input_tokensu320Audio input tokens
usage.audio_output_tokensu320Audio output tokens
usage.image_tokensu320Image/vision tokens
usage.tool_use_tokensu320Tool/function call tokens

Latency

FieldTypeDescription
latency.ttft_msu32Time to first byte (ms)
latency.total_msu32End-to-end latency (ms)
latency.time_to_close_msu32Time from last token to stream close

Attribution

FieldTypeDescription
user_idstringUser identity for cost attribution
api_key_idstringAPI key (overridden by server when auth enabled)
org_idstringOrganization ID
project_idstringProject ID
route_idstringLogical route name ("chat", "embeddings")
sourcestringName of the sending system

Flags

FieldTypeDefaultDescription
flags.streamingboolfalseResponse was streamed
flags.tool_callsboolfalseIncluded tool/function calls
flags.reasoningboolfalseUsed extended thinking
flags.stream_incompleteboolfalseStream ended prematurely
flags.cache_usedboolfalseResponse served from cache

Other optional fields

FieldTypeDescription
cost_nanodollarsi64Override auto-computed cost (nanodollars)
timestampi64 or stringEpoch nanos or ISO 8601 (default: server time)
methodstringHTTP method (default: "POST")
endpointstringAPI path ("/v1/chat/completions")
http_statusu16Upstream HTTP status code
error.kindstringError category ("rate_limited", etc.)
error.messagestringError message text
error.statusu16Error HTTP status
trace_idstringW3C trace ID
request_idstringProvider request ID
client_ipstringClient source IP
user_agentstringClient user-agent
request_bodyany JSONFull request body (stored compressed)
response_bodyany JSONFull response body (stored compressed)
metadataany JSONArbitrary metadata (queryable via user_tag/session_tag)

Full event example

{
  "model": "claude-sonnet-4-20250514",
  "provider": "anthropic",
  "cost_nanodollars": null,
  "usage": {
    "input_tokens": 2000,
    "output_tokens": 1000,
    "cache_read_input_tokens": 500,
    "cache_creation_input_tokens": 0,
    "reasoning_tokens": 0
  },
  "latency": {
    "ttft_ms": 45,
    "total_ms": 800,
    "time_to_close_ms": 20
  },
  "timestamp": "2024-11-15T10:30:00Z",
  "method": "POST",
  "endpoint": "/v1/messages",
  "http_status": 200,
  "source": "litellm",
  "user_id": "alice",
  "api_key_id": "my-service",
  "org_id": "acme-corp",
  "project_id": "chatbot-v2",
  "route_id": "chat",
  "flags": {
    "streaming": true,
    "tool_calls": false,
    "reasoning": true,
    "stream_incomplete": false,
    "cache_used": true
  },
  "error": null,
  "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
  "request_id": "req_abc123",
  "client_ip": "10.0.1.42",
  "user_agent": "my-app/1.0",
  "request_body": {"messages": [{"role": "user", "content": "Hello"}]},
  "response_body": {"content": [{"type": "text", "text": "Hi there!"}]},
  "metadata": {"session_id": "sess_xyz", "user_tag": "premium"}
}

What you get back

Every successful ingest returns:

FieldTypeDescription
idstringULID (time-sortable unique ID)
cost_nanodollarsi64Computed or overridden cost
modelstringNormalized model name
providerstringNormalized provider key

Supported providers

Provider keyService
openaiOpenAI (api.openai.com)
anthropicAnthropic (api.anthropic.com)
geminiGoogle AI Studio
vertex_aiGoogle Vertex AI
bedrockAWS Bedrock
azureAzure OpenAI
mistralMistral AI
groqGroq
xaixAI Grok
deepseekDeepSeek
cohereCohere v2
ollamaOllama (local)

Any unrecognized provider string is treated as OpenAI-compatible. Matching is case-insensitive.

Cost accounting

Costs are stored as int64 nanodollars (10-9 USD) to avoid floating-point precision issues.

NanodollarsUSD
1,000,000,000$1.00
1,000,000$0.001
1,000$0.000001

When you omit cost_nanodollars, Keplor computes it from the model pricing catalog and your usage token counts. This handles prompt caching discounts, reasoning token pricing, and audio/image tokens automatically.

To override: set cost_nanodollars to your own value. Unknown models get cost 0.

Batch ingestion

For high-throughput scenarios, use /v1/events/batch with up to 10,000 events per request:

$ curl -X POST http://localhost:8080/v1/events/batch \
  -H "Authorization: Bearer sk-your-key" \
  -H "Content-Type: application/json" \
  -d '{
    "events": [
      {"model": "gpt-4o", "provider": "openai", "usage": {"input_tokens": 500}},
      {"model": "claude-sonnet-4-20250514", "provider": "anthropic",
       "usage": {"input_tokens": 800, "output_tokens": 200}}
    ]
  }'

Response (201 all accepted, 207 partial):

{
  "results": [
    {"id": "01JA...", "cost_nanodollars": 1250000, "model": "gpt-4o", "provider": "openai"},
    {"id": "01JB...", "cost_nanodollars": 4200000, "model": "claude-sonnet-4-20250514", "provider": "anthropic"}
  ],
  "accepted": 2,
  "rejected": 0
}

Batch writes are fire-and-forget: events are validated synchronously but flushed to disk asynchronously (~50ms). Events may be lost on server crash before flush.

Querying your data

Check cost for a user:

$ curl "http://localhost:8080/v1/quota?user_id=alice&from=1700000000000000000" \
  -H "Authorization: Bearer sk-your-key"
{"cost_nanodollars": 150000000, "event_count": 85}

See the API Reference for all query endpoints: events, rollups, stats, and quota.

Error handling

StatusMeaningRetry?
201CreatedNo
207Partial success (batch)Retry failed items
400Validation error or bad JSONFix request
401Missing or invalid API keyFix auth
408Request timeoutYes
422Unprocessable entityFix payload
429Rate limit exceededYes, after Retry-After seconds
500Server errorYes, with backoff
503Server overloadedYes, with backoff

All errors return {"error": "message"}. All responses include an X-Request-Id header for log correlation.

Idempotency

To safely retry failed requests without creating duplicates, include an Idempotency-Key header:

curl -X POST http://localhost:8080/v1/events \
  -H "Idempotency-Key: my-unique-key-123" \
  -H "Content-Type: application/json" \
  -d '{"model": "gpt-4o", "provider": "openai"}'

If the same key is sent again within the TTL (default 5 minutes), Keplor returns the cached response without creating a new event.

Integration examples

Python

import requests, time

KEPLOR = "http://localhost:8080"
HEADERS = {
    "Authorization": "Bearer sk-your-key",
    "Content-Type": "application/json",
}

# After each LLM call, log to Keplor
def log_llm_event(model, provider, usage, latency_ms, user_id=None):
    return requests.post(f"{KEPLOR}/v1/events", headers=HEADERS, json={
        "model": model,
        "provider": provider,
        "usage": usage,
        "latency": {"total_ms": latency_ms},
        "http_status": 200,
        "user_id": user_id,
        "source": "my-app",
    }).json()

result = log_llm_event("gpt-4o", "openai", {
    "input_tokens": 1500,
    "output_tokens": 800,
}, 450, user_id="alice")

print(f"Event {result['id']} cost: ${result['cost_nanodollars'] / 1e9:.6f}")

Node.js

const KEPLOR = "http://localhost:8080";
const headers = {
  "Authorization": "Bearer sk-your-key",
  "Content-Type": "application/json",
};

async function logLlmCall(model, provider, usage, latencyMs, userId) {
  const resp = await fetch(`${KEPLOR}/v1/events`, {
    method: "POST",
    headers,
    body: JSON.stringify({
      model, provider, usage,
      latency: { total_ms: latencyMs },
      http_status: 200,
      user_id: userId,
      source: "my-node-app",
    }),
  });
  return resp.json();
}

const result = await logLlmCall("gpt-4o", "openai",
  { input_tokens: 1200, output_tokens: 600 }, 350, "alice");
console.log(`Cost: $${(result.cost_nanodollars / 1e9).toFixed(6)}`);

LiteLLM callback

import litellm, requests

KEPLOR = "http://localhost:8080"

def keplor_callback(kwargs, completion_response, start_time, end_time):
    latency_ms = int((end_time - start_time).total_seconds() * 1000)
    usage = completion_response.get("usage", {})
    requests.post(f"{KEPLOR}/v1/events", json={
        "model": kwargs.get("model", ""),
        "provider": kwargs.get("custom_llm_provider", "openai"),
        "usage": {
            "input_tokens": usage.get("prompt_tokens", 0),
            "output_tokens": usage.get("completion_tokens", 0),
        },
        "latency": {"total_ms": latency_ms},
        "http_status": 200,
        "user_id": kwargs.get("user"),
        "source": "litellm",
    })

litellm.success_callback = [keplor_callback]

Production operations

Configuration

[server]
listen_addr = "0.0.0.0:8080"
shutdown_timeout_secs = 25       # drain batch writer + WAL checkpoint
request_timeout_secs = 30        # per-request timeout (408 on exceed)
max_connections = 10000          # concurrent connection limit (65000 for 50K+ users)

[storage]
db_path = "keplor.db"
retention_days = 90              # legacy global GC (prefer [retention] tiers)
wal_checkpoint_secs = 300        # WAL truncation interval
gc_interval_secs = 3600          # GC run frequency (0 = disabled)
read_pool_size = 4               # SQLite read connections (use 16 for high concurrency)

[auth]
api_keys = ["prod-svc:sk-abc"]   # simple format (empty = open mode)

# Extended format with tier:
# [[auth.api_key_entries]]
# id = "pro-user"
# secret = "sk-pro-key"
# tier = "pro"

[retention]
default_tier = "free"

[[retention.tiers]]
name = "free"
days = 7

[[retention.tiers]]
name = "pro"
days = 90

[pipeline]
batch_size = 64                  # use 256 for high throughput
max_body_bytes = 10485760        # 10 MB
channel_capacity = 32768         # batch writer queue depth

[idempotency]
enabled = true                   # dedup retries via Idempotency-Key header
ttl_secs = 300                   # 5 minute cache TTL
max_entries = 100000

[rate_limit]
enabled = false                  # per-key rate limiting (429 on exceed)
requests_per_second = 100.0
burst = 200

# [tls]                          # optional HTTPS
# cert_path = "/etc/keplor/cert.pem"
# key_path = "/etc/keplor/key.pem"

Override any field with KEPLOR_<SECTION>_<FIELD> environment variables. See Configuration for the full reference.

Event archival (S3 / R2 / MinIO)

For long-term retention beyond what SQLite should hold, archive old events to any S3-compatible object store. Build with the s3 feature and add an [archive] section.

What moves: Entire events — serialized to JSONL, compressed with zstd, uploaded as files partitioned by user and day. Daily rollups stay in SQLite for fast aggregation.

Cloudflare R2

R2 has 10 GB free storage and zero egress fees.

[archive]
bucket = "keplor-archive"
endpoint = "https://<account-id>.r2.cloudflarestorage.com"
region = "auto"
access_key_id = "your-r2-access-key"
secret_access_key = "your-r2-secret-key"
prefix = "events"
archive_after_days = 30

AWS S3

[archive]
bucket = "keplor-archive"
endpoint = "https://s3.us-east-1.amazonaws.com"
region = "us-east-1"
access_key_id = "AKIA..."
secret_access_key = "..."
prefix = "events"
archive_after_days = 30

MinIO (self-hosted)

[archive]
bucket = "keplor-archive"
endpoint = "http://localhost:9000"
region = "us-east-1"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
path_style = true    # required for MinIO
archive_after_days = 30

Archival runs every archive_interval_secs (default 1 hour). Events are grouped by (user_id, day), compressed, and uploaded. Archived events are deleted from SQLite; VACUUM reclaims disk space. S3 connectivity is verified at startup.

Important: Set archive_after_days lower than your shortest retention tier, or GC will delete events before archival. See Event Archival for the full lifecycle.

Build command: cargo build --release --features mimalloc,s3

JSON structured logging

$ keplor run --json-logs

Emits newline-delimited JSON for log aggregation (Loki, Datadog, CloudWatch).

Graceful shutdown

On SIGINT/SIGTERM, Keplor stops accepting connections, drains the batch writer (flushes all pending events), runs a WAL checkpoint, and exits. Drain waits up to shutdown_timeout_secs.

Automated GC

Keplor runs tiered garbage collection every gc_interval_secs (default: 1 hour). Each configured retention tier gets its own pass — free-tier events older than 7 days are deleted independently of pro-tier events at 90 days.

Set gc_interval_secs = 0 to disable. You can still run keplor gc --older-than-days N manually.

Prometheus metrics

Scrape GET /metrics (no auth required).

MetricTypeDescription
keplor_events_ingested_totalcounterEvents ingested by provider
keplor_events_errors_totalcounterErrors by stage (validation, store, queue_full)
keplor_ingest_duration_secondshistogramEnd-to-end ingest latency (p50/p95/p99)
keplor_batch_flushes_totalcounterBatch flush operations
keplor_batch_events_flushed_totalcounterEvents written to SQLite
keplor_batch_flush_errors_totalcounterBatch flush failures
keplor_auth_successes_totalcounterSuccessful auth attempts
keplor_auth_failures_totalcounterAuth failures (missing or invalid)

Next steps

API Reference — all endpoints, parameters, and response shapes.
Configuration — TOML config, env vars, auth keys.
Quickstart — install and run Keplor in 2 minutes.