refactor: Consolidate message types - rename START_GAMES to SCHEDULE_GAMES

- Rename START_GAMES message type to SCHEDULE_GAMES for clarity
- Add schedule_games() method to MessageBuilder class
- Update all references in games_thread.py, match_timer.py, routes.py, and tests
- Improve message naming consistency:
  * START_GAME: Actually starts games/timers
  * SCHEDULE_GAMES: Changes status to scheduled (doesn't start timer)
- All message types now follow consistent naming convention
- Fixes potential confusion between starting and scheduling actions
parent d5cdad5a
"""
Games thread component for managing game-related operations
"""
import time
import logging
import threading
from datetime import datetime
from typing import Optional, Dict, Any, List
from .thread_manager import ThreadedComponent
from .message_bus import MessageBus, Message, MessageType, MessageBuilder
from ..database.manager import DatabaseManager
from ..database.models import MatchModel, MatchStatus
logger = logging.getLogger(__name__)
class GamesThread(ThreadedComponent):
"""Games thread for handling game operations and monitoring"""
def __init__(self, name: str, message_bus: MessageBus, db_manager: DatabaseManager):
super().__init__(name, message_bus)
self.db_manager = db_manager
self.current_fixture_id: Optional[str] = None
self.game_active = False
self._shutdown_event = threading.Event()
self.message_queue = None
def initialize(self) -> bool:
"""Initialize the games thread"""
try:
logger.info("Initializing GamesThread...")
# Register with message bus first
self.message_queue = self.message_bus.register_component(self.name)
# Subscribe to relevant messages
self.message_bus.subscribe(self.name, MessageType.START_GAME, self._handle_start_game)
self.message_bus.subscribe(self.name, MessageType.SCHEDULE_GAMES, self._handle_schedule_games)
self.message_bus.subscribe(self.name, MessageType.SYSTEM_SHUTDOWN, self._handle_shutdown_message)
# Send ready status
ready_message = MessageBuilder.system_status(
sender=self.name,
status="ready",
details={
"component": "games_thread",
"capabilities": ["game_monitoring", "fixture_tracking"]
}
)
self.message_bus.publish(ready_message)
logger.info("GamesThread initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize GamesThread: {e}")
return False
def run(self):
"""Main run loop for the games thread"""
logger.info("GamesThread started")
try:
while self.running and not self._shutdown_event.is_set():
try:
# Process any pending messages
message = self.message_bus.get_message(self.name, timeout=0.1)
if message:
self._process_message(message)
# If a game is active, perform game-related operations
if self.game_active and self.current_fixture_id:
self._monitor_game_state()
# Update heartbeat
self.heartbeat()
# Sleep for 0.1 seconds as requested
time.sleep(0.1)
except Exception as e:
logger.error(f"GamesThread run loop error: {e}")
time.sleep(1.0) # Longer sleep on error
except Exception as e:
logger.error(f"GamesThread run failed: {e}")
finally:
self._cleanup()
logger.info("GamesThread ended")
def shutdown(self):
"""Shutdown the games thread"""
logger.info("GamesThread shutdown requested")
self._shutdown_event.set()
self.game_active = False
def _handle_start_game(self, message: Message):
"""Handle START_GAME message"""
try:
fixture_id = message.data.get("fixture_id")
if not fixture_id:
# If no fixture_id provided, find the last fixture with pending matches
fixture_id = self._find_last_fixture_with_pending_matches()
if fixture_id:
logger.info(f"Starting game for fixture: {fixture_id}")
self.current_fixture_id = fixture_id
self.game_active = True
# Send game started confirmation
response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "started",
"fixture_id": fixture_id,
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(response)
else:
logger.warning("No fixture with pending matches found")
# Send error response
error_response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "error",
"error": "No fixture with pending matches found",
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(error_response)
except Exception as e:
logger.error(f"Failed to handle START_GAME message: {e}")
# Send error response
error_response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "error",
"error": str(e),
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(error_response)
def _handle_schedule_games(self, message: Message):
"""Handle SCHEDULE_GAMES message - change status of pending matches to scheduled"""
try:
fixture_id = message.data.get("fixture_id")
if not fixture_id:
# If no fixture_id provided, find the last fixture with pending matches
fixture_id = self._find_last_fixture_with_pending_matches()
if fixture_id:
logger.info(f"Scheduling games for fixture: {fixture_id}")
# Update status of all pending matches in the fixture to scheduled
updated_count = self._schedule_fixture_matches(fixture_id)
if updated_count > 0:
logger.info(f"Successfully scheduled {updated_count} matches for fixture {fixture_id}")
# Send success response
response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "scheduled",
"fixture_id": fixture_id,
"matches_scheduled": updated_count,
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(response)
else:
logger.warning(f"No pending matches found to schedule for fixture {fixture_id}")
# Send response indicating no matches were scheduled
response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "no_matches",
"fixture_id": fixture_id,
"message": "No pending matches found to schedule",
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(response)
else:
logger.warning("No fixture with pending matches found")
# Send error response
error_response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "error",
"error": "No fixture with pending matches found",
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(error_response)
except Exception as e:
logger.error(f"Failed to handle SCHEDULE_GAMES message: {e}")
# Send error response
error_response = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
recipient=message.sender,
data={
"status": "error",
"error": str(e),
"timestamp": time.time()
},
correlation_id=message.correlation_id
)
self.message_bus.publish(error_response)
def _handle_shutdown_message(self, message: Message):
"""Handle shutdown message"""
logger.info(f"Shutdown message received from {message.sender}")
self._shutdown_event.set()
self.game_active = False
def _process_message(self, message: Message):
"""Process incoming messages"""
try:
logger.debug(f"GamesThread processing message: {message}")
# Messages are handled by subscribed handlers, but we can add additional processing here
if message.type == MessageType.GAME_UPDATE:
self._handle_game_update(message)
except Exception as e:
logger.error(f"Failed to process message: {e}")
def _handle_game_update(self, message: Message):
"""Handle game update messages"""
try:
update_data = message.data
logger.debug(f"Game update received: {update_data}")
# Process game update data as needed
# This could include updating match states, processing outcomes, etc.
except Exception as e:
logger.error(f"Failed to handle game update: {e}")
def _find_last_fixture_with_pending_matches(self) -> Optional[str]:
"""Find the last fixture that has pending matches"""
try:
session = self.db_manager.get_session()
try:
# Query for matches with PENDING status
pending_matches = session.query(MatchModel).filter(
MatchModel.status == 'pending',
MatchModel.active_status == True
).order_by(MatchModel.fixture_active_time.desc()).all()
if pending_matches:
# Get the fixture_id from the most recent pending match
latest_match = pending_matches[0]
fixture_id = latest_match.fixture_id
logger.info(f"Found fixture with pending matches: {fixture_id} ({len(pending_matches)} matches)")
return fixture_id
else:
logger.info("No pending matches found")
return None
finally:
session.close()
except Exception as e:
logger.error(f"Failed to find last fixture with pending matches: {e}")
return None
def _schedule_fixture_matches(self, fixture_id: str) -> int:
"""Update status of all pending matches in a fixture to scheduled"""
try:
session = self.db_manager.get_session()
try:
# Query for pending matches in the specified fixture
pending_matches = session.query(MatchModel).filter(
MatchModel.fixture_id == fixture_id,
MatchModel.status == 'pending',
MatchModel.active_status == True
).all()
updated_count = 0
for match in pending_matches:
# Change status from PENDING to SCHEDULED
match.status = 'scheduled'
logger.debug(f"Scheduling match #{match.match_number}: {match.fighter1_township} vs {match.fighter2_township} - status changed to {match.status}")
updated_count += 1
# Commit the changes
session.commit()
logger.info(f"Scheduled {updated_count} matches for fixture {fixture_id}")
return updated_count
finally:
session.close()
except Exception as e:
logger.error(f"Failed to schedule matches for fixture {fixture_id}: {e}")
return 0
def _monitor_game_state(self):
"""Monitor the current game state"""
try:
if not self.current_fixture_id:
return
# Check if there are still pending or scheduled matches for this fixture
session = self.db_manager.get_session()
try:
active_count = session.query(MatchModel).filter(
MatchModel.fixture_id == self.current_fixture_id,
MatchModel.status.in_(['pending', 'scheduled', 'bet', 'ingame']),
MatchModel.active_status == True
).count()
if active_count == 0:
logger.info(f"All matches completed for fixture {self.current_fixture_id}")
self.game_active = False
# Send game completed message
completed_message = Message(
type=MessageType.GAME_STATUS,
sender=self.name,
data={
"status": "completed",
"fixture_id": self.current_fixture_id,
"timestamp": time.time()
}
)
self.message_bus.publish(completed_message)
# Reset current fixture
self.current_fixture_id = None
finally:
session.close()
except Exception as e:
logger.error(f"Failed to monitor game state: {e}")
def _cleanup(self):
"""Perform cleanup operations"""
try:
logger.info("GamesThread performing cleanup...")
# Reset state
self.game_active = False
self.current_fixture_id = None
# Send final status
final_status = MessageBuilder.system_status(
sender=self.name,
status="shutdown",
details={
"component": "games_thread",
"cleanup_completed": True
}
)
self.message_bus.publish(final_status)
logger.info("GamesThread cleanup completed")
except Exception as e:
logger.error(f"GamesThread cleanup error: {e}")
\ No newline at end of file
......@@ -38,7 +38,7 @@ class MatchTimerComponent(ThreadedComponent):
# Register message handlers
self.message_bus.subscribe(self.name, MessageType.START_GAME, self._handle_start_game)
self.message_bus.subscribe(self.name, MessageType.START_GAMES, self._handle_start_games)
self.message_bus.subscribe(self.name, MessageType.SCHEDULE_GAMES, self._handle_schedule_games)
self.message_bus.subscribe(self.name, MessageType.CUSTOM, self._handle_custom_message)
logger.info("MatchTimer component initialized")
......@@ -141,10 +141,10 @@ class MatchTimerComponent(ThreadedComponent):
except Exception as e:
logger.error(f"Failed to handle START_GAME message: {e}")
def _handle_start_games(self, message: Message):
"""Handle START_GAMES message"""
def _handle_schedule_games(self, message: Message):
"""Handle SCHEDULE_GAMES message"""
try:
logger.info("Received START_GAMES message")
logger.info("Received SCHEDULE_GAMES message")
# Get match interval from configuration
match_interval = self._get_match_interval()
......@@ -153,7 +153,7 @@ class MatchTimerComponent(ThreadedComponent):
self._start_timer(match_interval * 60, None)
except Exception as e:
logger.error(f"Failed to handle START_GAMES message: {e}")
logger.error(f"Failed to handle SCHEDULE_GAMES message: {e}")
def _handle_custom_message(self, message: Message):
"""Handle custom messages (like timer state requests)"""
......
......@@ -62,7 +62,7 @@ class MessageType(Enum):
# Game messages
START_GAME = "START_GAME"
START_GAMES = "START_GAMES"
SCHEDULE_GAMES = "SCHEDULE_GAMES"
START_GAME_DELAYED = "START_GAME_DELAYED"
MATCH_START = "MATCH_START"
GAME_STATUS = "GAME_STATUS"
......@@ -550,6 +550,17 @@ class MessageBuilder:
}
)
@staticmethod
def schedule_games(sender: str, fixture_id: Optional[str] = None) -> Message:
"""Create SCHEDULE_GAMES message"""
return Message(
type=MessageType.SCHEDULE_GAMES,
sender=sender,
data={
"fixture_id": fixture_id
}
)
@staticmethod
def start_game_delayed(sender: str, fixture_id: Optional[str] = None, delay_minutes: Optional[int] = None) -> Message:
"""Create START_GAME_DELAYED message"""
......
......@@ -2119,11 +2119,11 @@ def control_match_timer():
return jsonify({"error": "Action is required"}), 400
if action == 'start':
# Send START_GAMES message to start the timer
# Send SCHEDULE_GAMES message to start the timer
if api_bp.message_bus:
from ..core.message_bus import MessageBuilder, MessageType
start_message = MessageBuilder.start_games(sender="web_dashboard")
start_message = MessageBuilder.schedule_games(sender="web_dashboard")
api_bp.message_bus.publish(start_message)
return jsonify({
......
#!/usr/bin/env python3
"""
Test script for the GamesThread component
"""
import sys
import os
import time
import logging
# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from mbetterclient.core.games_thread import GamesThread
from mbetterclient.core.message_bus import MessageBus, Message, MessageType
from mbetterclient.database.manager import DatabaseManager
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def test_games_thread():
"""Test the GamesThread component"""
try:
logger.info("Testing GamesThread component...")
# Create a message bus
message_bus = MessageBus()
# Create a mock database manager (we won't actually connect to DB)
class MockDBManager:
def get_session(self):
class MockSession:
def query(self, model):
return self
def filter(self, *args):
return self
def order_by(self, *args):
return self
def first(self):
return None # No matches for this test
def close(self):
pass
return MockSession()
db_manager = MockDBManager()
# Create games thread
games_thread = GamesThread(
name="test_games_thread",
message_bus=message_bus,
db_manager=db_manager
)
# Test initialization
logger.info("Testing initialization...")
if not games_thread.initialize():
logger.error("Failed to initialize GamesThread")
return False
logger.info("GamesThread initialized successfully")
# Test START_GAME message handling
logger.info("Testing START_GAME message handling...")
start_game_message = Message(
type=MessageType.START_GAME,
sender="test",
data={
"fixture_id": "test_fixture_123"
}
)
# Send the message
message_bus.publish(start_game_message)
# Give it a moment to process
time.sleep(0.5)
# Test START_GAMES message handling
logger.info("Testing START_GAMES message handling...")
start_games_message = Message(
type=MessageType.SCHEDULE_GAMES,
sender="test",
data={
"fixture_id": "test_fixture_456"
}
)
# Send the message
message_bus.publish(start_games_message)
# Give it a moment to process
time.sleep(0.5)
# Test shutdown
logger.info("Testing shutdown...")
games_thread.shutdown()
logger.info("GamesThread test completed successfully")
return True
except Exception as e:
logger.error(f"GamesThread test failed: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_games_thread()
sys.exit(0 if success else 1)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment