Commit e00c4f41 authored by Your Name's avatar Your Name

Add HTTPS support, authentication, and condensation prompt loading

- Add HTTPS support with auto-generated self-signed certificates
- Add API token authentication middleware
- Update aisbf.json with protocol, SSL, and auth configuration
- Add system prompt loading method to ContextManager
- Update main() to support SSL certificates
- Add authentication check middleware for API endpoints
parent 7d5d9e73
......@@ -371,6 +371,32 @@ class ContextManager:
return condensed
def _load_system_prompt(self, method: str) -> str:
"""Load system prompt from markdown file"""
from pathlib import Path
# Try installed locations first
installed_dirs = [
Path('/usr/share/aisbf'),
Path.home() / '.local' / 'share' / 'aisbf',
]
for installed_dir in installed_dirs:
prompt_file = installed_dir / f'condensation_{method}.md'
if prompt_file.exists():
with open(prompt_file) as f:
return f.read()
# Fallback to source tree config directory
source_dir = Path(__file__).parent.parent / 'config'
prompt_file = source_dir / f'condensation_{method}.md'
if prompt_file.exists():
with open(prompt_file) as f:
return f.read()
# Return empty string if not found
return ""
async def _conversational_condense(self, messages: List[Dict], model: str) -> List[Dict]:
"""
CONVERSATIONAL SUMMARIZATION (MEMORY BUFFERING)
......@@ -380,14 +406,18 @@ class ContextManager:
logger = logging.getLogger(__name__)
logger.info(f"Conversational condensation: {len(messages)} messages")
# Use dedicated condensation handler if available, otherwise fallback to provider_handler
handler = self.condensation_handler if self.condensation_handler else self.provider_handler
if not handler:
logger.warning("No provider handler available for conversational condensation, skipping")
return messages
# Use dedicated condensation model if configured, otherwise use same model
condense_model = self.condensation_model if self.condensation_model else model
# Check if using internal model
if self._use_internal_model:
logger.info("Using internal model for conversational condensation")
else:
# Use dedicated condensation handler if available, otherwise fallback to provider_handler
handler = self.condensation_handler if self.condensation_handler else self.provider_handler
if not handler:
logger.warning("No provider handler available for conversational condensation, skipping")
return messages
# Use dedicated condensation model if configured, otherwise use same model
condense_model = self.condensation_model if self.condensation_model else model
if len(messages) <= 4:
# Not enough messages to condense
......@@ -405,13 +435,16 @@ class ContextManager:
if not messages_to_summarize:
return messages
# Build summary prompt
summary_prompt = "Summarize the following conversation history, including key facts, decisions, and the current goal. Keep it concise but comprehensive.\n\n"
# Load system prompt from markdown file
system_prompt = self._load_system_prompt('conversational')
# Build conversation text
conversation_text = ""
for msg in messages_to_summarize:
role = msg.get('role', 'unknown')
content = msg.get('content', '')
if content:
summary_prompt += f"{role}: {content}\n"
conversation_text += f"{role}: {content}\n"
try:
# If using rotation handler, call rotation handler's method
......
{
"server": {
"host": "0.0.0.0",
"port": 8000
"port": 8000,
"protocol": "http",
"ssl_certfile": null,
"ssl_keyfile": null
},
"auth": {
"enabled": false,
"tokens": []
},
"dashboard": {
"enabled": true,
......
......@@ -40,6 +40,75 @@ from collections import defaultdict
from pathlib import Path
import json
def generate_self_signed_cert(cert_file: Path, key_file: Path):
"""Generate self-signed SSL certificate"""
try:
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
logger.info("Generating self-signed SSL certificate...")
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Generate certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "State"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "City"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "AISBF"),
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
private_key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=365)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName("localhost"),
x509.DNSName("127.0.0.1"),
]),
critical=False,
).sign(private_key, hashes.SHA256())
# Write private key
key_file.parent.mkdir(parents=True, exist_ok=True)
with open(key_file, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
# Write certificate
with open(cert_file, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
logger.info(f"Generated self-signed certificate: {cert_file}")
logger.info(f"Generated private key: {key_file}")
except ImportError:
logger = logging.getLogger(__name__)
logger.error("cryptography library not installed. Install with: pip install cryptography")
raise
def load_server_config():
"""Load server configuration from aisbf.json"""
# Try user config first
......@@ -70,9 +139,35 @@ def load_server_config():
with open(config_path) as f:
config_data = json.load(f)
server_config = config_data.get('server', {})
auth_config = config_data.get('auth', {})
protocol = server_config.get('protocol', 'http')
ssl_certfile = server_config.get('ssl_certfile')
ssl_keyfile = server_config.get('ssl_keyfile')
# Handle HTTPS with auto-generated certificates
if protocol == 'https':
if not ssl_certfile or not ssl_keyfile:
# Auto-generate paths
ssl_dir = Path.home() / '.aisbf' / 'ssl'
ssl_certfile = str(ssl_dir / 'cert.pem')
ssl_keyfile = str(ssl_dir / 'key.pem')
cert_path = Path(ssl_certfile).expanduser()
key_path = Path(ssl_keyfile).expanduser()
# Generate if they don't exist
if not cert_path.exists() or not key_path.exists():
generate_self_signed_cert(cert_path, key_path)
return {
'host': server_config.get('host', '0.0.0.0'),
'port': server_config.get('port', 8000)
'port': server_config.get('port', 8000),
'protocol': protocol,
'ssl_certfile': ssl_certfile if protocol == 'https' else None,
'ssl_keyfile': ssl_keyfile if protocol == 'https' else None,
'auth_enabled': auth_config.get('enabled', False),
'auth_tokens': auth_config.get('tokens', [])
}
except Exception as e:
logger = logging.getLogger(__name__)
......@@ -81,7 +176,12 @@ def load_server_config():
# Return defaults
return {
'host': '0.0.0.0',
'port': 8000
'port': 8000,
'protocol': 'http',
'ssl_certfile': None,
'ssl_keyfile': None,
'auth_enabled': False,
'auth_tokens': []
}
class BrokenPipeFilter(logging.Filter):
......@@ -213,6 +313,9 @@ def setup_logging():
# Configure logging
logger = setup_logging()
# Load server configuration
server_config = load_server_config()
# Initialize handlers
request_handler = RequestHandler()
rotation_handler = RotationHandler()
......@@ -220,6 +323,37 @@ autoselect_handler = AutoselectHandler()
app = FastAPI(title="AI Proxy Server")
# Authentication middleware
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Check API token authentication if enabled"""
if server_config.get('auth_enabled', False):
# Skip auth for root endpoint
if request.url.path == "/":
response = await call_next(request)
return response
# Check for Authorization header
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return JSONResponse(
status_code=401,
content={"error": "Missing or invalid Authorization header. Use: Authorization: Bearer <token>"}
)
token = auth_header.replace('Bearer ', '')
allowed_tokens = server_config.get('auth_tokens', [])
if token not in allowed_tokens:
return JSONResponse(
status_code=403,
content={"error": "Invalid authentication token"}
)
response = await call_next(request)
return response
# Exception handler for validation errors
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
......@@ -583,9 +717,32 @@ def main():
server_config = load_server_config()
host = server_config['host']
port = server_config['port']
protocol = server_config.get('protocol', 'http')
ssl_certfile = server_config.get('ssl_certfile')
ssl_keyfile = server_config.get('ssl_keyfile')
auth_enabled = server_config.get('auth_enabled', False)
logger.info(f"Starting AI Proxy Server on http://{host}:{port}")
uvicorn.run(app, host=host, port=port)
# Log server configuration
logger.info(f"=== AISBF Server Configuration ===")
logger.info(f"Protocol: {protocol}")
logger.info(f"Host: {host}")
logger.info(f"Port: {port}")
logger.info(f"Authentication: {'Enabled' if auth_enabled else 'Disabled'}")
if protocol == 'https':
logger.info(f"SSL Certificate: {ssl_certfile}")
logger.info(f"SSL Key: {ssl_keyfile}")
logger.info(f"Starting AI Proxy Server on https://{host}:{port}")
uvicorn.run(
app,
host=host,
port=port,
ssl_certfile=ssl_certfile,
ssl_keyfile=ssl_keyfile
)
else:
logger.info(f"Starting AI Proxy Server on http://{host}:{port}")
uvicorn.run(app, host=host, port=port)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment