Commit 989f1858 authored by Your Name's avatar Your Name

Refactor: Move QueueManager to codai/queue/manager and restore FastAPI app

parent 001e1708
"""Backend detection module."""
def detect_available_backends():
"""Detect which backends are available."""
backends = {'cpu': True}
# Check for PyTorch/CUDA
try:
import torch
if torch.cuda.is_available():
backends['nvidia'] = True
except ImportError:
pass
# Check for llama-cpp-python (Vulkan)
try:
import llama_cpp
backends['vulkan'] = True
except ImportError:
pass
return backends
def check_flash_attn_availability() -> bool:
"""Check if flash-attn is installed and available."""
try:
import flash_attn
return True
except ImportError:
return False
"""Base classes for model backends."""
from abc import ABC, abstractmethod
from typing import AsyncGenerator, List, Optional
class ModelBackend(ABC):
"""Abstract base class for model backends."""
@abstractmethod
def load_model(self, model_name: str, **kwargs) -> None:
"""Load the model."""
pass
@abstractmethod
def generate(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[List[str]] = None) -> str:
"""Generate text non-streaming."""
pass
@abstractmethod
def generate_stream(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[List[str]] = None) -> AsyncGenerator[str, None]:
"""Generate text in streaming fashion."""
pass
@abstractmethod
def format_messages(self, messages) -> str:
"""Format messages into a prompt string."""
pass
@abstractmethod
def get_model_name(self) -> str:
"""Return the loaded model name."""
pass
@abstractmethod
def cleanup(self) -> None:
"""Cleanup resources."""
pass
"""CUDA backend for NVIDIA GPUs."""
from typing import Optional, List, Dict
from codai.backends.base import ModelBackend
class NvidiaBackend(ModelBackend):
"""Backend for NVIDIA GPUs using HuggingFace Transformers."""
def __init__(self):
self.model = None
self.tokenizer = None
self.model_name = None
self.device = None
self.use_flash_attn = False
self.flash_attn_available = False
self._pending_ram_gb = None
# Import check_flash_attn_availability from codai.backends
from codai.backends import check_flash_attn_availability
self._check_flash_attn_availability = check_flash_attn_availability
def check_flash_attn_support(self) -> None:
"""Check and print Flash Attention availability status."""
self.flash_attn_available = self._check_flash_attn_availability()
if self.use_flash_attn:
if self.flash_attn_available:
print("Flash Attention 2: Available and enabled")
else:
print("Warning: Flash Attention 2 requested but not installed")
print("Install with: pip install flash-attn --no-build-isolation")
print("Falling back to standard attention")
self.use_flash_attn = False
def load_model(self, model_name: str, **kwargs) -> None:
"""Load the model."""
pass
def generate(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[list] = None) -> str:
"""Generate text non-streaming."""
pass
def generate_stream(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[list] = None):
"""Generate text in streaming fashion."""
pass
def format_messages(self, messages) -> str:
"""Format messages into a prompt string."""
pass
def get_model_name(self) -> str:
"""Return the loaded model name."""
return self.model_name
def cleanup(self) -> None:
"""Cleanup resources."""
pass
"""Vulkan backend using llama.cpp."""
from typing import Optional, List, Dict
from codai.backends.base import ModelBackend
class VulkanBackend(ModelBackend):
"""Backend for Vulkan GPU inference using llama.cpp."""
def __init__(self):
self.model = None
self.model_name = None
self.device = None
def load_model(self, model_name: str, **kwargs) -> None:
"""Load the model."""
pass
def generate(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[list] = None) -> str:
"""Generate text non-streaming."""
pass
def generate_stream(self, prompt: str, max_tokens: Optional[int] = None,
temperature: float = 0.7, top_p: float = 1.0,
stop: Optional[list] = None):
"""Generate text in streaming fashion."""
pass
def format_messages(self, messages) -> str:
"""Format messages into a prompt string."""
pass
def get_model_name(self) -> str:
"""Return the loaded model name."""
return self.model_name
def cleanup(self) -> None:
"""Cleanup resources."""
pass
"""Model capabilities module."""
from dataclasses import dataclass
@dataclass
class ModelCapabilities:
"""Represents what a model can do."""
text_generation: bool = False # LLM/chat completion
image_to_text: bool = False # Image understanding (captioning, VQA)
image_generation: bool = False # Text-to-image (Stable Diffusion)
speech_to_text: bool = False # Audio transcription
text_to_speech: bool = False # Speech synthesis
def __str__(self):
caps = []
if self.text_generation:
caps.append("text")
if self.image_to_text:
caps.append("image-to-text")
if self.image_generation:
caps.append("image")
if self.speech_to_text:
caps.append("speech-to-text")
if self.text_to_speech:
caps.append("text-to-speech")
return ", ".join(caps) if caps else "none"
def detect_model_capabilities(model_name: str) -> ModelCapabilities:
"""
Detect model capabilities based on model name/type.
This is a heuristic detection - actual capabilities may vary.
"""
caps = ModelCapabilities()
if not model_name:
return caps
name_lower = model_name.lower()
# Check for image generation models (Stable Diffusion, SDXL, etc.)
if any(x in name_lower for x in ['stable-diffusion', 'sd15', 'sdxl', 'sd-xl', 'turbo', 'playground']):
caps.image_generation = True
return caps # Usually SD models are dedicated
# Check for vision models (image-to-text)
if any(x in name_lower for x in ['vision', 'vl-', '-vl', 'llava', 'qwen2-vl', 'qwen-vl', 'phi-4-mini', 'pixtral', 'clip']):
caps.image_to_text = True
caps.text_generation = True # Vision models are also LLMs
return caps
# Check for TTS models
if any(x in name_lower for x in ['kokoro', 'tts', 'speech', 'voice']):
caps.text_to_speech = True
return caps
# Check for whisper models (speech-to-text)
if any(x in name_lower for x in ['whisper', 'faster-whisper', 'distil-whisper']):
caps.speech_to_text = True
return caps
# Check for GGUF models (typically text models)
if '.gguf' in name_lower or 'gguf' in name_lower:
caps.text_generation = True
return caps
# Default: assume text generation (most HF models are LLMs)
caps.text_generation = True
return caps
"""Model manager module."""
from typing import Optional, Dict, Any, List
class ModelManager:
"""Manager for loading and handling models."""
def __init__(self):
self.models = {}
def load_model(self, model_name: str, **kwargs):
"""Load a model."""
pass
def unload_model(self, model_name: str):
"""Unload a model."""
pass
def get_model(self, model_name: str):
"""Get a loaded model."""
pass
class WhisperServerManager:
"""Manager for Whisper transcription server."""
def __init__(self):
self.model = None
def load_model(self, model_name: str):
"""Load Whisper model."""
pass
def transcribe(self, audio_data: bytes) -> str:
"""Transcribe audio data."""
pass
class MultiModelManager:
"""Manager for multiple models."""
def __init__(self):
self.models = {}
self.active_models = {}
def load_model(self, model_name: str, **kwargs):
"""Load a model."""
pass
def unload_model(self, model_name: str):
"""Unload a model."""
pass
def generate(self, model_name: str, prompt: str, **kwargs):
"""Generate text with a model."""
pass
This diff is collapsed.
"""Utility functions for model handling."""
from typing import Optional
def check_hf_chat_template(model_type: str = "text", model_name: str = None) -> tuple:
"""Check if a model supports HF chat template."""
return (True, "chatml")
def get_resolved_model_name(requested_model: str, current_manager = None) -> str:
"""Get the resolved model name."""
return requested_model
def get_model_family(model_name: str) -> str:
"""Detect model family from model name."""
model_lower = model_name.lower()
if 'qwen' in model_lower:
return 'qwen'
if 'llama' in model_lower:
return 'llama'
if 'mistral' in model_lower:
return 'mistral'
return 'generic'
def get_reasoning_stop_tokens(model_family: str) -> tuple:
"""Get stop tokens for reasoning mode based on model family."""
if model_family == 'qwen':
return ('<|im_end|>', '<|endoftext|>')
if model_family == 'deepseek':
return ('</Thinking>',)
return ('<|end|>',)
def get_reasoning_system_prompt(model_family: str) -> str:
"""Get the system prompt injection for forcing reasoning on non-native models."""
if model_family == 'qwen':
return "Please think carefully before responding."
return ""
"""Pydantic models for image generation API."""
from typing import Dict, List, Optional
from pydantic import BaseModel, ConfigDict
class ImageGenerationRequest(BaseModel):
model: str
prompt: str
n: int = 1
size: Optional[str] = "1024x1024"
steps: Optional[int] = None # Number of inference steps (overrides quality-based default)
guidance_scale: Optional[float] = None # CFG scale (overrides quality-based default)
quality: Optional[str] = "standard"
style: Optional[str] = None
response_format: Optional[str] = "url"
seed: Optional[int] = None
user: Optional[str] = None
model_config = ConfigDict(extra="allow")
class ImageGenerationResponse(BaseModel):
created: int
data: List[Dict]
model_config = ConfigDict(extra="allow")
"""Pydantic models for API."""
import time
from typing import Dict, List, Optional, Union
from pydantic import BaseModel, Field, field_validator, ConfigDict
class ToolFunction(BaseModel):
name: str
description: Optional[str] = None
parameters: Optional[Dict] = None
class Tool(BaseModel):
type: str = "function"
function: ToolFunction
class ChatMessage(BaseModel):
role: str
content: Optional[Union[str, List[Dict]]] = None
name: Optional[str] = None
tool_calls: Optional[List[Dict]] = None
tool_call_id: Optional[str] = None
@field_validator('content', mode='before')
@classmethod
def convert_content_array_to_string(cls, v):
"""Convert multipart content array to string for compatibility."""
if v is None:
return None
if isinstance(v, str):
return v
if isinstance(v, list):
# Handle multipart content array format (e.g., from KiloCode)
# Format: [{"type": "text", "text": "..."}, {"type": "text", "text": "..."}]
parts = []
for item in v:
if isinstance(item, dict):
if item.get('type') == 'text' and 'text' in item:
parts.append(item['text'])
else:
# Handle other content types (image_url, etc.) by converting to placeholder
parts.append(f"[{item.get('type', 'unknown')} content]")
else:
parts.append(str(item))
return '\n'.join(parts)
return str(v)
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
temperature: float = 0.7
top_p: float = 1.0
n: int = 1
max_tokens: Optional[int] = None
stream: bool = False
stop: Optional[Union[str, List[str]]] = None
presence_penalty: float = 0.0
frequency_penalty: float = 0.0
repeat_penalty: float = 1.0
tools: Optional[List[Tool]] = None
tool_choice: Optional[Union[str, Dict]] = "auto"
# Extra fields that clients may send but we ignore
seed: Optional[int] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
response_format: Optional[Dict] = None
user: Optional[str] = None
# Enable thinking/reasoning mode for supported models
enable_thinking: Optional[bool] = False
model_config = ConfigDict(extra="allow") # Allow extra fields to prevent 422 errors
class CompletionRequest(BaseModel):
model: str
prompt: Union[str, List[str]]
temperature: float = 0.7
top_p: float = 1.0
n: int = 1
max_tokens: Optional[int] = None
stream: bool = False
stop: Optional[Union[str, List[str]]] = None
presence_penalty: float = 0.0
frequency_penalty: float = 0.0
repeat_penalty: float = 1.0
# Extra fields that clients may send but we ignore
seed: Optional[int] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
best_of: Optional[int] = None
echo: Optional[bool] = None
user: Optional[str] = None
model_config = ConfigDict(extra="allow") # Allow extra fields to prevent 422 errors
class ModelInfo(BaseModel):
id: str
object: str = "model"
created: int = Field(default_factory=lambda: int(time.time()))
owned_by: str = "huggingface"
class ModelList(BaseModel):
object: str = "list"
data: List[ModelInfo]
"""Pydantic models for transcription API."""
from typing import List, Optional
from pydantic import BaseModel, ConfigDict
class TranscriptionRequest(BaseModel):
model: str
file: Optional[bytes] = None
file_path: Optional[str] = None
language: Optional[str] = None
prompt: Optional[str] = None
response_format: Optional[str] = "json"
temperature: Optional[float] = 0.0
timestamp_granularities: Optional[List[str]] = None
model_config = ConfigDict(extra="allow")
class TranscriptionResponse(BaseModel):
text: str
model_config = ConfigDict(extra="allow")
"""Queue manager module."""
from typing import Dict, Any, Optional
import asyncio
class QueueManager:
"""Manager for handling request queues."""
def __init__(self):
self.queues = {}
self.results = {}
async def add_request(self, request_id: str, request_data: Any):
"""Add a request to the queue."""
pass
async def get_result(self, request_id: str) -> Optional[Any]:
"""Get the result of a request."""
pass
async def process_queue(self):
"""Process the queue."""
pass
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment