Integration Guide
Everything a service needs to log LLM traffic through Keplor.
How it works
Your application or gateway makes LLM calls as usual, then POSTs event data to Keplor. Keplor computes cost from its bundled pricing catalog, compresses and stores the event, and makes it queryable via the API.
Your App / Gateway Keplor
| |
|-- LLM call --> Provider |
|<-- response -- |
| |
|-- POST /v1/events ---------> | validate, compute cost,
|<-- 201 {id, cost} ---------- | compress, store Keplor is observational only — it never touches your LLM traffic. It just records what happened.
Minimal example
Send the two required fields (model and provider) plus token counts:
$ curl -X POST http://localhost:8080/v1/events \
-H "Authorization: Bearer sk-your-key" \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"provider": "openai",
"usage": {"input_tokens": 1000, "output_tokens": 500},
"latency": {"ttft_ms": 30, "total_ms": 450},
"user_id": "alice",
"source": "my-app"
}' Response:
{
"id": "01JA2B3C4D5E6F7G8H9J0KMNPQ",
"cost_nanodollars": 6250000,
"model": "gpt-4o",
"provider": "openai"
} Cost is auto-computed: 6250000 nanodollars = $0.00625.
Authentication
When API keys are configured, include a Bearer token:
Authorization: Bearer sk-your-key Key formats
| Config format | Key ID | Tier |
|---|---|---|
"prod-svc:sk-abc123" | prod-svc | default_tier |
"sk-abc123" | key_<sha256-prefix> | default_tier |
{ id, secret, tier } | id value | explicit tier |
Retention tiers
Each API key is assigned a retention tier that controls how long its events are kept. Configure tiers in [retention]:
[retention]
default_tier = "free"
[[retention.tiers]]
name = "free"
days = 7
[[retention.tiers]]
name = "pro"
days = 90
[[retention.tiers]]
name = "team"
days = 180 Assign tiers to keys using the extended format:
[[auth.api_key_entries]]
id = "pro-user"
secret = "sk-pro-key"
tier = "pro" Tier names are fully configurable — add "enterprise", "trial", or any custom name. GC runs one pass per tier automatically.
Server-side key attribution
When auth is enabled, Keplor overrides the client-provided api_key_id with the authenticated key's ID and assigns the key's retention tier. This prevents clients from spoofing attribution.
Open mode
When no keys are configured (the default), auth is disabled. All requests are accepted without a Bearer token and assigned to default_tier.
What to send
Only model and provider are required. Everything else is optional with sensible defaults.
Required fields
| Field | Type | Description |
|---|---|---|
model | string | Model name ("gpt-4o", "claude-sonnet-4-20250514") |
provider | string | Provider key (see supported providers) |
Token usage
| Field | Type | Default | Description |
|---|---|---|---|
usage.input_tokens | u32 | 0 | Input/prompt tokens |
usage.output_tokens | u32 | 0 | Output/completion tokens |
usage.cache_read_input_tokens | u32 | 0 | Tokens served from cache |
usage.cache_creation_input_tokens | u32 | 0 | Tokens written to cache |
usage.reasoning_tokens | u32 | 0 | Chain-of-thought / thinking tokens |
usage.audio_input_tokens | u32 | 0 | Audio input tokens |
usage.audio_output_tokens | u32 | 0 | Audio output tokens |
usage.image_tokens | u32 | 0 | Image/vision tokens |
usage.tool_use_tokens | u32 | 0 | Tool/function call tokens |
Latency
| Field | Type | Description |
|---|---|---|
latency.ttft_ms | u32 | Time to first byte (ms) |
latency.total_ms | u32 | End-to-end latency (ms) |
latency.time_to_close_ms | u32 | Time from last token to stream close |
Attribution
| Field | Type | Description |
|---|---|---|
user_id | string | User identity for cost attribution |
api_key_id | string | API key (overridden by server when auth enabled) |
org_id | string | Organization ID |
project_id | string | Project ID |
route_id | string | Logical route name ("chat", "embeddings") |
source | string | Name of the sending system |
Flags
| Field | Type | Default | Description |
|---|---|---|---|
flags.streaming | bool | false | Response was streamed |
flags.tool_calls | bool | false | Included tool/function calls |
flags.reasoning | bool | false | Used extended thinking |
flags.stream_incomplete | bool | false | Stream ended prematurely |
flags.cache_used | bool | false | Response served from cache |
Other optional fields
| Field | Type | Description |
|---|---|---|
cost_nanodollars | i64 | Override auto-computed cost (nanodollars) |
timestamp | i64 or string | Epoch nanos or ISO 8601 (default: server time) |
method | string | HTTP method (default: "POST") |
endpoint | string | API path ("/v1/chat/completions") |
http_status | u16 | Upstream HTTP status code |
error.kind | string | Error category ("rate_limited", etc.) |
error.message | string | Error message text |
error.status | u16 | Error HTTP status |
trace_id | string | W3C trace ID |
request_id | string | Provider request ID |
client_ip | string | Client source IP |
user_agent | string | Client user-agent |
request_body | any JSON | Full request body (stored compressed) |
response_body | any JSON | Full response body (stored compressed) |
metadata | any JSON | Arbitrary metadata (queryable via user_tag/session_tag) |
Full event example
{
"model": "claude-sonnet-4-20250514",
"provider": "anthropic",
"cost_nanodollars": null,
"usage": {
"input_tokens": 2000,
"output_tokens": 1000,
"cache_read_input_tokens": 500,
"cache_creation_input_tokens": 0,
"reasoning_tokens": 0
},
"latency": {
"ttft_ms": 45,
"total_ms": 800,
"time_to_close_ms": 20
},
"timestamp": "2024-11-15T10:30:00Z",
"method": "POST",
"endpoint": "/v1/messages",
"http_status": 200,
"source": "litellm",
"user_id": "alice",
"api_key_id": "my-service",
"org_id": "acme-corp",
"project_id": "chatbot-v2",
"route_id": "chat",
"flags": {
"streaming": true,
"tool_calls": false,
"reasoning": true,
"stream_incomplete": false,
"cache_used": true
},
"error": null,
"trace_id": "4bf92f3577b34da6a3ce929d0e0e4736",
"request_id": "req_abc123",
"client_ip": "10.0.1.42",
"user_agent": "my-app/1.0",
"request_body": {"messages": [{"role": "user", "content": "Hello"}]},
"response_body": {"content": [{"type": "text", "text": "Hi there!"}]},
"metadata": {"session_id": "sess_xyz", "user_tag": "premium"}
} What you get back
Every successful ingest returns:
| Field | Type | Description |
|---|---|---|
id | string | ULID (time-sortable unique ID) |
cost_nanodollars | i64 | Computed or overridden cost |
model | string | Normalized model name |
provider | string | Normalized provider key |
Supported providers
| Provider key | Service |
|---|---|
openai | OpenAI (api.openai.com) |
anthropic | Anthropic (api.anthropic.com) |
gemini | Google AI Studio |
vertex_ai | Google Vertex AI |
bedrock | AWS Bedrock |
azure | Azure OpenAI |
mistral | Mistral AI |
groq | Groq |
xai | xAI Grok |
deepseek | DeepSeek |
cohere | Cohere v2 |
ollama | Ollama (local) |
Any unrecognized provider string is treated as OpenAI-compatible. Matching is case-insensitive.
Cost accounting
Costs are stored as int64 nanodollars (10-9 USD) to avoid floating-point precision issues.
| Nanodollars | USD |
|---|---|
1,000,000,000 | $1.00 |
1,000,000 | $0.001 |
1,000 | $0.000001 |
When you omit cost_nanodollars, Keplor computes it from the model pricing catalog and your usage token counts. This handles prompt caching discounts, reasoning token pricing, and audio/image tokens automatically.
To override: set cost_nanodollars to your own value. Unknown models get cost 0.
Batch ingestion
For high-throughput scenarios, use /v1/events/batch with up to 10,000 events per request:
$ curl -X POST http://localhost:8080/v1/events/batch \
-H "Authorization: Bearer sk-your-key" \
-H "Content-Type: application/json" \
-d '{
"events": [
{"model": "gpt-4o", "provider": "openai", "usage": {"input_tokens": 500}},
{"model": "claude-sonnet-4-20250514", "provider": "anthropic",
"usage": {"input_tokens": 800, "output_tokens": 200}}
]
}' Response (201 all accepted, 207 partial):
{
"results": [
{"id": "01JA...", "cost_nanodollars": 1250000, "model": "gpt-4o", "provider": "openai"},
{"id": "01JB...", "cost_nanodollars": 4200000, "model": "claude-sonnet-4-20250514", "provider": "anthropic"}
],
"accepted": 2,
"rejected": 0
} Batch writes are fire-and-forget: events are validated synchronously but flushed to disk asynchronously (~50ms). Events may be lost on server crash before flush.
Querying your data
Check cost for a user:
$ curl "http://localhost:8080/v1/quota?user_id=alice&from=1700000000000000000" \
-H "Authorization: Bearer sk-your-key" {"cost_nanodollars": 150000000, "event_count": 85} See the API Reference for all query endpoints: events, rollups, stats, and quota.
Error handling
| Status | Meaning | Retry? |
|---|---|---|
201 | Created | No |
207 | Partial success (batch) | Retry failed items |
400 | Validation error or bad JSON | Fix request |
401 | Missing or invalid API key | Fix auth |
408 | Request timeout | Yes |
422 | Unprocessable entity | Fix payload |
429 | Rate limit exceeded | Yes, after Retry-After seconds |
500 | Server error | Yes, with backoff |
503 | Server overloaded | Yes, with backoff |
All errors return {"error": "message"}. All responses include an X-Request-Id header for log correlation.
Idempotency
To safely retry failed requests without creating duplicates, include an Idempotency-Key header:
curl -X POST http://localhost:8080/v1/events \
-H "Idempotency-Key: my-unique-key-123" \
-H "Content-Type: application/json" \
-d '{"model": "gpt-4o", "provider": "openai"}' If the same key is sent again within the TTL (default 5 minutes), Keplor returns the cached response without creating a new event.
Integration examples
Python
import requests, time
KEPLOR = "http://localhost:8080"
HEADERS = {
"Authorization": "Bearer sk-your-key",
"Content-Type": "application/json",
}
# After each LLM call, log to Keplor
def log_llm_event(model, provider, usage, latency_ms, user_id=None):
return requests.post(f"{KEPLOR}/v1/events", headers=HEADERS, json={
"model": model,
"provider": provider,
"usage": usage,
"latency": {"total_ms": latency_ms},
"http_status": 200,
"user_id": user_id,
"source": "my-app",
}).json()
result = log_llm_event("gpt-4o", "openai", {
"input_tokens": 1500,
"output_tokens": 800,
}, 450, user_id="alice")
print(f"Event {result['id']} cost: ${result['cost_nanodollars'] / 1e9:.6f}") Node.js
const KEPLOR = "http://localhost:8080";
const headers = {
"Authorization": "Bearer sk-your-key",
"Content-Type": "application/json",
};
async function logLlmCall(model, provider, usage, latencyMs, userId) {
const resp = await fetch(`${KEPLOR}/v1/events`, {
method: "POST",
headers,
body: JSON.stringify({
model, provider, usage,
latency: { total_ms: latencyMs },
http_status: 200,
user_id: userId,
source: "my-node-app",
}),
});
return resp.json();
}
const result = await logLlmCall("gpt-4o", "openai",
{ input_tokens: 1200, output_tokens: 600 }, 350, "alice");
console.log(`Cost: $${(result.cost_nanodollars / 1e9).toFixed(6)}`); LiteLLM callback
import litellm, requests
KEPLOR = "http://localhost:8080"
def keplor_callback(kwargs, completion_response, start_time, end_time):
latency_ms = int((end_time - start_time).total_seconds() * 1000)
usage = completion_response.get("usage", {})
requests.post(f"{KEPLOR}/v1/events", json={
"model": kwargs.get("model", ""),
"provider": kwargs.get("custom_llm_provider", "openai"),
"usage": {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
},
"latency": {"total_ms": latency_ms},
"http_status": 200,
"user_id": kwargs.get("user"),
"source": "litellm",
})
litellm.success_callback = [keplor_callback] Production operations
Configuration
[server]
listen_addr = "0.0.0.0:8080"
shutdown_timeout_secs = 25 # drain batch writer + WAL checkpoint
request_timeout_secs = 30 # per-request timeout (408 on exceed)
max_connections = 10000 # concurrent connection limit (65000 for 50K+ users)
[storage]
db_path = "keplor.db"
retention_days = 90 # legacy global GC (prefer [retention] tiers)
wal_checkpoint_secs = 300 # WAL truncation interval
gc_interval_secs = 3600 # GC run frequency (0 = disabled)
read_pool_size = 4 # SQLite read connections (use 16 for high concurrency)
[auth]
api_keys = ["prod-svc:sk-abc"] # simple format (empty = open mode)
# Extended format with tier:
# [[auth.api_key_entries]]
# id = "pro-user"
# secret = "sk-pro-key"
# tier = "pro"
[retention]
default_tier = "free"
[[retention.tiers]]
name = "free"
days = 7
[[retention.tiers]]
name = "pro"
days = 90
[pipeline]
batch_size = 64 # use 256 for high throughput
max_body_bytes = 10485760 # 10 MB
channel_capacity = 32768 # batch writer queue depth
[idempotency]
enabled = true # dedup retries via Idempotency-Key header
ttl_secs = 300 # 5 minute cache TTL
max_entries = 100000
[rate_limit]
enabled = false # per-key rate limiting (429 on exceed)
requests_per_second = 100.0
burst = 200
# [tls] # optional HTTPS
# cert_path = "/etc/keplor/cert.pem"
# key_path = "/etc/keplor/key.pem" Override any field with KEPLOR_<SECTION>_<FIELD> environment variables. See Configuration for the full reference.
Event archival (S3 / R2 / MinIO)
For long-term retention beyond what SQLite should hold, archive old events to any S3-compatible object store. Build with the s3 feature and add an [archive] section.
What moves: Entire events — serialized to JSONL, compressed with zstd, uploaded as files partitioned by user and day. Daily rollups stay in SQLite for fast aggregation.
Cloudflare R2
R2 has 10 GB free storage and zero egress fees.
[archive]
bucket = "keplor-archive"
endpoint = "https://<account-id>.r2.cloudflarestorage.com"
region = "auto"
access_key_id = "your-r2-access-key"
secret_access_key = "your-r2-secret-key"
prefix = "events"
archive_after_days = 30 AWS S3
[archive]
bucket = "keplor-archive"
endpoint = "https://s3.us-east-1.amazonaws.com"
region = "us-east-1"
access_key_id = "AKIA..."
secret_access_key = "..."
prefix = "events"
archive_after_days = 30 MinIO (self-hosted)
[archive]
bucket = "keplor-archive"
endpoint = "http://localhost:9000"
region = "us-east-1"
access_key_id = "minioadmin"
secret_access_key = "minioadmin"
path_style = true # required for MinIO
archive_after_days = 30 Archival runs every archive_interval_secs (default 1 hour). Events are grouped by (user_id, day), compressed, and uploaded. Archived events are deleted from SQLite; VACUUM reclaims disk space. S3 connectivity is verified at startup.
Important: Set archive_after_days lower than your shortest retention tier, or GC will delete events before archival. See Event Archival for the full lifecycle.
Build command: cargo build --release --features mimalloc,s3
JSON structured logging
$ keplor run --json-logs Emits newline-delimited JSON for log aggregation (Loki, Datadog, CloudWatch).
Graceful shutdown
On SIGINT/SIGTERM, Keplor stops accepting connections, drains the batch writer (flushes all pending events), runs a WAL checkpoint, and exits. Drain waits up to shutdown_timeout_secs.
Automated GC
Keplor runs tiered garbage collection every gc_interval_secs (default: 1 hour). Each configured retention tier gets its own pass — free-tier events older than 7 days are deleted independently of pro-tier events at 90 days.
Set gc_interval_secs = 0 to disable. You can still run keplor gc --older-than-days N manually.
Prometheus metrics
Scrape GET /metrics (no auth required).
| Metric | Type | Description |
|---|---|---|
keplor_events_ingested_total | counter | Events ingested by provider |
keplor_events_errors_total | counter | Errors by stage (validation, store, queue_full) |
keplor_ingest_duration_seconds | histogram | End-to-end ingest latency (p50/p95/p99) |
keplor_batch_flushes_total | counter | Batch flush operations |
keplor_batch_events_flushed_total | counter | Events written to SQLite |
keplor_batch_flush_errors_total | counter | Batch flush failures |
keplor_auth_successes_total | counter | Successful auth attempts |
keplor_auth_failures_total | counter | Auth failures (missing or invalid) |
Next steps
API Reference — all endpoints, parameters, and response shapes.
Configuration — TOML config, env vars, auth keys.
Quickstart — install and run Keplor in 2 minutes.