Skip to content

API Reference

Auto-generated from docstrings via mkdocstrings.

cogcache

CogniCache

Source code in cogcache/cache.py
class CogniCache:
    def __init__(
        self,
        redis_url: str | None = None,
        similarity_threshold: float = 0.92,
        max_cache_size: int = 100,
        vector_dim: int = 512,
        ttl: int = -1,
        enable_judge: bool = False,
        judge: LLMJudge | None = None,
        write_min_quality: float = 0.8,
        low_quality_ttl: int = 1800,
        judge_on_hit: bool = False,
        hit_min_quality: float = 0.6,
        store: CacheStore | None = None,
        embed_fn: Callable[[str], list[float]] | None = None,
        metrics: MetricsCollector | None = None,
        logger: Callable[[str], None] = print,
    ) -> None:
        if store is not None:
            self._store = store
        elif redis_url:
            from cogcache.store.redis_store import RedisStore
            self._store = RedisStore(
                url=redis_url,
                similarity_threshold=similarity_threshold,
                max_cache_size=max_cache_size,
                vector_dim=vector_dim,
            )
        else:
            self._store = MemoryStore(
                similarity_threshold=similarity_threshold,
                max_cache_size=max_cache_size,
            )

        self._embed = embed_fn or _default_embed
        self._default_ttl = int(ttl)
        self._judge: LLMJudge = judge or NoopJudge()
        self._judge_enabled = bool(enable_judge)
        self._write_min_quality = float(write_min_quality)
        self._low_quality_ttl = int(low_quality_ttl)
        self._judge_on_hit = bool(judge_on_hit)
        self._hit_min_quality = float(hit_min_quality)
        self._metrics = metrics
        self._log = logger

    @property
    def metrics(self) -> MetricsCollector | None:
        return self._metrics

    @property
    def store(self) -> CacheStore:
        return self._store

    def query(
        self,
        query: str,
        llm_fn: Callable[[str], str],
        *,
        route: str = "default",
        intent: str = "default",
    ) -> str:
        return self.query_with_meta(
            query, llm_fn, route=route, intent=intent
        ).answer

    def query_with_meta(
        self,
        query: str,
        llm_fn: Callable[[str], str],
        *,
        route: str = "default",
        intent: str = "default",
    ) -> QueryResult:
        t0 = time.perf_counter()
        embedding = self._embed(query)
        hit, sim = self._store.find_match(embedding, route=route, intent=intent)
        if hit is not None:
            if self._judge_enabled and self._judge_on_hit:
                self._fire_hit_judge(query, hit)
            latency_ms = (time.perf_counter() - t0) * 1000
            self._emit_event(QueryEvent(
                cache_hit=True, similarity=sim, latency_ms=latency_ms,
                tokens_saved=hit.tokens, tokens_used=0,
                quality_score=hit.quality_score, route=route, intent=intent,
            ))
            return QueryResult(
                answer=hit.answer, cache_hit=True, similarity=sim,
                quality_score=hit.quality_score, entry=hit,
            )

        answer = llm_fn(query)
        entry = self._maybe_store(query, embedding, answer, route=route, intent=intent)
        latency_ms = (time.perf_counter() - t0) * 1000
        tokens_used = entry.tokens if entry else 0
        quality = entry.quality_score if entry else 1.0
        self._emit_event(QueryEvent(
            cache_hit=False, similarity=sim, latency_ms=latency_ms,
            tokens_saved=0, tokens_used=tokens_used,
            quality_score=quality, route=route, intent=intent,
        ))
        return QueryResult(
            answer=answer, cache_hit=False, similarity=sim,
            quality_score=quality, entry=entry,
        )

    def _emit_event(self, event: QueryEvent) -> None:
        if self._metrics is None:
            return
        try:
            self._metrics.record_query(event)
        except Exception as e:
            self._log(f"[CogniCache] metrics record failed: {type(e).__name__}: {e}")

    def cached(
        self,
        threshold: float | None = None,
        *,
        route: str = "default",
        intent: str = "default",
    ) -> Callable[[Callable[[str], str]], Callable[[str], str]]:
        """Decorator: drop-in cache wrapper for any ``(str) -> str`` callable."""
        if threshold is not None:
            self._store.similarity_threshold = float(threshold)

        def decorator(fn: Callable[[str], str]) -> Callable[[str], str]:
            @wraps(fn)
            def wrapper(query: str) -> str:
                return self.query(query, fn, route=route, intent=intent)
            return wrapper

        return decorator

    def clear(self) -> None:
        self._store.clear()

    def list_entries(self) -> list[CacheEntry]:
        return self._store.list_entries()

    def _maybe_store(
        self,
        query: str,
        embedding: list[float],
        answer: str,
        *,
        route: str,
        intent: str,
    ) -> CacheEntry | None:
        quality = 1.0
        ttl = self._default_ttl

        if self._judge_enabled:
            try:
                quality = self._judge.score(query, answer)
            except Exception as e:
                self._log(
                    f"[CogniCache] judge crashed: {type(e).__name__}: {e}; fail-open"
                )
                quality = 1.0
            if quality < self._write_min_quality:
                if self._low_quality_ttl <= 0:
                    self._log(
                        f"[CogniCache] reject write quality={quality:.2f} "
                        f"< {self._write_min_quality:.2f}"
                    )
                    return None
                ttl = self._low_quality_ttl

        entry = CacheEntry(
            text=query, embedding=embedding, answer=answer, tokens=0,
            original_cost=0.0, route=route, intent=intent, complexity=0.0,
            quality_score=quality, ttl=ttl,
        )
        self._store.store(entry)
        return entry

    def _fire_hit_judge(self, query: str, entry: CacheEntry) -> None:
        def _run() -> None:
            try:
                score = self._judge.score(query, entry.answer, reasoning=entry.reasoning)
            except Exception as e:
                self._log(f"[CogniCache] hit-judge crashed: {type(e).__name__}: {e}")
                return
            if score < self._hit_min_quality:
                self._log(
                    f"[CogniCache] hit warning: {query[:60]!r} scored "
                    f"{score:.2f} < {self._hit_min_quality:.2f} "
                    f"(cached_text={entry.text[:60]!r})"
                )

        threading.Thread(target=_run, daemon=True).start()

