Skip to content

Observability

cogcache ships with first-class observability via MetricsCollector — a thread-safe, bounded event aggregator with pluggable sinks.

JSON snapshot

from cogcache import CogniCache, MetricsCollector

metrics = MetricsCollector(history_size=10_000)
cache = CogniCache(similarity_threshold=0.92, metrics=metrics)

# ... handle traffic ...

print(metrics.snapshot())
{
  "total": 1247,
  "hits": 542,
  "misses": 705,
  "hit_rate": 0.4348,
  "tokens_saved": 162600,
  "tokens_used": 211500,
  "token_savings_ratio": 0.4348,
  "latency_ms": {
    "all":  { "p50": 1200.0, "p95": 6800.0, "p99": 9100.0 },
    "hit":  { "p50": 0.8,    "p95": 1.4,    "p99": 2.1    },
    "miss": { "p50": 3200.0, "p95": 7100.0, "p99": 9300.0 }
  },
  "window_size": 1247,
  "since": 1735678900.123
}

The collector keeps a bounded ring buffer (deque(maxlen=history_size)) so memory doesn't grow unbounded under load.

Prometheus

pip install cogcache[prometheus]
from cogcache import CogniCache, MetricsCollector
from cogcache.metrics.prometheus import PrometheusSink

metrics = MetricsCollector()
prom = PrometheusSink()           # creates its own CollectorRegistry
metrics.attach(prom)

cache = CogniCache(metrics=metrics)

Then expose the metrics in your web framework:

from fastapi import Response

@app.get("/metrics")
def expose_metrics():
    return Response(
        content=prom.expose(),
        media_type=prom.content_type(),
    )
@app.route("/metrics")
def expose_metrics():
    from flask import Response
    return Response(prom.expose(), mimetype=prom.content_type())

Exported metric names

Metric Type Labels
cogcache_queries_total Counter cache_hit, route, intent
cogcache_query_latency_seconds Histogram cache_hit
cogcache_tokens_total Counter kind (saved, used)
cogcache_quality_score Gauge
# Hit rate (rolling 5min)
sum(rate(cogcache_queries_total{cache_hit="true"}[5m]))
/
sum(rate(cogcache_queries_total[5m]))

# P99 latency by hit/miss
histogram_quantile(0.99,
  sum(rate(cogcache_query_latency_seconds_bucket[5m]))
  by (cache_hit, le)
)

# Tokens saved per second
rate(cogcache_tokens_total{kind="saved"}[1m])

# Cumulative cost savings (assuming $X per 1K tokens)
increase(cogcache_tokens_total{kind="saved"}[24h]) * 0.0002 / 1000

Alert rules

groups:
  - name: cogcache
    rules:
      - alert: CogCacheHitRateLow
        expr: |
          (sum(rate(cogcache_queries_total{cache_hit="true"}[5m])) /
           sum(rate(cogcache_queries_total[5m]))) < 0.10
        for: 30m
        annotations:
          summary: "Hit rate dropped below 10%  threshold may be too strict"

      - alert: CogCacheHitLatencyHigh
        expr: |
          histogram_quantile(0.99,
            sum(rate(cogcache_query_latency_seconds_bucket{cache_hit="true"}[5m]))
            by (le)
          ) > 0.05
        for: 10m
        annotations:
          summary: "P99 hit latency exceeds 50ms  embedding may be the bottleneck"

Custom sinks

Implement the MetricsSink ABC to push events to InfluxDB, Datadog, CloudWatch, etc:

from cogcache.metrics import MetricsSink, QueryEvent

class InfluxSink(MetricsSink):
    def __init__(self, client):
        self._client = client

    def record_query(self, event: QueryEvent) -> None:
        self._client.write_point({
            "measurement": "cogcache_query",
            "tags": {
                "cache_hit": event.cache_hit,
                "route": event.route,
                "intent": event.intent,
            },
            "fields": {
                "latency_ms": event.latency_ms,
                "tokens_saved": event.tokens_saved,
                "quality_score": event.quality_score,
            },
            "time": event.timestamp,
        })

metrics.attach(InfluxSink(my_influx_client))

A crashing sink does not break event recording — its exception is caught and logged, other sinks still receive the event.