Observability¶
cogcache ships with first-class observability via MetricsCollector —
a thread-safe, bounded event aggregator with pluggable sinks.
JSON snapshot¶
from cogcache import CogniCache, MetricsCollector
metrics = MetricsCollector(history_size=10_000)
cache = CogniCache(similarity_threshold=0.92, metrics=metrics)
# ... handle traffic ...
print(metrics.snapshot())
{
"total": 1247,
"hits": 542,
"misses": 705,
"hit_rate": 0.4348,
"tokens_saved": 162600,
"tokens_used": 211500,
"token_savings_ratio": 0.4348,
"latency_ms": {
"all": { "p50": 1200.0, "p95": 6800.0, "p99": 9100.0 },
"hit": { "p50": 0.8, "p95": 1.4, "p99": 2.1 },
"miss": { "p50": 3200.0, "p95": 7100.0, "p99": 9300.0 }
},
"window_size": 1247,
"since": 1735678900.123
}
The collector keeps a bounded ring buffer (deque(maxlen=history_size))
so memory doesn't grow unbounded under load.
Prometheus¶
from cogcache import CogniCache, MetricsCollector
from cogcache.metrics.prometheus import PrometheusSink
metrics = MetricsCollector()
prom = PrometheusSink() # creates its own CollectorRegistry
metrics.attach(prom)
cache = CogniCache(metrics=metrics)
Then expose the metrics in your web framework:
Exported metric names¶
| Metric | Type | Labels |
|---|---|---|
cogcache_queries_total |
Counter | cache_hit, route, intent |
cogcache_query_latency_seconds |
Histogram | cache_hit |
cogcache_tokens_total |
Counter | kind (saved, used) |
cogcache_quality_score |
Gauge | — |
Recommended Grafana panels¶
# Hit rate (rolling 5min)
sum(rate(cogcache_queries_total{cache_hit="true"}[5m]))
/
sum(rate(cogcache_queries_total[5m]))
# P99 latency by hit/miss
histogram_quantile(0.99,
sum(rate(cogcache_query_latency_seconds_bucket[5m]))
by (cache_hit, le)
)
# Tokens saved per second
rate(cogcache_tokens_total{kind="saved"}[1m])
# Cumulative cost savings (assuming $X per 1K tokens)
increase(cogcache_tokens_total{kind="saved"}[24h]) * 0.0002 / 1000
Alert rules¶
groups:
- name: cogcache
rules:
- alert: CogCacheHitRateLow
expr: |
(sum(rate(cogcache_queries_total{cache_hit="true"}[5m])) /
sum(rate(cogcache_queries_total[5m]))) < 0.10
for: 30m
annotations:
summary: "Hit rate dropped below 10% — threshold may be too strict"
- alert: CogCacheHitLatencyHigh
expr: |
histogram_quantile(0.99,
sum(rate(cogcache_query_latency_seconds_bucket{cache_hit="true"}[5m]))
by (le)
) > 0.05
for: 10m
annotations:
summary: "P99 hit latency exceeds 50ms — embedding may be the bottleneck"
Custom sinks¶
Implement the MetricsSink ABC to push events to InfluxDB, Datadog,
CloudWatch, etc:
from cogcache.metrics import MetricsSink, QueryEvent
class InfluxSink(MetricsSink):
def __init__(self, client):
self._client = client
def record_query(self, event: QueryEvent) -> None:
self._client.write_point({
"measurement": "cogcache_query",
"tags": {
"cache_hit": event.cache_hit,
"route": event.route,
"intent": event.intent,
},
"fields": {
"latency_ms": event.latency_ms,
"tokens_saved": event.tokens_saved,
"quality_score": event.quality_score,
},
"time": event.timestamp,
})
metrics.attach(InfluxSink(my_influx_client))
A crashing sink does not break event recording — its exception is caught and logged, other sinks still receive the event.