text: make auto-compaction actually fire — fix config lookup + max_tokens-aware layered trimming

Auto-compaction never triggered: multi_model_manager.config stores the
whitelisted build_runtime_kwargs() dict, which drops the per-model
auto_compact* keys (they survive only under _raw_cfg), so _resolve_compaction
always read the global default (False) and returned None. Read the keys via a
_raw_cfg fallback so per-model compaction config is honoured.

Also rework the over-context handling to count the reply reservation, since the
reply is generated into the same window (prompt + max_tokens <= n_ctx). Four
layers, cheapest first:
  1. fits as-is              -> nothing
  2. overflow within tol     -> trim max_tokens to fit (lossless)
  3. beyond tol & big prompt -> compact history (drop/summarize)
  4. single message too big  -> slice it (summarize its middle, keep head/tail)

The chars/4 estimate undercounts token-dense code/JSON, so trimming to the exact
n_ctx edge could still overflow; inflate the estimate by a configurable
estimate_safety (default 1.15) for all physical-fit decisions.

New CompactionConfig knobs (per-model overridable): tolerance_pct (20),
min_output (512), estimate_safety (1.15). Effective max_tokens is threaded back
to both the streaming and non-streaming generation paths.
Co-Authored-By: 's avatarClaude Opus 4.8 <noreply@anthropic.com>
parent 34d666d6
......@@ -320,9 +320,10 @@ def _estimate_tokens(messages) -> int:
return int(total / 4) + 8
def _compact_messages(messages, n_ctx, pct, strategy, summary_text=None):
"""Shrink an over-long message list to ~65% of n_ctx, keeping system messages
and the most recent turns. Returns (new_messages, info|None). Strategies:
def _compact_messages(messages, n_ctx, pct, strategy, summary_text=None, target=None):
"""Shrink an over-long message list to fit ``target`` tokens (default ~65% of
n_ctx), keeping system messages and the most recent turns. Returns
(new_messages, info|None). Strategies:
- drop_oldest : keep only system + the recent tail that fits.
- keep_head_tail: also keep the first user turn (context anchor) + a note.
- summarize : keep_head_tail, but replace the dropped middle with an LLM
......@@ -337,9 +338,14 @@ def _compact_messages(messages, n_ctx, pct, strategy, summary_text=None):
pct = 85.0
pct = min(99.0, max(50.0, pct))
est = _estimate_tokens(messages)
if est < n_ctx * pct / 100.0:
return messages, None
target = int(n_ctx * 0.65)
if target is None:
target = int(n_ctx * 0.65)
if est < n_ctx * pct / 100.0:
return messages, None
else:
target = max(1, int(target))
if est <= target:
return messages, None
sys_msgs = [m for m in messages if m.get("role") == "system"]
body = [m for m in messages if m.get("role") != "system"]
......@@ -472,6 +478,54 @@ async def _summarize_for_compact(manager, messages, keep_recent: int = 2,
return None
async def _slice_oversized(manager, messages, target, compact_n_ctx, progress=None):
"""Last-resort layer of auto-compaction: when a SINGLE message is itself larger
than the compaction ``target`` (so dropping other turns can't make the prompt
fit), shrink that message in place — keep a verbatim head + tail (so the actual
instruction / code survives) and replace the bulk middle with an LLM summary
produced by ``manager``. Returns a new message list; best-effort, leaving any
message that can't be summarized untouched."""
async def _emit(msg):
if progress:
try:
await progress(msg)
except Exception:
pass
# No single message should occupy more than ~60% of the prompt target.
per_msg_cap = max(512, int(target * 0.6))
chunk_chars = _summary_chunk_chars(compact_n_ctx)
out = []
for m in messages:
c = m.get("content")
if (m.get("role") == "system" or not isinstance(c, str)
or _estimate_tokens([m]) <= per_msg_cap):
out.append(m)
continue
try:
orig_tok = _estimate_tokens([m])
cap_chars = per_msg_cap * 4
head = c[:int(cap_chars * 0.35)]
tail = c[-int(cap_chars * 0.15):] if int(cap_chars * 0.15) else ""
middle = c[len(head):len(c) - len(tail)] if tail else c[len(head):]
await _emit(f"summarizing a large {m.get('role', '?')} message (~{orig_tok} tok)…")
chunks = [middle[i:i + chunk_chars]
for i in range(0, len(middle), chunk_chars)] or [middle]
parts = []
for ch in chunks:
s = await _summarize_one(manager, ch)
if s:
parts.append(s)
summary = "\n".join(parts).strip() or "[content omitted]"
nm = dict(m)
nm["content"] = (f"{head}\n\n[… {orig_tok}-token message sliced to fit the "
f"context window; middle summarized:\n{summary}\n…]\n\n{tail}")
out.append(nm)
except Exception as e:
print(f"[auto-compact] slice failed: {e}", flush=True)
out.append(m)
return out
def _resolve_compaction(request, current_manager):
"""Resolve effective auto-compaction settings for a request by merging the
per-model config over the global ``compaction`` defaults. Returns a plan dict
......@@ -483,6 +537,22 @@ def _resolve_compaction(request, current_manager):
except Exception:
_mmm = None
_cc = {}
# ``multi_model_manager.config`` stores a runtime-kwargs dict (built by
# build_runtime_kwargs), NOT the raw models.json entry — the per-model
# ``auto_compact*`` keys are only preserved under ``_raw_cfg``. Read from the
# runtime dict first, then fall back to the raw entry, so the flags aren't
# silently lost (which disabled compaction entirely).
_raw = _cc.get("_raw_cfg") if isinstance(_cc, dict) else None
if not isinstance(_raw, dict):
_raw = {}
def _mc(key, default):
if isinstance(_cc, dict) and key in _cc:
return _cc[key]
if key in _raw:
return _raw[key]
return default
_g = None
try:
from codai.admin.routes import config_manager as _cm
......@@ -494,12 +564,22 @@ def _resolve_compaction(request, current_manager):
def _gv(attr, default):
return getattr(_g, attr, default) if _g is not None else default
enabled = _cc.get("auto_compact", _gv("enabled", False))
enabled = _mc("auto_compact", _gv("enabled", False))
if not enabled:
return None
pct = _cc.get("auto_compact_pct", _gv("pct", 85)) or 85
strategy = (_cc.get("auto_compact_strategy") or _gv("strategy", "drop_oldest") or "drop_oldest").strip()
compact_model = (_cc.get("auto_compact_model") or _gv("model", "") or "").strip()
pct = _mc("auto_compact_pct", _gv("pct", 85)) or 85
strategy = (_mc("auto_compact_strategy", None) or _gv("strategy", "drop_oldest") or "drop_oldest").strip()
compact_model = (_mc("auto_compact_model", None) or _gv("model", "") or "").strip()
tol_pct = _mc("auto_compact_tolerance_pct", _gv("tolerance_pct", 20))
try:
tol_pct = max(0.0, float(tol_pct))
except (TypeError, ValueError):
tol_pct = 20.0
min_output = _mc("auto_compact_min_output", _gv("min_output", 512)) or 512
try:
safety = max(1.0, float(_mc("auto_compact_estimate_safety", _gv("estimate_safety", 1.15))))
except (TypeError, ValueError):
safety = 1.15
try:
n_ctx = current_manager.get_context_size() if current_manager else 0
......@@ -509,9 +589,14 @@ def _resolve_compaction(request, current_manager):
# NOTE: the summarizer model (``compact_model``) is resolved LAZILY in
# _auto_compact_events, only when the prompt is actually over threshold — so a
# configured separate model isn't loaded on every (under-threshold) request.
# ``max_tokens`` (the reply reservation) is counted against the window too —
# the model writes its reply into the same n_ctx as the prompt.
return {
"pct": float(pct), "strategy": strategy, "n_ctx": n_ctx,
"compact_model": compact_model, "current_manager": current_manager,
"max_tokens": getattr(request, "max_tokens", None),
"tolerance": 1.0 + tol_pct / 100.0, "min_output": int(min_output),
"safety": safety,
}
......@@ -552,55 +637,127 @@ async def _auto_compact_events(plan, messages):
n_ctx = plan["n_ctx"]
pct = plan["pct"]
strategy = plan["strategy"]
est = _estimate_tokens(messages)
if not n_ctx or est < n_ctx * pct / 100.0:
mt = int(plan.get("max_tokens") or 0)
tol = float(plan.get("tolerance") or 1.20)
min_out = int(plan.get("min_output") or 512)
safety = float(plan.get("safety") or 1.15)
if not n_ctx:
yield ("done", messages, None, None)
return
summary = None
if strategy == "summarize":
# Resolve the summarizer model now (may load a separate, smaller model).
compact_manager, compact_name, compact_n_ctx = _resolve_compact_manager(plan)
via = f" via {compact_name}" if compact_name and compact_name != "the model" else ""
yield ("status", f"🗜 Compacting context (~{est} tokens ≥ {int(pct)}% of {n_ctx})"
f" using '{strategy}'{via}…\n")
# Bridge the summarizer's progress callback to this generator through a
# queue so status lines stream to the client LIVE while it summarizes
# (summarization can take minutes on a large model).
_q: asyncio.Queue = asyncio.Queue()
_DONE = object()
async def _cb(msg):
await _q.put(f" • {msg}\n")
async def _run():
try:
return await _summarize_for_compact(
compact_manager, messages,
compact_n_ctx=compact_n_ctx, progress=_cb)
finally:
await _q.put(_DONE)
_task = asyncio.create_task(_run())
while True:
_ev = await _q.get()
if _ev is _DONE:
break
yield ("status", _ev)
summary = await _task
else:
yield ("status", f"🗜 Compacting context (~{est} tokens ≥ {int(pct)}% of {n_ctx})"
f" using '{strategy}'…\n")
new_messages, info = _compact_messages(messages, n_ctx, pct, strategy, summary)
if info:
yield ("status", f"✅ Context compacted: dropped {info['dropped']} message(s), "
f"~{info['before_tokens']}→{info['after_tokens']} tokens.\n")
# The reply is generated INTO the same window as the prompt, so the real
# constraint is prompt + max_tokens ≤ n_ctx. Four layers, cheapest first:
# 1. fits as-is → do nothing
# 2. overflow within ``tol`` of n_ctx → trim max_tokens to fit (lossless)
# 3. overflow beyond tolerance & prompt → compact history (drop/summarize)
# itself over target
# 4. a single message still over target → slice that message (summarize it)
# ``pest`` inflates the cheap chars/4 estimate by ``safety`` for every
# physical-fit decision — the raw estimate undercounts token-dense code/JSON
# prompts, and trimming to the exact n_ctx edge off an undercount still
# overflows the backend.
prompt_est = _estimate_tokens(messages)
pest = int(prompt_est * safety)
if pest + mt <= n_ctx:
yield ("done", messages, None, None)
return
tol_ctx = int(n_ctx * tol)
# Keep room for a real reply: reserve the requested output (capped at half the
# window so the prompt isn't squeezed to nothing); the kept prompt targets the
# rest, never more than ~65% of n_ctx.
reserve = max(min_out, min(mt or min_out, int(n_ctx * 0.5)))
target = min(int(n_ctx * 0.65), max(min_out, n_ctx - reserve))
info = None
compact_manager = compact_n_ctx = None
# Layer 3 — only when the overflow is beyond tolerance AND compacting history
# can actually help (the prompt itself exceeds the target; if it's small and a
# huge max_tokens is the problem, layer 2's trim handles it).
if pest + mt > tol_ctx and pest > target:
summary = None
if strategy == "summarize":
compact_manager, compact_name, compact_n_ctx = _resolve_compact_manager(plan)
via = f" via {compact_name}" if compact_name and compact_name != "the model" else ""
yield ("status", f"🗜 Compacting context (~{prompt_est}+{mt} tok > {int(tol * 100)}% "
f"of {n_ctx}) using '{strategy}'{via}…\n")
# Bridge the summarizer's progress callback to this generator through a
# queue so status lines stream to the client LIVE while it summarizes
# (summarization can take minutes on a large model).
_q: asyncio.Queue = asyncio.Queue()
_DONE = object()
async def _cb(msg):
await _q.put(f" • {msg}\n")
async def _run():
try:
return await _summarize_for_compact(
compact_manager, messages,
compact_n_ctx=compact_n_ctx, progress=_cb)
finally:
await _q.put(_DONE)
_task = asyncio.create_task(_run())
while True:
_ev = await _q.get()
if _ev is _DONE:
break
yield ("status", _ev)
summary = await _task
else:
yield ("status", f"🗜 Compacting context (~{prompt_est}+{mt} tok > {int(tol * 100)}% "
f"of {n_ctx}) using '{strategy}'…\n")
messages, info = _compact_messages(messages, n_ctx, pct, strategy, summary, target=target)
if info:
yield ("status", f"✅ Context compacted: dropped {info['dropped']} message(s), "
f"~{info['before_tokens']}→{info['after_tokens']} tokens.\n")
prompt_est = _estimate_tokens(messages)
pest = int(prompt_est * safety)
# Layer 4 — a single message is still bigger than the target; slice it.
if pest > target:
if compact_manager is None:
compact_manager, _cn, compact_n_ctx = _resolve_compact_manager(plan)
yield ("status", "✂ A single message exceeds the target; slicing it…\n")
_q2: asyncio.Queue = asyncio.Queue()
_DONE2 = object()
async def _cb2(msg):
await _q2.put(f" • {msg}\n")
async def _run2():
try:
return await _slice_oversized(
compact_manager, messages, target, compact_n_ctx, progress=_cb2)
finally:
await _q2.put(_DONE2)
_t2 = asyncio.create_task(_run2())
while True:
_ev = await _q2.get()
if _ev is _DONE2:
break
yield ("status", _ev)
messages = await _t2
prompt_est = _estimate_tokens(messages)
pest = int(prompt_est * safety)
# Layer 2 — trim the reply reservation so prompt + output fits the window.
if mt and pest + mt > n_ctx:
eff = max(min_out, n_ctx - pest)
if eff < mt:
plan["effective_max_tokens"] = eff
yield ("status", f"✂ Reducing max_tokens {mt}→{eff} to fit the {n_ctx}-token window.\n")
# Final guard: even a minimal reply won't fit — the prompt alone is too big.
err = None
if _estimate_tokens(new_messages) > n_ctx:
if pest + min_out > n_ctx:
err = ("The request is too large for this model's context window "
f"(~{_estimate_tokens(new_messages)} tokens vs n_ctx={n_ctx}) "
"even after auto-compaction. Shorten the latest message or "
"increase the model's context size (n_ctx).")
yield ("done", new_messages, info, err)
f"(~{prompt_est} prompt tokens vs n_ctx={n_ctx}) even after "
"auto-compaction. Shorten the latest message or increase the "
"model's context size (n_ctx).")
yield ("done", messages, info, err)
@router.post("/v1/chat/completions", summary="Chat completions")
......@@ -1205,6 +1362,11 @@ async def chat_completions(request: ChatCompletionRequest, http_request: Request
flush=True)
if _cerr:
raise HTTPException(status_code=400, detail=_cerr)
# Apply any max_tokens trim decided by the layered compaction so the reply
# reservation fits the window (the reply shares n_ctx with the prompt).
_eff_mt = _compact_plan.get("effective_max_tokens")
if _eff_mt is not None:
request.max_tokens = _eff_mt
# Convert tools to dict format if present
......@@ -2028,6 +2190,11 @@ async def stream_chat_response(
return
except Exception as _ce:
print(f"[auto-compact] streaming compaction failed: {_ce}", flush=True)
# Apply any max_tokens trim decided by the layered compaction so the reply
# reservation fits the window (the reply shares n_ctx with the prompt).
_eff_mt = compact_plan.get("effective_max_tokens")
if _eff_mt is not None:
max_tokens = _eff_mt
# Check if model is loaded - if not, notify waiting clients
# The model manager exists but backend may not be loaded yet in on-demand mode
......
......@@ -222,17 +222,28 @@ class CompactionConfig:
"""Global defaults for auto-compaction of an over-long chat history.
Per-model settings in a models.json entry (``auto_compact``,
``auto_compact_pct``, ``auto_compact_strategy``, ``auto_compact_model``)
OVERRIDE the values here; when a model leaves one unset, the global default
below applies. ``model`` selects which model performs the summarization for
the ``summarize`` strategy — empty means use the same model that serves the
request. Pointing it at a smaller/faster model lets that model summarize the
old turns while the big model answers; the dropped history is chunked to fit
the chosen summarizer's own context window before it is summarized."""
``auto_compact_pct``, ``auto_compact_strategy``, ``auto_compact_model``,
``auto_compact_tolerance_pct``) OVERRIDE the values here; when a model leaves
one unset, the global default below applies. ``model`` selects which model
performs the summarization for the ``summarize`` strategy — empty means use
the same model that serves the request. Pointing it at a smaller/faster model
lets that model summarize the old turns while the big model answers; the
dropped history is chunked to fit the chosen summarizer's own context window
before it is summarized.
The over-context check counts the prompt PLUS the request's ``max_tokens``
(the reply is generated into the same window). When that total overflows
``n_ctx`` by no more than ``tolerance_pct`` we leave history alone and just
trim ``max_tokens`` to fit (lossless); beyond that we compact, and as a last
resort slice any single message still larger than the target. ``min_output``
is the smallest reply (in tokens) we always try to leave room for."""
enabled: bool = False
pct: int = 85 # compact when the prompt reaches this % of n_ctx
strategy: str = "drop_oldest" # drop_oldest | keep_head_tail | summarize
model: str = "" # model id/alias that summarizes; "" = same as request
tolerance_pct: int = 20 # accept prompt+max_tokens up to this % over n_ctx (trim instead of compact)
min_output: int = 512 # always try to leave at least this many tokens for the reply
estimate_safety: float = 1.15 # inflate the chars/4 prompt estimate by this factor for fit/trim (it undercounts code/JSON)
@dataclass
......@@ -666,6 +677,9 @@ class ConfigManager:
"pct": self.config.compaction.pct,
"strategy": self.config.compaction.strategy,
"model": self.config.compaction.model,
"tolerance_pct": self.config.compaction.tolerance_pct,
"min_output": self.config.compaction.min_output,
"estimate_safety": self.config.compaction.estimate_safety,
},
"broker": {
"enabled": self.config.broker.enabled,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment