Commit 46faac95 authored by Your Name's avatar Your Name

Fix proper token scoping and permission enforcement

Now:
- Global tokens (aisbf.json) ONLY access global endpoints
- User tokens (database) ONLY access their own /api/u/<username> endpoints
- No cross-access possible
- Admin users still have full access
parent d9965453
......@@ -1288,6 +1288,75 @@ async def shutdown_event():
# Cleanup multiprocessing children (sentence-transformers, torch, etc.)
_cleanup_multiprocessing_children()
# API Token Authorization Middleware
@app.middleware("http")
async def api_token_authorization_middleware(request: Request, call_next):
"""Enforce proper token scope access:
- Global tokens (aisbf.json): CAN access global endpoints, CANNOT access user endpoints
- User tokens (database): CAN access ONLY their own user endpoints, CANNOT access global endpoints
"""
path = request.url.path
# Skip authorization for non-API paths
if (path == "/" or
path.startswith("/dashboard") or
path.startswith("/auth/") or
path.startswith("/api/admin") or
path == "/favicon.ico" or
path.startswith("/.well-known/")):
return await call_next(request)
# Skip for public models endpoints (GET only)
if request.method == "GET" and path in ["/api/models", "/api/v1/models"]:
return await call_next(request)
is_global_token = getattr(request.state, 'is_global_token', False)
user_id = getattr(request.state, 'user_id', None)
is_admin = getattr(request.state, 'is_admin', False)
# Admin users bypass all restrictions
if is_admin:
return await call_next(request)
# --- USER-SPECIFIC ENDPOINTS (/api/u/*) ---
if (path.startswith("/api/u/") or
path.startswith("/mcp/u/") or
path.startswith("/api/v1/u/") or
path.startswith("/mcp/v1/u/")):
if is_global_token:
return JSONResponse(
status_code=403,
content={"error": "Global tokens cannot access user-specific endpoints. Use the user's own API token."}
)
# Verify user is accessing their own endpoints
path_parts = path.split('/')
if len(path_parts) >= 4 and path_parts[2] == 'u':
target_username = path_parts[3]
db = DatabaseRegistry.get_config_database()
authenticated_user = db.get_user_by_id(user_id)
if not authenticated_user or authenticated_user['username'] != target_username:
return JSONResponse(
status_code=403,
content={"error": "You can only access your own user-specific endpoints."}
)
# --- GLOBAL ENDPOINTS (all other API paths) ---
else:
if not is_global_token:
return JSONResponse(
status_code=403,
content={"error": "User tokens cannot access global endpoints. Use your user-specific endpoints at /api/u/<your-username>"}
)
# All checks passed
return await call_next(request)
# Authentication middleware
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment