Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1660 lines
65 KiB
Python
1660 lines
65 KiB
Python
"""
|
|
Dashboard API server with REST endpoints and WebSocket support.
|
|
|
|
Extends the existing webhook server to add dashboard functionality.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import mimetypes
|
|
import threading
|
|
from datetime import datetime
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Callable, Optional, Set
|
|
from urllib.parse import parse_qs, urlparse, unquote
|
|
|
|
if TYPE_CHECKING:
|
|
from runner import Runner
|
|
from orchestrator import OrchestratorManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Static file directory for built Svelte app
|
|
STATIC_DIR = Path(__file__).parent / "static" / "dashboard"
|
|
|
|
|
|
class EventBroadcaster:
|
|
"""
|
|
Manages WebSocket-like event broadcasting.
|
|
|
|
Since we're using a simple HTTP server, we implement Server-Sent Events (SSE)
|
|
instead of WebSockets for simplicity. This avoids adding asyncio complexity
|
|
to the existing synchronous codebase.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._subscribers: Set[Callable[[dict], None]] = set()
|
|
self._lock = threading.Lock()
|
|
self._event_queue: list[dict] = []
|
|
self._max_queue_size = 100
|
|
|
|
def subscribe(self, callback: Callable[[dict], None]):
|
|
"""Add a subscriber callback."""
|
|
with self._lock:
|
|
self._subscribers.add(callback)
|
|
|
|
def unsubscribe(self, callback: Callable[[dict], None]):
|
|
"""Remove a subscriber callback."""
|
|
with self._lock:
|
|
self._subscribers.discard(callback)
|
|
|
|
def broadcast(self, event_type: str, data: dict):
|
|
"""
|
|
Broadcast an event to all subscribers.
|
|
|
|
Thread-safe: can be called from any thread.
|
|
"""
|
|
event = {
|
|
"type": event_type,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"data": data,
|
|
}
|
|
|
|
with self._lock:
|
|
# Store in queue for SSE clients to poll
|
|
self._event_queue.append(event)
|
|
if len(self._event_queue) > self._max_queue_size:
|
|
self._event_queue.pop(0)
|
|
|
|
# Notify subscribers
|
|
for callback in list(self._subscribers):
|
|
try:
|
|
callback(event)
|
|
except Exception as e:
|
|
logger.error(f"Subscriber callback error: {e}")
|
|
|
|
def get_recent_events(self, since_timestamp: Optional[str] = None) -> list[dict]:
|
|
"""Get events since a timestamp (for SSE polling)."""
|
|
with self._lock:
|
|
if not since_timestamp:
|
|
return list(self._event_queue)
|
|
|
|
try:
|
|
since = datetime.fromisoformat(since_timestamp)
|
|
return [
|
|
e for e in self._event_queue
|
|
if datetime.fromisoformat(e["timestamp"]) > since
|
|
]
|
|
except (ValueError, TypeError):
|
|
return list(self._event_queue)
|
|
|
|
|
|
# Global broadcaster instance
|
|
broadcaster = EventBroadcaster()
|
|
|
|
|
|
class DashboardAPIHandler(BaseHTTPRequestHandler):
|
|
"""HTTP request handler for dashboard API endpoints."""
|
|
|
|
# Reference to runner (set during server initialization)
|
|
runner: Optional["Runner"] = None
|
|
dashboard_api = None
|
|
oauth = None
|
|
orchestrator_manager: Optional["OrchestratorManager"] = None
|
|
|
|
def log_message(self, format, *args):
|
|
"""Override to use our logger."""
|
|
logger.debug(f"{self.address_string()} - {format % args}")
|
|
|
|
def send_json(self, data: dict, status: int = 200):
|
|
"""Send JSON response."""
|
|
body = json.dumps(data).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", len(body))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def send_error_json(self, status: int, code: str, message: str):
|
|
"""Send JSON error response."""
|
|
self.send_json({
|
|
"error": True,
|
|
"code": code,
|
|
"message": message,
|
|
}, status)
|
|
|
|
def get_json_body(self) -> Optional[dict]:
|
|
"""Parse JSON request body."""
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
if content_length == 0:
|
|
return {}
|
|
|
|
try:
|
|
body = self.rfile.read(content_length)
|
|
return json.loads(body)
|
|
except (json.JSONDecodeError, UnicodeDecodeError) as e:
|
|
logger.warning(f"Invalid JSON body: {e}")
|
|
return None
|
|
|
|
def handle_agent_stream(self, task_id: str):
|
|
"""
|
|
Stream agent output via Server-Sent Events (SSE).
|
|
|
|
Sends new messages as they arrive until the task completes or client disconnects.
|
|
"""
|
|
import time
|
|
|
|
if not self.runner:
|
|
self.send_error_json(503, "NOT_READY", "Runner not initialized")
|
|
return
|
|
|
|
# Check task exists
|
|
task = self.runner.agent_pool.get_task(task_id)
|
|
if not task:
|
|
self.send_error_json(404, "NOT_FOUND", f"Task {task_id} not found")
|
|
return
|
|
|
|
# Send SSE headers
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
last_index = 0
|
|
|
|
try:
|
|
while True:
|
|
# Get task (may have completed)
|
|
task = self.runner.agent_pool.get_task(task_id)
|
|
|
|
if task:
|
|
# Send any new messages
|
|
messages = task.get_messages(last_index)
|
|
for msg in messages:
|
|
event_data = json.dumps(msg)
|
|
self.wfile.write(f"data: {event_data}\n\n".encode())
|
|
self.wfile.flush()
|
|
last_index += 1
|
|
|
|
# Check if task is still running
|
|
if task.process and task.process.poll() is not None:
|
|
# Task completed - send final event
|
|
self.wfile.write(f"event: complete\ndata: {json.dumps({'returncode': task.returncode})}\n\n".encode())
|
|
self.wfile.flush()
|
|
break
|
|
else:
|
|
# Task no longer in active pool - it completed
|
|
self.wfile.write(f"event: complete\ndata: {json.dumps({'message': 'Task completed'})}\n\n".encode())
|
|
self.wfile.flush()
|
|
break
|
|
|
|
# Send keepalive
|
|
self.wfile.write(": keepalive\n\n".encode())
|
|
self.wfile.flush()
|
|
|
|
time.sleep(0.5) # Poll every 500ms
|
|
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
logger.debug(f"SSE client disconnected from {task_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error in SSE stream for {task_id}: {e}")
|
|
|
|
def require_auth(self) -> bool:
|
|
"""
|
|
Check if request is authenticated.
|
|
|
|
Returns True if authenticated, False otherwise (and sends 401).
|
|
"""
|
|
if not self.oauth:
|
|
# No OAuth configured - allow all requests
|
|
return True
|
|
|
|
cookie_header = self.headers.get("Cookie", "")
|
|
session = self.oauth.get_session_from_cookie(cookie_header)
|
|
|
|
if not session:
|
|
self.send_error_json(401, "UNAUTHORIZED", "Authentication required")
|
|
return False
|
|
|
|
# Store user in request for later use
|
|
self.current_user = session.user
|
|
return True
|
|
|
|
def do_GET(self):
|
|
"""Handle GET requests."""
|
|
parsed = urlparse(self.path)
|
|
path = parsed.path
|
|
query = parse_qs(parsed.query)
|
|
|
|
# Health check (no auth required)
|
|
if path == "/health":
|
|
self.send_json({"status": "healthy"})
|
|
return
|
|
|
|
# OAuth endpoints (no auth required)
|
|
if path.startswith("/oauth/"):
|
|
return self.handle_oauth(path, query)
|
|
|
|
# API endpoints (auth required)
|
|
if path.startswith("/api/"):
|
|
if not self.require_auth():
|
|
return
|
|
return self.handle_api_get(path, query)
|
|
|
|
# SSE events endpoint
|
|
if path == "/events":
|
|
if not self.require_auth():
|
|
return
|
|
return self.handle_sse(query)
|
|
|
|
# Static files (auth required for dashboard)
|
|
return self.serve_static(path)
|
|
|
|
def do_POST(self):
|
|
"""Handle POST requests."""
|
|
parsed = urlparse(self.path)
|
|
path = parsed.path
|
|
|
|
# Webhook endpoint (no auth - has its own verification)
|
|
if path == "/webhook/youtrack":
|
|
# Delegate to webhook handler in webhook_server.py
|
|
return self.handle_webhook()
|
|
|
|
# API endpoints (auth required)
|
|
if path.startswith("/api/"):
|
|
if not self.require_auth():
|
|
return
|
|
return self.handle_api_post(path)
|
|
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
|
|
|
|
def do_PUT(self):
|
|
"""Handle PUT requests."""
|
|
parsed = urlparse(self.path)
|
|
path = parsed.path
|
|
|
|
if path.startswith("/api/"):
|
|
if not self.require_auth():
|
|
return
|
|
return self.handle_api_put(path)
|
|
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
|
|
|
|
def do_DELETE(self):
|
|
"""Handle DELETE requests."""
|
|
parsed = urlparse(self.path)
|
|
path = parsed.path
|
|
query = parse_qs(parsed.query)
|
|
|
|
if path.startswith("/api/"):
|
|
if not self.require_auth():
|
|
return
|
|
return self.handle_api_delete(path, query)
|
|
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
|
|
|
|
def handle_oauth(self, path: str, query: dict):
|
|
"""Handle OAuth endpoints."""
|
|
if not self.oauth:
|
|
self.send_error_json(501, "NOT_CONFIGURED", "OAuth not configured")
|
|
return
|
|
|
|
if path == "/oauth/login":
|
|
# Redirect to Gitea authorize URL
|
|
authorize_url, state = self.oauth.get_authorize_url()
|
|
self.send_response(302)
|
|
self.send_header("Location", authorize_url)
|
|
self.end_headers()
|
|
|
|
elif path == "/oauth/callback":
|
|
# Handle OAuth callback
|
|
code = query.get("code", [None])[0]
|
|
state = query.get("state", [None])[0]
|
|
|
|
if not code or not state:
|
|
self.send_error_json(400, "INVALID_CALLBACK", "Missing code or state")
|
|
return
|
|
|
|
session = self.oauth.handle_callback(code, state)
|
|
if not session:
|
|
self.send_error_json(401, "AUTH_FAILED", "Authentication failed")
|
|
return
|
|
|
|
# Set session cookie and redirect to dashboard
|
|
self.send_response(302)
|
|
self.send_header("Set-Cookie", self.oauth.create_session_cookie(session))
|
|
self.send_header("Location", "/")
|
|
self.end_headers()
|
|
|
|
elif path == "/oauth/logout":
|
|
# Clear session
|
|
cookie_header = self.headers.get("Cookie", "")
|
|
session = self.oauth.get_session_from_cookie(cookie_header)
|
|
if session:
|
|
self.oauth.logout(session.session_id)
|
|
|
|
self.send_response(302)
|
|
self.send_header("Set-Cookie", self.oauth.create_logout_cookie())
|
|
self.send_header("Location", "/oauth/login")
|
|
self.end_headers()
|
|
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown OAuth endpoint: {path}")
|
|
|
|
def handle_api_get(self, path: str, query: dict):
|
|
"""Handle API GET requests."""
|
|
if not self.dashboard_api:
|
|
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
|
|
return
|
|
|
|
# GET /api/user - Current user info
|
|
if path == "/api/user":
|
|
if hasattr(self, 'current_user') and self.current_user:
|
|
self.send_json(self.current_user.to_dict())
|
|
else:
|
|
self.send_json({"login": "anonymous", "is_admin": False})
|
|
|
|
# GET /api/health - Service health
|
|
elif path == "/api/health":
|
|
self.send_json(self.dashboard_api.get_health_status())
|
|
|
|
# GET /api/status - Full dashboard status
|
|
elif path == "/api/status":
|
|
self.send_json(self.dashboard_api.get_dashboard_status())
|
|
|
|
# GET /api/agents - Agent pool status
|
|
elif path == "/api/agents":
|
|
self.send_json(self.dashboard_api.get_pool_status())
|
|
|
|
# GET /api/agents/{task_id}/output - Agent streaming output
|
|
elif path.startswith("/api/agents/") and path.endswith("/output"):
|
|
parts = path.split("/")
|
|
task_id = parts[3] if len(parts) > 4 else None
|
|
if task_id:
|
|
since = int(query.get("since", [0])[0])
|
|
output = self.runner.agent_pool.get_task_output(task_id, since)
|
|
if output:
|
|
self.send_json(output)
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Task {task_id} not found")
|
|
else:
|
|
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
|
|
|
|
# GET /api/agents/{task_id}/stream - SSE stream for agent output
|
|
elif path.startswith("/api/agents/") and path.endswith("/stream"):
|
|
parts = path.split("/")
|
|
task_id = parts[3] if len(parts) > 4 else None
|
|
if task_id:
|
|
self.handle_agent_stream(task_id)
|
|
else:
|
|
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
|
|
|
|
# GET /api/history - Agent run history with filtering
|
|
elif path == "/api/history":
|
|
limit = int(query.get("limit", [50])[0])
|
|
offset = int(query.get("offset", [0])[0])
|
|
task_type = query.get("type", [None])[0]
|
|
status = query.get("status", [None])[0]
|
|
since = query.get("since", [None])[0]
|
|
until = query.get("until", [None])[0]
|
|
repo = query.get("repo", [None])[0]
|
|
search = query.get("search", [None])[0]
|
|
|
|
# Use filtered method if any filters are provided
|
|
if any([task_type, status, since, until, repo, search]):
|
|
runs, total = self.runner.agent_pool.history.get_runs_filtered(
|
|
limit=limit,
|
|
offset=offset,
|
|
task_type=task_type,
|
|
status=status,
|
|
since=since,
|
|
until=until,
|
|
repo=repo,
|
|
search=search,
|
|
)
|
|
self.send_json({"runs": runs, "limit": limit, "offset": offset, "total": total})
|
|
else:
|
|
runs = self.runner.agent_pool.history.get_runs(limit, offset)
|
|
# For unfiltered, total is the full index length
|
|
total = len(self.runner.agent_pool.history._index.get("runs", []))
|
|
self.send_json({"runs": runs, "limit": limit, "offset": offset, "total": total})
|
|
|
|
# GET /api/history/{task_id} - Single history run
|
|
elif path.startswith("/api/history/"):
|
|
task_id = unquote(path.split("/")[-1])
|
|
run = self.runner.agent_pool.history.get_run(task_id)
|
|
if run:
|
|
self.send_json(run)
|
|
else:
|
|
# Also check if it's an active task
|
|
output = self.runner.agent_pool.get_task_output(task_id)
|
|
if output:
|
|
self.send_json(output)
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Run {task_id} not found")
|
|
|
|
# GET /api/config - Configuration
|
|
elif path == "/api/config":
|
|
self.send_json(self.dashboard_api.get_config())
|
|
|
|
# GET /api/issues - Issue list (proxy to YouTrack)
|
|
elif path == "/api/issues":
|
|
states = query.get("state", ["Ready,Verify,Document"])[0].split(",")
|
|
limit = int(query.get("limit", [50])[0])
|
|
self.send_json(self._get_issues(states, limit))
|
|
|
|
# GET /api/issues/{id} - Single issue
|
|
elif path.startswith("/api/issues/"):
|
|
issue_id = path.split("/")[-1]
|
|
self.send_json(self._get_issue(issue_id))
|
|
|
|
# GET /api/builds - Build list (proxy to Woodpecker)
|
|
elif path == "/api/builds":
|
|
repo = query.get("repo", [None])[0]
|
|
status = query.get("status", [None])[0]
|
|
limit = int(query.get("limit", [50])[0])
|
|
self.send_json(self._get_builds(repo, status, limit))
|
|
|
|
# GET /api/builds/{repo}/{build_id}/logs - Build logs
|
|
elif "/builds/" in path and path.endswith("/logs"):
|
|
parts = path.split("/")
|
|
if len(parts) >= 5:
|
|
repo = parts[3]
|
|
build_id = int(parts[4])
|
|
lines = int(query.get("lines", [200])[0])
|
|
logs = self.runner.woodpecker.get_build_log_excerpt(f"cleargrow/{repo}", build_id, lines)
|
|
self.send_json({"logs": logs, "build_id": build_id, "repo": repo})
|
|
else:
|
|
self.send_error_json(400, "INVALID_PATH", "Invalid build logs path")
|
|
|
|
# GET /api/repos/health - Repository health status
|
|
elif path == "/api/repos/health":
|
|
self.send_json(self._get_repos_health())
|
|
|
|
# GET /api/system/overview - System overview with VPS status
|
|
elif path == "/api/system/overview":
|
|
self.send_json(self._get_system_overview())
|
|
|
|
# GET /api/analytics/agents - Agent analytics
|
|
elif path == "/api/analytics/agents":
|
|
period = query.get("period", ["7d"])[0]
|
|
self.send_json(self._get_agent_analytics(period))
|
|
|
|
# GET /api/analytics/builds - Build analytics
|
|
elif path == "/api/analytics/builds":
|
|
period = query.get("period", ["7d"])[0]
|
|
self.send_json(self._get_build_analytics(period))
|
|
|
|
# GET /api/analytics/issues - Issue analytics
|
|
elif path == "/api/analytics/issues":
|
|
period = query.get("period", ["7d"])[0]
|
|
self.send_json(self._get_issue_analytics(period))
|
|
|
|
# GET /api/orchestrator/status - Orchestrator session status
|
|
elif path == "/api/orchestrator/status":
|
|
self.send_json(self._get_orchestrator_status())
|
|
|
|
# GET /api/orchestrator/messages - Get conversation messages
|
|
elif path == "/api/orchestrator/messages":
|
|
since = int(query.get("since", [0])[0])
|
|
self.send_json(self._get_orchestrator_messages(since))
|
|
|
|
# GET /api/orchestrator/stream - SSE stream for orchestrator output
|
|
elif path == "/api/orchestrator/stream":
|
|
self.handle_orchestrator_stream()
|
|
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
|
|
|
|
def handle_api_post(self, path: str):
|
|
"""Handle API POST requests."""
|
|
if not self.dashboard_api:
|
|
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
|
|
return
|
|
|
|
body = self.get_json_body()
|
|
if body is None:
|
|
self.send_error_json(400, "INVALID_JSON", "Invalid JSON body")
|
|
return
|
|
|
|
# POST /api/agents/{task_id}/kill - Kill an agent
|
|
if path.startswith("/api/agents/") and path.endswith("/kill"):
|
|
parts = path.split("/")
|
|
task_id = parts[3] if len(parts) > 3 else None
|
|
if task_id:
|
|
result = self.dashboard_api.kill_agent(task_id)
|
|
self.send_json(result, 200 if result["success"] else 404)
|
|
else:
|
|
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
|
|
|
|
# POST /api/agents/retry - Retry a task
|
|
elif path == "/api/agents/retry":
|
|
issue_id = body.get("issue_id")
|
|
task_type = body.get("task_type", "remediation")
|
|
if not issue_id:
|
|
self.send_error_json(400, "MISSING_ISSUE_ID", "Issue ID required")
|
|
return
|
|
# TODO: Implement retry logic
|
|
self.send_json({"success": False, "message": "Retry not yet implemented"})
|
|
|
|
# POST /api/issues/{id}/transition - Transition issue state
|
|
elif path.startswith("/api/issues/") and path.endswith("/transition"):
|
|
parts = path.split("/")
|
|
issue_id = parts[3] if len(parts) > 4 else None
|
|
new_state = body.get("state")
|
|
if issue_id and new_state:
|
|
result = self._transition_issue(issue_id, new_state)
|
|
self.send_json(result, 200 if result.get("success") else 400)
|
|
else:
|
|
self.send_error_json(400, "MISSING_PARAMS", "Issue ID and state required")
|
|
|
|
# POST /api/issues/{id}/comments - Add comment to issue
|
|
elif path.startswith("/api/issues/") and path.endswith("/comments"):
|
|
parts = path.split("/")
|
|
issue_id = parts[3] if len(parts) > 4 else None
|
|
comment_text = body.get("text", "").strip()
|
|
if issue_id and comment_text:
|
|
result = self._add_issue_comment(issue_id, comment_text)
|
|
self.send_json(result, 200 if result.get("success") else 400)
|
|
else:
|
|
self.send_error_json(400, "MISSING_PARAMS", "Issue ID and comment text required")
|
|
|
|
# POST /api/issues - Create new issue
|
|
elif path == "/api/issues":
|
|
result = self._create_issue(body)
|
|
self.send_json(result, 201 if result.get("success") else 400)
|
|
|
|
# POST /api/builds/{repo}/{id}/retry - Retry a build
|
|
elif "/builds/" in path and path.endswith("/retry"):
|
|
parts = path.split("/")
|
|
if len(parts) >= 5:
|
|
repo = parts[3]
|
|
build_id = int(parts[4])
|
|
result = self.runner.woodpecker.retry_pipeline(f"cleargrow/{repo}", build_id)
|
|
self.send_json(result, 200 if result.get("success") else 400)
|
|
else:
|
|
self.send_error_json(400, "INVALID_PATH", "Invalid build retry path")
|
|
|
|
# POST /api/history/bulk-delete - Bulk delete history runs
|
|
elif path == "/api/history/bulk-delete":
|
|
task_ids = body.get("task_ids", [])
|
|
if not task_ids:
|
|
self.send_error_json(400, "MISSING_TASK_IDS", "task_ids array required")
|
|
return
|
|
|
|
deleted = self.runner.agent_pool.history.delete_runs(task_ids)
|
|
self.send_json({"success": True, "deleted": deleted})
|
|
|
|
# POST /api/issues/bulk-transition - Bulk transition issues
|
|
elif path == "/api/issues/bulk-transition":
|
|
issue_ids = body.get("issue_ids", [])
|
|
new_state = body.get("state")
|
|
if not issue_ids or not new_state:
|
|
self.send_error_json(400, "MISSING_PARAMS", "issue_ids and state required")
|
|
return
|
|
|
|
results = []
|
|
for issue_id in issue_ids:
|
|
result = self._transition_issue(issue_id, new_state)
|
|
results.append({
|
|
"issue_id": issue_id,
|
|
"success": result.get("success", False),
|
|
"error": result.get("message") if not result.get("success") else None,
|
|
})
|
|
|
|
self.send_json({"results": results})
|
|
|
|
# POST /api/orchestrator/chat - Send message to orchestrator
|
|
elif path == "/api/orchestrator/chat":
|
|
message = body.get("message", "").strip()
|
|
logger.info(f"[API] POST /api/orchestrator/chat (message length: {len(message)})")
|
|
if not message:
|
|
logger.warning("[API] orchestrator/chat: missing message")
|
|
self.send_error_json(400, "MISSING_MESSAGE", "Message required")
|
|
return
|
|
self.handle_orchestrator_chat(message)
|
|
|
|
# POST /api/orchestrator/action - Execute approved action
|
|
elif path == "/api/orchestrator/action":
|
|
action_id = body.get("action_id")
|
|
approved = body.get("approved", False)
|
|
logger.info(f"[API] POST /api/orchestrator/action (action_id={action_id}, approved={approved})")
|
|
if not action_id:
|
|
logger.warning("[API] orchestrator/action: missing action_id")
|
|
self.send_error_json(400, "MISSING_ACTION_ID", "action_id required")
|
|
return
|
|
result = self._execute_orchestrator_action(action_id, approved)
|
|
logger.info(f"[API] orchestrator/action result: {result}")
|
|
self.send_json(result, 200 if result.get("success") else 400)
|
|
|
|
# POST /api/orchestrator/stop - Stop orchestrator session
|
|
elif path == "/api/orchestrator/stop":
|
|
logger.info("[API] POST /api/orchestrator/stop")
|
|
result = self._stop_orchestrator()
|
|
logger.info(f"[API] orchestrator/stop result: {result}")
|
|
self.send_json(result)
|
|
|
|
# POST /api/orchestrator/start - Start orchestrator session
|
|
elif path == "/api/orchestrator/start":
|
|
logger.info("[API] POST /api/orchestrator/start")
|
|
result = self._start_orchestrator()
|
|
logger.info(f"[API] orchestrator/start result: {result}")
|
|
self.send_json(result, 200 if result.get("success") else 500)
|
|
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
|
|
|
|
def handle_api_put(self, path: str):
|
|
"""Handle API PUT requests."""
|
|
if not self.dashboard_api:
|
|
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
|
|
return
|
|
|
|
body = self.get_json_body()
|
|
if body is None:
|
|
self.send_error_json(400, "INVALID_JSON", "Invalid JSON body")
|
|
return
|
|
|
|
# PUT /api/agents/config - Update agent pool config
|
|
if path == "/api/agents/config":
|
|
result = self.dashboard_api.update_config(body)
|
|
self.send_json(result)
|
|
|
|
# PUT /api/config - Update general config
|
|
elif path == "/api/config":
|
|
result = self.dashboard_api.update_config(body)
|
|
self.send_json(result)
|
|
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
|
|
|
|
def handle_api_delete(self, path: str, query: dict):
|
|
"""Handle API DELETE requests."""
|
|
if not self.runner:
|
|
self.send_error_json(503, "NOT_READY", "Runner not initialized")
|
|
return
|
|
|
|
# DELETE /api/history - Clear all history (requires confirm=true)
|
|
if path == "/api/history":
|
|
confirm = query.get("confirm", ["false"])[0].lower()
|
|
if confirm != "true":
|
|
self.send_error_json(400, "CONFIRMATION_REQUIRED",
|
|
"Add ?confirm=true to clear all history")
|
|
return
|
|
|
|
deleted = self.runner.agent_pool.history.clear_all()
|
|
self.send_json({"success": True, "deleted": deleted})
|
|
|
|
# DELETE /api/history/{task_id} - Delete single history run
|
|
elif path.startswith("/api/history/"):
|
|
task_id = unquote(path.split("/")[-1])
|
|
if not task_id:
|
|
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
|
|
return
|
|
|
|
success = self.runner.agent_pool.history.delete_run(task_id)
|
|
if success:
|
|
self.send_json({"success": True, "task_id": task_id})
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Run {task_id} not found")
|
|
|
|
else:
|
|
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
|
|
|
|
def handle_sse(self, query: dict):
|
|
"""Handle Server-Sent Events endpoint for real-time updates."""
|
|
since = query.get("since", [None])[0]
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.end_headers()
|
|
|
|
# Send recent events
|
|
events = broadcaster.get_recent_events(since)
|
|
for event in events:
|
|
self._send_sse_event(event)
|
|
|
|
# Note: For true SSE, we'd keep this connection open and stream events.
|
|
# Since we're using a simple HTTP server, clients should poll this endpoint.
|
|
|
|
def _send_sse_event(self, event: dict):
|
|
"""Send a single SSE event."""
|
|
try:
|
|
data = json.dumps(event)
|
|
self.wfile.write(f"data: {data}\n\n".encode())
|
|
self.wfile.flush()
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
pass
|
|
|
|
def serve_static(self, path: str):
|
|
"""Serve static files from the dashboard build."""
|
|
# Require auth for dashboard pages (but not for OAuth redirect)
|
|
if self.oauth:
|
|
cookie_header = self.headers.get("Cookie", "")
|
|
session = self.oauth.get_session_from_cookie(cookie_header)
|
|
if not session:
|
|
# Redirect to login
|
|
self.send_response(302)
|
|
self.send_header("Location", "/oauth/login")
|
|
self.end_headers()
|
|
return
|
|
|
|
# Normalize path
|
|
if path == "/" or path == "":
|
|
path = "/index.html"
|
|
|
|
# Resolve file path
|
|
file_path = STATIC_DIR / path.lstrip("/")
|
|
|
|
# SPA fallback - serve index.html for routes without file extensions
|
|
if not file_path.exists():
|
|
last_segment = path.split("/")[-1]
|
|
if "." not in last_segment:
|
|
file_path = STATIC_DIR / "index.html"
|
|
|
|
if not file_path.exists():
|
|
# If no static files exist yet, show a placeholder
|
|
if not STATIC_DIR.exists():
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/html")
|
|
self.end_headers()
|
|
self.wfile.write(b"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head><title>Agent Dashboard</title></head>
|
|
<body>
|
|
<h1>Agent Dashboard</h1>
|
|
<p>Frontend not built yet. Run:</p>
|
|
<pre>cd /opt/agent_runner/dashboard && npm run build</pre>
|
|
<p><a href="/api/status">View API Status</a></p>
|
|
</body>
|
|
</html>
|
|
""")
|
|
return
|
|
|
|
self.send_error(404, f"File not found: {path}")
|
|
return
|
|
|
|
# Security: prevent path traversal
|
|
try:
|
|
file_path = file_path.resolve()
|
|
if not str(file_path).startswith(str(STATIC_DIR.resolve())):
|
|
self.send_error(403, "Forbidden")
|
|
return
|
|
except (ValueError, OSError):
|
|
self.send_error(403, "Forbidden")
|
|
return
|
|
|
|
# Determine content type
|
|
content_type, _ = mimetypes.guess_type(str(file_path))
|
|
if content_type is None:
|
|
content_type = "application/octet-stream"
|
|
|
|
# Send file
|
|
try:
|
|
content = file_path.read_bytes()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", len(content))
|
|
|
|
# Cache static assets
|
|
if any(file_path.suffix in ext for ext in [".js", ".css", ".woff2", ".svg"]):
|
|
self.send_header("Cache-Control", "public, max-age=31536000")
|
|
|
|
self.end_headers()
|
|
self.wfile.write(content)
|
|
except IOError as e:
|
|
self.send_error(500, f"Error reading file: {e}")
|
|
|
|
def handle_webhook(self):
|
|
"""Placeholder for webhook handling - actual implementation in webhook_server.py."""
|
|
self.send_error_json(501, "NOT_IMPLEMENTED", "Webhook handling delegated to webhook_server")
|
|
|
|
# Helper methods for external service calls
|
|
|
|
def _get_issues(self, states: list[str], limit: int) -> list:
|
|
"""Fetch issues from YouTrack."""
|
|
if not self.runner:
|
|
return []
|
|
|
|
try:
|
|
issues = []
|
|
project = self.runner.config.get("project", {}).get("name", "CG")
|
|
|
|
for state in states:
|
|
try:
|
|
state_issues = self.runner.youtrack.get_issues_by_state(project, state)
|
|
for issue in state_issues[:limit]: # Apply limit after fetching
|
|
issues.append({
|
|
"id": issue.id,
|
|
"summary": issue.summary,
|
|
"state": state,
|
|
"priority": getattr(issue, 'priority', None),
|
|
"type": getattr(issue, 'type', None),
|
|
"created": getattr(issue, 'created', None),
|
|
"updated": getattr(issue, 'updated', None),
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch issues for state {state}: {e}")
|
|
|
|
return issues
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch issues: {e}")
|
|
return []
|
|
|
|
def _get_issue(self, issue_id: str) -> dict:
|
|
"""Fetch single issue from YouTrack."""
|
|
if not self.runner:
|
|
return {"error": "Runner not available"}
|
|
|
|
try:
|
|
issue = self.runner.youtrack.get_issue(issue_id)
|
|
if not issue:
|
|
return {"error": f"Issue {issue_id} not found"}
|
|
|
|
comments = self.runner.youtrack.get_issue_comments(issue_id)
|
|
|
|
# Get issue type from custom fields
|
|
issue_type = issue.custom_fields.get('Type', None)
|
|
|
|
# Convert timestamps (YouTrack uses milliseconds)
|
|
created = issue.created
|
|
updated = issue.updated
|
|
if isinstance(created, int) and created > 0:
|
|
from datetime import datetime
|
|
created = datetime.fromtimestamp(created / 1000).isoformat()
|
|
if isinstance(updated, int) and updated > 0:
|
|
from datetime import datetime
|
|
updated = datetime.fromtimestamp(updated / 1000).isoformat()
|
|
|
|
return {
|
|
"id": issue.id,
|
|
"summary": issue.summary,
|
|
"description": getattr(issue, 'description', ''),
|
|
"state": getattr(issue, 'state', ''),
|
|
"priority": issue.priority,
|
|
"type": issue_type,
|
|
"reporter": getattr(issue, 'reporter', None),
|
|
"created": created,
|
|
"updated": updated,
|
|
"repository": getattr(issue, 'repository', None),
|
|
"comments": [
|
|
{
|
|
"author": c.author,
|
|
"body": c.text,
|
|
"created": datetime.fromtimestamp(c.created / 1000).isoformat() if isinstance(c.created, int) and c.created > 0 else "",
|
|
}
|
|
for c in comments
|
|
],
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch issue {issue_id}: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def _transition_issue(self, issue_id: str, new_state: str) -> dict:
|
|
"""Transition issue to new state in YouTrack."""
|
|
if not self.runner:
|
|
return {"success": False, "message": "Runner not available"}
|
|
|
|
try:
|
|
self.runner.youtrack.update_issue_state(issue_id, new_state)
|
|
|
|
# Broadcast event
|
|
broadcaster.broadcast("issue.state_changed", {
|
|
"issue_id": issue_id,
|
|
"new_state": new_state,
|
|
})
|
|
|
|
return {"success": True, "issue_id": issue_id, "new_state": new_state}
|
|
except Exception as e:
|
|
logger.error(f"Failed to transition issue {issue_id}: {e}")
|
|
return {"success": False, "message": str(e)}
|
|
|
|
def _create_issue(self, data: dict) -> dict:
|
|
"""Create new issue in YouTrack."""
|
|
# TODO: Implement issue creation
|
|
return {"success": False, "message": "Issue creation not yet implemented"}
|
|
|
|
def _add_issue_comment(self, issue_id: str, text: str) -> dict:
|
|
"""Add comment to issue in YouTrack."""
|
|
if not self.runner:
|
|
return {"success": False, "message": "Runner not available"}
|
|
|
|
try:
|
|
success = self.runner.youtrack.add_issue_comment(issue_id, text)
|
|
if success:
|
|
return {"success": True, "issue_id": issue_id}
|
|
else:
|
|
return {"success": False, "message": "Failed to add comment"}
|
|
except Exception as e:
|
|
logger.error(f"Failed to add comment to issue {issue_id}: {e}")
|
|
return {"success": False, "message": str(e)}
|
|
|
|
def _get_builds(self, repo: Optional[str], status: Optional[str], limit: int) -> dict:
|
|
"""Fetch ALL builds from Woodpecker (not just running)."""
|
|
if not self.runner or not self.runner.woodpecker:
|
|
return {"builds": [], "total": 0}
|
|
|
|
try:
|
|
all_builds = []
|
|
repos_to_check = ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]
|
|
|
|
# Filter to single repo if specified
|
|
if repo and repo != "all":
|
|
repos_to_check = [f"cleargrow/{repo}"]
|
|
|
|
for repo_name in repos_to_check:
|
|
try:
|
|
repo_builds = self.runner.woodpecker.get_pipelines_extended(repo_name, limit)
|
|
all_builds.extend(repo_builds)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch builds for {repo_name}: {e}")
|
|
|
|
# Sort by created timestamp descending (most recent first)
|
|
all_builds.sort(key=lambda b: b.get('created') or 0, reverse=True)
|
|
|
|
# Filter by status if requested
|
|
if status:
|
|
all_builds = [b for b in all_builds if b.get("status", "").lower() == status.lower()]
|
|
|
|
total = len(all_builds)
|
|
return {"builds": all_builds[:limit], "total": total}
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch builds: {e}")
|
|
return {"builds": [], "total": 0}
|
|
|
|
def _get_repos_health(self) -> list:
|
|
"""Get health status for all repositories."""
|
|
import subprocess
|
|
from datetime import datetime, timedelta
|
|
|
|
if not self.runner:
|
|
return []
|
|
|
|
repos = []
|
|
repo_configs = self.runner.config.get("repos", {})
|
|
|
|
for key, repo_config in repo_configs.items():
|
|
repo_path = Path(repo_config.get("path", f"/opt/repos/{key}"))
|
|
|
|
health = {
|
|
"name": key,
|
|
"path": str(repo_path),
|
|
"platform": repo_config.get("platform", "unknown"),
|
|
"latestVersion": None,
|
|
"latestCommit": None,
|
|
"recentCommits": [],
|
|
"buildStatus": {
|
|
"lastBuild": None,
|
|
"successRate": 0,
|
|
"avgDuration": 0,
|
|
"buildsToday": 0,
|
|
"buildsThisWeek": 0,
|
|
},
|
|
}
|
|
|
|
# Get git info if repo exists
|
|
if repo_path.exists():
|
|
try:
|
|
# Get latest version tag
|
|
result = subprocess.run(
|
|
["git", "describe", "--tags", "--abbrev=0"],
|
|
cwd=repo_path,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
health["latestVersion"] = result.stdout.strip()
|
|
|
|
# Get recent commits (last 5)
|
|
result = subprocess.run(
|
|
["git", "log", "--oneline", "-5", "--format=%H|%s|%an|%aI|%D"],
|
|
cwd=repo_path,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5,
|
|
)
|
|
if result.returncode == 0:
|
|
commits = []
|
|
for line in result.stdout.strip().split("\n"):
|
|
if line:
|
|
parts = line.split("|")
|
|
if len(parts) >= 4:
|
|
commit = {
|
|
"sha": parts[0],
|
|
"message": parts[1],
|
|
"author": parts[2],
|
|
"date": parts[3],
|
|
"branch": parts[4] if len(parts) > 4 else "",
|
|
}
|
|
commits.append(commit)
|
|
|
|
health["recentCommits"] = commits
|
|
if commits:
|
|
health["latestCommit"] = commits[0]
|
|
|
|
except (subprocess.TimeoutExpired, Exception) as e:
|
|
logger.warning(f"Failed to get git info for {key}: {e}")
|
|
|
|
# Get build status from Woodpecker
|
|
try:
|
|
if self.runner.woodpecker:
|
|
repo_full = f"cleargrow/{key}"
|
|
builds = self.runner.woodpecker.get_pipelines_extended(repo_full, 20)
|
|
|
|
if builds:
|
|
health["buildStatus"]["lastBuild"] = builds[0]
|
|
|
|
# Calculate stats
|
|
now = datetime.now()
|
|
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
week_start = today_start - timedelta(days=7)
|
|
|
|
successful = 0
|
|
total_duration = 0
|
|
duration_count = 0
|
|
builds_today = 0
|
|
builds_week = 0
|
|
|
|
for build in builds:
|
|
status = build.get("status", "").lower()
|
|
if status == "success":
|
|
successful += 1
|
|
|
|
# Duration calculation
|
|
started = build.get("started")
|
|
finished = build.get("finished")
|
|
if started and finished:
|
|
total_duration += finished - started
|
|
duration_count += 1
|
|
|
|
# Time-based counts
|
|
created = build.get("created", 0)
|
|
if created:
|
|
created_dt = datetime.fromtimestamp(created)
|
|
if created_dt >= today_start:
|
|
builds_today += 1
|
|
if created_dt >= week_start:
|
|
builds_week += 1
|
|
|
|
if builds:
|
|
health["buildStatus"]["successRate"] = round(
|
|
(successful / len(builds)) * 100, 1
|
|
)
|
|
if duration_count > 0:
|
|
health["buildStatus"]["avgDuration"] = round(
|
|
total_duration / duration_count
|
|
)
|
|
health["buildStatus"]["buildsToday"] = builds_today
|
|
health["buildStatus"]["buildsThisWeek"] = builds_week
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get build status for {key}: {e}")
|
|
|
|
repos.append(health)
|
|
|
|
return repos
|
|
|
|
def _get_system_overview(self) -> dict:
|
|
"""Get system overview with VPS server status."""
|
|
import psutil
|
|
from datetime import datetime
|
|
from ssh_metrics import get_server_metrics
|
|
|
|
# Get SSH metrics for remote servers
|
|
vps_git_metrics = get_server_metrics("vps-git")
|
|
vps_track_metrics = get_server_metrics("vps-track")
|
|
vps_ci_metrics = get_server_metrics("vps-ci")
|
|
|
|
servers = [
|
|
{
|
|
"id": "vps-git",
|
|
"name": "vps-git",
|
|
"ip": "139.144.29.179",
|
|
"privateIp": "10.0.0.10",
|
|
"status": "healthy" if self._check_service_health("gitea") else "offline",
|
|
"services": [
|
|
{"name": "nginx", "status": "running", "port": 443, "url": "https://git.cleargrow.io"},
|
|
{"name": "gitea", "status": "running" if self._check_service_health("gitea") else "error", "port": 3000, "url": "https://git.cleargrow.io"},
|
|
{"name": "postgresql", "status": "unknown", "port": 5432},
|
|
],
|
|
"metrics": vps_git_metrics,
|
|
"lastCheck": datetime.now().isoformat(),
|
|
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@139.144.29.179",
|
|
},
|
|
{
|
|
"id": "vps-track",
|
|
"name": "vps-track",
|
|
"ip": "170.187.143.45",
|
|
"privateIp": "10.0.0.20",
|
|
"status": "healthy" if self._check_service_health("youtrack") else "offline",
|
|
"services": [
|
|
{"name": "nginx", "status": "running", "port": 443, "url": "https://track.cleargrow.io"},
|
|
{"name": "youtrack", "status": "running" if self._check_service_health("youtrack") else "error", "port": 8080, "url": "https://track.cleargrow.io"},
|
|
],
|
|
"metrics": vps_track_metrics,
|
|
"lastCheck": datetime.now().isoformat(),
|
|
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@170.187.143.45",
|
|
},
|
|
{
|
|
"id": "vps-ci",
|
|
"name": "vps-ci",
|
|
"ip": "173.230.138.66",
|
|
"privateIp": "10.0.0.30",
|
|
"status": "healthy" if self._check_service_health("woodpecker") else "offline",
|
|
"services": [
|
|
{"name": "nginx", "status": "running", "port": 443, "url": "https://ci.cleargrow.io"},
|
|
{"name": "woodpecker", "status": "running" if self._check_service_health("woodpecker") else "error", "port": 8000, "url": "https://ci.cleargrow.io"},
|
|
{"name": "registry", "status": "unknown", "port": 5000},
|
|
],
|
|
"metrics": vps_ci_metrics,
|
|
"lastCheck": datetime.now().isoformat(),
|
|
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@173.230.138.66",
|
|
},
|
|
{
|
|
"id": "vps-agents",
|
|
"name": "vps-agents",
|
|
"ip": "45.79.204.236",
|
|
"privateIp": "10.0.0.40",
|
|
"status": "healthy",
|
|
"services": [
|
|
{"name": "agent-runner", "status": "running", "port": 8765},
|
|
{"name": "claude-code", "status": "running"},
|
|
],
|
|
"metrics": self._get_local_metrics(),
|
|
"lastCheck": datetime.now().isoformat(),
|
|
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@45.79.204.236",
|
|
},
|
|
]
|
|
|
|
# Calculate summary stats
|
|
pool_status = self.runner.agent_pool.get_status() if self.runner else {}
|
|
|
|
return {
|
|
"servers": servers,
|
|
"totalAgents": pool_status.get("max", 10),
|
|
"activeAgents": pool_status.get("active", 0),
|
|
"issuesInFlight": self._count_in_flight_issues(),
|
|
"buildsRunning": self._count_running_builds(),
|
|
"lastUpdated": datetime.now().isoformat(),
|
|
}
|
|
|
|
def _check_service_health(self, service: str) -> bool:
|
|
"""Check if a service is healthy based on cached health status."""
|
|
if not self.dashboard_api:
|
|
return False
|
|
health = self.dashboard_api.get_health_status()
|
|
return health.get(service, False)
|
|
|
|
def _get_local_metrics(self) -> dict:
|
|
"""Get local system metrics for vps-agents."""
|
|
import psutil
|
|
|
|
try:
|
|
cpu_percent = psutil.cpu_percent(interval=0.1)
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage("/")
|
|
boot_time = psutil.boot_time()
|
|
uptime = int(datetime.now().timestamp() - boot_time)
|
|
|
|
return {
|
|
"cpuPercent": round(cpu_percent, 1),
|
|
"memoryPercent": round(memory.percent, 1),
|
|
"memoryUsedMB": round(memory.used / (1024 * 1024)),
|
|
"memoryTotalMB": round(memory.total / (1024 * 1024)),
|
|
"diskPercent": round(disk.percent, 1),
|
|
"diskUsedGB": round(disk.used / (1024 * 1024 * 1024), 1),
|
|
"diskTotalGB": round(disk.total / (1024 * 1024 * 1024), 1),
|
|
"uptime": uptime,
|
|
}
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get local metrics: {e}")
|
|
return None
|
|
|
|
def _count_in_flight_issues(self) -> int:
|
|
"""Count issues currently being processed."""
|
|
if not self.runner:
|
|
return 0
|
|
try:
|
|
project = self.runner.config.get("project", {}).get("name", "CG")
|
|
count = 0
|
|
for state in ["In Progress", "Build", "Verify", "Document"]:
|
|
try:
|
|
issues = self.runner.youtrack.get_issues_by_state(project, state)
|
|
count += len(issues) if issues else 0
|
|
except Exception:
|
|
pass
|
|
return count
|
|
except Exception:
|
|
return 0
|
|
|
|
def _count_running_builds(self) -> int:
|
|
"""Count currently running builds."""
|
|
if not self.runner or not self.runner.woodpecker:
|
|
return 0
|
|
try:
|
|
count = 0
|
|
for repo in ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]:
|
|
try:
|
|
builds = self.runner.woodpecker.get_pipelines_extended(repo, 5)
|
|
count += sum(1 for b in builds if b.get("status", "").lower() == "running")
|
|
except Exception:
|
|
pass
|
|
return count
|
|
except Exception:
|
|
return 0
|
|
|
|
def _get_agent_analytics(self, period: str) -> dict:
|
|
"""Get agent performance analytics."""
|
|
from datetime import datetime, timedelta
|
|
|
|
if not self.runner:
|
|
return self._empty_agent_analytics(period)
|
|
|
|
# Parse period
|
|
days = 7
|
|
if period == "24h":
|
|
days = 1
|
|
elif period == "30d":
|
|
days = 30
|
|
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
|
|
try:
|
|
# Get history runs
|
|
runs, total = self.runner.agent_pool.history.get_runs_filtered(
|
|
limit=1000,
|
|
offset=0,
|
|
since=cutoff.isoformat(),
|
|
)
|
|
|
|
if not runs:
|
|
return self._empty_agent_analytics(period)
|
|
|
|
# Calculate analytics
|
|
successful = sum(1 for r in runs if r.get("returncode", -1) == 0)
|
|
failed = sum(1 for r in runs if r.get("returncode", -1) != 0 and not r.get("timed_out", False))
|
|
timed_out = sum(1 for r in runs if r.get("timed_out", False))
|
|
|
|
# By task type
|
|
by_type = {}
|
|
for run in runs:
|
|
task_type = run.get("task_type", "unknown")
|
|
if task_type not in by_type:
|
|
by_type[task_type] = {"count": 0, "success": 0, "total_duration": 0}
|
|
by_type[task_type]["count"] += 1
|
|
if run.get("returncode", -1) == 0:
|
|
by_type[task_type]["success"] += 1
|
|
|
|
# By repo
|
|
by_repo = {}
|
|
for run in runs:
|
|
repo = run.get("repo", "unknown")
|
|
if repo not in by_repo:
|
|
by_repo[repo] = {"count": 0, "success": 0}
|
|
by_repo[repo]["count"] += 1
|
|
if run.get("returncode", -1) == 0:
|
|
by_repo[repo]["success"] += 1
|
|
|
|
return {
|
|
"period": period,
|
|
"totalRuns": len(runs),
|
|
"successfulRuns": successful,
|
|
"failedRuns": failed,
|
|
"timedOutRuns": timed_out,
|
|
"successRate": round((successful / len(runs)) * 100, 1) if runs else 0,
|
|
"avgDuration": 0, # Would need duration tracking
|
|
"byTaskType": {
|
|
k: {
|
|
"count": v["count"],
|
|
"successRate": round((v["success"] / v["count"]) * 100, 1) if v["count"] > 0 else 0,
|
|
"avgDuration": 0,
|
|
}
|
|
for k, v in by_type.items()
|
|
},
|
|
"byRepo": {
|
|
k: {
|
|
"count": v["count"],
|
|
"successRate": round((v["success"] / v["count"]) * 100, 1) if v["count"] > 0 else 0,
|
|
}
|
|
for k, v in by_repo.items()
|
|
},
|
|
"trend": [], # Would need time-series aggregation
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get agent analytics: {e}")
|
|
return self._empty_agent_analytics(period)
|
|
|
|
def _empty_agent_analytics(self, period: str) -> dict:
|
|
"""Return empty agent analytics structure."""
|
|
return {
|
|
"period": period,
|
|
"totalRuns": 0,
|
|
"successfulRuns": 0,
|
|
"failedRuns": 0,
|
|
"timedOutRuns": 0,
|
|
"successRate": 0,
|
|
"avgDuration": 0,
|
|
"byTaskType": {},
|
|
"byRepo": {},
|
|
"trend": [],
|
|
}
|
|
|
|
def _get_build_analytics(self, period: str) -> dict:
|
|
"""Get build analytics."""
|
|
from datetime import datetime, timedelta
|
|
|
|
if not self.runner or not self.runner.woodpecker:
|
|
return self._empty_build_analytics(period)
|
|
|
|
days = 7
|
|
if period == "24h":
|
|
days = 1
|
|
elif period == "30d":
|
|
days = 30
|
|
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
cutoff_ts = cutoff.timestamp()
|
|
|
|
try:
|
|
all_builds = []
|
|
repos_to_check = ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]
|
|
|
|
for repo_name in repos_to_check:
|
|
try:
|
|
builds = self.runner.woodpecker.get_pipelines_extended(repo_name, 100)
|
|
# Filter by date
|
|
for build in builds:
|
|
created = build.get("created", 0)
|
|
if created >= cutoff_ts:
|
|
build["repo_key"] = repo_name.split("/")[-1]
|
|
all_builds.append(build)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch builds for {repo_name}: {e}")
|
|
|
|
if not all_builds:
|
|
return self._empty_build_analytics(period)
|
|
|
|
# Calculate stats
|
|
successful = sum(1 for b in all_builds if b.get("status", "").lower() == "success")
|
|
total_duration = 0
|
|
duration_count = 0
|
|
by_repo = {}
|
|
|
|
for build in all_builds:
|
|
repo_key = build.get("repo_key", "unknown")
|
|
if repo_key not in by_repo:
|
|
by_repo[repo_key] = {"total": 0, "success": 0, "total_duration": 0, "duration_count": 0}
|
|
|
|
by_repo[repo_key]["total"] += 1
|
|
if build.get("status", "").lower() == "success":
|
|
by_repo[repo_key]["success"] += 1
|
|
|
|
started = build.get("started")
|
|
finished = build.get("finished")
|
|
if started and finished:
|
|
duration = finished - started
|
|
total_duration += duration
|
|
duration_count += 1
|
|
by_repo[repo_key]["total_duration"] += duration
|
|
by_repo[repo_key]["duration_count"] += 1
|
|
|
|
return {
|
|
"period": period,
|
|
"totalBuilds": len(all_builds),
|
|
"successRate": round((successful / len(all_builds)) * 100, 1) if all_builds else 0,
|
|
"avgDuration": round(total_duration / duration_count) if duration_count > 0 else 0,
|
|
"buildsPerDay": round(len(all_builds) / days, 1),
|
|
"failurePatterns": [], # Would need log analysis
|
|
"byRepo": {
|
|
k: {
|
|
"total": v["total"],
|
|
"success": v["success"],
|
|
"avgDuration": round(v["total_duration"] / v["duration_count"]) if v["duration_count"] > 0 else 0,
|
|
}
|
|
for k, v in by_repo.items()
|
|
},
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get build analytics: {e}")
|
|
return self._empty_build_analytics(period)
|
|
|
|
def _empty_build_analytics(self, period: str) -> dict:
|
|
"""Return empty build analytics structure."""
|
|
return {
|
|
"period": period,
|
|
"totalBuilds": 0,
|
|
"successRate": 0,
|
|
"avgDuration": 0,
|
|
"buildsPerDay": 0,
|
|
"failurePatterns": [],
|
|
"byRepo": {},
|
|
}
|
|
|
|
def _get_issue_analytics(self, period: str) -> dict:
|
|
"""Get issue throughput analytics."""
|
|
# This is a simplified version - full implementation would track historical data
|
|
return {
|
|
"period": period,
|
|
"throughput": 0,
|
|
"avgTimeInState": {},
|
|
"stateDistribution": {},
|
|
"trend": [],
|
|
}
|
|
|
|
# Orchestrator helper methods
|
|
|
|
def _get_orchestrator_status(self) -> dict:
|
|
"""Get orchestrator session status."""
|
|
if not self.orchestrator_manager:
|
|
return {
|
|
"session_id": None,
|
|
"active": False,
|
|
"message_count": 0,
|
|
"pending_actions": [],
|
|
}
|
|
return self.orchestrator_manager.get_status()
|
|
|
|
def _get_orchestrator_messages(self, since: int) -> dict:
|
|
"""Get orchestrator conversation messages."""
|
|
if not self.orchestrator_manager:
|
|
return {"messages": [], "count": 0}
|
|
|
|
session = self.orchestrator_manager.get_session()
|
|
if not session:
|
|
return {"messages": [], "count": 0}
|
|
|
|
messages = session.get_messages(since)
|
|
return {
|
|
"messages": messages,
|
|
"count": len(messages),
|
|
"total": len(session.messages),
|
|
}
|
|
|
|
def _start_orchestrator(self) -> dict:
|
|
"""Start a new orchestrator session."""
|
|
if not self.orchestrator_manager:
|
|
return {"success": False, "error": "Orchestrator not configured"}
|
|
|
|
try:
|
|
session, created = self.orchestrator_manager.start_session()
|
|
return {
|
|
"success": True,
|
|
"session_id": session.session_id,
|
|
"created": created,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to start orchestrator: {e}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def _stop_orchestrator(self) -> dict:
|
|
"""Stop the current orchestrator session."""
|
|
logger.debug("[API] _stop_orchestrator called")
|
|
if not self.orchestrator_manager:
|
|
logger.warning("[API] _stop_orchestrator: orchestrator not configured")
|
|
return {"success": False, "error": "Orchestrator not configured"}
|
|
|
|
stopped = self.orchestrator_manager.stop_session()
|
|
logger.info(f"[API] _stop_orchestrator: stopped={stopped}")
|
|
return {"success": stopped}
|
|
|
|
def _execute_orchestrator_action(self, action_id: str, approved: bool) -> dict:
|
|
"""Execute or reject an orchestrator action."""
|
|
logger.info(f"[API] _execute_orchestrator_action: action_id={action_id}, approved={approved}")
|
|
|
|
if not self.orchestrator_manager:
|
|
logger.error("[API] _execute_orchestrator_action: orchestrator not configured")
|
|
return {"success": False, "error": "Orchestrator not configured"}
|
|
|
|
session = self.orchestrator_manager.get_session()
|
|
if not session:
|
|
logger.error("[API] _execute_orchestrator_action: no active session")
|
|
return {"success": False, "error": "No active orchestrator session"}
|
|
|
|
logger.debug(f"[API] _execute_orchestrator_action: got session {session.session_id[:8]}")
|
|
result = session.execute_action(action_id, approved)
|
|
logger.info(f"[API] _execute_orchestrator_action: result={result}")
|
|
return result
|
|
|
|
def handle_orchestrator_chat(self, message: str):
|
|
"""
|
|
Handle chat message to orchestrator via SSE.
|
|
|
|
Streams response chunks back to the client.
|
|
"""
|
|
logger.info(f"[API] handle_orchestrator_chat: message length={len(message)}")
|
|
|
|
if not self.orchestrator_manager:
|
|
logger.error("[API] handle_orchestrator_chat: orchestrator not configured")
|
|
self.send_error_json(503, "NOT_CONFIGURED", "Orchestrator not configured")
|
|
return
|
|
|
|
# Start session if not active
|
|
try:
|
|
session, created = self.orchestrator_manager.start_session()
|
|
logger.info(f"[API] handle_orchestrator_chat: session={session.session_id[:8]}, created={created}")
|
|
except Exception as e:
|
|
logger.exception(f"[API] handle_orchestrator_chat: failed to start session: {e}")
|
|
self.send_error_json(500, "START_FAILED", str(e))
|
|
return
|
|
|
|
# Send SSE headers
|
|
logger.debug("[API] handle_orchestrator_chat: sending SSE headers")
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
try:
|
|
chunk_count = 0
|
|
for chunk in session.send_message(message):
|
|
chunk_count += 1
|
|
event_data = json.dumps(chunk)
|
|
logger.debug(f"[API] handle_orchestrator_chat: sending chunk #{chunk_count} type={chunk.get('type')}")
|
|
self.wfile.write(f"data: {event_data}\n\n".encode())
|
|
self.wfile.flush()
|
|
|
|
logger.info(f"[API] handle_orchestrator_chat: completed, sent {chunk_count} chunks")
|
|
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
logger.warning("[API] handle_orchestrator_chat: client disconnected")
|
|
except Exception as e:
|
|
logger.exception(f"[API] handle_orchestrator_chat: error: {e}")
|
|
try:
|
|
error_data = json.dumps({"type": "error", "message": str(e)})
|
|
self.wfile.write(f"data: {error_data}\n\n".encode())
|
|
self.wfile.flush()
|
|
except Exception:
|
|
pass
|
|
|
|
def handle_orchestrator_stream(self):
|
|
"""
|
|
Stream orchestrator output via SSE.
|
|
|
|
Sends new messages as they arrive.
|
|
"""
|
|
if not self.orchestrator_manager:
|
|
self.send_error_json(503, "NOT_CONFIGURED", "Orchestrator not configured")
|
|
return
|
|
|
|
session = self.orchestrator_manager.get_session()
|
|
if not session:
|
|
self.send_error_json(404, "NO_SESSION", "No active orchestrator session")
|
|
return
|
|
|
|
# Send SSE headers
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "text/event-stream")
|
|
self.send_header("Cache-Control", "no-cache")
|
|
self.send_header("Connection", "keep-alive")
|
|
self.send_header("Access-Control-Allow-Origin", "*")
|
|
self.end_headers()
|
|
|
|
import time
|
|
last_index = 0
|
|
|
|
try:
|
|
while True:
|
|
# Check if session still active
|
|
current_session = self.orchestrator_manager.get_session()
|
|
if not current_session or current_session.session_id != session.session_id:
|
|
self.wfile.write(f"event: session_ended\ndata: {{}}\n\n".encode())
|
|
self.wfile.flush()
|
|
break
|
|
|
|
# Send new messages
|
|
messages = current_session.get_messages(last_index)
|
|
for msg in messages:
|
|
event_data = json.dumps(msg)
|
|
self.wfile.write(f"data: {event_data}\n\n".encode())
|
|
self.wfile.flush()
|
|
last_index += 1
|
|
|
|
# Send pending actions
|
|
status = current_session.get_status()
|
|
if status.get("pending_actions"):
|
|
actions_data = json.dumps({
|
|
"type": "actions",
|
|
"actions": status["pending_actions"],
|
|
})
|
|
self.wfile.write(f"data: {actions_data}\n\n".encode())
|
|
self.wfile.flush()
|
|
|
|
# Keepalive
|
|
self.wfile.write(": keepalive\n\n".encode())
|
|
self.wfile.flush()
|
|
|
|
time.sleep(0.5)
|
|
|
|
except (BrokenPipeError, ConnectionResetError):
|
|
logger.debug("Orchestrator stream client disconnected")
|
|
except Exception as e:
|
|
logger.error(f"Error in orchestrator stream: {e}")
|
|
|
|
|
|
def setup_api_handler(runner: "Runner", oauth=None):
|
|
"""
|
|
Configure the API handler with runner reference.
|
|
|
|
Call this before starting the HTTP server.
|
|
"""
|
|
from dashboard_api import DashboardAPI
|
|
from orchestrator import OrchestratorManager
|
|
|
|
DashboardAPIHandler.runner = runner
|
|
DashboardAPIHandler.dashboard_api = DashboardAPI(runner)
|
|
DashboardAPIHandler.oauth = oauth
|
|
DashboardAPIHandler.orchestrator_manager = OrchestratorManager(runner)
|
|
|
|
logger.info("Dashboard API handler configured (with orchestrator)")
|