cached(threshold=None, *, route='default', intent='default')

Decorator: drop-in cache wrapper for any (str) -> str callable.

Source code in cogcache/cache.py
def cached(
    self,
    threshold: float | None = None,
    *,
    route: str = "default",
    intent: str = "default",
) -> Callable[[Callable[[str], str]], Callable[[str], str]]:
    """Decorator: drop-in cache wrapper for any ``(str) -> str`` callable."""
    if threshold is not None:
        self._store.similarity_threshold = float(threshold)

    def decorator(fn: Callable[[str], str]) -> Callable[[str], str]:
        @wraps(fn)
        def wrapper(query: str) -> str:
            return self.query(query, fn, route=route, intent=intent)
        return wrapper

    return decorator

QueryResult dataclass

Source code in cogcache/cache.py
@dataclass
class QueryResult:
    answer: str
    cache_hit: bool
    similarity: float
    quality_score: float
    entry: CacheEntry | None = None

CacheEntry dataclass

Source code in cogcache/core/entry.py
@dataclass
class CacheEntry:
    text: str
    embedding: list[float]
    answer: str
    tokens: int
    original_cost: float
    route: str
    intent: str
    complexity: float
    hit_count: int = 0
    created_at: float = field(default_factory=time.time)

    reasoning: str = ""
    quality_score: float = 1.0
    ttl: int = -1
    tags: list[str] = field(default_factory=list)
    updated_at: float = field(default_factory=time.time)

    @property
    def query_vector(self) -> list[float]:
        return self.embedding

    @property
    def expires_at(self) -> float | None:
        """Wall-clock expiration timestamp; ``None`` if the entry never expires."""
        if self.ttl is None or self.ttl <= 0:
            return None
        return self.created_at + float(self.ttl)

    def is_expired(self, now: float | None = None) -> bool:
        exp = self.expires_at
        if exp is None:
            return False
        return (now if now is not None else time.time()) >= exp

expires_at property

Wall-clock expiration timestamp; None if the entry never expires.


cogcache.store

CacheStore

Bases: ABC

Source code in cogcache/store/base.py
class CacheStore(ABC):
    similarity_threshold: float
    max_cache_size: int

    @abstractmethod
    def store(self, entry: CacheEntry) -> None: ...

    @abstractmethod
    def find_match(
        self,
        embedding: list[float],
        route: str | None = None,
        intent: str | None = None,
    ) -> tuple[CacheEntry | None, float]: ...

    @abstractmethod
    def list_entries(self) -> list[CacheEntry]: ...

    @abstractmethod
    def clear(self) -> None: ...

    def __len__(self) -> int:
        return len(self.list_entries())

MemoryStore

Bases: CacheStore

Source code in cogcache/store/memory.py
class MemoryStore(CacheStore):
    def __init__(
        self,
        similarity_threshold: float = DEFAULT_SIMILARITY_THRESHOLD,
        max_cache_size: int = DEFAULT_MAX_CACHE_SIZE,
    ) -> None:
        self.similarity_threshold = float(similarity_threshold)
        self.max_cache_size = int(max_cache_size)
        self._cache: list[CacheEntry] = []

    def store(self, entry: CacheEntry) -> None:
        self._cache.append(entry)
        if len(self._cache) > self.max_cache_size:
            del self._cache[0: len(self._cache) - self.max_cache_size]

    def find_match(
        self,
        embedding: list[float],
        route: str | None = None,
        intent: str | None = None,
    ) -> tuple[CacheEntry | None, float]:
        self._purge_expired()
        best_entry: CacheEntry | None = None
        best_similarity = 0.0
        for entry in self._cache:
            sim = cosine_similarity(embedding, entry.embedding)
            if sim > best_similarity:
                best_entry, best_similarity = entry, sim
        if not best_entry:
            return None, round(best_similarity, 4)
        same_route = route is None or best_entry.route == route
        same_intent = intent is None or best_entry.intent == intent
        if best_similarity >= self.similarity_threshold and same_route and same_intent:
            return best_entry, round(best_similarity, 4)
        return None, round(best_similarity, 4)

    def list_entries(self) -> list[CacheEntry]:
        self._purge_expired()
        return self._cache

    def clear(self) -> None:
        self._cache.clear()

    def _purge_expired(self) -> int:
        """Drop expired entries in place. Returns the number purged."""
        if not self._cache:
            return 0
        now = time.time()
        kept = [e for e in self._cache if not e.is_expired(now)]
        purged = len(self._cache) - len(kept)
        if purged:
            self._cache[:] = kept
        return purged

cogcache.judge

LLMJudge

Bases: ABC

Source code in cogcache/judge/base.py
class LLMJudge(ABC):
    @abstractmethod
    def score(self, query: str, answer: str, reasoning: str = "") -> float:
        """Return a quality score in [0.0, 1.0]."""

score(query, answer, reasoning='') abstractmethod

Return a quality score in [0.0, 1.0].

Source code in cogcache/judge/base.py
@abstractmethod
def score(self, query: str, answer: str, reasoning: str = "") -> float:
    """Return a quality score in [0.0, 1.0]."""

NoopJudge

Bases: LLMJudge

Always returns 1.0. Useful for tests and as a safe default.

Source code in cogcache/judge/base.py
class NoopJudge(LLMJudge):
    """Always returns 1.0. Useful for tests and as a safe default."""

    def score(self, query: str, answer: str, reasoning: str = "") -> float:
        return 1.0

LLMJudgeOpenAI

Bases: LLMJudge

Source code in cogcache/judge/llm_judge.py
class LLMJudgeOpenAI(LLMJudge):
    def __init__(
        self,
        client: Any,
        model: str,
        system_prompt: str = DEFAULT_SYSTEM_PROMPT,
        max_tokens: int = 8,
        temperature: float = 0.0,
        timeout: float = 10.0,
        logger: Callable[[str], None] = print,
    ) -> None:
        self._client = client
        self._model = model
        self._system_prompt = system_prompt
        self._max_tokens = max_tokens
        self._temperature = temperature
        self._timeout = timeout
        self._logger = logger

    def score(self, query: str, answer: str, reasoning: str = "") -> float:
        user = f"Query:\n{query}\n\nAnswer:\n{answer}"
        if reasoning:
            user += f"\n\nReasoning:\n{reasoning}"
        try:
            response = self._client.chat.completions.create(
                model=self._model,
                messages=[
                    {"role": "system", "content": self._system_prompt},
                    {"role": "user", "content": user},
                ],
                max_tokens=self._max_tokens,
                temperature=self._temperature,
                timeout=self._timeout,
                # Qwen3 and similar thinking models: ensure the numeric
                # score lands in content rather than reasoning_content.
                extra_body={"enable_thinking": False},
            )
            msg = response.choices[0].message
            text = (getattr(msg, "content", None)
                    or getattr(msg, "reasoning_content", None)
                    or "").strip()
        except Exception as e:
            self._logger(f"[judge] scoring failed ({type(e).__name__}: {e}); fail-open=1.0")
            return 1.0

        match = _SCORE_PATTERN.search(text)
        if not match:
            self._logger(f"[judge] unparseable score {text!r}; fail-open=1.0")
            return 1.0
        try:
            value = float(match.group(0))
        except ValueError:
            return 1.0
        return max(0.0, min(1.0, value))

cogcache.metrics

MetricsCollector

Source code in cogcache/metrics/collector.py
class MetricsCollector:
    def __init__(self, history_size: int = 10_000) -> None:
        self._events: deque[QueryEvent] = deque(maxlen=history_size)
        self._lock = threading.Lock()
        self._sinks: list[MetricsSink] = []
        self._logger: Callable[[str], None] = print

    def attach(self, sink: MetricsSink) -> None:
        self._sinks.append(sink)

    def set_logger(self, logger: Callable[[str], None]) -> None:
        self._logger = logger

    def record_query(self, event: QueryEvent) -> None:
        with self._lock:
            self._events.append(event)
        for sink in self._sinks:
            try:
                sink.record_query(event)
            except Exception as e:
                # A broken sink must never break the request path.
                self._logger(
                    f"[metrics] sink {type(sink).__name__} crashed: "
                    f"{type(e).__name__}: {e}"
                )

    def snapshot(self) -> dict[str, Any]:
        with self._lock:
            events = list(self._events)

        if not events:
            return {
                "total": 0, "hits": 0, "misses": 0,
                "hit_rate": 0.0,
                "tokens_saved": 0, "tokens_used": 0,
                "token_savings_ratio": 0.0,
                "latency_ms": {
                    "all":  {"p50": 0.0, "p95": 0.0, "p99": 0.0},
                    "hit":  {"p50": 0.0, "p95": 0.0, "p99": 0.0},
                    "miss": {"p50": 0.0, "p95": 0.0, "p99": 0.0},
                },
                "window_size": 0,
                "since": None,
            }

        total = len(events)
        hits = sum(1 for e in events if e.cache_hit)
        misses = total - hits
        tokens_saved = sum(e.tokens_saved for e in events)
        tokens_used = sum(e.tokens_used for e in events)
        denom = tokens_saved + tokens_used
        savings_ratio = tokens_saved / denom if denom else 0.0

        all_lat = [e.latency_ms for e in events]
        hit_lat = [e.latency_ms for e in events if e.cache_hit]
        miss_lat = [e.latency_ms for e in events if not e.cache_hit]

        def buckets(values: list[float]) -> dict[str, float]:
            return {
                "p50": _percentile(values, 0.50),
                "p95": _percentile(values, 0.95),
                "p99": _percentile(values, 0.99),
            }

        return {
            "total": total,
            "hits": hits,
            "misses": misses,
            "hit_rate": hits / total,
            "tokens_saved": tokens_saved,
            "tokens_used": tokens_used,
            "token_savings_ratio": savings_ratio,
            "latency_ms": {
                "all": buckets(all_lat),
                "hit": buckets(hit_lat),
                "miss": buckets(miss_lat),
            },
            "window_size": total,
            "since": events[0].timestamp,
        }

    def reset(self) -> None:
        with self._lock:
            self._events.clear()

QueryEvent dataclass

Source code in cogcache/metrics/base.py
@dataclass(frozen=True)
class QueryEvent:
    cache_hit: bool
    similarity: float
    latency_ms: float
    tokens_saved: int = 0
    tokens_used: int = 0
    quality_score: float = 1.0
    route: str = "default"
    intent: str = "default"
    timestamp: float = field(default_factory=time.time)

MetricsSink

Bases: ABC

Source code in cogcache/metrics/base.py
class MetricsSink(ABC):
    @abstractmethod
    def record_query(self, event: QueryEvent) -> None: ...

cogcache.errors

CogniCacheError

Bases: Exception

Base class for all CogniCache errors.

Source code in cogcache/errors.py
class CogniCacheError(Exception):
    """Base class for all CogniCache errors."""

CacheStoreError

Bases: CogniCacheError

Base class for cache-store layer errors.

Source code in cogcache/errors.py
class CacheStoreError(CogniCacheError):
    """Base class for cache-store layer errors."""

CacheStoreConnectionError

Bases: CacheStoreError

The store backend could not be reached.

Source code in cogcache/errors.py
class CacheStoreConnectionError(CacheStoreError):
    """The store backend could not be reached."""

CacheStoreOperationError

Bases: CacheStoreError

A store operation (FT.SEARCH, HSET, etc.) failed.

Source code in cogcache/errors.py
class CacheStoreOperationError(CacheStoreError):
    """A store operation (FT.SEARCH, HSET, etc.) failed."""

ConfigurationError

Bases: CogniCacheError

Invalid configuration (unknown backend, missing kwargs, etc.).

Source code in cogcache/errors.py
class ConfigurationError(CogniCacheError):
    """Invalid configuration (unknown backend, missing kwargs, etc.)."""

EmbeddingError

Bases: CogniCacheError

Embedding generation failed and no fallback applied.

Source code in cogcache/errors.py
class EmbeddingError(CogniCacheError):
    """Embedding generation failed and no fallback applied."""

JudgeError

Bases: CogniCacheError

A Judge invocation failed.

Most call sites swallow Judge errors and fail open (returning 1.0). This type exists for the rare consumer that wants to opt out of that behavior and propagate failures.

Source code in cogcache/errors.py
class JudgeError(CogniCacheError):
    """A Judge invocation failed.

    Most call sites swallow Judge errors and fail open (returning 1.0).
    This type exists for the rare consumer that wants to opt out of
    that behavior and propagate failures.
    """

cogcache.embedding

cogcache.local_embedding(text)

Source code in cogcache/embedding/base.py
def local_embedding(text: str) -> list[float]:
    vector = [0.0] * _local_dimensions
    normalized = text.strip().lower()
    if not normalized:
        return vector
    padded = f"  {normalized}  "
    features = normalized.split() + [
        padded[i: i + 3] for i in range(max(1, len(padded) - 2))
    ]
    for feature in features:
        digest = hashlib.sha256(feature.encode("utf-8")).digest()
        bucket = int.from_bytes(digest[:4], "big") % _local_dimensions
        vector[bucket] += 1.0
    norm = math.sqrt(sum(v * v for v in vector))
    if norm == 0.0:
        return vector
    return [v / norm for v in vector]

cogcache.get_embedding(text)

Source code in cogcache/embedding/base.py
def get_embedding(text: str) -> list[float]:
    model = get_bge_model()
    if model is not None:
        try:
            embedding = model.encode(text, normalize_embeddings=True)
            return embedding.tolist()
        except Exception as e:
            _logger(f"[embedding] BGE failed ({e}), falling back to API")

    if _client is None:
        if not _embedding_fallback_enabled:
            raise RuntimeError("Embedding client not configured")
        return local_embedding(text)

    try:
        response = _client.embeddings.create(
            model=_embedding_model, input=text, encoding_format="float",
        )
        return _extract_embedding(response)
    except Exception:
        if not _embedding_fallback_enabled:
            raise
        return local_embedding(text)

cogcache.estimate_embedding_tokens(text)

Source code in cogcache/embedding/base.py
def estimate_embedding_tokens(text: str) -> int:
    return max(1, math.ceil(len(text) / 4))

cogcache.calc_embedding_cost(text)

Source code in cogcache/embedding/base.py
def calc_embedding_cost(text: str) -> float:
    return round(_embedding_cost_per_token * estimate_embedding_tokens(text), 8)