Implement client-side last sync query and accumulated shortfall tracking

- Add accumulated_shortfall field to MatchModel to store historical shortfall at match completion
- Create Migration_040_AddAccumulatedShortfallToMatches for database schema update
- Update extraction flow in games_thread.py to store accumulated shortfall in match records
- Update sync report logic in client.py to use match's stored accumulated shortfall value
- Add test_last_sync_query.py to verify implementation
- Add CLIENT_SYNC_MINIMAL_PROMPT.md with API specifications

This ensures accurate reporting of cap compensation balance as it existed at the time each match was completed, rather than using the current global value.
parent eef784ce
# Minimal Prompt: Client-Side Last Sync Query Implementation
## What Changed on Server
Server now has a new endpoint to query last sync information:
**Endpoint**: `GET /api/reports/last-sync?client_id=<client_id>`
**Authentication**: Bearer token (API token)
**Response Format**:
```json
{
"success": true,
"client_id": "client_unique_identifier",
"last_sync_id": "sync_20260201_214327_abc12345",
"last_sync_timestamp": "2026-02-01T21:43:27.249Z",
"last_sync_type": "incremental",
"total_syncs": 25,
"last_sync_summary": {
"total_payin": 100000.0,
"total_payout": 95000.0,
"net_profit": 5000.0,
"total_bets": 50,
"total_matches": 10,
"cap_compensation_balance": 5000.0
},
"server_timestamp": "2026-02-01T21:43:27.249Z"
}
```
## What You Need to Implement
### 1. Add Function to Query Server
```python
def query_server_last_sync(api_token, client_id):
"""Query server for last sync information"""
import requests
url = "https://your-server.com/api/reports/last-sync"
headers = {"Authorization": f"Bearer {api_token}"}
params = {"client_id": client_id}
response = requests.get(url, headers=headers, params=params)
return response.json()
```
### 2. Call Before Each Sync
```python
# Before performing sync
server_info = query_server_last_sync(api_token, client_id)
if server_info.get('success'):
last_sync_id = server_info.get('last_sync_id')
last_sync_time = server_info.get('last_sync_timestamp')
# Compare with your local tracking
# If mismatch detected, perform full sync instead of incremental
```
### 3. Handle Recovery
If your local tracking is corrupted or lost:
```python
# If no local tracking exists
if not local_tracking_exists():
# Query server for last sync
server_info = query_server_last_sync(api_token, client_id)
# Recover local tracking from server state
if server_info.get('last_sync_id'):
update_local_tracking(
sync_id=server_info['last_sync_id'],
timestamp=server_info['last_sync_timestamp']
)
```
## Key Benefits
1. **Verify Server State**: Check what server has before syncing
2. **Detect Corruption**: Compare local tracking with server
3. **Auto-Recovery**: Restore local tracking from server if lost
4. **Prevent Data Loss**: Ensure no syncs are missed
## Integration Point
Add this call to your existing sync flow:
```python
# Existing sync flow
def perform_sync():
# NEW: Query server first
server_info = query_server_last_sync(api_token, client_id)
# Verify and recover if needed
if needs_recovery(server_info):
recover_from_server(server_info)
# Continue with normal sync
send_sync_data()
```
That's it! Just add the query call before your existing sync logic.
\ No newline at end of file
...@@ -936,7 +936,7 @@ class UpdatesResponseHandler(ResponseHandler): ...@@ -936,7 +936,7 @@ class UpdatesResponseHandler(ResponseHandler):
class ReportsSyncResponseHandler(ResponseHandler): class ReportsSyncResponseHandler(ResponseHandler):
"""Response handler for synchronizing report data to server with offline support""" """Response handler for synchronizing report data to server with offline support and incremental updates"""
def __init__(self, db_manager, user_data_dir, api_client=None, message_bus=None): def __init__(self, db_manager, user_data_dir, api_client=None, message_bus=None):
self.db_manager = db_manager self.db_manager = db_manager
...@@ -947,7 +947,11 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -947,7 +947,11 @@ class ReportsSyncResponseHandler(ResponseHandler):
self.max_retries = 5 # Maximum retry attempts for failed syncs self.max_retries = 5 # Maximum retry attempts for failed syncs
self.retry_backoff_base = 60 # Base backoff time in seconds (exponential) self.retry_backoff_base = 60 # Base backoff time in seconds (exponential)
def handle_response(self, endpoint: APIEndpoint, response: requests.Response) -> Optional[Dict[str, Any]]: # Import tracking model
from ..database.models import ReportsSyncTrackingModel
self.ReportsSyncTrackingModel = ReportsSyncTrackingModel
def handle_response(self, endpoint: APIEndpoint, response: requests.Response, report_data: Dict[str, Any] = None) -> Optional[Dict[str, Any]]:
"""Handle server response to report sync submission""" """Handle server response to report sync submission"""
try: try:
data = response.json() data = response.json()
...@@ -968,6 +972,10 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -968,6 +972,10 @@ class ReportsSyncResponseHandler(ResponseHandler):
processed_data['synced_items'] = synced_count processed_data['synced_items'] = synced_count
logger.info(f"Successfully synced {synced_count} report items to server") logger.info(f"Successfully synced {synced_count} report items to server")
# Update tracking records for successfully synced items
if report_data:
self._update_sync_tracking(report_data)
# Clear successfully synced items from queue # Clear successfully synced items from queue
self._clear_synced_items() self._clear_synced_items()
else: else:
...@@ -1004,10 +1012,26 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1004,10 +1012,26 @@ class ReportsSyncResponseHandler(ResponseHandler):
return error_data return error_data
def collect_report_data(self, date_range: str = 'today') -> Dict[str, Any]: def collect_report_data(self, date_range: str = 'today') -> Dict[str, Any]:
"""Collect report data from database for synchronization""" """Collect report data from database for synchronization (incremental - only new/changed data)"""
try: try:
from ..database.models import BetModel, BetDetailModel, ExtractionStatsModel, MatchModel # NEW: Query server for last sync information before collecting data
client_id = self._get_client_id()
server_info = self.query_server_last_sync(client_id)
# Check if recovery is needed
if self.needs_recovery(server_info):
logger.warning("Local tracking mismatch detected, recovering from server...")
if server_info and self.recover_local_tracking(server_info):
logger.info("Successfully recovered local tracking from server")
else:
logger.error("Failed to recover local tracking, proceeding with caution")
from ..database.models import (
BetModel, BetDetailModel, ExtractionStatsModel, MatchModel,
ReportsSyncTrackingModel, PersistentRedistributionAdjustmentModel
)
from datetime import datetime, timedelta, date from datetime import datetime, timedelta, date
import hashlib
# Determine date range # Determine date range
now = datetime.utcnow() now = datetime.utcnow()
...@@ -1026,21 +1050,94 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1026,21 +1050,94 @@ class ReportsSyncResponseHandler(ResponseHandler):
session = self.db_manager.get_session() session = self.db_manager.get_session()
try: try:
# Collect bets data # Get last sync time for incremental sync
last_sync_time = session.query(
self.ReportsSyncTrackingModel.last_synced_at
).filter(
self.ReportsSyncTrackingModel.entity_type == 'sync'
).order_by(
self.ReportsSyncTrackingModel.last_synced_at.desc()
).first()
# If we have a previous sync, only collect data since then
if last_sync_time:
start_date = last_sync_time.last_synced_at
logger.info(f"Incremental sync: collecting data since {start_date}")
else:
logger.info(f"Full sync: collecting all data (no previous sync found)")
# Collect bets data (incremental - only new/updated bets)
bets_query = session.query(BetModel).filter( bets_query = session.query(BetModel).filter(
BetModel.bet_datetime >= start_date, BetModel.bet_datetime >= start_date,
BetModel.bet_datetime <= end_date BetModel.bet_datetime <= end_date
) )
bets = bets_query.all() bets = bets_query.all()
# Collect extraction stats # Filter bets to only include those not yet synced or updated since last sync
bets_to_sync = []
for bet in bets:
# Check if this bet has been synced before
tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='bet',
entity_id=bet.uuid
).first()
if tracking:
# Check if bet has been updated since last sync
if bet.updated_at > tracking.last_synced_at:
bets_to_sync.append(bet)
logger.debug(f"Bet {bet.uuid} updated since last sync, including in sync")
else:
logger.debug(f"Bet {bet.uuid} already synced and not updated, skipping")
else:
# New bet, include in sync
bets_to_sync.append(bet)
logger.debug(f"Bet {bet.uuid} is new, including in sync")
# Collect extraction stats (incremental - only new/updated stats)
stats_query = session.query(ExtractionStatsModel).filter( stats_query = session.query(ExtractionStatsModel).filter(
ExtractionStatsModel.match_datetime >= start_date, ExtractionStatsModel.match_datetime >= start_date,
ExtractionStatsModel.match_datetime <= end_date ExtractionStatsModel.match_datetime <= end_date
) )
stats = stats_query.all() stats = stats_query.all()
# Build report data payload # Filter stats to only include those not yet synced or updated since last sync
stats_to_sync = []
for stat in stats:
# Check if this stat has been synced before
tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='extraction_stat',
entity_id=str(stat.match_id)
).first()
if tracking:
# Check if stat has been updated since last sync
if stat.updated_at > tracking.last_synced_at:
stats_to_sync.append(stat)
logger.debug(f"Extraction stat {stat.match_id} updated since last sync, including in sync")
else:
logger.debug(f"Extraction stat {stat.match_id} already synced and not updated, skipping")
else:
# New stat, include in sync
stats_to_sync.append(stat)
logger.debug(f"Extraction stat {stat.match_id} is new, including in sync")
# Get cap compensation balance (accumulated shortfall)
# Use the global record (date 1970-01-01) to match dashboard display
cap_compensation_balance = 0.0
try:
from datetime import date
global_date = date(1970, 1, 1)
global_record = session.query(PersistentRedistributionAdjustmentModel)\
.filter_by(date=global_date)\
.first()
if global_record:
cap_compensation_balance = float(global_record.accumulated_shortfall)
logger.debug(f"Cap compensation balance (global record): {cap_compensation_balance}")
except Exception as e:
logger.warning(f"Failed to get cap compensation balance: {e}")
# Build report data payload (incremental - only new/changed data)
report_data = { report_data = {
'sync_id': self._generate_sync_id(), 'sync_id': self._generate_sync_id(),
'client_id': self._get_client_id(), 'client_id': self._get_client_id(),
...@@ -1050,11 +1147,14 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1050,11 +1147,14 @@ class ReportsSyncResponseHandler(ResponseHandler):
'end_date': end_date.isoformat(), 'end_date': end_date.isoformat(),
'bets': [], 'bets': [],
'extraction_stats': [], 'extraction_stats': [],
'summary': self._calculate_summary(bets, stats) 'cap_compensation_balance': cap_compensation_balance,
'summary': self._calculate_summary(bets_to_sync, stats_to_sync),
'is_incremental': True, # Flag to indicate this is an incremental sync
'sync_type': 'incremental' if last_sync_time else 'full'
} }
# Add bets data (excluding cancelled bets) # Add bets data (only new/updated bets)
for bet in bets: for bet in bets_to_sync:
bet_details = session.query(BetDetailModel).filter_by( bet_details = session.query(BetDetailModel).filter_by(
bet_id=bet.uuid bet_id=bet.uuid
).filter(BetDetailModel.result != 'cancelled').all() ).filter(BetDetailModel.result != 'cancelled').all()
...@@ -1085,8 +1185,12 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1085,8 +1185,12 @@ class ReportsSyncResponseHandler(ResponseHandler):
report_data['bets'].append(bet_data) report_data['bets'].append(bet_data)
# Add extraction stats # Add extraction stats (only new/updated stats)
for stat in stats: for stat in stats_to_sync:
# Get the match to retrieve the stored accumulated shortfall at completion time
match = session.query(MatchModel).filter_by(id=stat.match_id).first()
accumulated_shortfall = float(match.accumulated_shortfall) if match and match.accumulated_shortfall is not None else 0.0
stat_data = { stat_data = {
'match_id': stat.match_id, 'match_id': stat.match_id,
'fixture_id': stat.fixture_id, 'fixture_id': stat.fixture_id,
...@@ -1102,7 +1206,8 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1102,7 +1206,8 @@ class ReportsSyncResponseHandler(ResponseHandler):
'under_amount': float(stat.under_amount), 'under_amount': float(stat.under_amount),
'over_bets': stat.over_bets, 'over_bets': stat.over_bets,
'over_amount': float(stat.over_amount), 'over_amount': float(stat.over_amount),
'result_breakdown': stat.result_breakdown 'result_breakdown': stat.result_breakdown,
'accumulated_shortfall': accumulated_shortfall # Historical value at match completion
} }
report_data['extraction_stats'].append(stat_data) report_data['extraction_stats'].append(stat_data)
...@@ -1321,6 +1426,151 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1321,6 +1426,151 @@ class ReportsSyncResponseHandler(ResponseHandler):
logger.error(f"Unexpected error during sync: {e}") logger.error(f"Unexpected error during sync: {e}")
return False return False
def query_server_last_sync(self, client_id: str) -> Optional[Dict[str, Any]]:
"""Query server for last sync information"""
try:
if not self.api_client or not hasattr(self.api_client, 'session'):
logger.error("API client not available for last sync query")
return None
# Get sync endpoint configuration to extract base URL and auth
sync_endpoint = self.api_client.endpoints.get('reports_sync')
if not sync_endpoint:
logger.error("Reports sync endpoint not configured")
return None
# Construct last-sync URL from sync endpoint URL
base_url = sync_endpoint.url.replace('/sync', '/last-sync')
# Prepare headers with authentication
headers = {}
if sync_endpoint.auth and sync_endpoint.auth.get('type') == 'bearer':
token = sync_endpoint.auth.get('token')
if token:
headers['Authorization'] = f"Bearer {token}"
# Query server
params = {"client_id": client_id}
response = self.api_client.session.get(
base_url,
headers=headers,
params=params,
timeout=30
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
logger.info(f"Successfully queried server last sync: {data.get('last_sync_id')}")
return data
else:
logger.warning(f"Server returned unsuccessful response: {data.get('error')}")
return None
elif response.status_code == 404:
# No sync record found on server (first sync scenario)
logger.info("No previous sync found on server (first sync)")
return None
else:
logger.error(f"Failed to query server last sync: status {response.status_code}")
return None
except Exception as e:
logger.error(f"Error querying server last sync: {e}")
return None
def recover_local_tracking(self, server_info: Dict[str, Any]) -> bool:
"""Recover local tracking from server state"""
try:
session = self.db_manager.get_session()
try:
# Parse server timestamps
last_sync_timestamp = server_info.get('last_sync_timestamp')
if last_sync_timestamp:
sync_time = datetime.fromisoformat(last_sync_timestamp.replace('Z', '+00:00'))
else:
sync_time = datetime.utcnow()
# Update or create sync tracking record
sync_tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='sync',
entity_id='latest'
).first()
if sync_tracking:
sync_tracking.last_synced_at = sync_time
sync_tracking.sync_count = server_info.get('total_syncs', 1)
sync_tracking.updated_at = sync_time
logger.info(f"Recovered sync tracking: last sync at {sync_time}")
else:
sync_tracking = self.ReportsSyncTrackingModel(
entity_type='sync',
entity_id='latest',
last_synced_at=sync_time,
sync_count=server_info.get('total_syncs', 1)
)
session.add(sync_tracking)
logger.info(f"Created sync tracking from server: first sync at {sync_time}")
session.commit()
logger.info("Successfully recovered local tracking from server")
return True
except Exception as e:
logger.error(f"Failed to recover local tracking: {e}")
session.rollback()
return False
finally:
session.close()
except Exception as e:
logger.error(f"Error recovering local tracking: {e}")
return False
def needs_recovery(self, server_info: Optional[Dict[str, Any]]) -> bool:
"""Check if local tracking needs recovery from server"""
if not server_info:
# No server info available, can't determine recovery need
return False
try:
session = self.db_manager.get_session()
try:
# Check if local tracking exists
local_tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='sync',
entity_id='latest'
).first()
if not local_tracking:
# No local tracking exists - need recovery
logger.info("No local tracking found - recovery needed")
return True
# Compare sync IDs if available
server_sync_id = server_info.get('last_sync_id')
if server_sync_id:
# If server has sync info but local doesn't match, need recovery
# We can't directly compare sync IDs since they're generated differently
# Instead, compare timestamps
server_timestamp = server_info.get('last_sync_timestamp')
if server_timestamp:
server_time = datetime.fromisoformat(server_timestamp.replace('Z', '+00:00'))
# If server time is significantly newer than local, need recovery
time_diff = abs((server_time - local_tracking.last_synced_at).total_seconds())
if time_diff > 60: # More than 1 minute difference
logger.warning(f"Sync time mismatch detected: server={server_time}, local={local_tracking.last_synced_at}")
return True
return False
finally:
session.close()
except Exception as e:
logger.error(f"Error checking recovery need: {e}")
return False
def _generate_sync_id(self) -> str: def _generate_sync_id(self) -> str:
"""Generate unique sync ID""" """Generate unique sync ID"""
import uuid import uuid
...@@ -1423,6 +1673,95 @@ class ReportsSyncResponseHandler(ResponseHandler): ...@@ -1423,6 +1673,95 @@ class ReportsSyncResponseHandler(ResponseHandler):
finally: finally:
session.close() session.close()
def _update_sync_tracking(self, report_data: Dict[str, Any] = None):
"""Update sync tracking records after successful sync"""
session = self.db_manager.get_session()
try:
# Get the sync timestamp for this sync
sync_time = datetime.utcnow()
# Update or create tracking record for this sync operation
sync_tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='sync',
entity_id='latest'
).first()
if sync_tracking:
# Update existing tracking record
sync_tracking.last_synced_at = sync_time
sync_tracking.sync_count += 1
sync_tracking.updated_at = sync_time
logger.info(f"Updated sync tracking: last sync at {sync_time}, total syncs: {sync_tracking.sync_count}")
else:
# Create new tracking record
sync_tracking = self.ReportsSyncTrackingModel(
entity_type='sync',
entity_id='latest',
last_synced_at=sync_time,
sync_count=1
)
session.add(sync_tracking)
logger.info(f"Created sync tracking: first sync at {sync_time}")
# Track individual bets that were synced
if report_data and 'bets' in report_data:
for bet_data in report_data['bets']:
bet_uuid = bet_data.get('uuid')
if bet_uuid:
bet_tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='bet',
entity_id=bet_uuid
).first()
if bet_tracking:
# Update existing bet tracking
bet_tracking.last_synced_at = sync_time
bet_tracking.sync_count += 1
bet_tracking.updated_at = sync_time
else:
# Create new bet tracking
bet_tracking = self.ReportsSyncTrackingModel(
entity_type='bet',
entity_id=bet_uuid,
last_synced_at=sync_time,
sync_count=1
)
session.add(bet_tracking)
# Track individual extraction stats that were synced
if report_data and 'extraction_stats' in report_data:
for stat_data in report_data['extraction_stats']:
match_id = stat_data.get('match_id')
if match_id:
stat_tracking = session.query(self.ReportsSyncTrackingModel).filter_by(
entity_type='extraction_stat',
entity_id=str(match_id)
).first()
if stat_tracking:
# Update existing stat tracking
stat_tracking.last_synced_at = sync_time
stat_tracking.sync_count += 1
stat_tracking.updated_at = sync_time
else:
# Create new stat tracking
stat_tracking = self.ReportsSyncTrackingModel(
entity_type='extraction_stat',
entity_id=str(match_id),
last_synced_at=sync_time,
sync_count=1
)
session.add(stat_tracking)
session.commit()
if report_data:
logger.info(f"Updated tracking for {len(report_data.get('bets', []))} bets and {len(report_data.get('extraction_stats', []))} extraction stats")
except Exception as e:
logger.error(f"Failed to update sync tracking: {e}")
session.rollback()
finally:
session.close()
def _requeue_failed_items(self): def _requeue_failed_items(self):
"""Re-queue failed items for retry in database""" """Re-queue failed items for retry in database"""
session = self.db_manager.get_session() session = self.db_manager.get_session()
...@@ -1673,6 +2012,7 @@ class APIClient(ThreadedComponent): ...@@ -1673,6 +2012,7 @@ class APIClient(ThreadedComponent):
# Construct full URLs by appending paths to base URL # Construct full URLs by appending paths to base URL
fastapi_main_url = fastapi_url.rstrip('/') + "/api/updates" fastapi_main_url = fastapi_url.rstrip('/') + "/api/updates"
reports_sync_url = fastapi_url.rstrip('/') + "/api/reports/sync" reports_sync_url = fastapi_url.rstrip('/') + "/api/reports/sync"
reports_last_sync_url = fastapi_url.rstrip('/') + "/api/reports/last-sync"
endpoints = { endpoints = {
"fastapi_main": { "fastapi_main": {
"url": fastapi_main_url, "url": fastapi_main_url,
...@@ -1691,16 +2031,29 @@ class APIClient(ThreadedComponent): ...@@ -1691,16 +2031,29 @@ class APIClient(ThreadedComponent):
"method": "POST", # Use POST for reports sync "method": "POST", # Use POST for reports sync
"headers": headers, "headers": headers,
"auth": auth_config, "auth": auth_config,
"interval": 3600, # 1 hour default for reports sync "interval": 600, # 10 minutes for incremental reports sync
"enabled": enabled, # Enable when token is available "enabled": enabled, # Enable when token is available
"timeout": 60, # Longer timeout for large report data "timeout": 60, # Longer timeout for large report data
"retry_attempts": 5, # More retries for unreliable connections "retry_attempts": 5, # More retries for unreliable connections
"retry_delay": 60, # 60 seconds between retries "retry_delay": 60, # 60 seconds between retries
"response_handler": "reports_sync" # Use reports_sync handler "response_handler": "reports_sync" # Use reports_sync handler
}, },
"reports_last_sync": {
"url": reports_last_sync_url,
"method": "GET",
"headers": headers,
"auth": auth_config,
"interval": 600, # 10 minutes
"enabled": enabled,
"timeout": 30,
"retry_attempts": 3,
"retry_delay": 30,
"response_handler": "default" # Use default handler
},
} }
logger.debug("ENDPOINT URLS - base_url: {}, fastapi_main: {}, reports_sync: {}".format(fastapi_url, fastapi_main_url, reports_sync_url)) logger.debug("ENDPOINT URLS - base_url: {}, fastapi_main: {}, reports_sync: {}, reports_last_sync: {}".format(
fastapi_url, fastapi_main_url, reports_sync_url, reports_last_sync_url))
return endpoints return endpoints
...@@ -1814,6 +2167,39 @@ class APIClient(ThreadedComponent): ...@@ -1814,6 +2167,39 @@ class APIClient(ThreadedComponent):
# Prepare data/params based on method # Prepare data/params based on method
request_data = endpoint.params.copy() if endpoint.method == 'GET' else endpoint.data.copy() request_data = endpoint.params.copy() if endpoint.method == 'GET' else endpoint.data.copy()
# For reports_sync endpoint, collect report data before sending
if endpoint.name == 'reports_sync':
logger.debug("Collecting report data for reports_sync endpoint")
reports_handler = self.response_handlers.get('reports_sync')
if reports_handler and hasattr(reports_handler, 'collect_report_data'):
try:
# Collect report data for all time (not just today)
report_data = reports_handler.collect_report_data(date_range='all')
logger.info(f"Collected report data: {len(report_data.get('bets', []))} bets, {len(report_data.get('extraction_stats', []))} stats")
# Log the complete report data structure for debugging
logger.debug(f"Report data structure: {list(report_data.keys())}")
logger.debug(f"Report data keys: {json.dumps({k: type(v).__name__ for k, v in report_data.items()})}")
# Log sample data (first bet and first stat if available)
if report_data.get('bets'):
logger.debug(f"Sample bet data: {json.dumps(report_data['bets'][0], default=str)}")
if report_data.get('extraction_stats'):
logger.debug(f"Sample extraction stat: {json.dumps(report_data['extraction_stats'][0], default=str)}")
# Calculate payload size
json_payload = json.dumps(report_data, default=str)
payload_size = len(json_payload.encode('utf-8'))
logger.info(f"Reports sync payload size: {payload_size} bytes ({payload_size/1024:.2f} KB)")
request_data = report_data
except Exception as e:
logger.error(f"Failed to collect report data: {e}")
import traceback
logger.error(f"Data collection traceback: {traceback.format_exc()}")
# Send empty data if collection fails
request_data = {}
# For FastAPI /api/updates endpoint, add 'from' parameter and rustdesk_id if provided # For FastAPI /api/updates endpoint, add 'from' parameter and rustdesk_id if provided
if endpoint.name == 'fastapi_main' and 'updates' in endpoint.url.lower(): if endpoint.name == 'fastapi_main' and 'updates' in endpoint.url.lower():
# Check if forced timestamp is requested (for empty templates table scenario) # Check if forced timestamp is requested (for empty templates table scenario)
...@@ -1892,7 +2278,6 @@ class APIClient(ThreadedComponent): ...@@ -1892,7 +2278,6 @@ class APIClient(ThreadedComponent):
else: else:
# Standard execution for first attempt or non-token endpoints # Standard execution for first attempt or non-token endpoints
response = self.session.request(**request_kwargs) response = self.session.request(**request_kwargs)
response.raise_for_status()
# Update heartbeat after HTTP request completes # Update heartbeat after HTTP request completes
self.heartbeat() self.heartbeat()
...@@ -1906,12 +2291,31 @@ class APIClient(ThreadedComponent): ...@@ -1906,12 +2291,31 @@ class APIClient(ThreadedComponent):
logger.debug(f"Response body (truncated): {response_text[:1000]}...") logger.debug(f"Response body (truncated): {response_text[:1000]}...")
else: else:
logger.debug(f"Response body: {response_text}") logger.debug(f"Response body: {response_text}")
# For reports_sync endpoint, log detailed error information on 500 errors
if endpoint.name == 'reports_sync' and response.status_code >= 500:
logger.error(f"Reports sync server error - Status: {response.status_code}")
logger.error(f"Server response: {response_text}")
logger.error(f"Request URL: {request_kwargs.get('url')}")
logger.error(f"Request method: {request_kwargs.get('method')}")
logger.error(f"Request headers: {request_kwargs.get('headers')}")
if 'json' in request_kwargs:
json_data = request_kwargs['json']
logger.error(f"Request JSON keys: {list(json_data.keys())}")
logger.error(f"Request JSON size: {len(json.dumps(json_data, default=str).encode('utf-8'))} bytes")
except Exception as e: except Exception as e:
logger.debug(f"Could not read response body: {e}") logger.debug(f"Could not read response body: {e}")
# Now raise for status to trigger error handling
response.raise_for_status()
# Handle successful response # Handle successful response
handler = self.response_handlers.get(endpoint.response_handler, self.response_handlers['default']) handler = self.response_handlers.get(endpoint.response_handler, self.response_handlers['default'])
processed_data = handler.handle_response(endpoint, response) # Pass report_data to handler if this is reports_sync endpoint
if endpoint.name == 'reports_sync' and 'request_data' in locals():
processed_data = handler.handle_response(endpoint, response, request_data)
else:
processed_data = handler.handle_response(endpoint, response)
# Update heartbeat after response processing # Update heartbeat after response processing
self.heartbeat() self.heartbeat()
......
...@@ -3170,6 +3170,12 @@ class GamesThread(ThreadedComponent): ...@@ -3170,6 +3170,12 @@ class GamesThread(ThreadedComponent):
logger.warning(f"Match {match_id} not found for statistics collection") logger.warning(f"Match {match_id} not found for statistics collection")
return return
# Store the accumulated shortfall value at the time of match completion
# This historical value will be used in reports instead of the current global value
accumulated_shortfall = self._get_global_redistribution_adjustment(session)
match.accumulated_shortfall = accumulated_shortfall
logger.info(f"💰 [SHORTFALL TRACKING] Stored accumulated shortfall {accumulated_shortfall:.2f} in match {match_id} at completion time")
# Calculate statistics (excluding cancelled bets) # Calculate statistics (excluding cancelled bets)
total_bets = session.query(BetDetailModel).join(MatchModel).filter( total_bets = session.query(BetDetailModel).join(MatchModel).filter(
BetDetailModel.match_id == match_id, BetDetailModel.match_id == match_id,
......
...@@ -2943,6 +2943,44 @@ class Migration_039_AddMatchNumberToBetDetails(DatabaseMigration): ...@@ -2943,6 +2943,44 @@ class Migration_039_AddMatchNumberToBetDetails(DatabaseMigration):
return True return True
class Migration_040_AddAccumulatedShortfallToMatches(DatabaseMigration):
"""Add accumulated_shortfall field to matches table for storing historical shortfall values"""
def __init__(self):
super().__init__("040", "Add accumulated_shortfall field to matches table")
def up(self, db_manager) -> bool:
"""Add accumulated_shortfall column to matches table"""
try:
with db_manager.engine.connect() as conn:
# Check if accumulated_shortfall column already exists
result = conn.execute(text("PRAGMA table_info(matches)"))
columns = [row[1] for row in result.fetchall()]
if 'accumulated_shortfall' not in columns:
# Add accumulated_shortfall column with default value 0.0
conn.execute(text("""
ALTER TABLE matches
ADD COLUMN accumulated_shortfall REAL DEFAULT 0.0 NOT NULL
"""))
conn.commit()
logger.info("accumulated_shortfall column added to matches table")
else:
logger.info("accumulated_shortfall column already exists in matches table")
return True
except Exception as e:
logger.error(f"Failed to add accumulated_shortfall field to matches: {e}")
return False
def down(self, db_manager) -> bool:
"""Remove accumulated_shortfall column - SQLite doesn't support DROP COLUMN easily"""
logger.warning("SQLite doesn't support DROP COLUMN - accumulated_shortfall column will remain")
return True
class Migration_036_AddMatchTemplatesTables(DatabaseMigration): class Migration_036_AddMatchTemplatesTables(DatabaseMigration):
"""Add matches_templates and match_outcomes_templates tables for storing match templates""" """Add matches_templates and match_outcomes_templates tables for storing match templates"""
...@@ -3101,6 +3139,7 @@ MIGRATIONS: List[DatabaseMigration] = [ ...@@ -3101,6 +3139,7 @@ MIGRATIONS: List[DatabaseMigration] = [
Migration_037_RenameDailyRedistributionShortfallTable(), Migration_037_RenameDailyRedistributionShortfallTable(),
Migration_038_AddWin1Win2Associations(), Migration_038_AddWin1Win2Associations(),
Migration_039_AddMatchNumberToBetDetails(), Migration_039_AddMatchNumberToBetDetails(),
Migration_040_AddAccumulatedShortfallToMatches(),
] ]
......
...@@ -495,6 +495,7 @@ class MatchModel(BaseModel): ...@@ -495,6 +495,7 @@ class MatchModel(BaseModel):
running = Column(Boolean, default=False, nullable=False, comment='Match running flag (0=not running, 1=running)') running = Column(Boolean, default=False, nullable=False, comment='Match running flag (0=not running, 1=running)')
status = Column(Enum('pending', 'scheduled', 'bet', 'ingame', 'done', 'cancelled', 'failed', 'paused'), default='pending', nullable=False, comment='Match status enum') status = Column(Enum('pending', 'scheduled', 'bet', 'ingame', 'done', 'cancelled', 'failed', 'paused'), default='pending', nullable=False, comment='Match status enum')
fixture_active_time = Column(Integer, nullable=True, comment='Unix timestamp when fixture became active on server') fixture_active_time = Column(Integer, nullable=True, comment='Unix timestamp when fixture became active on server')
accumulated_shortfall = Column(Float(precision=2), default=0.0, nullable=False, comment='Accumulated shortfall from redistribution at the time of match completion')
# File metadata # File metadata
filename = Column(String(1024), nullable=False, comment='Original fixture filename') filename = Column(String(1024), nullable=False, comment='Original fixture filename')
...@@ -1245,3 +1246,32 @@ class ReportsSyncQueueModel(BaseModel): ...@@ -1245,3 +1246,32 @@ class ReportsSyncQueueModel(BaseModel):
def __repr__(self): def __repr__(self):
return f'<ReportsSyncQueue {self.sync_id}: status={self.status}, retries={self.retry_count}>' return f'<ReportsSyncQueue {self.sync_id}: status={self.status}, retries={self.retry_count}>'
class ReportsSyncTrackingModel(BaseModel):
"""Track what data has been synced to server for incremental updates"""
__tablename__ = 'reports_sync_tracking'
__table_args__ = (
Index('ix_reports_sync_tracking_entity_type', 'entity_type'),
Index('ix_reports_sync_tracking_entity_id', 'entity_id'),
Index('ix_reports_sync_tracking_last_synced_at', 'last_synced_at'),
Index('ix_reports_sync_tracking_composite', 'entity_type', 'entity_id'),
UniqueConstraint('entity_type', 'entity_id', name='uq_reports_sync_tracking_entity'),
)
entity_type = Column(String(50), nullable=False, comment='Type of entity: bet, bet_detail, extraction_stat')
entity_id = Column(String(255), nullable=False, comment='ID of the entity (bet UUID, match ID, etc.)')
last_synced_at = Column(DateTime, default=datetime.utcnow, nullable=False, comment='Last time this entity was synced')
last_synced_hash = Column(String(64), comment='Hash of entity data at last sync for change detection')
sync_count = Column(Integer, default=1, nullable=False, comment='Number of times this entity has been synced')
def update_sync(self, data_hash: str = None):
"""Update sync timestamp and optionally hash"""
self.last_synced_at = datetime.utcnow()
self.sync_count += 1
if data_hash:
self.last_synced_hash = data_hash
self.updated_at = datetime.utcnow()
def __repr__(self):
return f'<ReportsSyncTracking {self.entity_type}:{self.entity_id} synced at {self.last_synced_at}>'
"""
Test script for the new last sync query functionality
"""
import sys
import os
from pathlib import Path
# Add the project root to the path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
def test_imports():
"""Test that all necessary imports work"""
print("Testing imports...")
try:
from mbetterclient.api_client.client import ReportsSyncResponseHandler
print("✓ ReportsSyncResponseHandler imported successfully")
return True
except Exception as e:
print(f"✗ Failed to import ReportsSyncResponseHandler: {e}")
return False
def test_method_exists():
"""Test that new methods exist in ReportsSyncResponseHandler"""
print("\nTesting method existence...")
try:
from mbetterclient.api_client.client import ReportsSyncResponseHandler
from mbetterclient.database.manager import DatabaseManager
from mbetterclient.config.manager import ConfigManager
# Create a mock instance (we won't actually run it)
# Just check that the methods exist
methods_to_check = [
'query_server_last_sync',
'recover_local_tracking',
'needs_recovery'
]
for method_name in methods_to_check:
if hasattr(ReportsSyncResponseHandler, method_name):
print(f"✓ Method '{method_name}' exists")
else:
print(f"✗ Method '{method_name}' NOT found")
return False
return True
except Exception as e:
print(f"✗ Error checking methods: {e}")
return False
def test_endpoint_configuration():
"""Test that the new endpoint is configured"""
print("\nTesting endpoint configuration...")
try:
from mbetterclient.api_client.client import APIClient
from mbetterclient.core.message_bus import MessageBus
from mbetterclient.database.manager import DatabaseManager
from mbetterclient.config.manager import ConfigManager
from mbetterclient.config.settings import ApiConfig
# Check if _get_default_endpoints method exists
if hasattr(APIClient, '_get_default_endpoints'):
print("✓ _get_default_endpoints method exists")
# We can't actually instantiate APIClient without a full setup,
# but we can check the method signature
import inspect
sig = inspect.signature(APIClient._get_default_endpoints)
print(f"✓ Method signature: {sig}")
return True
else:
print("✗ _get_default_endpoints method NOT found")
return False
except Exception as e:
print(f"✗ Error checking endpoint configuration: {e}")
import traceback
traceback.print_exc()
return False
def test_integration():
"""Test that the integration is correct"""
print("\nTesting integration...")
try:
from mbetterclient.api_client.client import ReportsSyncResponseHandler
import inspect
# Check that collect_report_data method exists and has the right signature
if hasattr(ReportsSyncResponseHandler, 'collect_report_data'):
print("✓ collect_report_data method exists")
sig = inspect.signature(ReportsSyncResponseHandler.collect_report_data)
print(f"✓ Method signature: {sig}")
return True
else:
print("✗ collect_report_data method NOT found")
return False
except Exception as e:
print(f"✗ Error checking integration: {e}")
return False
def main():
"""Run all tests"""
print("=" * 60)
print("Testing Last Sync Query Implementation")
print("=" * 60)
results = []
# Run tests
results.append(("Imports", test_imports()))
results.append(("Method Existence", test_method_exists()))
results.append(("Endpoint Configuration", test_endpoint_configuration()))
results.append(("Integration", test_integration()))
# Print summary
print("\n" + "=" * 60)
print("Test Summary")
print("=" * 60)
passed = sum(1 for _, result in results if result)
total = len(results)
for test_name, result in results:
status = "✓ PASS" if result else "✗ FAIL"
print(f"{test_name:.<40} {status}")
print(f"\nTotal: {passed}/{total} tests passed")
if passed == total:
print("\n✓ All tests passed!")
return 0
else:
print(f"\n✗ {total - passed} test(s) failed")
return 1
if __name__ == "__main__":
sys.exit(main())
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment