Files
agentrunner/dashboard_api.py
CI System d218c49b2f fix: resolve API field mismatches and issue count fetching
- Fix PoolStatus.to_dict() field names to match frontend types
  (active_count, active_tasks instead of active, tasks)
- Fix AgentTaskInfo.to_dict() to use start_time instead of started_at
- Fix _get_issues() to use correct YouTrackClient method signature
  (get_issues_by_state takes project and state, not state and limit)
- Fix _get_builds() to use get_running_builds() instead of non-existent
  get_builds() method
- Fix _get_issue_counts() to actually fetch counts from YouTrack API
  instead of returning all zeros

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 07:18:44 -07:00

376 lines
11 KiB
Python

"""
Dashboard API data aggregation layer.
Provides thread-safe access to runner state for the dashboard API.
"""
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from runner import Runner
from agent import AgentPool
logger = logging.getLogger(__name__)
@dataclass
class ServiceHealth:
"""Health status for a single service."""
name: str
healthy: bool
url: str
last_check: Optional[datetime] = None
error: Optional[str] = None
def to_dict(self) -> dict:
return {
"name": self.name,
"healthy": self.healthy,
"url": self.url,
"last_check": self.last_check.isoformat() if self.last_check else None,
"error": self.error,
}
@dataclass
class AgentTaskInfo:
"""Information about a running agent task."""
task_id: str
issue_id: str
repo: str
platform: str
task_type: str
started_at: Optional[datetime] = None
elapsed_seconds: float = 0
def to_dict(self) -> dict:
return {
"task_id": self.task_id,
"issue_id": self.issue_id,
"repo": self.repo,
"platform": self.platform,
"task_type": self.task_type,
"start_time": self.started_at.isoformat() if self.started_at else None,
"elapsed_seconds": round(self.elapsed_seconds, 1),
}
@dataclass
class PoolStatus:
"""Agent pool status."""
max_agents: int
active: int
available: int
timeout_seconds: int
tasks: list[AgentTaskInfo] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"max_agents": self.max_agents,
"active_count": self.active,
"available": self.available,
"timeout_seconds": self.timeout_seconds,
"active_tasks": [t.to_dict() for t in self.tasks],
}
class DashboardAPI:
"""
Aggregates data from runner components for dashboard display.
Thread-safe: all methods can be called from API handler threads.
"""
def __init__(self, runner: "Runner"):
self._runner = runner
self._start_time = datetime.now()
@property
def uptime_seconds(self) -> float:
"""Get runner uptime in seconds."""
return (datetime.now() - self._start_time).total_seconds()
def get_health_status(self) -> dict:
"""
Get health status for all services.
Returns:
{
"status": "ok" | "degraded" | "error",
"services": {
"youtrack": {...},
"gitea": {...},
"woodpecker": {...}
},
"uptime_seconds": 12345
}
"""
health = self._runner.health
# Get current status dict from SystemHealth
status_dict = health._status
services = {}
all_healthy = True
any_healthy = False
# YouTrack
yt_healthy = status_dict.get("youtrack", False)
services["youtrack"] = ServiceHealth(
name="youtrack",
healthy=yt_healthy,
url=self._runner.config.get("youtrack", {}).get("base_url", ""),
last_check=datetime.now(),
).to_dict()
if yt_healthy:
any_healthy = True
else:
all_healthy = False
# Gitea
gitea_healthy = status_dict.get("gitea", False)
services["gitea"] = ServiceHealth(
name="gitea",
healthy=gitea_healthy,
url=self._runner.config.get("gitea", {}).get("base_url", ""),
last_check=datetime.now(),
).to_dict()
if gitea_healthy:
any_healthy = True
else:
all_healthy = False
# Woodpecker
wp_healthy = status_dict.get("woodpecker", False)
services["woodpecker"] = ServiceHealth(
name="woodpecker",
healthy=wp_healthy,
url=self._runner.config.get("woodpecker", {}).get("base_url", ""),
last_check=datetime.now(),
).to_dict()
if wp_healthy:
any_healthy = True
else:
all_healthy = False
# Determine overall status
if all_healthy:
status = "ok"
elif any_healthy:
status = "degraded"
else:
status = "error"
return {
"status": status,
"services": services,
"uptime_seconds": round(self.uptime_seconds),
# Simple boolean format for frontend compatibility
"youtrack": yt_healthy,
"gitea": gitea_healthy,
"woodpecker": wp_healthy,
}
def get_pool_status(self) -> dict:
"""
Get agent pool status.
Thread-safe: uses AgentPool.get_status() which acquires lock.
Returns:
{
"max_agents": 10,
"active": 3,
"available": 7,
"timeout_seconds": 1800,
"tasks": [...]
}
"""
pool = self._runner.agent_pool
now = datetime.now()
# get_status() is thread-safe
raw_status = pool.get_status()
tasks = []
for task_info in raw_status.get("tasks", []):
started_at = None
elapsed = 0
if task_info.get("started"):
try:
started_at = datetime.fromisoformat(task_info["started"])
elapsed = (now - started_at).total_seconds()
except (ValueError, TypeError):
pass
tasks.append(AgentTaskInfo(
task_id=task_info.get("task_id", ""),
issue_id=task_info.get("issue_id", ""),
repo=task_info.get("repo", ""),
platform=task_info.get("platform", ""),
task_type=task_info.get("task_type", "remediation"),
started_at=started_at,
elapsed_seconds=elapsed,
))
return PoolStatus(
max_agents=raw_status.get("max_agents", 0),
active=raw_status.get("active", 0),
available=raw_status.get("available", 0),
timeout_seconds=pool.timeout_seconds,
tasks=tasks,
).to_dict()
def get_dashboard_status(self) -> dict:
"""
Get combined status for dashboard overview.
Returns:
{
"health": {...},
"pool": {...},
"issue_counts": {...},
"last_poll": "...",
"poll_interval": 10
}
"""
return {
"health": self.get_health_status(),
"pool": self.get_pool_status(),
"issue_counts": self._get_issue_counts(),
"last_poll": self._runner.last_poll_time.isoformat() if hasattr(self._runner, 'last_poll_time') and self._runner.last_poll_time else None,
"poll_interval": self._runner.config.get("poll_interval_seconds", 10),
}
def _get_issue_counts(self) -> dict:
"""
Get issue counts by state.
Note: This makes API calls to YouTrack, so use sparingly.
Consider caching in production.
"""
states = self._runner.config.get("project", {}).get("states", {})
project = self._runner.config.get("project", {}).get("name", "CG")
counts = {}
for state_key, state_name in states.items():
# Skip certain states we don't track
if state_key in ("triage", "backlog", "done"):
continue
try:
issues = self._runner.youtrack.get_issues_by_state(project, state_name)
counts[state_name] = len(issues)
except Exception as e:
logger.warning(f"Failed to get count for state {state_name}: {e}")
counts[state_name] = 0
return counts
def get_config(self) -> dict:
"""
Get current configuration (safe subset).
Returns config without sensitive tokens.
"""
config = self._runner.config
return {
"poll_interval_seconds": config.get("poll_interval_seconds", 10),
"agent_timeout_seconds": config.get("agent_timeout_seconds", 1800),
"max_parallel_agents": config.get("max_parallel_agents", 10),
"auto_push": config.get("auto_push", True),
"health_check_interval": config.get("health_check_interval", 300),
"repos": {
name: {
"name": repo.get("name"),
"path": repo.get("path"),
"platform": repo.get("platform"),
}
for name, repo in config.get("repos", {}).items()
},
"project": config.get("project", {}),
}
def update_config(self, updates: dict) -> dict:
"""
Update configuration values.
Only allows updating safe, runtime-modifiable settings.
Args:
updates: Dict of config keys to update
Returns:
{"success": True, "updated": [...], "requires_restart": False}
"""
allowed_keys = {
"poll_interval_seconds",
"agent_timeout_seconds",
"max_parallel_agents",
"auto_push",
}
updated = []
requires_restart = False
for key, value in updates.items():
if key not in allowed_keys:
continue
# Validate types
if key in ("poll_interval_seconds", "agent_timeout_seconds", "max_parallel_agents"):
if not isinstance(value, int) or value < 1:
continue
# Update config
self._runner.config[key] = value
updated.append(key)
# Update pool if relevant
if key == "max_parallel_agents":
self._runner.agent_pool.max_agents = value
elif key == "agent_timeout_seconds":
self._runner.agent_pool.timeout_seconds = value
return {
"success": True,
"updated": updated,
"requires_restart": requires_restart,
}
def kill_agent(self, task_id: str) -> dict:
"""
Kill a running agent task.
Args:
task_id: The task ID to kill
Returns:
{"success": True/False, "message": "..."}
"""
pool = self._runner.agent_pool
# Check if task exists
if not pool.is_task_running(task_id):
return {
"success": False,
"message": f"Task {task_id} not found or already completed",
}
# Kill the task (method added in agent.py modifications)
if hasattr(pool, 'kill_task'):
result = pool.kill_task(task_id)
if result:
return {
"success": True,
"message": f"Task {task_id} terminated",
}
return {
"success": False,
"message": f"Failed to terminate task {task_id}",
}