Phase 1: Configuration foundation - move CLI to JSON config

- Refactor cli.py to only support --debug and --config options
- Create ConfigManager class for loading/saving JSON configs
- Implement per-model configuration approach in models.json
- Create comprehensive design document for admin dashboard
- Set up admin package structure
- All model-specific settings now stored per-model instead of global defaults
parent bf1d3f52
"""Admin dashboard package for coderai."""
from .routes import router
__all__ = ['router']
"""Command-line argument parsing for codai server."""
import argparse
import json
import os
from pathlib import Path
def load_config_file(config_dir: Path) -> dict:
"""Load the main config.json file."""
config_path = config_dir / "config.json"
if config_path.exists():
with open(config_path, 'r') as f:
return json.load(f)
return {}
def load_models_file(config_dir: Path) -> dict:
"""Load the models.json file."""
models_path = config_dir / "models.json"
if models_path.exists():
with open(models_path, 'r') as f:
return json.load(f)
return {}
def load_auth_file(config_dir: Path) -> dict:
"""Load the auth.json file."""
auth_path = config_dir / "auth.json"
if auth_path.exists():
with open(auth_path, 'r') as f:
return json.load(f)
return {}
def setup_default_config(config_dir: Path):
"""Create default configuration files if they don't exist."""
config_dir.mkdir(parents=True, exist_ok=True)
# Default config.json
default_config = {
"version": "1.0",
"server": {
"host": "0.0.0.0",
"port": 8000,
"https": False,
"https_key_path": None,
"https_cert_path": None
},
"backend": {
"type": "auto",
"image_backend": "auto",
"audio_backend": "auto",
"tts_backend": "auto"
},
"models": {
"default_load_mode": "ondemand",
"loaded": [],
"preload": [],
"unloaded": []
},
"offload": {
"directory": "./offload",
"strategy": "auto",
"max_gpu_percent": None,
"no_ram": False,
"load_in_4bit": False,
"load_in_8bit": False,
"manual_ram_gb": None,
"flash_attention": False
},
"vulkan": {
"n_gpu_layers": -1,
"n_ctx": 2048,
"device_id": 0,
"single_gpu": False
},
"image": {
"llm_path": None,
"vae_path": None,
"sample_method": "res_multistep",
"steps": 4,
"width": 512,
"height": 512,
"cfg_scale": 1.0,
"precision": "f32",
"cpu_offload": False,
"seed": None,
"vae_tiling": False,
"clip_on_cpu": False
},
"whisper": {
"server_path": None,
"server_port": 8744
},
"system_prompt": None,
"tools_closer_prompt": False,
"grammar_guided": False,
"file_path": None,
"hf_chat_templates": [],
"reasoning_options": [],
"parser": "auto"
}
config_path = config_dir / "config.json"
if not config_path.exists():
with open(config_path, 'w') as f:
json.dump(default_config, f, indent=2)
# Default models.json
default_models = {
"text_models": [],
"image_models": [],
"audio_models": [],
"vision_models": [],
"tts_model": None,
"aliases": {}
}
models_path = config_dir / "models.json"
if not models_path.exists():
with open(models_path, 'w') as f:
json.dump(default_models, f, indent=2)
# Default auth.json with admin / admin
from pathlib import Path
import secrets
from argon2 import PasswordHasher
if hasattr(argon2, 'PasswordHasher'):
ph = argon2.PasswordHasher()
default_admin_hash = ph.hash("admin")
else:
default_admin_hash = "argon2id$v=19$m=65536,t=3,p=4$...admin_hash_placeholder"
default_auth = {
"users": [{
"id": 1,
"username": "admin",
"password_hash": default_admin_hash,
"role": "admin",
"created_at": "2026-05-03T00:00:00Z",
"must_change_password": True
}],
"tokens": [],
"sessions": {}
}
auth_path = config_dir / "auth.json"
if not auth_path.exists():
with open(auth_path, 'w') as f:
json.dump(default_auth, f, indent=2)
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="OpenAI-compatible API server supporting NVIDIA (CUDA) and Vulkan backends"
)
parser.add_argument(
"--model",
type=str,
action="append",
default=None,
help="Model name, path, or URL for text-to-text LLM. Can be specified multiple times for multiple models.",
)
parser.add_argument(
"--model-alias",
type=str,
action="append",
default=None,
dest="model_aliases",
nargs=2,
metavar=("ALIAS", "MODEL"),
help="Register an alias for a model. Format: --model-alias <alias_name> <actual_model>",
)
parser.add_argument(
"--backend",
type=str,
choices=["auto", "nvidia", "vulkan", "opencl"],
default="auto",
help="Backend to use: auto (detect), nvidia (CUDA), vulkan (AMD), or opencl",
)
parser.add_argument(
"--image-backend",
type=str,
choices=["auto", "nvidia", "vulkan", "opencl"],
default="auto",
help="Image generation backend: auto, nvidia (CUDA), vulkan (AMD), or opencl",
)
parser.add_argument(
"--audio-backend",
type=str,
choices=["auto", "nvidia", "vulkan", "opencl"],
default="auto",
help="Audio transcription backend: auto, nvidia (CUDA), vulkan (AMD), or opencl",
)
parser.add_argument(
"--tts-backend",
type=str,
choices=["auto", "nvidia", "vulkan", "opencl"],
default="auto",
help="TTS backend: auto, nvidia (CUDA), vulkan (AMD), or opencl",
)
parser.add_argument(
"--host",
type=str,
default="0.0.0.0",
help="Host to bind to (default: 0.0.0.0)",
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind to (default: 8000)",
)
parser.add_argument(
"--url",
type=str,
default="auto",
help="Base URL for media downloads: 'auto' (use request IP) or explicit URL (e.g., http://myserver:8000)",
)
parser.add_argument(
"--https",
action="store_true",
help="Enable HTTPS with auto-generated certificate",
)
parser.add_argument(
"--privkey",
type=str,
default=None,
help="Path to HTTPS private key file",
)
parser.add_argument(
"--pubkey",
type=str,
default=None,
help="Path to HTTPS certificate file",
)
parser.add_argument(
"--offload-dir",
type=str,
default="./offload",
help="Directory for disk offload (NVIDIA backend only, default: ./offload)",
)
parser.add_argument(
"--load-in-4bit",
action="store_true",
help="Load model in 4-bit precision (NVIDIA backend only, requires bitsandbytes)",
)
parser.add_argument(
"--load-in-8bit",
action="store_true",
help="Load model in 8-bit precision (NVIDIA backend only, requires bitsandbytes)",
)
parser.add_argument(
"--ram",
type=float,
default=None,
help="Maximum CPU RAM to use for model offloading in GB (NVIDIA backend only). Auto-detected if not specified. Disk offloading only occurs after this limit is exceeded.",
)
parser.add_argument(
"--flash-attn",
action="store_true",
help="Use Flash Attention 2 (NVIDIA backend only, requires flash-attn package)",
)
parser.add_argument(
"--offload-strategy",
type=str,
choices=["auto", "conservative", "balanced", "aggressive", "sequential", "none"],
default="auto",
help="Offload strategy for NVIDIA backend (default: auto). Use 'none' to disable CPU offloading and VRAM auto-detection entirely.",
)
parser.add_argument(
"--max-gpu-percent",
type=float,
default=None,
help="Maximum GPU VRAM to use as percentage (0-100). Overrides offload-strategy. Lower values offload more to CPU/RAM (default: None = use offload-strategy)",
)
parser.add_argument(
"--n-gpu-layers",
type=int,
default=-1,
help="Number of layers to offload to GPU (Vulkan backend only, default: -1 = all layers)",
)
parser.add_argument(
"--n-ctx",
type=int,
action="append",
default=None,
help="Context window size (Vulkan backend). Can be specified multiple times, one per --model.",
)
parser.add_argument(
"--vulkan-device",
type=int,
default=0,
help="Vulkan GPU device ID to use (Vulkan backend only, default: 0). Use --vulkan-list-devices to see available devices",
)
parser.add_argument(
"--vulkan-single-gpu",
action="store_true",
help="Force Vulkan to use only the specified GPU device (prevents layer distribution across multiple GPUs)",
)
parser.add_argument(
"--vulkan-list-devices",
action="store_true",
help="List available Vulkan GPU devices and exit",
)
parser.add_argument(
"--hf-chat-template",
action="append",
default=[],
help="Use HuggingFace apply_chat_template. Examples: --hf-chat-template auto (all models), --hf-chat-template text (all text), --hf-chat-template mymodel:llama3 (specific model with template). Can be repeated.",
)
parser.add_argument(
"--system-prompt",
nargs="?",
const=True,
default=None,
help="Inject a system prompt at the beginning of conversations. Use without a value for a default prompt, or provide custom text.",
)
# Multi-model arguments
parser.add_argument(
"--tts-model",
type=str,
default=None,
help="Model for text-to-speech (e.g., kokoro, or path/URL to Kokoro model). Can be specified multiple times.",
)
parser.add_argument(
"--audio-model",
type=str,
action="append",
default=None,
help="Model for audio transcription (e.g., whisper-1, base, or path to faster-whisper model). Can be specified multiple times for multiple models.",
)
parser.add_argument(
"--audio-1",
action="store_true",
help="Disable request queue for audio models - return 409 if model is busy",
)
parser.add_argument(
"--image-model",
type=str,
action="append",
default=None,
help="Model for image generation (e.g., stable-diffusion-xl-base-1.0). Can be specified multiple times for multiple models.",
)
parser.add_argument(
"--vision-model",
type=str,
action="append",
default=None,
help="Model for image/video-to-text (e.g., llava-1.5, LLaVA). Supports vulkan and cuda backends.",
)
parser.add_argument(
"--image-1",
action="store_true",
help="Disable request queue for image models - return 409 if model is busy",
description="OpenAI-compatible API server supporting NVIDIA (CUDA) and Vulkan backends",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Configuration: All settings are loaded from JSON config files in the
configuration directory (--config DIR, default: ~/.coderai/). Key files:
config.json - Server and backend settings
models.json - Model registry and configurations
auth.json - Users, tokens, and sessions"""
)
parser.add_argument(
"--llm-path",
"--config",
type=str,
default=None,
help="Path to CLIP LLM model for image generation (stable-diffusion-cpp-python).",
)
parser.add_argument(
"--vae-path",
type=str,
default=None,
help="Path to VAE model for image generation (stable-diffusion-cpp-python).",
default=os.path.expanduser("~/.coderai/"),
help="Configuration directory (default: ~/.coderai/)",
)
parser.add_argument(
"--image-sample-method",
type=str,
default="res_multistep",
help="Sample method for image generation (default: res_multistep for Z-Image Turbo).",
)
parser.add_argument(
"--image-steps",
type=int,
default=4,
help="Number of inference steps for image generation (default: 4 for Z-Image Turbo).",
)
parser.add_argument(
"--image-width",
type=int,
default=512,
help="Image width for generation (default: 512).",
)
parser.add_argument(
"--image-height",
type=int,
default=512,
help="Image height for generation (default: 512).",
)
parser.add_argument(
"--image-cfg-scale",
type=float,
default=1.0,
help="CFG scale for image generation (default: 1.0 for Z-Image Turbo).",
)
parser.add_argument(
"--image-precision",
type=str,
default="f32",
choices=["bf16", "f32", "f16", "f8"],
help="Model precision for image generation (default: f32). bf16 recommended for modern GPUs.",
)
parser.add_argument(
"--image-cpu-offload",
action="store_true",
help="Enable sequential CPU offload for image models (lower VRAM usage).",
)
parser.add_argument(
"--image-seed",
type=int,
default=None,
help="Default seed for image generation (default: random).",
)
parser.add_argument(
"--vae-tiling",
action="store_true",
help="Enable VAE tiling for lower VRAM usage (sd.cpp only).",
)
parser.add_argument(
"--clip-on-cpu",
action="store_true",
help="Run CLIP on CPU to save VRAM (sd.cpp only).",
)
parser.add_argument(
"--loadall",
action="store_true",
help="Load all models at startup. Tries VRAM first, offloads to CPU RAM if VRAM is full.",
)
parser.add_argument(
"--loadswap",
"--debug",
action="store_true",
help="Load first model in VRAM, others in CPU RAM. Swap active model between VRAM and CPU RAM on switch.",
help="Enable debug mode - dumps full request/response to stdout for troubleshooting",
)
parser.add_argument(
"--nopreload",
"--dump",
action="store_true",
help="Skip model pre-loading at startup. Models will load on first request using the active mode strategy (ondemand/loadswap/loadall).",
)
parser.add_argument(
"--audio-ctx",
type=int,
action="append",
default=None,
help="Audio model context size in milliseconds. Can be specified multiple times, one per --audio-model.",
)
parser.add_argument(
"--audio-offload",
type=float,
default=None,
help="Audio model GPU offload percentage (0-100). If not set, uses CPU",
)
parser.add_argument(
"--audio-vulkan-device",
type=int,
default=0,
help="Vulkan GPU device ID to use for Whisper audio transcription (default: 0). Only used when using Vulkan backend.",
)
parser.add_argument(
"--image-vulkan-device",
type=int,
default=None,
help="Vulkan GPU device ID to use for image generation models (default: same as --vulkan-device). Use --vulkan-list-devices to see available devices",
)
parser.add_argument(
"--whisper-cpp",
type=str,
default=None,
help="Path to whisper.cpp CLI executable (e.g., ~/whisper.cpp/build/bin/whisper-cli). Uses Vulkan if available.",
)
parser.add_argument(
"--whisper-server",
type=str,
default=None,
help="Path to whisper.cpp server executable (e.g., ~/whisper.cpp/build/bin/whisper-server). Keeps model loaded in VRAM.",
)
parser.add_argument(
"--whisper-server-port",
type=int,
default=8744,
help="Port for whisper-server (default: 8744).",
)
parser.add_argument(
"--image-ctx",
type=int,
action="append",
default=None,
help="Image model context size. Can be specified multiple times, one per --image-model.",
)
parser.add_argument(
"--image-offload",
type=float,
default=None,
help="Vision model GPU offload percentage (0-100). If not set, loads fully on GPU",
help="Dump model output: raw output, parsed output, and litellm debug info",
)
parser.add_argument(
"--list-cached-models",
......@@ -378,69 +203,9 @@ def parse_args():
help="File pattern for HuggingFace model downloads (e.g., .gguf, .safetensors). Default: .gguf for text models",
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode - dumps full request/response to stdout for troubleshooting",
)
parser.add_argument(
"--dump",
action="store_true",
help="Dump model output: raw output, parsed output, and litellm debug info",
)
parser.add_argument(
"--file-path",
type=str,
default=None,
help="Path to store generated files (images, audio). If specified, files will be saved here and served over web.",
)
parser.add_argument(
"--parser",
type=str,
default="auto",
choices=["auto", "litellm"],
help="Tool call parser to use: 'auto' for internal parser, 'litellm' for LiteLLM's parser. Default: auto",
)
# Custom type for comma-separated reasoning options
def reasoning_choices(value):
if not value:
return []
options = [v.strip().lower() for v in value.split(',')]
valid = {'chat', 'stop', 'inject', 'prompt', 'all', 'twopass', 'mock', 'raw'}
invalid = [o for o in options if o not in valid]
if invalid:
raise argparse.ArgumentTypeError(f"Invalid choices: {invalid}. Valid options: {valid}")
# Expand 'all' to all options
if 'all' in options:
options = ['chat', 'inject', 'prompt', 'mock', 'raw', 'twopass']
return options
parser.add_argument(
"--force-reasoning",
type=reasoning_choices,
default=None,
help="Force reasoning. Options: 'chat' (API), 'stop' (tokens), 'inject' (sys prompt), 'prompt' (seeding), 'twopass' (2 calls), 'mock' (fake stats), 'raw' (raw completion), 'all' (all options). Combine: --force-reasoning chat,inject.",
)
parser.add_argument(
"--grammar-guided-gen",
"--ggg",
action="store_true",
default=False,
help="Enable grammar-guided generation to reduce model hallucinations when using tools. Uses GBNF grammar for Vulkan backend and outlines for CUDA backend.",
)
parser.add_argument(
"--tools-closer-prompt",
action="store_true",
default=False,
help="Enable prompt distillation: place tool definitions right before the user's latest request instead of in the system prompt. This can improve tool call accuracy.",
)
parser.add_argument(
"--no-ram",
"--vulkan-list-devices",
action="store_true",
default=False,
help="Force model loading to maximize VRAM usage without CPU RAM spilling. "
"For llama-cpp-python: sets n_gpu_layers=-1, use_mmap=False, ignores --n-ctx. "
"For HuggingFace transformers: sets device_map='cuda:0', low_cpu_mem_usage=True, torch_dtype='auto'. "
"For diffusers: forces full GPU loading without CPU offload. "
"For sd.cpp: maximizes GPU layer offloading.",
help="List available Vulkan GPU devices and exit",
)
return parser.parse_args()
"""Configuration management for coderai."""
import json
import os
from pathlib import Path
from typing import Any, Dict, Optional
from dataclasses import dataclass, field
@dataclass
class ServerConfig:
"""Server configuration."""
host: str = "0.0.0.0"
port: int = 8000
https: bool = False
https_key_path: Optional[str] = None
https_cert_path: Optional[str] = None
@dataclass
class BackendConfig:
"""Backend configuration."""
type: str = "auto"
image_backend: str = "auto"
audio_backend: str = "auto"
tts_backend: str = "auto"
@dataclass
class ModelsConfig:
"""Models configuration."""
default_load_mode: str = "ondemand"
@dataclass
class OffloadConfig:
"""Offload configuration."""
directory: str = "./offload"
@dataclass
class Config:
"""Main configuration class."""
version: str = "1.0"
server: ServerConfig = field(default_factory=ServerConfig)
backend: BackendConfig = field(default_factory=BackendConfig)
models: ModelsConfig = field(default_factory=ModelsConfig)
offload: OffloadConfig = field(default_factory=OffloadConfig)
system_prompt: Optional[str] = None
tools_closer_prompt: bool = False
grammar_guided: bool = False
file_path: Optional[str] = None
hf_chat_templates: list = field(default_factory=list)
reasoning_options: list = field(default_factory=list)
parser: str = "auto"
class ConfigManager:
"""Manages configuration loading, saving, and validation."""
def __init__(self, config_dir: str):
"""Initialize the configuration manager.
Args:
config_dir: Path to the configuration directory
"""
self.config_dir = Path(config_dir).expanduser()
self.config_path = self.config_dir / "config.json"
self.models_path = self.config_dir / "models.json"
self.auth_path = self.config_dir / "auth.json"
self.config: Optional[Config] = None
self.models_data: Dict[str, Any] = {}
self.auth_data: Dict[str, Any] = {}
def ensure_config_dir(self):
"""Create configuration directory if it doesn't exist."""
self.config_dir.mkdir(parents=True, exist_ok=True)
def create_default_configs(self):
"""Create default configuration files."""
self.ensure_config_dir()
# Create default config.json
if not self.config_path.exists():
default_config = {
"version": "1.0",
"server": {
"host": "0.0.0.0",
"port": 8000,
"https": False,
"https_key_path": None,
"https_cert_path": None
},
"backend": {
"type": "auto",
"image_backend": "auto",
"audio_backend": "auto",
"tts_backend": "auto"
},
"models": {
"default_load_mode": "ondemand"
},
"offload": {
"directory": "./offload"
},
"system_prompt": None,
"tools_closer_prompt": False,
"grammar_guided": False,
"file_path": None,
"hf_chat_templates": [],
"reasoning_options": [],
"parser": "auto"
}
with open(self.config_path, 'w') as f:
json.dump(default_config, f, indent=2)
print(f"Created default config: {self.config_path}")
# Create default models.json
if not self.models_path.exists():
default_models = {
"text_models": [],
"image_models": [],
"audio_models": [],
"vision_models": [],
"tts_models": [],
"gguf_models": [],
"loaded": [],
"preload": [],
"unloaded": [],
"aliases": {}
}
with open(self.models_path, 'w') as f:
json.dump(default_models, f, indent=2)
print(f"Created default models config: {self.models_path}")
# Create default auth.json
if not self.auth_path.exists():
try:
from argon2 import PasswordHasher
ph = PasswordHasher()
default_admin_hash = ph.hash("admin")
except ImportError:
default_admin_hash = "argon2id$v=19$m=65536,t=3,p=4$...admin_hash_placeholder"
default_auth = {
"users": [{
"id": 1,
"username": "admin",
"password_hash": default_admin_hash,
"role": "admin",
"created_at": "2026-05-03T00:00:00Z",
"must_change_password": True
}],
"tokens": [],
"sessions": {}
}
with open(self.auth_path, 'w') as f:
json.dump(default_auth, f, indent=2)
print(f"Created default auth config: {self.auth_path}")
print("\nDefault credentials: admin / admin")
print("You will be prompted to change the password on first login.\n")
def load(self) -> Config:
"""Load configuration from files.
Returns:
Config object with loaded settings
"""
# Create defaults if config directory is empty or doesn't exist
if not self.config_dir.exists() or not any(self.config_dir.iterdir()):
self.create_default_configs()
# Load config.json
if self.config_path.exists():
with open(self.config_path, 'r') as f:
config_data = json.load(f)
# Parse into Config dataclass
self.config = Config(
version=config_data.get("version", "1.0"),
server=ServerConfig(**config_data.get("server", {})),
backend=BackendConfig(**config_data.get("backend", {})),
models=ModelsConfig(**config_data.get("models", {})),
offload=OffloadConfig(**config_data.get("offload", {})),
system_prompt=config_data.get("system_prompt"),
tools_closer_prompt=config_data.get("tools_closer_prompt", False),
grammar_guided=config_data.get("grammar_guided", False),
file_path=config_data.get("file_path"),
hf_chat_templates=config_data.get("hf_chat_templates", []),
reasoning_options=config_data.get("reasoning_options", []),
parser=config_data.get("parser", "auto")
)
else:
self.config = Config()
# Load models.json
if self.models_path.exists():
with open(self.models_path, 'r') as f:
self.models_data = json.load(f)
else:
self.models_data = {
"text_models": [],
"image_models": [],
"audio_models": [],
"vision_models": [],
"tts_models": [],
"gguf_models": [],
"loaded": [],
"preload": [],
"unloaded": [],
"aliases": {}
}
# Load auth.json
if self.auth_path.exists():
with open(self.auth_path, 'r') as f:
self.auth_data = json.load(f)
else:
self.auth_data = {
"users": [],
"tokens": [],
"sessions": {}
}
return self.config
def save_config(self):
"""Save config.json to disk."""
config_dict = {
"version": self.config.version,
"server": {
"host": self.config.server.host,
"port": self.config.server.port,
"https": self.config.server.https,
"https_key_path": self.config.server.https_key_path,
"https_cert_path": self.config.server.https_cert_path
},
"backend": {
"type": self.config.backend.type,
"image_backend": self.config.backend.image_backend,
"audio_backend": self.config.backend.audio_backend,
"tts_backend": self.config.backend.tts_backend
},
"models": {
"default_load_mode": self.config.models.default_load_mode
},
"offload": {
"directory": self.config.offload.directory
},
"system_prompt": self.config.system_prompt,
"tools_closer_prompt": self.config.tools_closer_prompt,
"grammar_guided": self.config.grammar_guided,
"file_path": self.config.file_path,
"hf_chat_templates": self.config.hf_chat_templates,
"reasoning_options": self.config.reasoning_options,
"parser": self.config.parser
}
with open(self.config_path, 'w') as f:
json.dump(config_dict, f, indent=2)
def save_models(self):
"""Save models.json to disk."""
with open(self.models_path, 'w') as f:
json.dump(self.models_data, f, indent=2)
def save_auth(self):
"""Save auth.json to disk."""
with open(self.auth_path, 'w') as f:
json.dump(self.auth_data, f, indent=2)
def reload(self):
"""Reload all configuration files."""
return self.load()
#!/usr/bin/env python3
"""
coder - A CLI tool for interacting with coderai API
Connects to OpenAI-compatible API and executes tools automatically.
"""
# Debug: Verify script execution - imports must come first
import sys
import os
if os.environ.get('CODER_DEBUG'):
print(f"DEBUG: Script started", file=sys.stderr)
print(f"DEBUG: Arguments: {sys.argv}", file=sys.stderr)
print(f"DEBUG: Python executable: {sys.executable}", file=sys.stderr)
import sys
import json
import argparse
import subprocess
import readline
import random
import string
from pathlib import Path
from typing import Optional, Dict, Any, List, Callable
from dataclasses import dataclass, field
from datetime import datetime
import requests
# ANSI color codes
class Colors:
"""ANSI color codes for terminal output."""
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
WHITE = "\033[97m"
# Default system prompt for normal models
DEFAULT_SYSTEM_PROMPT = """You are Coder, an AI coding assistant. You help users write, read, and modify code files. You have access to tools for file operations.
## CRITICAL: Response Format
1. ALWAYS maintain proper spacing between words and after punctuation.
2. Use complete sentences with normal spacing.
3. When showing code, use proper code blocks with language identifiers.
## Available Tools
You can invoke tools by outputting JSON inside <tool> tags:
<tool>{"name": "TOOL_NAME", "arguments": {PARAMETERS}}</tool>
### read_file - Read file contents
Purpose: Read one or more files to understand the codebase
Parameters: {"path": "relative/path/to/file"}
Example: <tool>{"name": "read_file", "arguments": {"path": "main.py"}}</tool>
### write_file - Create or overwrite files
Purpose: Write new files or completely replace existing ones
Parameters: {"path": "relative/path", "content": "full file content"}
Example: <tool>{"name": "write_file", "arguments": {"path": "hello.py", "content": "print('Hello World')"}}</tool>
### apply_diff - Modify existing files
Purpose: Make targeted changes to specific sections of files
Parameters: {"path": "relative/path", "diff": "SEARCH/REPLACE block"}
Example: <tool>{"name": "apply_diff", "arguments": {"path": "main.py", "diff": "<<<<<<< SEARCH\ndef old_func():\n pass\n=======\ndef new_func():\n return 42\n>>>>>>> REPLACE"}}</tool>
### execute_command - Run shell commands
Purpose: Execute commands like git, npm, python, ls, etc.
Parameters: {"command": "shell command string"}
Example: <tool>{"name": "execute_command", "arguments": {"command": "ls -la"}}</tool>
## Tool Usage Rules
1. READ FIRST: Always read files before modifying them
2. COMPLETE REPLACEMENTS: When using write_file, include the ENTIRE file content
3. TARGETED EDITS: Use apply_diff for small changes to preserve the rest of the file
4. ONE TOOL AT A TIME: Make one tool call, wait for results, then proceed
5. VERIFY CHANGES: After writing files, read them back to confirm
## Workflow Example
User: "Add a function to main.py"
You: <tool>{"name": "read_file", "arguments": {"path": "main.py"}}</tool>
[Tool result shown]
You: [Explain what you'll add, then call write_file or apply_diff]
## Output Style
- Use markdown for formatting
- Show file paths as [`filename`](path/to/file)
- Include code blocks with language tags
- Maintain normal spacing in all responses"""
# Simplified system prompt for small models (under 7B parameters)
SMALL_MODEL_SYSTEM_PROMPT = """You are Coder, an AI assistant. Help with coding tasks.
IMPORTANT RULES:
1. Put ONE space between EVERY word.
2. Put ONE space after periods and commas.
3. Use code blocks with triple backticks.
4. Be concise.
TOOLS:
Use <tool>{"name": "TOOL", "arguments": {}}</tool> format.
Available tools:
- read_file: {"path": "file.py"} - Read a file
- write_file: {"path": "file.py", "content": "code"} - Write a file
- apply_diff: {"path": "file.py", "diff": "SEARCH...REPLACE"} - Edit file
- execute_command: {"command": "ls"} - Run command
ALWAYS add spaces between words."""
# Minimal system prompt for tiny models (under 3B parameters)
TINY_MODEL_SYSTEM_PROMPT = """You are Coder. Help with code.
Rules:
- Space between words
- Space after punctuation
- Use ``` for code
- Be brief
Tools: <tool>{"name":"TOOL","arguments":{}}</tool>
Tools: read_file, write_file, apply_diff, execute_command"""
# Ultra-minimal system prompt for micro models (under 1.5B parameters)
MICRO_MODEL_SYSTEM_PROMPT = """Coder AI. Help code.
Rules:
- Space between words
- Use ``` for code blocks
Tools: read_file, write_file, apply_diff, execute_command
Format: <tool>{"name":"TOOL","arguments":{}}</tool>"""
@dataclass
class Config:
"""Configuration for the coder CLI."""
api_url: str = "http://localhost:6744/v1"
token: Optional[str] = None
system_prompt: str = DEFAULT_SYSTEM_PROMPT
model: str = "default"
model_aliases: Dict[str, str] = None # Alias -> Model mapping
small: bool = False # Use small model optimizations
tiny: bool = False # Use tiny model optimizations (minimal)
micro: bool = False # Use micro model optimizations (ultra-minimal)
timeout: int = 600 # Request timeout in seconds
confirm_all: bool = True # Confirm before executing tools by default
confirm_commands: Dict[str, bool] = None # Per-command confirmation settings
debug: bool = False # Show debug output including raw tool calls
dump: bool = False # Show dump output: tools schema, raw response, parsed tool calls
max_context: int = 32768 # Maximum context size in tokens
no_prompt: bool = False # Don't send system prompt
no_tools: bool = False # Don't send tool definitions
def __post_init__(self):
if self.confirm_commands is None:
self.confirm_commands = {}
if self.model_aliases is None:
self.model_aliases = {}
def resolve_model(self, model: str) -> str:
"""Resolve model alias to actual model name."""
return self.model_aliases.get(model, model)
@classmethod
def load(cls, config_path: Optional[str] = None) -> "Config":
"""Load configuration from file or create default."""
if config_path is None:
config_path = os.path.expanduser("~/.config/coderai/cli.json")
config = cls()
if os.path.exists(config_path):
try:
with open(config_path, 'r') as f:
data = json.load(f)
config.api_url = data.get('api_url', config.api_url)
config.token = data.get('token')
config.system_prompt = data.get('system_prompt', config.system_prompt)
config.model = data.get('model', config.model)
config.model_aliases = data.get('model_aliases', config.model_aliases)
config.small = data.get('small', config.small)
config.tiny = data.get('tiny', config.tiny)
config.micro = data.get('micro', config.micro)
config.timeout = data.get('timeout', config.timeout)
config.debug = data.get('debug', config.debug)
config.max_context = data.get('max_context', config.max_context)
config.no_prompt = data.get('no_prompt', config.no_prompt)
config.no_tools = data.get('no_tools', config.no_tools)
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Could not load config from {config_path}: {e}", file=sys.stderr)
return config
def save(self, config_path: Optional[str] = None) -> None:
"""Save configuration to file."""
if config_path is None:
config_path = os.path.expanduser("~/.config/coderai/cli.json")
# Ensure directory exists
os.makedirs(os.path.dirname(config_path), exist_ok=True)
data = {
'api_url': self.api_url,
'token': self.token,
'system_prompt': self.system_prompt,
'model': self.model,
'model_aliases': self.model_aliases,
'small': self.small,
'tiny': self.tiny,
'micro': self.micro,
'timeout': self.timeout,
'debug': self.debug,
'max_context': self.max_context,
'no_prompt': self.no_prompt,
'no_tools': self.no_tools
}
with open(config_path, 'w') as f:
json.dump(data, f, indent=2)
class SessionManager:
"""Manages named sessions for the CLI."""
def __init__(self, sessions_dir: str = None):
if sessions_dir is None:
sessions_dir = os.path.expanduser("~/.cache/coderai/sessions")
self.sessions_dir = sessions_dir
os.makedirs(sessions_dir, exist_ok=True)
def _get_session_path(self, name: str) -> str:
"""Get the file path for a session."""
# Sanitize name for filesystem
safe_name = "".join(c for c in name if c.isalnum() or c in ('-', '_')).rstrip()
return os.path.join(self.sessions_dir, f"{safe_name}.json")
def session_exists(self, name: str) -> bool:
"""Check if a session exists."""
return os.path.exists(self._get_session_path(name))
def save_session(self, name: str, history: List[Dict[str, Any]]) -> None:
"""Save a session to disk."""
session_data = {
'name': name,
'timestamp': datetime.now().isoformat(),
'history': history
}
with open(self._get_session_path(name), 'w') as f:
json.dump(session_data, f, indent=2)
def load_session(self, name: str) -> Optional[List[Dict[str, Any]]]:
"""Load a session from disk."""
path = self._get_session_path(name)
if not os.path.exists(path):
return None
try:
with open(path, 'r') as f:
data = json.load(f)
return data.get('history', [])
except (json.JSONDecodeError, IOError):
return None
def delete_session(self, name: str) -> bool:
"""Delete a session. Returns True if deleted, False if not found."""
path = self._get_session_path(name)
if os.path.exists(path):
os.remove(path)
return True
return False
def delete_all_sessions(self) -> int:
"""Delete all sessions. Returns count of deleted sessions."""
count = 0
for filename in os.listdir(self.sessions_dir):
if filename.endswith('.json'):
os.remove(os.path.join(self.sessions_dir, filename))
count += 1
return count
def list_sessions(self) -> List[Dict[str, Any]]:
"""List all available sessions."""
sessions = []
for filename in sorted(os.listdir(self.sessions_dir)):
if filename.endswith('.json'):
path = os.path.join(self.sessions_dir, filename)
try:
with open(path, 'r') as f:
data = json.load(f)
sessions.append({
'name': data.get('name', filename[:-5]),
'timestamp': data.get('timestamp', ''),
'message_count': len(data.get('history', []))
})
except (json.JSONDecodeError, IOError):
pass
return sessions
def generate_unique_name(self, base_name: str) -> str:
"""Generate a unique session name by adding random characters if needed."""
if not self.session_exists(base_name):
return base_name
# Add random suffix
suffix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=4))
return f"{base_name}_{suffix}"
class ToolExecutor:
"""Executes tool calls from the LLM."""
def __init__(self, working_dir: str = "."):
self.working_dir = working_dir
self.tools = self._define_tools()
def _define_tools(self) -> List[Dict[str, Any]]:
"""Define available tools in OpenAI format."""
return [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a file",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read (relative to working directory)"
}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write content to a file (creates or overwrites)",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to write (relative to working directory)"
},
"content": {
"type": "string",
"description": "Content to write to the file"
}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "apply_diff",
"description": "Apply a diff/patch to a file. Use SEARCH/REPLACE blocks format.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to modify"
},
"diff": {
"type": "string",
"description": "Diff content in SEARCH/REPLACE format: <<<<<<< SEARCH\\n[old content]\\n=======\\n[new content]\\n>>>>>>> REPLACE"
}
},
"required": ["path", "diff"]
}
}
},
{
"type": "function",
"function": {
"name": "execute_command",
"description": "Execute a shell command",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"cwd": {
"type": "string",
"description": "Working directory for the command (optional, defaults to current)"
}
},
"required": ["command"]
}
}
}
]
def execute(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a tool call and return the result."""
try:
if tool_name == "read_file":
return self._read_file(arguments["path"])
elif tool_name == "write_file":
return self._write_file(arguments["path"], arguments["content"])
elif tool_name == "apply_diff":
return self._apply_diff(arguments["path"], arguments["diff"])
elif tool_name == "execute_command":
cwd = arguments.get("cwd", self.working_dir)
return self._execute_command(arguments["command"], cwd)
else:
return {"error": f"Unknown tool: {tool_name}"}
except Exception as e:
return {"error": str(e)}
def _read_file(self, path: str) -> Dict[str, Any]:
"""Read a file and return its contents."""
full_path = os.path.join(self.working_dir, path)
full_path = os.path.abspath(full_path)
if not os.path.exists(full_path):
return {"error": f"File not found: {path}"}
if not os.path.isfile(full_path):
return {"error": f"Path is not a file: {path}"}
try:
with open(full_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return {"content": content, "path": path}
except Exception as e:
return {"error": f"Failed to read file: {e}"}
def _write_file(self, path: str, content: str) -> Dict[str, Any]:
"""Write content to a file."""
full_path = os.path.join(self.working_dir, path)
full_path = os.path.abspath(full_path)
# Ensure directory exists
os.makedirs(os.path.dirname(full_path), exist_ok=True)
try:
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return {"success": True, "path": path, "bytes_written": len(content)}
except Exception as e:
return {"error": f"Failed to write file: {e}"}
def _apply_diff(self, path: str, diff: str) -> Dict[str, Any]:
"""Apply a SEARCH/REPLACE diff to a file."""
full_path = os.path.join(self.working_dir, path)
full_path = os.path.abspath(full_path)
if not os.path.exists(full_path):
return {"error": f"File not found: {path}"}
try:
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
# Parse and apply SEARCH/REPLACE blocks
import re
pattern = r'<<<<<<< SEARCH\n(.*?)\n=======\n(.*?)\n>>>>>>> REPLACE'
matches = list(re.finditer(pattern, diff, re.DOTALL))
if not matches:
return {"error": "No valid SEARCH/REPLACE blocks found in diff"}
new_content = content
replacements = 0
for match in matches:
search_text = match.group(1)
replace_text = match.group(2)
if search_text in new_content:
new_content = new_content.replace(search_text, replace_text, 1)
replacements += 1
else:
return {"error": f"Search text not found in file: {search_text[:50]}..."}
with open(full_path, 'w', encoding='utf-8') as f:
f.write(new_content)
return {
"success": True,
"path": path,
"replacements": replacements
}
except Exception as e:
return {"error": f"Failed to apply diff: {e}"}
def _execute_command(self, command: str, cwd: str) -> Dict[str, Any]:
"""Execute a shell command."""
try:
result = subprocess.run(
command,
shell=True,
cwd=cwd,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
return {
"success": result.returncode == 0,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"command": command
}
except subprocess.TimeoutExpired:
return {"error": "Command timed out after 5 minutes"}
except Exception as e:
return {"error": f"Failed to execute command: {e}"}
class CoderClient:
"""Client for interacting with the coderai API."""
def __init__(self, config: Config, session_manager: SessionManager = None):
self.config = config
self.tool_executor = ToolExecutor()
self.conversation_history: List[Dict[str, Any]] = []
self.in_tool_call = False
self.tool_call_buffer = ""
self.session_manager = session_manager
self.session_name: Optional[str] = None
self.input_history: List[str] = [] # Track user inputs for readline
def chat(self, message: str, stream: bool = True) -> str:
"""Send a message to the API and get response."""
# Add to input history for readline
self.input_history.append(message)
# Check context size and compress if needed
self._manage_context()
# Add user message to history
self.conversation_history.append({
"role": "user",
"content": message
})
# Prepare messages with system prompt (if not disabled)
if self.config.no_prompt:
messages = []
else:
messages = [{"role": "system", "content": self.config.system_prompt}]
messages.extend(self.conversation_history)
headers = {"Content-Type": "application/json"}
if self.config.token:
headers["Authorization"] = f"Bearer {self.config.token}"
# Build payload, conditionally including tools
payload = {
"model": self.config.resolve_model(self.config.model),
"messages": messages,
"stream": stream
}
# Only include tools if not disabled
if not self.config.no_tools:
payload["tools"] = self.tool_executor.tools
payload["tool_choice"] = "auto"
# Dump tools schema if enabled
if self.config.dump:
print(f"\n{Colors.CYAN}=== DUMP: TOOLS SCHEMA ==={Colors.RESET}")
print(json.dumps(self.tool_executor.tools, indent=2))
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
try:
response = requests.post(
f"{self.config.api_url}/chat/completions",
headers=headers,
json=payload,
stream=stream,
timeout=self.config.timeout
)
response.raise_for_status()
if stream:
result = self._handle_streaming_response(response)
else:
result = self._handle_non_streaming_response(response)
# Save session after each interaction
self._save_current_session()
return result
except requests.exceptions.ConnectionError:
return "Error: Could not connect to API. Is the server running?"
except requests.exceptions.Timeout:
return "Error: Request timed out."
except requests.exceptions.RequestException as e:
return f"Error: API request failed: {e}"
def _estimate_tokens(self, text: str) -> int:
"""Rough estimation of token count (4 chars per token on average)."""
return len(text) // 4
def _manage_context(self):
"""Manage context size and compress if approaching limit."""
# Calculate current context size
total_text = ""
for msg in self.conversation_history:
total_text += msg.get("content", "") or ""
if "tool_calls" in msg:
for tc in msg["tool_calls"]:
total_text += tc.get("function", {}).get("arguments", "")
current_tokens = self._estimate_tokens(total_text)
threshold = int(self.config.max_context * 0.9) # 90% threshold
if current_tokens > threshold:
print(f"{Colors.YELLOW}[Context at {current_tokens}/{self.config.max_context} tokens - compressing...]{Colors.RESET}")
self._compress_context()
def _compress_context(self):
"""Compress context by summarizing old messages."""
if len(self.conversation_history) <= 4:
return
# Keep system message (implicit), first user message, and last 2 exchanges
# Summarize the middle portion
to_summarize = self.conversation_history[:-4]
keep = self.conversation_history[-4:]
# Create a summary placeholder
summary = f"[Previous {len(to_summarize)} messages summarized]"
# Replace with summary
self.conversation_history = [{"role": "system", "content": summary}] + keep
# Save the updated session after compression
self._save_current_session()
def _save_current_session(self):
"""Save current session if it has a name."""
if self.session_manager and self.session_name and self.conversation_history:
self.session_manager.save_session(self.session_name, self.conversation_history)
def load_session(self, name: str) -> bool:
"""Load a session by name."""
if not self.session_manager:
return False
history = self.session_manager.load_session(name)
if history is not None:
self.conversation_history = history
self.session_name = name
return True
return False
def new_session(self, name: str = None) -> str:
"""Start a new session. Returns the session name."""
# Save current session if exists
self._save_current_session()
# Clear history
self.conversation_history = []
if name:
if self.session_manager and self.session_manager.session_exists(name):
name = self.session_manager.generate_unique_name(name)
self.session_name = name
else:
# Generate default name
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.session_name = f"session_{timestamp}"
# Save the new empty session immediately
self._save_current_session()
return self.session_name
def _handle_streaming_response(self, response: requests.Response) -> str:
"""Handle streaming response from API."""
import time
import re
import sys
full_content = ""
tool_calls = []
in_thinking = False
thinking_content = ""
thinking_start_time = 0
last_update_time = 0
displayed_elapsed = 0
in_tool_call = False
tool_call_buffer = ""
def format_thinking_line(elapsed, content):
"""Format the thinking line for display."""
# Filter out tool tags and normalize
display = re.sub(r'<tool.*?>.*?</tool>', '', content, flags=re.DOTALL)
display = re.sub(r'<tool_call.*?>.*?</tool_call>', '', display, flags=re.DOTALL)
display = display.replace('\n', ' ').strip()
if len(display) > 50:
display = "..." + display[-50:]
return f"[{elapsed}s] Thinking: [{display}]"
def parse_tool_calls_from_content(text):
"""Parse tool calls from content in various formats."""
parsed = []
# Format 1: <tool_call>{"name": "...", "arguments": {...}}</tool_call>
pattern1 = r'<tool_call>\s*(\{.*?\})\s*</tool_call>'
matches1 = re.findall(pattern1, text, re.DOTALL)
for match in matches1:
try:
tool_data = json.loads(match)
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': tool_data.get('name', ''),
'arguments': json.dumps(tool_data.get('arguments', {}))
}
})
except json.JSONDecodeError:
continue
# Format 2: XML format
write_file_pattern = r'<tool>\s*<name>write_file</name>\s*<arguments>\s*<file\s+path="([^"]+)">\s*<content>(.*?)</content>\s*</file>\s*</arguments>\s*</tool>'
for match in re.finditer(write_file_pattern, text, re.DOTALL | re.IGNORECASE):
path = match.group(1)
content = match.group(2)
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': 'write_file',
'arguments': json.dumps({'path': path, 'content': content.strip()})
}
})
read_file_pattern = r'<tool>\s*<name>read_file</name>\s*<arguments>\s*<path>([^<]+)</path>\s*</arguments>\s*</tool>'
for match in re.finditer(read_file_pattern, text, re.DOTALL | re.IGNORECASE):
path = match.group(1).strip()
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': 'read_file',
'arguments': json.dumps({'path': path})
}
})
exec_pattern = r'<tool>\s*<name>execute_command</name>\s*<arguments>\s*(?:<command>)?([^<]+)(?:</command>)?\s*</arguments>\s*</tool>'
for match in re.finditer(exec_pattern, text, re.DOTALL | re.IGNORECASE):
command = match.group(1).strip()
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': 'execute_command',
'arguments': json.dumps({'command': command})
}
})
diff_pattern = r'<tool>\s*<name>apply_diff</name>\s*<arguments>\s*<path>([^<]+)</path>\s*<diff>(.*?)</diff>\s*</arguments>\s*</tool>'
for match in re.finditer(diff_pattern, text, re.DOTALL | re.IGNORECASE):
path = match.group(1).strip()
diff = match.group(2).strip()
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': 'apply_diff',
'arguments': json.dumps({'path': path, 'diff': diff})
}
})
# Format 3: Generic <tool_call><tool><name>...</name><arguments>JSON</arguments></tool></tool_call>
# Also handles incomplete closing tags like <tool_call> without </tool_call>
generic_pattern = r'<tool_call>\s*<tool>\s*<name>(.*?)</name>\s*<arguments>(.*?)</arguments>\s*</tool>\s*(?:</tool_call>)?'
for match in re.finditer(generic_pattern, text, re.DOTALL | re.IGNORECASE):
name = match.group(1).strip()
args_str = match.group(2).strip()
if not name:
continue
try:
args = json.loads(args_str) if args_str else {}
except json.JSONDecodeError:
args = {}
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': name,
'arguments': json.dumps(args)
}
})
# Format 4: Short format <tool>TOOL_NAME>JSON</tool>
# Example: <tool>financial_data_fetcher>{"ticker": "AAPL"}</tool>
pattern_short = r'<tool>(\w+)>(\{.*?\})</tool>'
for match in re.finditer(pattern_short, text, re.DOTALL):
name = match.group(1).strip()
args_str = match.group(2).strip()
if not name:
continue
try:
args = json.loads(args_str) if args_str else {}
except json.JSONDecodeError:
args = {}
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': name,
'arguments': json.dumps(args)
}
})
# Format 5: <tool_call><tool>TOOL_NAME>JSON</tool></tool_call>
pattern_short2 = r'<tool_call>\s*<tool>(\w+)>\s*(\{.*?\})\s*</tool>\s*</tool_call>'
for match in re.finditer(pattern_short2, text, re.DOTALL):
name = match.group(1).strip()
args_str = match.group(2).strip()
if not name:
continue
try:
args = json.loads(args_str) if args_str else {}
except json.JSONDecodeError:
args = {}
parsed.append({
'id': f'call_{len(parsed)}',
'type': 'function',
'function': {
'name': name,
'arguments': json.dumps(args)
}
})
return parsed
# Process streaming response line by line
for line in response.iter_lines():
if not line:
continue
line = line.decode('utf-8')
# Handle SSE format
if line.startswith('data: '):
line = line[6:]
if line == '[DONE]':
break
try:
data = json.loads(line)
delta = data.get('choices', [{}])[0].get('delta', {})
content = delta.get('content')
if not content:
continue
full_content += content
current_time = time.time()
# Filter out tool calls from display (unless debug mode)
display_content = content
if not self.config.debug:
# Check for tool_call tag start
if '<tool_call>' in content:
if not in_tool_call:
# Start of tool call - hide everything from <tool_call> onwards
parts = content.split('<tool_call>', 1)
if parts[0]:
display_content = parts[0]
else:
display_content = ""
in_tool_call = True
tool_call_buffer = '<tool_call>' + (parts[1] if len(parts) > 1 else "")
else:
tool_call_buffer += content
display_content = ""
elif in_tool_call:
# We're inside a tool call
tool_call_buffer += content
if '</tool_call>' in content:
# End of tool call
in_tool_call = False
# Check if there's content after </tool_call>
parts = content.split('</tool_call>', 1)
if len(parts) > 1 and parts[1]:
display_content = parts[1]
else:
display_content = ""
tool_call_buffer = ""
else:
display_content = ""
# Handle thinking state
if '<think>' in display_content:
in_thinking = True
thinking_start_time = current_time
last_update_time = current_time
displayed_elapsed = 0
thinking_content = ""
sys.stdout.write(f"\r{Colors.DIM}{format_thinking_line(0, '')}{Colors.RESET}")
sys.stdout.flush()
continue
if in_thinking:
if '</think>' in display_content:
in_thinking = False
elapsed = int(current_time - thinking_start_time)
parts = display_content.split('</think>', 1)
if parts[0]:
thinking_content += parts[0]
sys.stdout.write(f"\r{Colors.DIM}{format_thinking_line(elapsed, thinking_content)}{Colors.RESET}\n")
sys.stdout.flush()
# Content after </think>
if len(parts) > 1 and parts[1]:
actual = parts[1]
sys.stdout.write(actual)
sys.stdout.flush()
else:
thinking_content += display_content
# Update display every 0.1 seconds or on new content
elapsed = int(current_time - thinking_start_time)
if elapsed != displayed_elapsed or current_time - last_update_time >= 0.1:
sys.stdout.write(f"\r{Colors.DIM}{format_thinking_line(elapsed, thinking_content)}{Colors.RESET}")
sys.stdout.flush()
displayed_elapsed = elapsed
last_update_time = current_time
else:
if display_content:
sys.stdout.write(display_content)
sys.stdout.flush()
except json.JSONDecodeError:
continue
if in_thinking:
sys.stdout.write('\n')
sys.stdout.write('\n')
sys.stdout.flush()
# Dump raw response if enabled
if self.config.dump:
print(f"\n{Colors.CYAN}=== DUMP: RAW RESPONSE ==={Colors.RESET}")
print(full_content)
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
# Parse tool calls from full content after streaming
tool_calls = parse_tool_calls_from_content(full_content)
# Dump output if enabled
if self.config.dump:
print(f"\n{Colors.CYAN}=== DUMP: PARSED TOOL CALLS ==={Colors.RESET}")
print(json.dumps(tool_calls, indent=2))
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
# Execute tool calls if any
if tool_calls:
tool_results = []
for tc in tool_calls:
tool_name = tc['function']['name']
try:
arguments = json.loads(tc['function']['arguments'])
except json.JSONDecodeError:
arguments = {}
# Format arguments for display
if tool_name == 'execute_command':
args_str = arguments.get('command', '')
elif tool_name == 'read_file':
args_str = arguments.get('path', '')
elif tool_name == 'write_file':
args_str = f"{arguments.get('path', '')} ({len(arguments.get('content', ''))} bytes)"
elif tool_name == 'apply_diff':
args_str = arguments.get('path', '')
else:
args_str = str(arguments)
# Show tool call with colors: yellow "Calling tool:", red tool name, white args
print(f"\n{Colors.YELLOW}Calling tool:{Colors.RESET} {Colors.RED}{tool_name}{Colors.RESET} -> {args_str}")
# Check if confirmation is needed
needs_confirm = self.config.confirm_all
if tool_name in self.config.confirm_commands:
needs_confirm = self.config.confirm_commands[tool_name]
if needs_confirm:
confirm = input(f"{Colors.YELLOW}Execute? (y/N): {Colors.RESET}").strip().lower()
if confirm not in ('y', 'yes'):
result = {"error": "User declined execution", "declined": True}
print(f"{Colors.YELLOW}Skipped{Colors.RESET}")
else:
result = self.tool_executor.execute(tool_name, arguments)
else:
result = self.tool_executor.execute(tool_name, arguments)
# Show result summary
if "error" in result:
print(f"{Colors.RED}Error: {result['error']}{Colors.RESET}")
elif result.get('declined'):
pass # Already printed "Skipped"
else:
print(f"{Colors.GREEN}Success{Colors.RESET}")
# Show command output for execute_command only in debug mode
if self.config.debug and tool_name == 'execute_command' and 'stdout' in result:
stdout = result['stdout'].strip()
if stdout:
# Show first few lines of output
lines = stdout.split('\n')[:20]
print(f"{Colors.CYAN}Output:{Colors.RESET}")
for line in lines:
print(f" {line}")
if len(stdout.split('\n')) > 20:
print(f" {Colors.DIM}... ({len(stdout.split(chr(10))) - 20} more lines){Colors.RESET}")
tool_results.append({
"tool_call_id": tc['id'],
"role": "tool",
"content": json.dumps(result)
})
# Add assistant message with tool calls to history
self.conversation_history.append({
"role": "assistant",
"content": full_content or None,
"tool_calls": [
{
"id": tc['id'],
"type": "function",
"function": tc['function']
} for tc in tool_calls
]
})
# Add tool results to history
self.conversation_history.extend(tool_results)
# Get follow-up response with tool results
print(f"\n{Colors.DIM}[Getting follow-up response...]{Colors.RESET}")
return self._get_follow_up_response()
# Add assistant response to history
if full_content:
self.conversation_history.append({
"role": "assistant",
"content": full_content
})
return full_content
def _handle_non_streaming_response(self, response: requests.Response) -> str:
"""Handle non-streaming response from API."""
data = response.json()
message = data.get('choices', [{}])[0].get('message', {})
content = message.get('content', '')
tool_calls = message.get('tool_calls', [])
# Dump raw response if enabled
if self.config.dump:
print(f"\n{Colors.CYAN}=== DUMP: RAW RESPONSE ==={Colors.RESET}")
print(json.dumps(data, indent=2))
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
# Dump parsed tool calls if enabled
if self.config.dump and tool_calls:
print(f"\n{Colors.CYAN}=== DUMP: PARSED TOOL CALLS ==={Colors.RESET}")
print(json.dumps(tool_calls, indent=2))
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
if content:
print(content)
# Execute tool calls if any
if tool_calls:
print("\n[Executing tools...]")
tool_results = []
for tc in tool_calls:
tool_name = tc['function']['name']
try:
arguments = json.loads(tc['function']['arguments'])
except json.JSONDecodeError:
arguments = {}
print(f" → {tool_name}({arguments})")
result = self.tool_executor.execute(tool_name, arguments)
tool_results.append({
"tool_call_id": tc['id'],
"role": "tool",
"content": json.dumps(result)
})
if "error" in result:
print(f" Error: {result['error']}")
else:
print(f" Success")
# Add to history
self.conversation_history.append({
"role": "assistant",
"content": content or None,
"tool_calls": tool_calls
})
self.conversation_history.extend(tool_results)
# Get follow-up response
print("\n[Getting follow-up response...]")
return self._get_follow_up_response()
# Add to history
self.conversation_history.append({
"role": "assistant",
"content": content
})
return content
def _get_follow_up_response(self) -> str:
"""Get follow-up response after tool execution."""
messages = [{"role": "system", "content": self.config.system_prompt}]
messages.extend(self.conversation_history)
headers = {"Content-Type": "application/json"}
if self.config.token:
headers["Authorization"] = f"Bearer {self.config.token}"
# Build payload, conditionally including tools
payload = {
"model": self.config.resolve_model(self.config.model),
"messages": messages,
"stream": True
}
# Only include tools if not disabled
if not self.config.no_tools:
payload["tools"] = self.tool_executor.tools
payload["tool_choice"] = "auto"
# Dump tools schema if enabled
if self.config.dump:
print(f"\n{Colors.CYAN}=== DUMP: TOOLS SCHEMA (FOLLOW-UP) ==={Colors.RESET}")
print(json.dumps(self.tool_executor.tools, indent=2))
print(f"{Colors.CYAN}=== END DUMP ==={Colors.RESET}\n")
response = requests.post(
f"{self.config.api_url}/chat/completions",
headers=headers,
json=payload,
stream=True,
timeout=self.config.timeout
)
response.raise_for_status()
return self._handle_streaming_response(response)
def clear_history(self):
"""Clear conversation history."""
self.conversation_history = []
def run_interactive_shell(client: CoderClient, session_manager: SessionManager) -> None:
"""Run interactive REPL shell."""
# Set up readline for history
readline_history_file = os.path.expanduser("~/.cache/coderai/input_history")
os.makedirs(os.path.dirname(readline_history_file), exist_ok=True)
try:
readline.read_history_file(readline_history_file)
except FileNotFoundError:
pass
# Save history on exit
import atexit
atexit.register(readline.write_history_file, readline_history_file)
print(f"{Colors.CYAN}{'=' * 60}{Colors.RESET}")
print(f"{Colors.BOLD}{Colors.GREEN} coder - Interactive Coding Assistant{Colors.RESET}")
if client.session_name:
print(f"{Colors.DIM} Session: {client.session_name}{Colors.RESET}")
print(f"{Colors.CYAN}{'=' * 60}{Colors.RESET}")
print(f"{Colors.DIM}Type /quit, /exit or press Ctrl+C to exit.{Colors.RESET}")
print(f"{Colors.DIM}Type /clear to clear conversation history.{Colors.RESET}")
print(f"{Colors.DIM}Type /help for more commands.{Colors.RESET}")
print(f"{Colors.DIM}End line with \\ to continue on next line (multiline){Colors.RESET}")
print(f"{Colors.CYAN}{'-' * 60}{Colors.RESET}")
while True:
try:
# Calculate context usage
total_text = ""
for msg in client.conversation_history:
total_text += msg.get("content", "") or ""
if "tool_calls" in msg:
for tc in msg["tool_calls"]:
total_text += tc.get("function", {}).get("arguments", "")
current_tokens = client._estimate_tokens(total_text)
max_ctx = client.config.max_context
# Choose color based on usage
if current_tokens > int(max_ctx * 0.9):
ctx_color = Colors.RED
elif current_tokens > int(max_ctx * 0.7):
ctx_color = Colors.YELLOW
else:
ctx_color = Colors.DIM
# Colorful prompt with context counter
prompt = f"{ctx_color}[{current_tokens}/{max_ctx}]{Colors.RESET} {Colors.BOLD}{Colors.BLUE}CoderCLI>{Colors.RESET} "
lines = []
while True:
try:
line = input(prompt)
except EOFError:
# Handle Ctrl+D as end of multiline input
if lines:
break
raise
lines.append(line)
# Check for explicit continuation with backslash
if line.rstrip().endswith('\\'):
lines[-1] = line.rstrip()[:-1] # Remove backslash
prompt = f"{Colors.BLUE} ...>{Colors.RESET} " # Continuation prompt
continue
# Check for incomplete brackets (multiline paste detection)
joined = '\n'.join(lines)
open_brackets = joined.count('(') - joined.count(')')
open_brackets += joined.count('[') - joined.count(']')
open_brackets += joined.count('{') - joined.count('}')
# Check for incomplete code blocks or sentences
stripped = line.strip()
ends_with_colon = stripped.endswith(':')
ends_with_open = stripped.endswith(('(', '[', '{'))
if open_brackets > 0 or ends_with_colon or ends_with_open:
prompt = f"{Colors.BLUE} ...>{Colors.RESET} " # Continuation prompt
continue
# Empty line after content - finish input (for paste mode)
if not stripped and len(lines) > 1:
lines.pop() # Remove the empty line
break
# Single line input - we're done
if len(lines) == 1 and not open_brackets and not ends_with_colon and not ends_with_open:
break
# Multiple lines with balanced brackets - finish on empty line or new prompt
if open_brackets == 0 and not ends_with_colon and not ends_with_open:
# For pasted multiline content, finish immediately
# For manual entry, require empty line
if '\n'.join(lines).count('\n') >= 1 and not stripped:
lines.pop() # Remove the empty line
break
elif '\n'.join(lines).count('\n') >= 1 and stripped:
# Pasted content with balanced brackets - likely done
break
else:
prompt = f"{Colors.BLUE} ...>{Colors.RESET} "
continue
user_input = '\n'.join(lines).strip()
if not user_input:
continue
# Print separator after user input
print(f"{Colors.CYAN}{'─' * 40}{Colors.RESET}")
# Handle commands with / prefix
cmd = user_input.lower()
if cmd in ('/quit', '/exit', '/q'):
print(f"{Colors.GREEN}Goodbye!{Colors.RESET}")
break
if cmd == '/clear' or cmd == '/c':
client.clear_history()
print(f"{Colors.YELLOW}Conversation history cleared.{Colors.RESET}")
continue
if cmd == '/help' or cmd == '/h':
print_help()
continue
if cmd.startswith('/read '):
path = user_input[6:].strip()
result = client.tool_executor._read_file(path)
if 'content' in result:
print(f"\n{Colors.CYAN}--- Content of {path} ---{Colors.RESET}")
print(result['content'])
print(f"{Colors.CYAN}--- End ---{Colors.RESET}")
else:
print(f"{Colors.RED}Error: {result.get('error', 'Unknown error')}{Colors.RESET}")
continue
if cmd.startswith('/exec '):
command = user_input[6:].strip()
result = client.tool_executor._execute_command(command, ".")
print(f"\n{Colors.GREEN}$ {command}{Colors.RESET}")
if result.get('stdout'):
print(result['stdout'])
if result.get('stderr'):
print(f"{Colors.RED}stderr: {result['stderr']}{Colors.RESET}", file=sys.stderr)
if result.get('returncode', 0) != 0:
print(f"{Colors.RED}Exit code: {result['returncode']}{Colors.RESET}")
continue
if cmd.startswith('/confirm '):
parts = user_input[9:].strip().split()
if len(parts) >= 2:
tool_name = parts[0]
setting = parts[1].lower()
if setting in ('yes', 'y', 'true', '1'):
client.config.confirm_commands[tool_name] = True
print(f"{Colors.GREEN}Confirmation enabled for {tool_name}{Colors.RESET}")
elif setting in ('no', 'n', 'false', '0'):
client.config.confirm_commands[tool_name] = False
print(f"{Colors.YELLOW}Confirmation disabled for {tool_name}{Colors.RESET}")
else:
print(f"{Colors.RED}Invalid setting. Use 'yes' or 'no'{Colors.RESET}")
else:
print(f"{Colors.RED}Usage: /confirm <tool_name> <yes/no>{Colors.RESET}")
print(f" Example: /confirm execute_command no")
print(f" Example: /confirm write_file yes")
continue
# Handle /new command
if cmd == '/new' or cmd.startswith('/new '):
name = user_input[5:].strip() if len(user_input) > 5 else None
session_name = client.new_session(name)
print(f"{Colors.GREEN}Started new session: {session_name}{Colors.RESET}")
continue
# Handle /session command
if cmd.startswith('/session '):
name = user_input[9:].strip()
if client.load_session(name):
print(f"{Colors.GREEN}Loaded session: {name}{Colors.RESET}")
else:
print(f"{Colors.RED}Session not found: {name}{Colors.RESET}")
continue
# Handle /delete command
if cmd.startswith('/delete '):
target = user_input[8:].strip()
if target.upper() == 'ALL':
count = session_manager.delete_all_sessions()
print(f"{Colors.YELLOW}Deleted {count} sessions{Colors.RESET}")
else:
if session_manager.delete_session(target):
print(f"{Colors.YELLOW}Deleted session: {target}{Colors.RESET}")
else:
print(f"{Colors.RED}Session not found: {target}{Colors.RESET}")
continue
# Handle /sessions command (list sessions)
if cmd == '/sessions' or cmd == '/ls':
sessions = session_manager.list_sessions()
if sessions:
print(f"{Colors.CYAN}Available sessions:{Colors.RESET}")
for s in sessions:
current = " (current)" if s['name'] == client.session_name else ""
print(f" {Colors.GREEN}{s['name']}{Colors.RESET}{current} - {s['message_count']} messages - {s['timestamp'][:19]}")
else:
print(f"{Colors.DIM}No saved sessions{Colors.RESET}")
continue
# Send message to LLM
client.chat(user_input)
except KeyboardInterrupt:
print(f"\n{Colors.GREEN}Goodbye!{Colors.RESET}")
break
except EOFError:
print(f"\n{Colors.GREEN}Goodbye!{Colors.RESET}")
break
def print_help():
"""Print help information."""
print(f"""
{Colors.BOLD}{Colors.CYAN}Commands:{Colors.RESET}
{Colors.GREEN}/quit, /exit, /q{Colors.RESET} Exit the shell
{Colors.GREEN}/clear, /c{Colors.RESET} Clear conversation history
{Colors.GREEN}/help, /h{Colors.RESET} Show this help message
{Colors.GREEN}/new [name]{Colors.RESET} Start a new session (optional name)
{Colors.GREEN}/session <name>{Colors.RESET} Load a saved session
{Colors.GREEN}/sessions, /ls{Colors.RESET} List all saved sessions
{Colors.GREEN}/delete <name|ALL>{Colors.RESET} Delete a session or all sessions
{Colors.BOLD}{Colors.CYAN}Shortcuts:{Colors.RESET}
{Colors.YELLOW}/read <path>{Colors.RESET} Read a file directly
{Colors.YELLOW}/exec <command>{Colors.RESET} Execute a shell command directly
{Colors.YELLOW}/confirm <tool> <y/n>{Colors.RESET} Enable/disable tool confirmation
{Colors.BOLD}{Colors.CYAN}Multiline Input:{Colors.RESET}
- End a line with {Colors.YELLOW}\\{Colors.RESET} to continue
- Or paste multiline content directly
- Or press Enter twice to finish
- Unclosed brackets ( ) [ ] {{ }} or : continue automatically
Example:
CoderCLI> This is a \\
...> multiline message
{Colors.BOLD}{Colors.CYAN}The assistant can use tools to:{Colors.RESET}
- {Colors.BLUE}read_file{Colors.RESET}: Read file contents
- {Colors.BLUE}write_file{Colors.RESET}: Write/create files
- {Colors.BLUE}apply_diff{Colors.RESET}: Apply patches to files
- {Colors.BLUE}execute_command{Colors.RESET}: Run shell commands
{Colors.DIM}Tool execution requires confirmation by default.{Colors.RESET}
{Colors.DIM}Disable confirmation with: /confirm <tool_name> no{Colors.RESET}
""")
def main():
parser = argparse.ArgumentParser(
description="coder - CLI tool for coderai API",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
coder Start interactive shell
coder "Hello!" Send a single message
coder -m "Fix bug" Send message non-interactively
coder --config /path Use custom config file
coder --ctx 65536 Set max context to 65536 tokens
coder --session myproj Load session 'myproj'
"""
)
parser.add_argument(
'message',
nargs='?',
help='Message to send (if not provided, starts interactive shell)'
)
parser.add_argument(
'-m', '--message',
dest='msg_flag',
help='Message to send (alternative to positional argument)'
)
parser.add_argument(
'--api-url',
help='API URL (default: from config or http://localhost:6744/v1)'
)
parser.add_argument(
'--endpoint',
help='API endpoint URL (same as --api-url, temporary override)'
)
parser.add_argument(
'--token',
help='API token (default: from config, temporary override)'
)
parser.add_argument(
'--model',
help='Model name to use (default: "default", temporary override)'
)
parser.add_argument(
'--alias',
help='Create an alias for the model (alias -> model mapping)'
)
parser.add_argument(
'--config',
help='Path to config file (default: ~/.config/coderai/cli.json)'
)
parser.add_argument(
'--init-config',
action='store_true',
help='Create default config file and exit'
)
parser.add_argument(
'--no-stream',
action='store_true',
help='Disable streaming responses'
)
parser.add_argument(
'--small',
action='store_true',
help='Use small model mode (simplified prompt for models under 7B parameters)'
)
parser.add_argument(
'--tiny',
action='store_true',
help='Use tiny model mode (minimal prompt for models under 3B parameters)'
)
parser.add_argument(
'--micro',
action='store_true',
help='Use micro model mode (ultra-minimal prompt for models under 1.5B parameters)'
)
parser.add_argument(
'--no-prompt',
action='store_true',
dest='no_prompt',
help='Do not send system prompt (for custom use cases)'
)
parser.add_argument(
'--timeout',
type=int,
default=600,
help='Request timeout in seconds (default: 600)'
)
parser.add_argument(
'--debug',
action='store_true',
help='Show debug output including raw tool calls'
)
parser.add_argument(
'--dump',
action='store_true',
help='Show dump output: tools schema, raw response, and parsed tool calls'
)
parser.add_argument(
'--no-tools',
action='store_true',
dest='no_tools',
help='Do not send tool definitions to the API (plain chat mode)'
)
parser.add_argument(
'--ctx',
type=int,
dest='max_context',
help='Maximum context size in tokens (default: 32768)'
)
parser.add_argument(
'--session',
help='Load a saved session by name'
)
args = parser.parse_args()
# Handle init-config
if args.init_config:
config = Config()
config.save()
config_path = os.path.expanduser("~/.config/coderai/cli.json")
print(f"Created default config at: {config_path}")
print(json.dumps({
'api_url': config.api_url,
'token': config.token,
'system_prompt': config.system_prompt,
'model': config.model,
'model_aliases': config.model_aliases,
'small': config.small,
'tiny': config.tiny,
'max_context': config.max_context,
'no_tools': config.no_tools
}, indent=2))
return
# Load config
config = Config.load(args.config)
# Override with command line args (temporary, not saved to config)
if args.api_url:
config.api_url = args.api_url
if args.endpoint:
config.api_url = args.endpoint
if args.token:
config.token = args.token
if args.model:
config.model = args.model
if args.alias:
# Register alias: alias -> model mapping
# If model not specified, alias points to "default"
target_model = args.model if args.model else config.model
config.model_aliases[args.alias] = target_model
print(f"[Alias registered: {args.alias} -> {target_model}]")
if args.small:
config.small = True
if args.tiny:
config.tiny = True
if args.micro:
config.micro = True
if args.timeout:
config.timeout = args.timeout
if args.debug:
config.debug = True
if args.dump:
config.dump = True
if args.max_context:
config.max_context = args.max_context
if args.no_prompt:
config.no_prompt = True
if args.no_tools:
config.no_tools = True
# Apply small/tiny model system prompt if enabled
if config.micro:
config.system_prompt = MICRO_MODEL_SYSTEM_PROMPT
print("[Micro model mode enabled - using ultra-minimal system prompt]")
elif config.tiny:
config.system_prompt = TINY_MODEL_SYSTEM_PROMPT
print("[Tiny model mode enabled - using minimal system prompt]")
elif config.small:
config.system_prompt = SMALL_MODEL_SYSTEM_PROMPT
print("[Small model mode enabled - using simplified system prompt]")
# Create session manager
session_manager = SessionManager()
# Create client
client = CoderClient(config, session_manager)
# Load session if specified
if args.session:
if client.load_session(args.session):
print(f"[Loaded session: {args.session}]")
else:
print(f"[Session not found: {args.session}, starting new session]")
client.new_session(args.session)
else:
# Start with a default session name
client.new_session()
# Get message
message = args.message or args.msg_flag
if os.environ.get('CODER_DEBUG'):
print(f"DEBUG: message = {message}", file=sys.stderr)
print(f"DEBUG: args.no_stream = {args.no_stream}", file=sys.stderr)
if message:
# Single message mode - disable confirmations for non-interactive use
if os.environ.get('CODER_DEBUG'):
print(f"DEBUG: Entering single message mode", file=sys.stderr)
client.config.confirm_all = False
result = client.chat(message, stream=not args.no_stream)
# Print result if non-streaming (streaming prints internally)
if args.no_stream and result:
print(result)
if os.environ.get('CODER_DEBUG'):
print(f"DEBUG: chat() returned", file=sys.stderr)
else:
# Interactive shell mode
if os.environ.get('CODER_DEBUG'):
print(f"DEBUG: Entering interactive shell mode", file=sys.stderr)
run_interactive_shell(client, session_manager)
if __name__ == "__main__":
main()
# CoderAI Configuration & Admin Dashboard Design
## Overview
Refactor coderai from a complex CLI-driven application to a configuration-file-based system with a comprehensive web administration dashboard. All command-line options (except `--debug` and `--config`) are replaced by JSON configuration files stored in `~/.coderai/` by default.
## 1. CLI Changes
### Removed CLI Options
All existing options in `codai/cli.py` (446 lines of arguments) are removed except:
### Retained CLI Options
- `--debug`: Enable debug output (default: false)
- `--config DIR`: Set configuration directory (default: `~/.coderai`)
### Initialization Flow
1. Parse `--config` (default: `~/.coderai/`)
2. Create config directory if it doesn't exist
3. If config directory is empty, create default minimal config files
4. Load configuration from JSON files
5. Start server with settings from config
---
## 2. Configuration File Structure
All configuration stored as JSON in the config directory:
### `config.json` - Main Configuration
```json
{
"version": "1.0",
"server": {
"host": "0.0.0.0",
"port": 8000,
"https": false,
"https_key_path": null,
"https_cert_path": null
},
"backend": {
"type": "auto",
"image_backend": "auto",
"audio_backend": "auto",
"tts_backend": "auto"
},
"models": {
"default_load_mode": "ondemand"
},
"offload": {
"directory": "./offload"
},
"system_prompt": null,
"tools_closer_prompt": false,
"grammar_guided": false,
"file_path": null,
"hf_chat_templates": [],
"reasoning_options": [],
"parser": "auto"
}
```
**Note**: All model-specific settings (GPU layers, quantization, context size, image generation parameters, etc.) are now stored per-model in `models.json` rather than as global defaults in `config.json`. This allows different models to have different configurations even if they share the same backend or capability type.
### `models.json` - Model Registry & Configurations
```json
{
"text_models": [
{
"id": "microsoft/DialoGPT-medium",
"backend": "nvidia",
"context_size": 0,
"n_gpu_layers": -1,
"load_in_4bit": false,
"load_in_8bit": false,
"flash_attn": false,
"offload_strategy": "auto",
"manual_ram_gb": null,
"max_gpu_percent": null,
"no_ram": false,
"enabled": true
}
],
"image_models": [
{
"id": "stable-diffusion-xl-base-1.0",
"backend": "nvidia",
"llm_path": null,
"vae_path": null,
"sample_method": "res_multistep",
"steps": 4,
"width": 512,
"height": 512,
"cfg_scale": 1.0,
"precision": "f32",
"cpu_offload": false,
"seed": null,
"vae_tiling": false,
"clip_on_cpu": false,
"enabled": true
}
],
"audio_models": [
{
"id": "openai/whisper-1",
"backend": "nvidia",
"context_ms": 0,
"offload": null,
"vulkan_device": 0,
"enabled": true
}
],
"vision_models": [
{
"id": "llava-1.5",
"backend": "nvidia",
"context_size": 0,
"offload": null,
"n_gpu_layers": -1,
"enabled": true
}
],
"tts_models": [
{
"id": "kokoro",
"backend": "nvidia",
"voice": "af",
"speed": 1.0,
"enabled": true
}
],
"gguf_models": [
{
"id": "llama-2-7b.Q4_K_M.gguf",
"backend": "vulkan",
"context_size": 2048,
"n_gpu_layers": 35,
"vulkan_device": 0,
"vulkan_single_gpu": false,
"enabled": true
}
],
"loaded": [
"microsoft/DialoGPT-medium"
],
"preload": [
"stable-diffusion-xl-base-1.0"
],
"unloaded": [],
"aliases": {
"default": "microsoft/DialoGPT-medium",
"code": "microsoft/DialoGPT-medium",
"sdxl": "stable-diffusion-xl-base-1.0"
}
}
```
### `auth.json` - User Accounts & Tokens
```json
{
"users": [
{
"id": 1,
"username": "admin",
"password_hash": "$argon2id$...",
"role": "admin",
"created_at": "2026-05-03T00:00:00Z"
}
],
"tokens": [
{
"id": 1,
"name": "OpenAI Compatible",
"token": "sk-coderai-...",
"provider": "openai",
"created_at": "2026-05-03T00:00:00Z",
"last_used": null
}
],
"sessions": []
}
```
---
## 3. Web Administration Dashboard
### Layout & Theme
- **Dark theme**: #0d1117 background, #161b22 cards, #21262d borders
- **Accent colors**: #58a6ff (blue), #3fb950 (green), #f85149 (red)
- **Modern fonts**: system-ui, -apple-system, Segoe UI
- **Responsive**: works on desktop and tablet
### Authentication
- Login page at `/login`
- Session-based cookies with CSRF protection
- Default credentials: `admin` / `admin` (forced change on first login)
- Password hashing with Argon2
- Sessions stored in `auth.json` (in-memory hot cache, persisted to disk)
### Pages
#### 1. **Overview Dashboard** (`/admin`)
- System status: uptime, backend type (NVIDIA/Vulkan/OpenCL), GPU info
- Active models: currently loaded, preload queue, memory usage (VRAM/RAM)
- Request stats: total, active, queued
- Quick actions: restart server, clear cache
- Line charts for request volume and latency
#### 2. **Models** (`/admin/models`)
- **Sub-tabs**:
- **Local Models**: List all downloaded GGUF and HuggingFace models, size, format, status
- **Download**: Search HuggingFace with filters (model type, size, license, language)
- **Configuration**: Set loaded models, preload models, backend options per model
- **Model Details**: Click a model to see specs, performance, edit context size, GPU layers
#### 3. **API Tokens** (`/admin/tokens`)
- List all tokens with name, provider, last used
- Generate new token (random 32-char hex, prefixed `sk-coderai-`)
- Revoke/delete tokens
- Copy token to clipboard (one-time reveal)
#### 4. **Users** (`/admin/users`)
- Admin can change own password
- CRUD for other users (username, password, role)
- Role-based: `admin`, `user`, `readonly`
#### 5. **Chat Interface** (`/chat`)
- OpenAI-compatible chat UI
- Model selector dropdown (all available models)
- Streaming responses
- File attachments (images, documents)
- Export conversation
### Routes & Middleware
- Static files: `/static/` (CSS, JS, images)
- Admin routes: `/admin/*` (require admin role)
- Auth routes: `/login`, `/logout`, `/auth/check`
- API routes (FastAPI): `/v1/*` (require bearer token or session auth)
- Web UI routes: Jinja2 templates for admin and chat
---
## 4. Model Management & Loading Strategy
### Model Types & Backend Mapping
| Model Type | Backends | Format | Per-Model Config Fields |
|------------|----------|--------|------------------------|
| Text LLM | NVIDIA (Transformers), Vulkan (llama-cpp) | HF safetensors / GGUF | backend, context_size, n_gpu_layers, load_in_4bit, load_in_8bit, flash_attn, offload_strategy, manual_ram_gb, max_gpu_percent, no_ram |
| Image Generation | NVIDIA (Diffusers), Vulkan (sd.cpp) | HF Diffusers / GGUF-SD | backend, llm_path, vae_path, sample_method, steps, width, height, cfg_scale, precision, cpu_offload, seed, vae_tiling, clip_on_cpu |
| Audio Transcription | NVIDIA (Transformers), Vulkan (whisper.cpp) | HF / GGUF | backend, context_ms, offload, vulkan_device |
| TTS | NVIDIA/Kokoro, Vulkan/kokoro | Kokoro models | backend, voice, speed |
| Vision | NVIDIA (LLaVA), Vulkan (llava.cpp) | HF / GGUF | backend, context_size, offload, n_gpu_layers |
**Key Design Principle**: Each model entry in `models.json` contains ALL configuration specific to that model. This allows:
- Multiple text models with different quantization settings (one 4-bit, one 8-bit)
- Multiple image models with different resolutions (512x512 for speed, 1024x1024 for quality)
- Multiple GGUF models with different GPU layer counts (35 layers for one, all layers for another)
- Same model with different backends (e.g., GGUF on Vulkan for one instance, HF on NVIDIA for another)
### Loading Modes
- **ondemand** (default): Only one model resident in VRAM at a time. Unload on switch.
- **loadall**: All models try to load into VRAM, OOM → CPU RAM offload.
- **loadswap**: First model in VRAM, others in CPU RAM. Swap on demand.
### Pre-load vs Loaded Status
- **Loaded**: Model actively in VRAM (or CPU RAM for loadswap)
- **Preload**: Model configured to be loaded at startup (into VRAM or CPU RAM depending on mode)
- **Unloaded**: Model not loaded; will be loaded on first request if available
### Request Queue & Smart Reordering
1. Request arrives for model X
2. If model X already in VRAM → serve immediately
3. If model X in CPU RAM → move to VRAM (evict current if needed)
4. If model X unloaded → load from disk
5. **Smart reorder**: Queue grouped by model state:
- Requests for currently loaded models served first (preserve order within group)
- Then requests for CPU RAM resident models (FIFO)
- Finally requests for unloaded models (FIFO)
6. **Starvation prevention**: If a model hasn't been served in N requests, boost its priority
### Model Lifecycle
```
Startup:
└─> Load models in "loaded" list (respecting load_mode)
└─> Pre-load "preload" models (into CPU RAM if loadswap)
Runtime:
└─> On API request: check queue → load/swap if needed → serve request
└─> Queue management: group by model availability, preserve FIFO within groups
└─> Periodic cleanup: keep only "loaded" count of models in VRAM
```
---
## 5. Database & Persistence
All data persisted to JSON files in config directory:
| File | Purpose |
|------|---------|
| `config.json` | Server and backend settings |
| `models.json` | Model registry, aliases, per-model config |
| `auth.json` | Users, tokens, active sessions |
| `cache.db` (optional) | Model download cache metadata (existing system) |
---
## 6. API Changes
### Token-Based Authentication
All API endpoints require a bearer token:
```
Authorization: Bearer sk-coderai-<32hex>
```
Tokens validated against `auth.json` tokens list.
### New Admin API Endpoints (FastAPI)
- `GET /admin/api/models` - list all models
- `POST /admin/api/models/download` - download from HuggingFace
- `POST /admin/api/models/remove` - delete local model
- `POST /admin/api/models/configure` - update model settings
- `GET /admin/api/tokens` - list tokens
- `POST /admin/api/tokens` - create token
- `DELETE /admin/api/tokens/{id}` - revoke token
- `GET /admin/api/users` - list users
- `POST /admin/api/users` - create user
- `PUT /admin/api/users/{id}` - update user
- `DELETE /admin/api/users/{id}` - delete user
- `POST /admin/api/system/reload` - reload config without restart
- `GET /admin/api/system/status` - system health
### WebSocket for Real-time Updates
- `/ws/admin` - admin dashboard live updates (requests, model status, VRAM)
- `/ws/chat` - chat streaming (SSE compatible)
---
## 7. Security Considerations
- Session cookies: `HttpOnly`, `Secure` (if HTTPS), `SameSite=strict`
- CSRF tokens for all POST/PUT/DELETE admin forms
- Passwords: Argon2id with salt
- Token generation: cryptographically secure random (32+ bytes)
- Rate limiting: admin endpoints (10 req/s), API (100 req/s per token)
- Input validation: model IDs, file paths sanitized
- File serving: restrict to config directory, no path traversal
---
## 8. Implementation Phases
### Phase 1: Configuration Foundation
1. Refactor `cli.py` → only `--debug` and `--config`
2. Create `ConfigManager` class (load/save/validate JSON)
3. Migrate all CLI defaults to `config.json`
4. Auto-create default configs on first run
5. Update `main.py` to read from config
### Phase 2: Admin Dashboard (FastAPI + Jinja2)
1. Create `admin/` package structure:
- `admin/routes.py` - admin page routes
- `admin/models.py` - model management logic
- `admin/users.py` - user/API token logic
- `admin/dashboard.py` - overview stats
- `admin/templates/` - Jinja2 templates
- `admin/static/` - CSS, JS, images
2. Implement authentication middleware
3. Build login page + session management
4. Build overview page with stats
5. Build models page (list, card grid)
### Phase 3: Models CRUD & Search
1. Integrate `codai/models/cache.py` for download/list
2. Build HuggingFace search API integration
3. Create download/remove model forms
4. Model configuration form (backend, context, GPU layers, quantization)
5. Implement model aliases system
6. Model status polling (WebSocket)
### Phase 4: Users & Tokens
1. User CRUD with Argon2 password hashing
2. Token generation (random secure, `sk-coderai-*` prefix)
3. Token usage tracking (last_used timestamp)
4. Session management (store in auth.json)
5. First-run setup wizard (force password change)
### Phase 5: Chat Interface
1. Chat page template (similar to OpenAI ChatGPT UI)
2. Model selector dropdown
3. Chat history (localStorage)
4. Streaming response handling (SSE)
5. Export/conversation management
### Phase 6: Model Loading & Queue
1. Refactor `MultiModelManager` to respect config (loaded/preload/unloaded)
2. Implement smart request queue with same-model clustering
3. WebSocket updates for model status
4. Graceful degradation (fallback models)
5. Cache management (auto-clean old models if disk full)
### Phase 7: Polish & Testing
1. Dark theme CSS polish
2. Error pages and handling
3. Responsive design
4. Accessibility (ARIA labels, keyboard navigation)
5. Integration tests for API endpoints
6. Load testing with multiple models
---
## 9. Web Interface Pages (Jinja2 Templates)
### Base Layout
- Dark sidebar navigation
- Top bar: server status, user menu, logout
- Main content area (responsive)
### Login Page
```
+-------------------------------------------+
| CoderAI Admin |
| [Logo] |
| |
| Username: [________] |
| Password: [________] |
| |
| [Login] |
| |
| Default: admin / admin |
+-------------------------------------------+
```
### Overview Dashboard
```
+---------------------------------------------------+
| Models | Tokens | Users | Chat [Reload] [Logout]|
+---------------------------------------------------+
| System Status | Active Models |
| - Backend: NVIDIA | - phi-3 (VRAM) |
| - GPU: RTX 4090 24GB | - Llama-2 (CPU RAM) |
| - Uptime: 3d 12h | [Manage Models] |
| | |
| Request Stats | VRAM Usage |
| - Total: 12,453 | [██████████░░░░] 68% |
| - Queued: 3 | |
| - Last hour: 234 | System Health: OK |
+---------------------------------------------------+
| Recent Activity (table) |
+---------------------------------------------------+
```
### Models Page
```
+---------------------------------------------------+
| [Local Models] [Download] [Config] [Search] |
+---------------------------------------------------+
| Local Models: |
| [ ] phi-3-mini.q4.gguf 3.2GB VRAM [Load] |
| [x] Llama-2-7B.Q4_K_M.gguf 4.1GB CPU [Load] |
| [ ] mistral-7b.gguf 4.5GB Cached [Load] |
| |
| Download from HuggingFace: |
| Search: [_____________] [Filters▼] [Search] |
| Results: |
| - model1 (4.2GB, NVIDIA, MIT) [Download] |
| - model2 (3.8GB, Vulkan, Apache) [Download] |
+---------------------------------------------------+
```
### Chat Interface
```
+---------------------------------------------------+
| Models: [phi-3-mini ▼] New Chat History |
+---------------------------------------------------+
| Chat: |
| User: Explain transformers |
| AI: [streaming response...] |
| |
| [Input...] [Send] [Attach] |
+---------------------------------------------------+
```
---
## 10. Data Flow
### Startup Sequence
```
1. main.py: parse --debug, --config
2. ConfigManager.load() → loads config.json, models.json, auth.json
3. Auto-create defaults if missing
4. Initialize ModelManager with settings from config
5. Load models listed in "loaded" and "preload" (respecting load_mode)
6. Start FastAPI server with:
- Static file serving /templates
- Admin routes (with session auth)
- API routes (with token auth)
- WebSocket routes
7. Print startup info (backends, loaded models, URL)
```
### Request Handling
```
1. Request arrives at /v1/chat/completions
2. Auth middleware: check Bearer token or session
3. Extract model from request body
4. MultiModelManager.request_model(model):
- Check if model allowed in config
- Check if already loaded in VRAM → return
- Check if in CPU RAM → move to VRAM (evict if needed)
- If unloaded → load from disk
- Apply smart queue reordering
5. Pass to backend for inference
6. Stream/return response
```
### Admin Dashboard
```
1. User visits /admin → redirect to /login if not authenticated
2. POST /login → validate credentials → set session cookie
3. SPA-style navigation via sidebar (full page reloads, no JS framework)
4. Each admin page fetches data via FastAPI endpoints (JSON)
5. Forms POST to endpoints, redirect back with flash messages
6. WebSocket updates push live stats to dashboard
```
---
## 11. File Structure After Refactor
```
coderai/
├── codai/
│ ├── main.py # Entry point (simplified)
│ ├── cli.py # Only --debug, --config parsing
│ ├── config.py # NEW: ConfigManager class
│ ├── api/
│ │ ├── app.py # FastAPI app + routes
│ │ ├── state.py # Global state (reduced)
│ │ ├── text.py
│ │ ├── images.py
│ │ ├── transcriptions.py
│ │ └── tts.py
│ ├── models/
│ │ ├── manager.py # MultiModelManager (updated)
│ │ ├── cache.py # Model download/caching
│ │ ├── parser.py
│ │ └── backends/
│ │ ├── base.py
│ │ ├── nvidia.py
│ │ └── vulkan.py
│ ├── admin/ # NEW: Admin dashboard
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ ├── auth.py # Authentication, sessions, passwords
│ │ ├── models.py # Model CRUD, search, download
│ │ ├── tokens.py # API token management
│ │ ├── users.py # User management
│ │ ├── dashboard.py # Overview stats
│ │ ├── templates/
│ │ │ ├── base.html
│ │ │ ├── login.html
│ │ │ ├── dashboard.html
│ │ │ ├── models.html
│ │ │ ├── tokens.html
│ │ │ ├── users.html
│ │ │ └── chat.html
│ │ └── static/
│ │ ├── style.css # Dark theme
│ │ └── app.js
│ └── pydantic/
│ ├── textrequest.py
│ ├── imagerequest.py
│ └── transcriptionrequest.py
├── docs/
│ └── superpowers/
│ └── specs/
│ └── 2026-05-03-coderai-config-admin-dashboard-design.md ← THIS FILE
├── requirements.txt
├── README.md (updated)
└── AGENTS.md (updated)
```
---
## 12. Benefits
- **Simplified CLI**: Only 2 flags to remember
- **Centralized config**: All settings in one place, version-controllable
- **Visual management**: No need to edit CLI flags or restart manually
- **User management**: Multiple users with roles and tokens
- **Model discovery**: Built-in HuggingFace search
- **Runtime control**: Change settings via dashboard, reload without restart
- **History & monitoring**: See requests, errors, usage stats
- **Backup/restore**: Config files are portable
---
## 13. Backwards Compatibility
- Old CLI command-line will fail with helpful message
- Migration script can convert existing args → config file
- Existing model cache locations preserved
- API endpoints remain compatible (only auth added)
---
## 14. Risks & Mitigations
| Risk | Mitigation |
|------|------------|
| Significant rewrite, regression bugs | Comprehensive testing, phased rollout |
| Existing users lose configs | Provide migration tool, document manual migration |
| Security vulnerabilities (auth, tokens) | Use proven libraries (passlib, secrets), security audit |
| Web UI becomes maintenance burden | Keep it simple (Jinja2, no heavy JS framework) |
| Model loading complexity breaks | Maintain existing `MultiModelManager` logic, wrap in config layer |
---
## 15. Open Questions & Decisions Needed
1. **Should model search show ONLY GGUF models, or all HF models?**
→ Recommend: filter by GGUF for Vulkan, all for NVIDIA with format indicator
2. **Should admin be able to delete models from disk, or just unregister?**
→ Recommend: delete from cache directory with confirmation
3. **Should chat interface support advanced parameters (temp, top_p, etc)?**
→ Recommend: collapsible advanced panel in chat UI
4. **Should config support environment variable substitution?** (e.g., `${HOME}`)
→ Recommend: yes, for paths
5. **Should there be a "safe mode" if config.json is corrupt?**
→ Recommend: fall back to hardcoded minimal defaults, rebuild default config
6. **Should we keep command-line flag to bypass config entirely for debugging?**
→ Recommend: `--force-cli` flag (hidden/undocumented) for dev use
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment