Commit 81c39eb8 authored by Your Name's avatar Your Name

Refactor: Move backend and manager classes to codai modules

- Move NvidiaBackend to codai/backends/cuda.py
- Move VulkanBackend to codai/backends/vulkan.py
- Move ModelManager, WhisperServerManager, MultiModelManager to codai/models/manager.py
- Move QueueManager to codai/queue/manager.py
- Add proper exports in codai/backends/__init__.py
- Update imports in coderai to use new modules
- Fix import paths for base class and cache functions
parent 7c6b60f0
"""Backend detection module.""" """Backend detection and management module."""
from codai.backends.base import ModelBackend
from codai.backends.cuda import NvidiaBackend
from codai.backends.vulkan import VulkanBackend
def detect_available_backends(): def detect_available_backends():
......
...@@ -3,9 +3,11 @@ ...@@ -3,9 +3,11 @@
import os import os
from typing import Optional, List, Dict from typing import Optional, List, Dict
from threading import Thread from threading import Thread
from abc import ABC
# Import from codai modules
from codai.backends.base import ModelBackend from codai.backends.base import ModelBackend
from codai.models.capabilities import detect_model_capabilities, check_flash_attn_availability from codai.models.capabilities import detect_model_capabilities
from codai.pydantic.textrequest import ChatMessage from codai.pydantic.textrequest import ChatMessage
...@@ -22,7 +24,12 @@ class NvidiaBackend(ModelBackend): ...@@ -22,7 +24,12 @@ class NvidiaBackend(ModelBackend):
def check_flash_attn_support(self) -> None: def check_flash_attn_support(self) -> None:
"""Check and print Flash Attention availability status.""" """Check and print Flash Attention availability status."""
self.flash_attn_available = check_flash_attn_availability() try:
import flash_attn
self.flash_attn_available = True
except ImportError:
self.flash_attn_available = False
if self.use_flash_attn: if self.use_flash_attn:
if self.flash_attn_available: if self.flash_attn_available:
print("Flash Attention 2: Available and enabled") print("Flash Attention 2: Available and enabled")
...@@ -174,4 +181,379 @@ class NvidiaBackend(ModelBackend): ...@@ -174,4 +181,379 @@ class NvidiaBackend(ModelBackend):
print(f" Using sequential offload strategy") print(f" Using sequential offload strategy")
if is_moe: if is_moe:
return [0.80, 0.78, 0.76, 0.74, 0.72, 0.70, 0.68, 0.66, 0.64, 0.62, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35, 0.30, 0.25, 0.20, 0.0] return [0.80, 0.78, 0.76, 0.74, 0.72, 0.70, 0.68, 0.66, 0.64, 0.62, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35, 0.30, 0.25, 0.20, 0.0]
return [0.93, 0.91, 0.89, 0.87, 0.85, 0.83, 0.81, 0.79, 0.77, 0.75, 0.73, 0.71, 0.69, 0.67, 0.65, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35, 0. return [0.93, 0.91, 0.89, 0.87, 0.85, 0.83, 0.81, 0.79, 0.77, 0.75, 0.73, 0.71, 0.69, 0.67, 0.65, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35, 0.30, 0.20, 0.0]
else:
if total_vram_gb < 3:
print(f" Detected small GPU ({total_vram_gb:.1f}GB), using aggressive VRAM usage")
return [0.99, 0.95, 0.90, 0.85, 0.75, 0.65, 0.50, 0.35, 0.20, 0.0]
elif total_vram_gb <= 8:
print(f" Detected medium GPU ({total_vram_gb:.1f}GB), using high VRAM usage")
return [0.96, 0.90, 0.85, 0.75, 0.65, 0.50, 0.35, 0.20, 0.0]
else:
if is_moe:
print(f" Detected large GPU ({total_vram_gb:.1f}GB), using MoE-safe VRAM usage")
return [0.80, 0.75, 0.70, 0.65, 0.60, 0.50, 0.40, 0.30, 0.20, 0.0]
else:
print(f" Detected large GPU ({total_vram_gb:.1f}GB), using conservative VRAM usage")
return [0.93, 0.85, 0.75, 0.65, 0.50, 0.35, 0.20, 0.0]
def _get_vram_percentages_for_gpu(self, model_name: str = "", strategy: str = "auto", max_gpu_percent: float = None) -> list:
"""Get VRAM percentage steps based on GPU memory size."""
import torch
if not torch.cuda.is_available():
return [0.0]
if max_gpu_percent is not None:
max_pct = max(0.05, min(1.0, max_gpu_percent / 100.0))
print(f" Using custom max GPU percent: {max_pct*100:.0f}%")
steps = []
current = max_pct
while current > 0.05:
steps.append(current)
if current > 0.3:
current -= 0.05
elif current > 0.15:
current -= 0.03
else:
current -= 0.02
steps.append(0.0)
return steps
total_vram_gb = 0
for i in range(torch.cuda.device_count()):
props = torch.cuda.get_device_properties(i)
total_vram_gb += props.total_memory / 1e9
is_moe = self._is_moe_model(model_name)
if is_moe:
print(f" Detected MoE model, using extra conservative VRAM limits")
return self._get_vram_percentages_for_strategy(strategy, is_moe, total_vram_gb)
def load_model(self, model_name: str, **kwargs) -> None:
"""Load the model using HuggingFace Transformers with automatic OOM handling."""
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
offload_dir = kwargs.get('offload_dir')
load_in_4bit = kwargs.get('load_in_4bit', False)
load_in_8bit = kwargs.get('load_in_8bit', False)
manual_ram_gb = kwargs.get('manual_ram_gb')
flash_attn = kwargs.get('flash_attn', False)
offload_strategy = kwargs.get('offload_strategy', 'auto')
max_gpu_percent = kwargs.get('max_gpu_percent', None)
self._pending_ram_gb = manual_ram_gb
print(f"Loading HuggingFace model: {model_name}")
self.use_flash_attn = flash_attn
self.check_flash_attn_support()
self.device = self._detect_device()
self.tokenizer = AutoTokenizer.from_pretrained(
model_name,
trust_remote_code=True,
padding_side="left"
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
load_kwargs = {'trust_remote_code': True}
if load_in_4bit or load_in_8bit:
if 'qwen3.5' in model_name.lower() and ('a3b' in model_name.lower() or 'moe' in model_name.lower()):
print(f"Warning: {model_name} does not support bitsandbytes quantization")
else:
try:
import bitsandbytes as bnb
print(f"Using {4 if load_in_4bit else 8}-bit quantization")
load_kwargs['load_in_4bit'] = load_in_4bit
load_kwargs['load_in_8bit'] = load_in_8bit
except ImportError:
print("Warning: bitsandbytes not installed. Quantization disabled.")
if self.device == "cuda":
load_kwargs['torch_dtype'] = torch.float16
else:
load_kwargs['torch_dtype'] = torch.float32
if offload_dir:
os.makedirs(offload_dir, exist_ok=True)
load_kwargs['offload_folder'] = offload_dir
if self.use_flash_attn and self.flash_attn_available:
load_kwargs['attn_implementation'] = "flash_attention_2"
print("Using Flash Attention 2")
model = None
vram_percentages = self._get_vram_percentages_for_gpu(model_name, offload_strategy, max_gpu_percent)
first_vram_pct = vram_percentages[0] if vram_percentages else 0.93
for vram_pct in vram_percentages:
if self.device != "cuda":
load_kwargs['device_map'] = None
print("Loading model in CPU-only mode...")
model = self._try_load_model(model_name, load_kwargs, self.device)
if model is not None:
break
if vram_pct > 0:
max_memory = self._get_gpu_memory_map_with_limit(vram_pct)
load_kwargs['max_memory'] = max_memory
load_kwargs['device_map'] = 'auto'
print(f"\nTrying with GPU limit: {vram_pct*100:.0f}% VRAM")
model = self._try_load_model(model_name, load_kwargs, self.device)
if model is not None:
print(f" ✓ Model loaded successfully with {vram_pct*100:.0f}% GPU VRAM limit")
if vram_pct < first_vram_pct:
print(f" (Reduced from {first_vram_pct*100:.0f}% due to memory constraints)")
break
else:
print(f" ✗ Out of memory with {vram_pct*100:.0f}% GPU VRAM, trying lower limit...")
if torch.cuda.is_available():
torch.cuda.empty_cache()
else:
print("\nFalling back to CPU-only mode...")
load_kwargs['max_memory'] = {0: 0, 'cpu': int((manual_ram_gb or 48) * 1e9)}
load_kwargs['device_map'] = 'auto'
model = self._try_load_model(model_name, load_kwargs, "cpu")
if model is not None:
print(" ✓ Model loaded successfully on CPU")
break
if model is None:
raise RuntimeError("Failed to load model: Out of memory even with minimum GPU usage")
self.model = model
self.model.eval()
self.model_name = model_name
print(f"\nModel loaded successfully")
print(f"Model device: {next(self.model.parameters()).device}")
caps = detect_model_capabilities(model_name)
print(f"Model capabilities: {caps}")
def _get_gpu_memory_map_with_limit(self, vram_fraction: float) -> Dict:
"""Get max_memory dict with specified VRAM fraction limit."""
import torch
max_memory = {}
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
props = torch.cuda.get_device_properties(i)
total_vram = props.total_memory
usable_vram = int(total_vram * vram_fraction)
max_memory[i] = usable_vram
manual_ram_gb = getattr(self, '_pending_ram_gb', None)
if manual_ram_gb:
max_memory['cpu'] = int(manual_ram_gb * 1e9)
else:
import psutil
available_ram = psutil.virtual_memory().available
usable_ram = max(0, available_ram - int(4e9))
max_memory['cpu'] = usable_ram
return max_memory
def format_messages(self, messages: List[ChatMessage]) -> str:
"""Format messages into a prompt string."""
formatted = []
for msg in messages:
if msg.role == "system":
formatted.append(f"System: {msg.content}")
elif msg.role == "user":
formatted.append(f"User: {msg.content}")
elif msg.role == "assistant":
content = msg.content or ""
if msg.tool_calls:
for tc in msg.tool_calls:
if tc.get("function"):
func = tc["function"]
content += f'\n<tool>{{"name": "{func.get("name", "")}", "arguments": {func.get("arguments", "{}")}}}</tool>'
formatted.append(f"Assistant: {content}")
elif msg.role == "tool":
formatted.append(f"Tool ({msg.name}): {msg.content}")
formatted.append("Assistant:")
return "\n\n".join(formatted)
def _validate_params(self, temperature: float, top_p: float):
"""Validate generation parameters."""
if temperature <= 0:
temperature = 1.0
do_sample = False
else:
temperature = max(0.01, min(temperature, 2.0))
do_sample = True
top_p = max(0.0, min(top_p, 1.0))
return temperature, top_p, do_sample
def generate(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[List[str]] = None) -> str:
"""Generate text non-streaming."""
import torch
from transformers import LogitsProcessor, LogitsProcessorList
class InvalidLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids, scores):
scores = torch.where(torch.isnan(scores), torch.tensor(-1e9, dtype=scores.dtype, device=scores.device), scores)
scores = torch.where(torch.isinf(scores), torch.tensor(1e9, dtype=scores.dtype, device=scores.device), scores)
return scores
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True)
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
if max_tokens is None:
max_tokens = 512
temperature, top_p, do_sample = self._validate_params(temperature, top_p)
try:
with torch.no_grad():
outputs = self.model.generate(
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
max_new_tokens=max_tokens,
temperature=temperature if do_sample else None,
top_p=top_p if do_sample else None,
do_sample=do_sample,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
logits_processor=LogitsProcessorList([InvalidLogitsProcessor()]),
)
generated_tokens = outputs[0][inputs["input_ids"].shape[1]:]
return self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
except (RuntimeError, torch.cuda.OutOfMemoryError) as e:
error_msg = str(e).lower()
if "out of memory" in error_msg or "cuda" in error_msg or "oom" in error_msg:
print(f"Warning: CUDA OOM during generation. Clearing cache and retrying...")
torch.cuda.empty_cache()
try:
with torch.no_grad():
outputs = self.model.generate(
input_ids=inputs["input_ids"],
attention_mask=inputs["attention_mask"],
max_new_tokens=max(1, max_tokens // 2),
temperature=temperature if do_sample else None,
top_p=top_p if do_sample else None,
do_sample=do_sample,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
logits_processor=LogitsProcessorList([InvalidLogitsProcessor()]),
)
generated_tokens = outputs[0][inputs["input_ids"].shape[1]:]
return self.tokenizer.decode(generated_tokens, skip_special_tokens=True)
except Exception as e2:
print(f"Error: Generation failed: {e2}")
return "[Error: Out of memory during generation]"
raise
async def generate_stream(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[List[str]] = None):
"""Generate text in streaming fashion."""
import torch
from transformers import TextIteratorStreamer, LogitsProcessor, LogitsProcessorList, StoppingCriteria, StoppingCriteriaList
class InvalidLogitsProcessor(LogitsProcessor):
def __call__(self, input_ids, scores):
scores = torch.where(torch.isnan(scores), torch.tensor(-1e9, dtype=scores.dtype, device=scores.device), scores)
scores = torch.where(torch.isinf(scores), torch.tensor(1e9, dtype=scores.dtype, device=scores.device), scores)
return scores
inputs = self.tokenizer(prompt, return_tensors="pt", padding=True)
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
if max_tokens is None:
max_tokens = 512
temperature, top_p, do_sample = self._validate_params(temperature, top_p)
streamer = TextIteratorStreamer(
self.tokenizer,
skip_prompt=True,
skip_special_tokens=True,
)
generation_kwargs = {
"input_ids": inputs["input_ids"],
"attention_mask": inputs["attention_mask"],
"max_new_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"do_sample": do_sample,
"streamer": streamer,
"pad_token_id": self.tokenizer.pad_token_id,
"eos_token_id": self.tokenizer.eos_token_id,
"logits_processor": LogitsProcessorList([InvalidLogitsProcessor()]),
}
if stop:
class StopOnSequence(StoppingCriteria):
def __init__(self, stop_sequences, tokenizer):
self.stop_sequences = stop_sequences
self.tokenizer = tokenizer
def __call__(self, input_ids, scores, **kwargs):
decoded = self.tokenizer.decode(input_ids[0][-20:], skip_special_tokens=True)
return any(seq in decoded for seq in self.stop_sequences)
generation_kwargs["stopping_criteria"] = StoppingCriteriaList([
StopOnSequence(stop, self.tokenizer)
])
generation_error = None
def generate_with_error_handling():
nonlocal generation_error
try:
self.model.generate(**generation_kwargs)
except (RuntimeError, torch.cuda.OutOfMemoryError) as e:
error_msg = str(e).lower()
if "out of memory" in error_msg or "cuda" in error_msg or "oom" in error_msg:
generation_error = "oom"
print(f"Warning: CUDA OOM during streaming generation...")
torch.cuda.empty_cache()
else:
generation_error = str(e)
thread = Thread(target=generate_with_error_handling)
thread.start()
try:
for text in streamer:
yield text
except Exception as e:
print(f"Error during stream iteration: {e}")
thread.join()
if generation_error == "oom":
yield "\n[Warning: Generation stopped due to out-of-memory.]"
elif generation_error:
yield f"\n[Error during generation: {generation_error}]"
def get_model_name(self) -> str:
return self.model_name or "unknown"
def cleanup(self) -> None:
import torch
if self.model is not None:
del self.model
del self.tokenizer
self.model = None
self.tokenizer = None
if torch.cuda.is_available():
torch.cuda.empty_cache()
"""Vulkan backend using llama.cpp.""" # AI.PROMPT: Add Vulkan backend support for AMD GPUs using llama-cpp-python
# This backend handles GGUF models on AMD GPUs via Vulkan
from typing import Optional, List, Dict import os
import json
from typing import AsyncIterator, Optional, Union, List, Dict, Any
from pathlib import Path
from codai.backends.base import ModelBackend from codai.backends.base import ModelBackend
from codai.models.utils import (
check_hf_chat_template,
get_model_family,
get_reasoning_stop_tokens
)
from codai.models.cache import get_cached_model_path
try:
from llama_cpp import Llama
from llama_cpp.llama_chat_format import ChatFormatterResponse
LLAMA_CPP_AVAILABLE = True
except ImportError:
LLAMA_CPP_AVAILABLE = False
Llama = None
ChatFormatterResponse = None
class VulkanBackend(ModelBackend): class VulkanBackend(ModelBackend):
"""Backend for Vulkan GPU inference using llama.cpp.""" """Backend for Vulkan (AMD GPUs) using llama-cpp-python with GGUF models."""
def __init__(self, original_backend: str = None): def __init__(self, original_backend: str = None):
self.model = None self.model = None
self.model_name = None self.model_name = None
self.device = None self.n_gpu_layers = -1 # Offload all layers to GPU by default
self.n_gpu_layers = -1
self.n_ctx = 2048 self.n_ctx = 2048
self.verbose = True self.verbose = True
self.main_gpu = 0 self.main_gpu = 0 # Default to first GPU
self.chat_template = None self.chat_template = None # Detected chat template name
self.hf_tokenizer = None self.hf_tokenizer = None # HuggingFace tokenizer for apply_chat_template
self.force_cuda = original_backend in ("nvidia", "cuda") self.force_cuda = original_backend in ("nvidia", "cuda") # Force CUDA if original was nvidia
if self.force_cuda: if self.force_cuda:
print("DEBUG: GGUF model will use CUDA backend (forced by --backend nvidia)") print("DEBUG: GGUF model will use CUDA backend (forced by --backend nvidia)")
self._detect_chat_template()
def _detect_chat_template(self):
"""Detect the chat template used by the model."""
try:
# Try to get the chat template from the model
# llama.cpp models have a chat_template attribute
from llama_cpp.llama_chat_format import ChatFormatterResponse
# We'll detect it when the model is loaded
self.chat_template = "unknown"
print("DEBUG: Chat template detection will happen after model load")
except Exception as e:
print(f"DEBUG: Could not initialize chat template detection: {e}")
self.chat_template = None
def _load_huggingface_tokenizer(self, template_name: str = None):
"""Load HuggingFace tokenizer for apply_chat_template support.
Args:
template_name: Optional specific template to use (e.g., 'llama3', 'chatml').
If None, will auto-detect from tokenizer.
"""
if self.hf_tokenizer is not None:
return # Already loaded
model_path = getattr(self, 'model_name', None)
if not model_path:
print("DEBUG: No model name available for HuggingFace tokenizer")
return
# If model_path is a URL, try to get the cached local path first
if model_path.startswith('http://') or model_path.startswith('https://'):
cached_path = get_cached_model_path(model_path)
if cached_path and os.path.exists(cached_path):
model_path = cached_path
print(f"DEBUG: Using cached model path for HF tokenizer: {model_path}")
try:
from transformers import AutoTokenizer
# If a specific template is provided, we can use it directly without loading tokenizer
if template_name:
self.chat_template = template_name
print(f"DEBUG: Using specified chat template: {template_name}")
# Still need to load tokenizer to get the actual template
# but we can use the specified template name
# Try to determine the model identifier
# If model_path is a GGUF file, try to find the corresponding HF model
if model_path.endswith('.gguf'):
# Try to extract model name from path
# Common patterns: .../models/llama-3.1-8b-instruct-q4_k_m.gguf
model_dir = os.path.dirname(model_path)
model_file = os.path.basename(model_path)
# Try to find a tokenizer config in the model directory
tokenizer_config_path = os.path.join(model_dir, 'tokenizer_config.json')
if os.path.exists(tokenizer_config_path):
# Load from local directory
self.hf_tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
print(f"DEBUG: Loaded HuggingFace tokenizer from local: {model_dir}")
if not template_name:
self.chat_template = "hf_local"
return
# Try to infer model name from file name
# Common patterns: llama-3.1-8b-instruct-q4_k_m.gguf -> llama-3.1-8b-instruct
# Also handle cached files with hash prefix: hash_modelname.gguf -> modelname
model_base = model_file.replace('.gguf', '')
# Remove hash prefix (64 hex chars for SHA-256 followed by underscore)
if len(model_base) > 64 and model_base[:64].isalnum():
model_base = model_base[65:] # Skip hash + underscore
# Remove common quantization suffixes (case-insensitive)
for suffix in ['_q4_k_m', '_q4_k', '_q5_k', '_q5_k_m', '_q8_0', '_f16', '_q4_0', '_q3_k_m', '_q2_k', '_Q4_K_M', '_Q4_K', '_Q5_K', '_Q5_K_M', '_Q8_0', '_F16', '_Q4_0', '_Q3_K_M', '_Q2_K']:
model_base = model_base.replace(suffix, '')
# Try to load from HuggingFace hub
# First try the cleaned model_base
model_names_to_try = [model_base]
# Generate shorter versions of the model name for fallback
# E.g., Qwen3.5-27B-Uncensored-HauhauCS-Aggressive -> try shorter variants
parts = model_base.split('-')
if len(parts) > 1:
# Try progressively shorter names by removing parts from the end
for i in range(len(parts) - 1, 0, -1):
shorter_name = '-'.join(parts[:i])
if shorter_name and shorter_name != model_base:
model_names_to_try.append(shorter_name)
# Also try with just the first part (e.g., "Qwen" from "Qwen3.5-27B...")
if len(parts) > 1:
model_names_to_try.append(parts[0])
tokenizer_loaded = False
last_error = None
for model_id in model_names_to_try:
try:
self.hf_tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
print(f"DEBUG: Loaded HuggingFace tokenizer from hub: {model_id}")
if not template_name:
self.chat_template = "hf_hub"
tokenizer_loaded = True
break
except Exception as fallback_err:
last_error = fallback_err
print(f"DEBUG: Could not load tokenizer from hub ({model_id}): {fallback_err}")
continue
if tokenizer_loaded:
return
# If HF tokenizer loading failed, try to use known template names based on model name
# This helps when we can't find the tokenizer but know the model family
model_base_lower = model_base.lower()
# Check if this looks like a Qwen model
known_templates_to_try = []
if 'qwen' in model_base_lower:
# Try known Qwen template names in order of specificity
if 'qwen3.5' in model_base_lower or 'qwen3' in model_base_lower:
known_templates_to_try = ['qwen3', 'qwen', None] # None means use manual formatting
elif 'qwen2' in model_base_lower:
known_templates_to_try = ['qwen2', 'qwen', None]
else:
known_templates_to_try = ['qwen', None]
elif 'llama' in model_base_lower:
known_templates_to_try = ['llama3', 'llama', None]
elif 'phi' in model_base_lower:
known_templates_to_try = ['phi', None]
elif 'mistral' in model_base_lower or 'mixtral' in model_base_lower:
known_templates_to_try = ['mistral', None]
# Try each known template - directly use the template name without loading tokenizer
# This is the key fix: instead of trying to load more non-existent tokenizers,
# directly set the chat_template to the known template name
for template_name in known_templates_to_try:
if template_name is None:
# No more templates to try, use manual formatting with generic format
self.chat_template = "chatml" # Use ChatML as generic fallback
print(f"DEBUG: No known templates worked, using generic ChatML format")
break
# Directly use this known template name - no need to load tokenizer
# The manual formatting will use <|im_start|> tags which work for most models
self.chat_template = template_name
print(f"DEBUG: Using known template '{template_name}' for model family detection")
# Successfully set template - don't try to load tokenizer
break
if self.chat_template:
return
# All attempts failed - warn but continue without template
print(f"Warning: Could not load HuggingFace tokenizer for any variant of '{model_base}'")
print(f"Warning: Will not use apply_chat_template - model will use manual formatting")
self.chat_template = None
else:
# Not a GGUF file, try to load directly
self.hf_tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
print(f"DEBUG: Loaded HuggingFace tokenizer from: {model_path}")
if not template_name:
self.chat_template = "hf"
return
except ImportError as e:
print(f"DEBUG: transformers not installed, cannot use HuggingFace chat template: {e}")
self.chat_template = None
except Exception as e:
print(f"DEBUG: Failed to load HuggingFace tokenizer: {e}")
self.chat_template = None
def _finalize_chat_template_detection(self):
"""Finalize chat template detection after model is loaded."""
# Check if we should use HuggingFace tokenizer for chat template
# Try to get model info
model_name = getattr(self, 'model_name', None) or "unknown"
# Determine model type - text models use GGUF, images would be different
model_type = "text"
if model_name.startswith("image:"):
model_type = "image"
should_use, template_name = check_hf_chat_template(model_type, model_name)
# If the model is a text model, try to load the HuggingFace tokenizer
# for apply_chat_template support
if model_type == "text" and not self.hf_tokenizer:
self._load_huggingface_tokenizer(template_name)
def _apply_chat_template(self, messages: List[Dict[str, str]], add_generation_prompt: bool = True) -> str:
"""Apply chat template to messages.
Tries multiple methods in order:
1. HuggingFace tokenizer's apply_chat_template
2. Manual template application based on detected template name
3. Generic ChatML format as fallback
"""
# First try HuggingFace tokenizer
if self.hf_tokenizer:
try:
# Check if tokenizer has apply_chat_template
if hasattr(self.hf_tokenizer, 'apply_chat_template'):
# Use the tokenizer's chat template
# For ChatML/Others: add_generation_prompt=True adds the assistant token
# For Llama3: adds the correct prefix
rendered = self.hf_tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=add_generation_prompt
)
print(f"DEBUG: Applied HuggingFace chat template")
return rendered
except Exception as e:
print(f"DEBUG: HuggingFace apply_chat_template failed: {e}")
# Try llama.cpp's built-in chat template
if hasattr(self, 'model') and self.model is not None:
try:
if hasattr(self.model, 'chat'):
# Get the chat template from llama.cpp
# This works for models that have chat templates defined
from llama_cpp.llama_chat_format import ChatCompletionMessage
# Try to use the model's chat handler
# llama.cpp's create_chat_completion handles templates internally
# But we need raw text for streaming
pass # Fall through to manual
except Exception as e:
print(f"DEBUG: llama.cpp chat handling failed: {e}")
# Manual template application based on detected template
template_name = self.chat_template or "chatml"
# Format messages based on template name
if template_name in ("llama3", "llama-3", "llama-3.1"):
return self._format_llama3(messages, add_generation_prompt)
elif template_name in ("qwen2", "qwen2.5", "qwen3"):
return self._format_qwen(messages, add_generation_prompt)
elif template_name == "chatml":
return self._format_chatml(messages, add_generation_prompt)
elif template_name in ("mistral", "mixtral"):
return self._format_mistral(messages, add_generation_prompt)
else:
# Default to ChatML format
return self._format_chatml(messages, add_generation_prompt)
def _format_llama3(self, messages: List[Dict[str, str]], add_generation_prompt: bool = True) -> str:
"""Format messages for Llama 3.x models."""
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
result.append(f"<|start_header_id|>system<|end_header_id|>\n\n{content}<|eot_id|>")
elif role == "user":
result.append(f"<|start_header_id|>user<|end_header_id|>\n\n{content}<|eot_id|>")
elif role == "assistant":
result.append(f"<|start_header_id|>assistant<|end_header_id|>\n\n{content}<|eot_id|>")
if add_generation_prompt:
result.append(f"<|start_header_id|>assistant<|end_header_id|>\n\n")
return "".join(result)
def _format_qwen(self, messages: List[Dict[str, str]], add_generation_prompt: bool = True) -> str:
"""Format messages for Qwen models."""
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
result.append(f"<|im_start|>system\n{content}<|im_end|>")
elif role == "user":
result.append(f"<|im_start|>user\n{content}<|im_end|>")
elif role == "assistant":
result.append(f"<|im_start|>assistant\n{content}<|im_end|>")
if add_generation_prompt:
result.append(f"<|im_start|>assistant\n")
return "".join(result)
def _format_chatml(self, messages: List[Dict[str, str]], add_generation_prompt: bool = True) -> str:
"""Format messages using ChatML format."""
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
result.append(f"<|im_start|>system\n{content}<|im_end|>")
elif role == "user":
result.append(f"<|im_start|>user\n{content}<|im_end|>")
elif role == "assistant":
result.append(f"<|im_start|>assistant\n{content}<|im_end|>")
if add_generation_prompt:
result.append(f"<|im_start|>assistant\n")
return "".join(result)
def _format_mistral(self, messages: List[Dict[str, str]], add_generation_prompt: bool = True) -> str:
"""Format messages for Mistral/Mixtral models."""
result = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
result.append(f"[INST] <<SYS>>\n{content}\n<</SYS>>\n\n")
elif role == "user":
result.append(f"[INST]{content}[/INST]")
elif role == "assistant":
result.append(f"{content}")
# Mistral uses a different format - the last user message has [/INST]
# and we add the assistant prefix if generation prompt is needed
if add_generation_prompt:
# Find the last user message and add the assistant prefix after it
# Actually, let's reformat more carefully
pass # Fall through to simpler format
# Simpler approach: join with newlines
formatted = ""
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
formatted += f"<<SYS>>\n{content}\n<</SYS>>\n\n"
elif role == "user":
formatted += f"[INST] {content} [/INST]"
elif role == "assistant":
formatted += f" {content}"
if add_generation_prompt:
formatted += " "
return formatted
async def load_model(self, model_path: str, model_type: str = "text", **kwargs) -> bool:
"""Load a GGUF model.
Args:
model_path: Path to the GGUF model file or HuggingFace model ID
model_type: Type of model (text, image, audio)
**kwargs: Additional parameters
Returns:
True if model loaded successfully
"""
if not LLAMA_CPP_AVAILABLE:
raise ImportError("llama-cpp-python is required for GGUF models. Install with: pip install llama-cpp-python")
# If it's a HuggingFace model ID, try to download
if not model_path.endswith('.gguf') and not os.path.exists(model_path):
# Try to get from HuggingFace
try:
from huggingface_hub import hf_hub_download
# Download the GGUF file
model_path = hf_hub_download(repo_id=model_path, filename="*.gguf", cache_dir=kwargs.get('cache_dir'))
except Exception as e:
print(f"Warning: Could not download from HuggingFace: {e}")
# Try as-is
# If it's a URL, download it
if model_path.startswith('http://') or model_path.startswith('https://'):
cached_path = get_cached_model_path(model_path)
if cached_path:
model_path = cached_path
else:
raise ValueError(f"Could not cache model from URL: {model_path}")
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
self.model_name = model_path
# Determine model type
is_image = model_type == "image" or model_path.startswith("image:")
# Configure GPU layers
n_gpu_layers = kwargs.get('n_gpu_layers', -1)
if n_gpu_layers != -1:
self.n_gpu_layers = n_gpu_layers
# Configure context size
n_ctx = kwargs.get('n_ctx', 2048)
self.n_ctx = n_ctx
# Set verbose
self.verbose = kwargs.get('verbose', True)
# Set main GPU
self.main_gpu = kwargs.get('main_gpu', 0)
# Build kwargs for Llama constructor
llama_kwargs = {
'model_path': model_path,
'n_gpu_layers': self.n_gpu_layers,
'n_ctx': self.n_ctx,
'verbose': self.verbose,
'main_gpu': self.main_gpu,
}
# Add optional parameters
if 'n_threads' in kwargs:
llama_kwargs['n_threads'] = kwargs['n_threads']
if 'n_threads_batch' in kwargs:
llama_kwargs['n_threads_batch'] = kwargs['n_threads_batch']
if 'rope_freq_base' in kwargs:
llama_kwargs['rope_freq_base'] = kwargs['rope_freq_base']
if 'rope_freq_scale' in kwargs:
llama_kwargs['rope_freq_scale'] = kwargs['rope_freq_scale']
# Force CUDA if requested
if self.force_cuda:
# Set environment variable to force CUDA
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
# llama-cpp-python will use CUDA when available
try:
self.model = Llama(**llama_kwargs)
# Try to detect and set up chat template
self._finalize_chat_template_detection()
print(f"DEBUG: VulkanBackend loaded model: {model_path}")
print(f"DEBUG: n_gpu_layers={self.n_gpu_layers}, n_ctx={self.n_ctx}")
print(f"DEBUG: chat_template={self.chat_template}")
return True
except Exception as e:
print(f"Error loading GGUF model: {e}")
raise
async def generate(
self,
prompt: str,
**kwargs
) -> str:
"""Generate text from prompt.
Args:
prompt: Input prompt (or messages for chat)
**kwargs: Generation parameters
Returns:
Generated text
"""
if self.model is None:
raise RuntimeError("Model not loaded")
# Check if prompt looks like messages (list of dicts)
if isinstance(prompt, list):
prompt = self._apply_chat_template(prompt, add_generation_prompt=True)
# Set defaults
max_tokens = kwargs.get('max_tokens', 256)
temperature = kwargs.get('temperature', 0.7)
top_p = kwargs.get('top_p', 0.9)
top_k = kwargs.get('top_k', 40)
repeat_penalty = kwargs.get('repeat_penalty', 1.1)
stream = kwargs.get('stream', False)
# Get stop strings
stop = kwargs.get('stop', None)
if stop is None:
# Get default stop tokens based on template
stop = get_reasoning_stop_tokens(self.chat_template)
try:
if stream:
# Collect all chunks
chunks = []
async for chunk in self.generate_stream(prompt, **kwargs):
chunks.append(chunk)
return "".join(chunks)
else:
result = self.model.create_completion(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repeat_penalty=repeat_penalty,
stop=stop,
)
return result['choices'][0]['text']
except Exception as e:
print(f"Error during generation: {e}")
raise
async def generate_stream(
self,
prompt: str,
**kwargs
) -> AsyncIterator[str]:
"""Generate text with streaming.
Args:
prompt: Input prompt (or messages for chat)
**kwargs: Generation parameters
Yields:
Generated text chunks
"""
if self.model is None:
raise RuntimeError("Model not loaded")
# Check if prompt looks like messages (list of dicts)
if isinstance(prompt, list):
prompt = self._apply_chat_template(prompt, add_generation_prompt=True)
# Set defaults
max_tokens = kwargs.get('max_tokens', 256)
temperature = kwargs.get('temperature', 0.7)
top_p = kwargs.get('top_p', 0.9)
top_k = kwargs.get('top_k', 40)
repeat_penalty = kwargs.get('repeat_penalty', 1.1)
# Get stop strings
stop = kwargs.get('stop', None)
if stop is None:
# Get default stop tokens based on template
stop = get_reasoning_stop_tokens(self.chat_template)
try:
# For chat, we need to extract just the new text from each chunk
# The first chunk will have the full prompt + first token
# Subsequent chunks only have new tokens
first_chunk = True
prompt_len = len(prompt) if isinstance(prompt, str) else 0
for chunk in self.model.create_completion(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
top_k=top_k,
repeat_penalty=repeat_penalty,
stop=stop,
stream=True,
):
text = chunk['choices'][0].get('text', '')
if first_chunk:
# Skip the prompt text on first chunk
# The first chunk includes the full prompt plus the first new token
if text and len(text) > prompt_len:
# Extract just the new part
new_text = text[prompt_len:]
if new_text:
yield new_text
first_chunk = False
else:
# Subsequent chunks only have new tokens
if text:
yield text
# Check for stop
if chunk['choices'][0].get('finish_reason'):
break
except Exception as e:
print(f"Error during streaming generation: {e}")
raise
async def chat(
self,
messages: List[Dict[str, str]],
**kwargs
) -> Dict[str, Any]:
"""Chat completion.
Args:
messages: List of message dicts with 'role' and 'content'
**kwargs: Generation parameters
Returns:
Response dict with 'content' and metadata
"""
if self.model is None:
raise RuntimeError("Model not loaded")
# Apply chat template
prompt = self._apply_chat_template(messages, add_generation_prompt=True)
# Generate
max_tokens = kwargs.get('max_tokens', 256)
temperature = kwargs.get('temperature', 0.7)
top_p = kwargs.get('top_p', 0.9)
repeat_penalty = kwargs.get('repeat_penalty', 1.1)
stop = kwargs.get('stop', None)
if stop is None:
stop = get_reasoning_stop_tokens(self.chat_template)
stream = kwargs.get('stream', False)
if stream:
# Return iterator for streaming
async def generate_stream():
first_chunk = True
prompt_len = len(prompt)
for chunk in self.model.create_completion(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
repeat_penalty=repeat_penalty,
stop=stop,
stream=True,
):
text = chunk['choices'][0].get('text', '')
if first_chunk:
if text and len(text) > prompt_len:
new_text = text[prompt_len:]
if new_text:
yield new_text
first_chunk = False
else:
if text:
yield text
if chunk['choices'][0].get('finish_reason'):
break
return {"stream": generate_stream(), "content": ""}
else:
result = self.model.create_completion(
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
repeat_penalty=repeat_penalty,
stop=stop,
)
content = result['choices'][0]['text']
return {
"content": content,
"usage": result.get('usage', {}),
"model": self.model_name,
}
async def embed(self, text: str) -> List[float]:
"""Generate embeddings.
Args:
text: Input text
Returns:
Embedding vector
"""
if self.model is None:
raise RuntimeError("Model not loaded")
try:
result = self.model.create_embedding(text)
return result['data'][0]['embedding']
except Exception as e:
print(f"Error generating embeddings: {e}")
raise
def unload_model(self):
"""Unload the model from memory."""
if self.model is not None:
del self.model
self.model = None
self.model_name = None
print("DEBUG: VulkanBackend unloaded model")
@property
def is_loaded(self) -> bool:
"""Check if model is loaded."""
return self.model is not None
def load_model(self, model_name: str, **kwargs) -> None: def get_model_info(self) -> Dict[str, Any]:
"""Load the model.""" """Get information about the loaded model."""
pass if self.model is None:
return {"loaded": False}
def generate(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0, return {
stop: Optional[list] = None) -> str: "loaded": True,
"""Generate text non-streaming.""" "model_name": self.model_name,
pass "chat_template": self.chat_template,
"n_ctx": self.n_ctx,
def generate_chat(self, messages: List[Dict], max_tokens: Optional[int] = None, "n_gpu_layers": self.n_gpu_layers,
temperature: float = 0.7, top_p: float = 1.0, }
stop: Optional[List[str]] = None, tools: Optional[List] = None,
response_format: Optional[Dict] = None) -> str:
"""Generate chat completion non-streaming."""
pass
async def generate_chat_stream(self, messages: List[Dict], max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[List[str]] = None, tools: Optional[List] = None,
response_format: Optional[Dict] = None):
"""Generate chat completion streaming."""
pass
def generate_stream(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[list] = None):
"""Generate text in streaming fashion."""
pass
def format_messages(self, messages) -> str:
"""Format messages into a prompt string."""
pass
def get_model_name(self) -> str:
"""Return the loaded model name."""
return self.model_name
def cleanup(self) -> None:
"""Cleanup resources."""
pass
...@@ -13,7 +13,7 @@ from codai.models.parser import ModelParserAdapter ...@@ -13,7 +13,7 @@ from codai.models.parser import ModelParserAdapter
from codai.backends import detect_available_backends from codai.backends import detect_available_backends
from codai.backends.cuda import NvidiaBackend from codai.backends.cuda import NvidiaBackend
from codai.backends.vulkan import VulkanBackend from codai.backends.vulkan import VulkanBackend
from codai.models.cache import get_cached_model_path, download_model from codai.models.cache import get_cached_model_path, download_model, get_model_cache_dir
from codai.pydantic.textrequest import ModelInfo from codai.pydantic.textrequest import ModelInfo
......
"""Queue manager module.""" """Queue manager module - manages request queues for model loading notifications."""
from typing import Dict, Any, Optional from typing import Dict, Optional
import asyncio import asyncio
import time
class QueueManager: class QueueManager:
"""Manager for handling request queues.""" """
Manages request queue for model loading notifications.
When clients are waiting for a model to load, sends them progress updates.
"""
def __init__(self): def __init__(self):
self.queues = {} self.waiting_requests: Dict[str, float] = {} # request_id -> start_time
self.results = {} self.current_request_id: Optional[str] = None
self.model_loading: bool = False
self.model_name: Optional[str] = None
self.lock = asyncio.Lock()
async def add_request(self, request_id: str, request_data: Any): async def add_waiting(self, request_id: str) -> None:
"""Add a request to the queue.""" """Add a request to the waiting queue."""
pass async with self.lock:
self.waiting_requests[request_id] = time.time()
async def get_result(self, request_id: str) -> Optional[Any]: async def remove_waiting(self, request_id: str) -> None:
"""Get the result of a request.""" """Remove a request from the waiting queue."""
pass async with self.lock:
self.waiting_requests.pop(request_id, None)
async def process_queue(self): async def start_processing(self, request_id: str, model_name: str = None) -> None:
"""Process the queue.""" """Mark a request as now processing (model loaded)."""
pass async with self.lock:
self.waiting_requests.pop(request_id, None)
self.current_request_id = request_id
self.model_name = model_name
async def finish_processing(self) -> None:
"""Mark current request as finished."""
async with self.lock:
self.current_request_id = None
async def is_waiting(self, request_id: str) -> bool:
"""Check if a request is in the waiting queue."""
async with self.lock:
return request_id in self.waiting_requests
async def get_wait_time(self, request_id: str) -> float:
"""Get how long a request has been waiting in seconds."""
async with self.lock:
if request_id in self.waiting_requests:
return time.time() - self.waiting_requests[request_id]
return 0.0
async def get_queue_position(self, request_id: str) -> int:
"""Get the position of a request in the queue (1-based)."""
async with self.lock:
keys = list(self.waiting_requests.keys())
try:
return keys.index(request_id) + 1
except ValueError:
return 0
# Global queue manager instance
queue_manager = QueueManager()
...@@ -30,6 +30,11 @@ from threading import Thread ...@@ -30,6 +30,11 @@ from threading import Thread
# Import codai module for enhanced tool call parsing # Import codai module for enhanced tool call parsing
from codai.models import ModelParserDispatcher, OpenAIFormatter from codai.models import ModelParserDispatcher, OpenAIFormatter
# Import from codai modules for use in this file
from codai.models.manager import ModelManager, WhisperServerManager, MultiModelManager
from codai.queue.manager import QueueManager, queue_manager
from codai.backends import NvidiaBackend, VulkanBackend, detect_available_backends
# Per-model semaphores for request concurrency control # Per-model semaphores for request concurrency control
model_semaphores: dict = {} model_semaphores: dict = {}
load_mode = {"mode": "ondemand"} # Track load mode globally load_mode = {"mode": "ondemand"} # Track load mode globally
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment