Files
agentrunner/api_server.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

1660 lines
65 KiB
Python

"""
Dashboard API server with REST endpoints and WebSocket support.
Extends the existing webhook server to add dashboard functionality.
"""
import asyncio
import json
import logging
import mimetypes
import threading
from datetime import datetime
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Optional, Set
from urllib.parse import parse_qs, urlparse, unquote
if TYPE_CHECKING:
from runner import Runner
from orchestrator import OrchestratorManager
logger = logging.getLogger(__name__)
# Static file directory for built Svelte app
STATIC_DIR = Path(__file__).parent / "static" / "dashboard"
class EventBroadcaster:
"""
Manages WebSocket-like event broadcasting.
Since we're using a simple HTTP server, we implement Server-Sent Events (SSE)
instead of WebSockets for simplicity. This avoids adding asyncio complexity
to the existing synchronous codebase.
"""
def __init__(self):
self._subscribers: Set[Callable[[dict], None]] = set()
self._lock = threading.Lock()
self._event_queue: list[dict] = []
self._max_queue_size = 100
def subscribe(self, callback: Callable[[dict], None]):
"""Add a subscriber callback."""
with self._lock:
self._subscribers.add(callback)
def unsubscribe(self, callback: Callable[[dict], None]):
"""Remove a subscriber callback."""
with self._lock:
self._subscribers.discard(callback)
def broadcast(self, event_type: str, data: dict):
"""
Broadcast an event to all subscribers.
Thread-safe: can be called from any thread.
"""
event = {
"type": event_type,
"timestamp": datetime.now().isoformat(),
"data": data,
}
with self._lock:
# Store in queue for SSE clients to poll
self._event_queue.append(event)
if len(self._event_queue) > self._max_queue_size:
self._event_queue.pop(0)
# Notify subscribers
for callback in list(self._subscribers):
try:
callback(event)
except Exception as e:
logger.error(f"Subscriber callback error: {e}")
def get_recent_events(self, since_timestamp: Optional[str] = None) -> list[dict]:
"""Get events since a timestamp (for SSE polling)."""
with self._lock:
if not since_timestamp:
return list(self._event_queue)
try:
since = datetime.fromisoformat(since_timestamp)
return [
e for e in self._event_queue
if datetime.fromisoformat(e["timestamp"]) > since
]
except (ValueError, TypeError):
return list(self._event_queue)
# Global broadcaster instance
broadcaster = EventBroadcaster()
class DashboardAPIHandler(BaseHTTPRequestHandler):
"""HTTP request handler for dashboard API endpoints."""
# Reference to runner (set during server initialization)
runner: Optional["Runner"] = None
dashboard_api = None
oauth = None
orchestrator_manager: Optional["OrchestratorManager"] = None
def log_message(self, format, *args):
"""Override to use our logger."""
logger.debug(f"{self.address_string()} - {format % args}")
def send_json(self, data: dict, status: int = 200):
"""Send JSON response."""
body = json.dumps(data).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def send_error_json(self, status: int, code: str, message: str):
"""Send JSON error response."""
self.send_json({
"error": True,
"code": code,
"message": message,
}, status)
def get_json_body(self) -> Optional[dict]:
"""Parse JSON request body."""
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
return {}
try:
body = self.rfile.read(content_length)
return json.loads(body)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(f"Invalid JSON body: {e}")
return None
def handle_agent_stream(self, task_id: str):
"""
Stream agent output via Server-Sent Events (SSE).
Sends new messages as they arrive until the task completes or client disconnects.
"""
import time
if not self.runner:
self.send_error_json(503, "NOT_READY", "Runner not initialized")
return
# Check task exists
task = self.runner.agent_pool.get_task(task_id)
if not task:
self.send_error_json(404, "NOT_FOUND", f"Task {task_id} not found")
return
# Send SSE headers
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
last_index = 0
try:
while True:
# Get task (may have completed)
task = self.runner.agent_pool.get_task(task_id)
if task:
# Send any new messages
messages = task.get_messages(last_index)
for msg in messages:
event_data = json.dumps(msg)
self.wfile.write(f"data: {event_data}\n\n".encode())
self.wfile.flush()
last_index += 1
# Check if task is still running
if task.process and task.process.poll() is not None:
# Task completed - send final event
self.wfile.write(f"event: complete\ndata: {json.dumps({'returncode': task.returncode})}\n\n".encode())
self.wfile.flush()
break
else:
# Task no longer in active pool - it completed
self.wfile.write(f"event: complete\ndata: {json.dumps({'message': 'Task completed'})}\n\n".encode())
self.wfile.flush()
break
# Send keepalive
self.wfile.write(": keepalive\n\n".encode())
self.wfile.flush()
time.sleep(0.5) # Poll every 500ms
except (BrokenPipeError, ConnectionResetError):
logger.debug(f"SSE client disconnected from {task_id}")
except Exception as e:
logger.error(f"Error in SSE stream for {task_id}: {e}")
def require_auth(self) -> bool:
"""
Check if request is authenticated.
Returns True if authenticated, False otherwise (and sends 401).
"""
if not self.oauth:
# No OAuth configured - allow all requests
return True
cookie_header = self.headers.get("Cookie", "")
session = self.oauth.get_session_from_cookie(cookie_header)
if not session:
self.send_error_json(401, "UNAUTHORIZED", "Authentication required")
return False
# Store user in request for later use
self.current_user = session.user
return True
def do_GET(self):
"""Handle GET requests."""
parsed = urlparse(self.path)
path = parsed.path
query = parse_qs(parsed.query)
# Health check (no auth required)
if path == "/health":
self.send_json({"status": "healthy"})
return
# OAuth endpoints (no auth required)
if path.startswith("/oauth/"):
return self.handle_oauth(path, query)
# API endpoints (auth required)
if path.startswith("/api/"):
if not self.require_auth():
return
return self.handle_api_get(path, query)
# SSE events endpoint
if path == "/events":
if not self.require_auth():
return
return self.handle_sse(query)
# Static files (auth required for dashboard)
return self.serve_static(path)
def do_POST(self):
"""Handle POST requests."""
parsed = urlparse(self.path)
path = parsed.path
# Webhook endpoint (no auth - has its own verification)
if path == "/webhook/youtrack":
# Delegate to webhook handler in webhook_server.py
return self.handle_webhook()
# API endpoints (auth required)
if path.startswith("/api/"):
if not self.require_auth():
return
return self.handle_api_post(path)
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
def do_PUT(self):
"""Handle PUT requests."""
parsed = urlparse(self.path)
path = parsed.path
if path.startswith("/api/"):
if not self.require_auth():
return
return self.handle_api_put(path)
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
def do_DELETE(self):
"""Handle DELETE requests."""
parsed = urlparse(self.path)
path = parsed.path
query = parse_qs(parsed.query)
if path.startswith("/api/"):
if not self.require_auth():
return
return self.handle_api_delete(path, query)
self.send_error_json(404, "NOT_FOUND", f"Unknown endpoint: {path}")
def handle_oauth(self, path: str, query: dict):
"""Handle OAuth endpoints."""
if not self.oauth:
self.send_error_json(501, "NOT_CONFIGURED", "OAuth not configured")
return
if path == "/oauth/login":
# Redirect to Gitea authorize URL
authorize_url, state = self.oauth.get_authorize_url()
self.send_response(302)
self.send_header("Location", authorize_url)
self.end_headers()
elif path == "/oauth/callback":
# Handle OAuth callback
code = query.get("code", [None])[0]
state = query.get("state", [None])[0]
if not code or not state:
self.send_error_json(400, "INVALID_CALLBACK", "Missing code or state")
return
session = self.oauth.handle_callback(code, state)
if not session:
self.send_error_json(401, "AUTH_FAILED", "Authentication failed")
return
# Set session cookie and redirect to dashboard
self.send_response(302)
self.send_header("Set-Cookie", self.oauth.create_session_cookie(session))
self.send_header("Location", "/")
self.end_headers()
elif path == "/oauth/logout":
# Clear session
cookie_header = self.headers.get("Cookie", "")
session = self.oauth.get_session_from_cookie(cookie_header)
if session:
self.oauth.logout(session.session_id)
self.send_response(302)
self.send_header("Set-Cookie", self.oauth.create_logout_cookie())
self.send_header("Location", "/oauth/login")
self.end_headers()
else:
self.send_error_json(404, "NOT_FOUND", f"Unknown OAuth endpoint: {path}")
def handle_api_get(self, path: str, query: dict):
"""Handle API GET requests."""
if not self.dashboard_api:
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
return
# GET /api/user - Current user info
if path == "/api/user":
if hasattr(self, 'current_user') and self.current_user:
self.send_json(self.current_user.to_dict())
else:
self.send_json({"login": "anonymous", "is_admin": False})
# GET /api/health - Service health
elif path == "/api/health":
self.send_json(self.dashboard_api.get_health_status())
# GET /api/status - Full dashboard status
elif path == "/api/status":
self.send_json(self.dashboard_api.get_dashboard_status())
# GET /api/agents - Agent pool status
elif path == "/api/agents":
self.send_json(self.dashboard_api.get_pool_status())
# GET /api/agents/{task_id}/output - Agent streaming output
elif path.startswith("/api/agents/") and path.endswith("/output"):
parts = path.split("/")
task_id = parts[3] if len(parts) > 4 else None
if task_id:
since = int(query.get("since", [0])[0])
output = self.runner.agent_pool.get_task_output(task_id, since)
if output:
self.send_json(output)
else:
self.send_error_json(404, "NOT_FOUND", f"Task {task_id} not found")
else:
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
# GET /api/agents/{task_id}/stream - SSE stream for agent output
elif path.startswith("/api/agents/") and path.endswith("/stream"):
parts = path.split("/")
task_id = parts[3] if len(parts) > 4 else None
if task_id:
self.handle_agent_stream(task_id)
else:
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
# GET /api/history - Agent run history with filtering
elif path == "/api/history":
limit = int(query.get("limit", [50])[0])
offset = int(query.get("offset", [0])[0])
task_type = query.get("type", [None])[0]
status = query.get("status", [None])[0]
since = query.get("since", [None])[0]
until = query.get("until", [None])[0]
repo = query.get("repo", [None])[0]
search = query.get("search", [None])[0]
# Use filtered method if any filters are provided
if any([task_type, status, since, until, repo, search]):
runs, total = self.runner.agent_pool.history.get_runs_filtered(
limit=limit,
offset=offset,
task_type=task_type,
status=status,
since=since,
until=until,
repo=repo,
search=search,
)
self.send_json({"runs": runs, "limit": limit, "offset": offset, "total": total})
else:
runs = self.runner.agent_pool.history.get_runs(limit, offset)
# For unfiltered, total is the full index length
total = len(self.runner.agent_pool.history._index.get("runs", []))
self.send_json({"runs": runs, "limit": limit, "offset": offset, "total": total})
# GET /api/history/{task_id} - Single history run
elif path.startswith("/api/history/"):
task_id = unquote(path.split("/")[-1])
run = self.runner.agent_pool.history.get_run(task_id)
if run:
self.send_json(run)
else:
# Also check if it's an active task
output = self.runner.agent_pool.get_task_output(task_id)
if output:
self.send_json(output)
else:
self.send_error_json(404, "NOT_FOUND", f"Run {task_id} not found")
# GET /api/config - Configuration
elif path == "/api/config":
self.send_json(self.dashboard_api.get_config())
# GET /api/issues - Issue list (proxy to YouTrack)
elif path == "/api/issues":
states = query.get("state", ["Ready,Verify,Document"])[0].split(",")
limit = int(query.get("limit", [50])[0])
self.send_json(self._get_issues(states, limit))
# GET /api/issues/{id} - Single issue
elif path.startswith("/api/issues/"):
issue_id = path.split("/")[-1]
self.send_json(self._get_issue(issue_id))
# GET /api/builds - Build list (proxy to Woodpecker)
elif path == "/api/builds":
repo = query.get("repo", [None])[0]
status = query.get("status", [None])[0]
limit = int(query.get("limit", [50])[0])
self.send_json(self._get_builds(repo, status, limit))
# GET /api/builds/{repo}/{build_id}/logs - Build logs
elif "/builds/" in path and path.endswith("/logs"):
parts = path.split("/")
if len(parts) >= 5:
repo = parts[3]
build_id = int(parts[4])
lines = int(query.get("lines", [200])[0])
logs = self.runner.woodpecker.get_build_log_excerpt(f"cleargrow/{repo}", build_id, lines)
self.send_json({"logs": logs, "build_id": build_id, "repo": repo})
else:
self.send_error_json(400, "INVALID_PATH", "Invalid build logs path")
# GET /api/repos/health - Repository health status
elif path == "/api/repos/health":
self.send_json(self._get_repos_health())
# GET /api/system/overview - System overview with VPS status
elif path == "/api/system/overview":
self.send_json(self._get_system_overview())
# GET /api/analytics/agents - Agent analytics
elif path == "/api/analytics/agents":
period = query.get("period", ["7d"])[0]
self.send_json(self._get_agent_analytics(period))
# GET /api/analytics/builds - Build analytics
elif path == "/api/analytics/builds":
period = query.get("period", ["7d"])[0]
self.send_json(self._get_build_analytics(period))
# GET /api/analytics/issues - Issue analytics
elif path == "/api/analytics/issues":
period = query.get("period", ["7d"])[0]
self.send_json(self._get_issue_analytics(period))
# GET /api/orchestrator/status - Orchestrator session status
elif path == "/api/orchestrator/status":
self.send_json(self._get_orchestrator_status())
# GET /api/orchestrator/messages - Get conversation messages
elif path == "/api/orchestrator/messages":
since = int(query.get("since", [0])[0])
self.send_json(self._get_orchestrator_messages(since))
# GET /api/orchestrator/stream - SSE stream for orchestrator output
elif path == "/api/orchestrator/stream":
self.handle_orchestrator_stream()
else:
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
def handle_api_post(self, path: str):
"""Handle API POST requests."""
if not self.dashboard_api:
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
return
body = self.get_json_body()
if body is None:
self.send_error_json(400, "INVALID_JSON", "Invalid JSON body")
return
# POST /api/agents/{task_id}/kill - Kill an agent
if path.startswith("/api/agents/") and path.endswith("/kill"):
parts = path.split("/")
task_id = parts[3] if len(parts) > 3 else None
if task_id:
result = self.dashboard_api.kill_agent(task_id)
self.send_json(result, 200 if result["success"] else 404)
else:
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
# POST /api/agents/retry - Retry a task
elif path == "/api/agents/retry":
issue_id = body.get("issue_id")
task_type = body.get("task_type", "remediation")
if not issue_id:
self.send_error_json(400, "MISSING_ISSUE_ID", "Issue ID required")
return
# TODO: Implement retry logic
self.send_json({"success": False, "message": "Retry not yet implemented"})
# POST /api/issues/{id}/transition - Transition issue state
elif path.startswith("/api/issues/") and path.endswith("/transition"):
parts = path.split("/")
issue_id = parts[3] if len(parts) > 4 else None
new_state = body.get("state")
if issue_id and new_state:
result = self._transition_issue(issue_id, new_state)
self.send_json(result, 200 if result.get("success") else 400)
else:
self.send_error_json(400, "MISSING_PARAMS", "Issue ID and state required")
# POST /api/issues/{id}/comments - Add comment to issue
elif path.startswith("/api/issues/") and path.endswith("/comments"):
parts = path.split("/")
issue_id = parts[3] if len(parts) > 4 else None
comment_text = body.get("text", "").strip()
if issue_id and comment_text:
result = self._add_issue_comment(issue_id, comment_text)
self.send_json(result, 200 if result.get("success") else 400)
else:
self.send_error_json(400, "MISSING_PARAMS", "Issue ID and comment text required")
# POST /api/issues - Create new issue
elif path == "/api/issues":
result = self._create_issue(body)
self.send_json(result, 201 if result.get("success") else 400)
# POST /api/builds/{repo}/{id}/retry - Retry a build
elif "/builds/" in path and path.endswith("/retry"):
parts = path.split("/")
if len(parts) >= 5:
repo = parts[3]
build_id = int(parts[4])
result = self.runner.woodpecker.retry_pipeline(f"cleargrow/{repo}", build_id)
self.send_json(result, 200 if result.get("success") else 400)
else:
self.send_error_json(400, "INVALID_PATH", "Invalid build retry path")
# POST /api/history/bulk-delete - Bulk delete history runs
elif path == "/api/history/bulk-delete":
task_ids = body.get("task_ids", [])
if not task_ids:
self.send_error_json(400, "MISSING_TASK_IDS", "task_ids array required")
return
deleted = self.runner.agent_pool.history.delete_runs(task_ids)
self.send_json({"success": True, "deleted": deleted})
# POST /api/issues/bulk-transition - Bulk transition issues
elif path == "/api/issues/bulk-transition":
issue_ids = body.get("issue_ids", [])
new_state = body.get("state")
if not issue_ids or not new_state:
self.send_error_json(400, "MISSING_PARAMS", "issue_ids and state required")
return
results = []
for issue_id in issue_ids:
result = self._transition_issue(issue_id, new_state)
results.append({
"issue_id": issue_id,
"success": result.get("success", False),
"error": result.get("message") if not result.get("success") else None,
})
self.send_json({"results": results})
# POST /api/orchestrator/chat - Send message to orchestrator
elif path == "/api/orchestrator/chat":
message = body.get("message", "").strip()
logger.info(f"[API] POST /api/orchestrator/chat (message length: {len(message)})")
if not message:
logger.warning("[API] orchestrator/chat: missing message")
self.send_error_json(400, "MISSING_MESSAGE", "Message required")
return
self.handle_orchestrator_chat(message)
# POST /api/orchestrator/action - Execute approved action
elif path == "/api/orchestrator/action":
action_id = body.get("action_id")
approved = body.get("approved", False)
logger.info(f"[API] POST /api/orchestrator/action (action_id={action_id}, approved={approved})")
if not action_id:
logger.warning("[API] orchestrator/action: missing action_id")
self.send_error_json(400, "MISSING_ACTION_ID", "action_id required")
return
result = self._execute_orchestrator_action(action_id, approved)
logger.info(f"[API] orchestrator/action result: {result}")
self.send_json(result, 200 if result.get("success") else 400)
# POST /api/orchestrator/stop - Stop orchestrator session
elif path == "/api/orchestrator/stop":
logger.info("[API] POST /api/orchestrator/stop")
result = self._stop_orchestrator()
logger.info(f"[API] orchestrator/stop result: {result}")
self.send_json(result)
# POST /api/orchestrator/start - Start orchestrator session
elif path == "/api/orchestrator/start":
logger.info("[API] POST /api/orchestrator/start")
result = self._start_orchestrator()
logger.info(f"[API] orchestrator/start result: {result}")
self.send_json(result, 200 if result.get("success") else 500)
else:
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
def handle_api_put(self, path: str):
"""Handle API PUT requests."""
if not self.dashboard_api:
self.send_error_json(503, "NOT_READY", "Dashboard API not initialized")
return
body = self.get_json_body()
if body is None:
self.send_error_json(400, "INVALID_JSON", "Invalid JSON body")
return
# PUT /api/agents/config - Update agent pool config
if path == "/api/agents/config":
result = self.dashboard_api.update_config(body)
self.send_json(result)
# PUT /api/config - Update general config
elif path == "/api/config":
result = self.dashboard_api.update_config(body)
self.send_json(result)
else:
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
def handle_api_delete(self, path: str, query: dict):
"""Handle API DELETE requests."""
if not self.runner:
self.send_error_json(503, "NOT_READY", "Runner not initialized")
return
# DELETE /api/history - Clear all history (requires confirm=true)
if path == "/api/history":
confirm = query.get("confirm", ["false"])[0].lower()
if confirm != "true":
self.send_error_json(400, "CONFIRMATION_REQUIRED",
"Add ?confirm=true to clear all history")
return
deleted = self.runner.agent_pool.history.clear_all()
self.send_json({"success": True, "deleted": deleted})
# DELETE /api/history/{task_id} - Delete single history run
elif path.startswith("/api/history/"):
task_id = unquote(path.split("/")[-1])
if not task_id:
self.send_error_json(400, "MISSING_TASK_ID", "Task ID required")
return
success = self.runner.agent_pool.history.delete_run(task_id)
if success:
self.send_json({"success": True, "task_id": task_id})
else:
self.send_error_json(404, "NOT_FOUND", f"Run {task_id} not found")
else:
self.send_error_json(404, "NOT_FOUND", f"Unknown API endpoint: {path}")
def handle_sse(self, query: dict):
"""Handle Server-Sent Events endpoint for real-time updates."""
since = query.get("since", [None])[0]
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.end_headers()
# Send recent events
events = broadcaster.get_recent_events(since)
for event in events:
self._send_sse_event(event)
# Note: For true SSE, we'd keep this connection open and stream events.
# Since we're using a simple HTTP server, clients should poll this endpoint.
def _send_sse_event(self, event: dict):
"""Send a single SSE event."""
try:
data = json.dumps(event)
self.wfile.write(f"data: {data}\n\n".encode())
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError):
pass
def serve_static(self, path: str):
"""Serve static files from the dashboard build."""
# Require auth for dashboard pages (but not for OAuth redirect)
if self.oauth:
cookie_header = self.headers.get("Cookie", "")
session = self.oauth.get_session_from_cookie(cookie_header)
if not session:
# Redirect to login
self.send_response(302)
self.send_header("Location", "/oauth/login")
self.end_headers()
return
# Normalize path
if path == "/" or path == "":
path = "/index.html"
# Resolve file path
file_path = STATIC_DIR / path.lstrip("/")
# SPA fallback - serve index.html for routes without file extensions
if not file_path.exists():
last_segment = path.split("/")[-1]
if "." not in last_segment:
file_path = STATIC_DIR / "index.html"
if not file_path.exists():
# If no static files exist yet, show a placeholder
if not STATIC_DIR.exists():
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"""
<!DOCTYPE html>
<html>
<head><title>Agent Dashboard</title></head>
<body>
<h1>Agent Dashboard</h1>
<p>Frontend not built yet. Run:</p>
<pre>cd /opt/agent_runner/dashboard && npm run build</pre>
<p><a href="/api/status">View API Status</a></p>
</body>
</html>
""")
return
self.send_error(404, f"File not found: {path}")
return
# Security: prevent path traversal
try:
file_path = file_path.resolve()
if not str(file_path).startswith(str(STATIC_DIR.resolve())):
self.send_error(403, "Forbidden")
return
except (ValueError, OSError):
self.send_error(403, "Forbidden")
return
# Determine content type
content_type, _ = mimetypes.guess_type(str(file_path))
if content_type is None:
content_type = "application/octet-stream"
# Send file
try:
content = file_path.read_bytes()
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", len(content))
# Cache static assets
if any(file_path.suffix in ext for ext in [".js", ".css", ".woff2", ".svg"]):
self.send_header("Cache-Control", "public, max-age=31536000")
self.end_headers()
self.wfile.write(content)
except IOError as e:
self.send_error(500, f"Error reading file: {e}")
def handle_webhook(self):
"""Placeholder for webhook handling - actual implementation in webhook_server.py."""
self.send_error_json(501, "NOT_IMPLEMENTED", "Webhook handling delegated to webhook_server")
# Helper methods for external service calls
def _get_issues(self, states: list[str], limit: int) -> list:
"""Fetch issues from YouTrack."""
if not self.runner:
return []
try:
issues = []
project = self.runner.config.get("project", {}).get("name", "CG")
for state in states:
try:
state_issues = self.runner.youtrack.get_issues_by_state(project, state)
for issue in state_issues[:limit]: # Apply limit after fetching
issues.append({
"id": issue.id,
"summary": issue.summary,
"state": state,
"priority": getattr(issue, 'priority', None),
"type": getattr(issue, 'type', None),
"created": getattr(issue, 'created', None),
"updated": getattr(issue, 'updated', None),
})
except Exception as e:
logger.warning(f"Failed to fetch issues for state {state}: {e}")
return issues
except Exception as e:
logger.error(f"Failed to fetch issues: {e}")
return []
def _get_issue(self, issue_id: str) -> dict:
"""Fetch single issue from YouTrack."""
if not self.runner:
return {"error": "Runner not available"}
try:
issue = self.runner.youtrack.get_issue(issue_id)
if not issue:
return {"error": f"Issue {issue_id} not found"}
comments = self.runner.youtrack.get_issue_comments(issue_id)
# Get issue type from custom fields
issue_type = issue.custom_fields.get('Type', None)
# Convert timestamps (YouTrack uses milliseconds)
created = issue.created
updated = issue.updated
if isinstance(created, int) and created > 0:
from datetime import datetime
created = datetime.fromtimestamp(created / 1000).isoformat()
if isinstance(updated, int) and updated > 0:
from datetime import datetime
updated = datetime.fromtimestamp(updated / 1000).isoformat()
return {
"id": issue.id,
"summary": issue.summary,
"description": getattr(issue, 'description', ''),
"state": getattr(issue, 'state', ''),
"priority": issue.priority,
"type": issue_type,
"reporter": getattr(issue, 'reporter', None),
"created": created,
"updated": updated,
"repository": getattr(issue, 'repository', None),
"comments": [
{
"author": c.author,
"body": c.text,
"created": datetime.fromtimestamp(c.created / 1000).isoformat() if isinstance(c.created, int) and c.created > 0 else "",
}
for c in comments
],
}
except Exception as e:
logger.error(f"Failed to fetch issue {issue_id}: {e}")
return {"error": str(e)}
def _transition_issue(self, issue_id: str, new_state: str) -> dict:
"""Transition issue to new state in YouTrack."""
if not self.runner:
return {"success": False, "message": "Runner not available"}
try:
self.runner.youtrack.update_issue_state(issue_id, new_state)
# Broadcast event
broadcaster.broadcast("issue.state_changed", {
"issue_id": issue_id,
"new_state": new_state,
})
return {"success": True, "issue_id": issue_id, "new_state": new_state}
except Exception as e:
logger.error(f"Failed to transition issue {issue_id}: {e}")
return {"success": False, "message": str(e)}
def _create_issue(self, data: dict) -> dict:
"""Create new issue in YouTrack."""
# TODO: Implement issue creation
return {"success": False, "message": "Issue creation not yet implemented"}
def _add_issue_comment(self, issue_id: str, text: str) -> dict:
"""Add comment to issue in YouTrack."""
if not self.runner:
return {"success": False, "message": "Runner not available"}
try:
success = self.runner.youtrack.add_issue_comment(issue_id, text)
if success:
return {"success": True, "issue_id": issue_id}
else:
return {"success": False, "message": "Failed to add comment"}
except Exception as e:
logger.error(f"Failed to add comment to issue {issue_id}: {e}")
return {"success": False, "message": str(e)}
def _get_builds(self, repo: Optional[str], status: Optional[str], limit: int) -> dict:
"""Fetch ALL builds from Woodpecker (not just running)."""
if not self.runner or not self.runner.woodpecker:
return {"builds": [], "total": 0}
try:
all_builds = []
repos_to_check = ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]
# Filter to single repo if specified
if repo and repo != "all":
repos_to_check = [f"cleargrow/{repo}"]
for repo_name in repos_to_check:
try:
repo_builds = self.runner.woodpecker.get_pipelines_extended(repo_name, limit)
all_builds.extend(repo_builds)
except Exception as e:
logger.warning(f"Failed to fetch builds for {repo_name}: {e}")
# Sort by created timestamp descending (most recent first)
all_builds.sort(key=lambda b: b.get('created') or 0, reverse=True)
# Filter by status if requested
if status:
all_builds = [b for b in all_builds if b.get("status", "").lower() == status.lower()]
total = len(all_builds)
return {"builds": all_builds[:limit], "total": total}
except Exception as e:
logger.error(f"Failed to fetch builds: {e}")
return {"builds": [], "total": 0}
def _get_repos_health(self) -> list:
"""Get health status for all repositories."""
import subprocess
from datetime import datetime, timedelta
if not self.runner:
return []
repos = []
repo_configs = self.runner.config.get("repos", {})
for key, repo_config in repo_configs.items():
repo_path = Path(repo_config.get("path", f"/opt/repos/{key}"))
health = {
"name": key,
"path": str(repo_path),
"platform": repo_config.get("platform", "unknown"),
"latestVersion": None,
"latestCommit": None,
"recentCommits": [],
"buildStatus": {
"lastBuild": None,
"successRate": 0,
"avgDuration": 0,
"buildsToday": 0,
"buildsThisWeek": 0,
},
}
# Get git info if repo exists
if repo_path.exists():
try:
# Get latest version tag
result = subprocess.run(
["git", "describe", "--tags", "--abbrev=0"],
cwd=repo_path,
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
health["latestVersion"] = result.stdout.strip()
# Get recent commits (last 5)
result = subprocess.run(
["git", "log", "--oneline", "-5", "--format=%H|%s|%an|%aI|%D"],
cwd=repo_path,
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
commits = []
for line in result.stdout.strip().split("\n"):
if line:
parts = line.split("|")
if len(parts) >= 4:
commit = {
"sha": parts[0],
"message": parts[1],
"author": parts[2],
"date": parts[3],
"branch": parts[4] if len(parts) > 4 else "",
}
commits.append(commit)
health["recentCommits"] = commits
if commits:
health["latestCommit"] = commits[0]
except (subprocess.TimeoutExpired, Exception) as e:
logger.warning(f"Failed to get git info for {key}: {e}")
# Get build status from Woodpecker
try:
if self.runner.woodpecker:
repo_full = f"cleargrow/{key}"
builds = self.runner.woodpecker.get_pipelines_extended(repo_full, 20)
if builds:
health["buildStatus"]["lastBuild"] = builds[0]
# Calculate stats
now = datetime.now()
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_start = today_start - timedelta(days=7)
successful = 0
total_duration = 0
duration_count = 0
builds_today = 0
builds_week = 0
for build in builds:
status = build.get("status", "").lower()
if status == "success":
successful += 1
# Duration calculation
started = build.get("started")
finished = build.get("finished")
if started and finished:
total_duration += finished - started
duration_count += 1
# Time-based counts
created = build.get("created", 0)
if created:
created_dt = datetime.fromtimestamp(created)
if created_dt >= today_start:
builds_today += 1
if created_dt >= week_start:
builds_week += 1
if builds:
health["buildStatus"]["successRate"] = round(
(successful / len(builds)) * 100, 1
)
if duration_count > 0:
health["buildStatus"]["avgDuration"] = round(
total_duration / duration_count
)
health["buildStatus"]["buildsToday"] = builds_today
health["buildStatus"]["buildsThisWeek"] = builds_week
except Exception as e:
logger.warning(f"Failed to get build status for {key}: {e}")
repos.append(health)
return repos
def _get_system_overview(self) -> dict:
"""Get system overview with VPS server status."""
import psutil
from datetime import datetime
from ssh_metrics import get_server_metrics
# Get SSH metrics for remote servers
vps_git_metrics = get_server_metrics("vps-git")
vps_track_metrics = get_server_metrics("vps-track")
vps_ci_metrics = get_server_metrics("vps-ci")
servers = [
{
"id": "vps-git",
"name": "vps-git",
"ip": "139.144.29.179",
"privateIp": "10.0.0.10",
"status": "healthy" if self._check_service_health("gitea") else "offline",
"services": [
{"name": "nginx", "status": "running", "port": 443, "url": "https://git.cleargrow.io"},
{"name": "gitea", "status": "running" if self._check_service_health("gitea") else "error", "port": 3000, "url": "https://git.cleargrow.io"},
{"name": "postgresql", "status": "unknown", "port": 5432},
],
"metrics": vps_git_metrics,
"lastCheck": datetime.now().isoformat(),
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@139.144.29.179",
},
{
"id": "vps-track",
"name": "vps-track",
"ip": "170.187.143.45",
"privateIp": "10.0.0.20",
"status": "healthy" if self._check_service_health("youtrack") else "offline",
"services": [
{"name": "nginx", "status": "running", "port": 443, "url": "https://track.cleargrow.io"},
{"name": "youtrack", "status": "running" if self._check_service_health("youtrack") else "error", "port": 8080, "url": "https://track.cleargrow.io"},
],
"metrics": vps_track_metrics,
"lastCheck": datetime.now().isoformat(),
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@170.187.143.45",
},
{
"id": "vps-ci",
"name": "vps-ci",
"ip": "173.230.138.66",
"privateIp": "10.0.0.30",
"status": "healthy" if self._check_service_health("woodpecker") else "offline",
"services": [
{"name": "nginx", "status": "running", "port": 443, "url": "https://ci.cleargrow.io"},
{"name": "woodpecker", "status": "running" if self._check_service_health("woodpecker") else "error", "port": 8000, "url": "https://ci.cleargrow.io"},
{"name": "registry", "status": "unknown", "port": 5000},
],
"metrics": vps_ci_metrics,
"lastCheck": datetime.now().isoformat(),
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@173.230.138.66",
},
{
"id": "vps-agents",
"name": "vps-agents",
"ip": "45.79.204.236",
"privateIp": "10.0.0.40",
"status": "healthy",
"services": [
{"name": "agent-runner", "status": "running", "port": 8765},
{"name": "claude-code", "status": "running"},
],
"metrics": self._get_local_metrics(),
"lastCheck": datetime.now().isoformat(),
"sshCommand": "ssh -i ~/.ssh/cleargrow_ed25519 root@45.79.204.236",
},
]
# Calculate summary stats
pool_status = self.runner.agent_pool.get_status() if self.runner else {}
return {
"servers": servers,
"totalAgents": pool_status.get("max", 10),
"activeAgents": pool_status.get("active", 0),
"issuesInFlight": self._count_in_flight_issues(),
"buildsRunning": self._count_running_builds(),
"lastUpdated": datetime.now().isoformat(),
}
def _check_service_health(self, service: str) -> bool:
"""Check if a service is healthy based on cached health status."""
if not self.dashboard_api:
return False
health = self.dashboard_api.get_health_status()
return health.get(service, False)
def _get_local_metrics(self) -> dict:
"""Get local system metrics for vps-agents."""
import psutil
try:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
boot_time = psutil.boot_time()
uptime = int(datetime.now().timestamp() - boot_time)
return {
"cpuPercent": round(cpu_percent, 1),
"memoryPercent": round(memory.percent, 1),
"memoryUsedMB": round(memory.used / (1024 * 1024)),
"memoryTotalMB": round(memory.total / (1024 * 1024)),
"diskPercent": round(disk.percent, 1),
"diskUsedGB": round(disk.used / (1024 * 1024 * 1024), 1),
"diskTotalGB": round(disk.total / (1024 * 1024 * 1024), 1),
"uptime": uptime,
}
except Exception as e:
logger.warning(f"Failed to get local metrics: {e}")
return None
def _count_in_flight_issues(self) -> int:
"""Count issues currently being processed."""
if not self.runner:
return 0
try:
project = self.runner.config.get("project", {}).get("name", "CG")
count = 0
for state in ["In Progress", "Build", "Verify", "Document"]:
try:
issues = self.runner.youtrack.get_issues_by_state(project, state)
count += len(issues) if issues else 0
except Exception:
pass
return count
except Exception:
return 0
def _count_running_builds(self) -> int:
"""Count currently running builds."""
if not self.runner or not self.runner.woodpecker:
return 0
try:
count = 0
for repo in ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]:
try:
builds = self.runner.woodpecker.get_pipelines_extended(repo, 5)
count += sum(1 for b in builds if b.get("status", "").lower() == "running")
except Exception:
pass
return count
except Exception:
return 0
def _get_agent_analytics(self, period: str) -> dict:
"""Get agent performance analytics."""
from datetime import datetime, timedelta
if not self.runner:
return self._empty_agent_analytics(period)
# Parse period
days = 7
if period == "24h":
days = 1
elif period == "30d":
days = 30
cutoff = datetime.now() - timedelta(days=days)
try:
# Get history runs
runs, total = self.runner.agent_pool.history.get_runs_filtered(
limit=1000,
offset=0,
since=cutoff.isoformat(),
)
if not runs:
return self._empty_agent_analytics(period)
# Calculate analytics
successful = sum(1 for r in runs if r.get("returncode", -1) == 0)
failed = sum(1 for r in runs if r.get("returncode", -1) != 0 and not r.get("timed_out", False))
timed_out = sum(1 for r in runs if r.get("timed_out", False))
# By task type
by_type = {}
for run in runs:
task_type = run.get("task_type", "unknown")
if task_type not in by_type:
by_type[task_type] = {"count": 0, "success": 0, "total_duration": 0}
by_type[task_type]["count"] += 1
if run.get("returncode", -1) == 0:
by_type[task_type]["success"] += 1
# By repo
by_repo = {}
for run in runs:
repo = run.get("repo", "unknown")
if repo not in by_repo:
by_repo[repo] = {"count": 0, "success": 0}
by_repo[repo]["count"] += 1
if run.get("returncode", -1) == 0:
by_repo[repo]["success"] += 1
return {
"period": period,
"totalRuns": len(runs),
"successfulRuns": successful,
"failedRuns": failed,
"timedOutRuns": timed_out,
"successRate": round((successful / len(runs)) * 100, 1) if runs else 0,
"avgDuration": 0, # Would need duration tracking
"byTaskType": {
k: {
"count": v["count"],
"successRate": round((v["success"] / v["count"]) * 100, 1) if v["count"] > 0 else 0,
"avgDuration": 0,
}
for k, v in by_type.items()
},
"byRepo": {
k: {
"count": v["count"],
"successRate": round((v["success"] / v["count"]) * 100, 1) if v["count"] > 0 else 0,
}
for k, v in by_repo.items()
},
"trend": [], # Would need time-series aggregation
}
except Exception as e:
logger.error(f"Failed to get agent analytics: {e}")
return self._empty_agent_analytics(period)
def _empty_agent_analytics(self, period: str) -> dict:
"""Return empty agent analytics structure."""
return {
"period": period,
"totalRuns": 0,
"successfulRuns": 0,
"failedRuns": 0,
"timedOutRuns": 0,
"successRate": 0,
"avgDuration": 0,
"byTaskType": {},
"byRepo": {},
"trend": [],
}
def _get_build_analytics(self, period: str) -> dict:
"""Get build analytics."""
from datetime import datetime, timedelta
if not self.runner or not self.runner.woodpecker:
return self._empty_build_analytics(period)
days = 7
if period == "24h":
days = 1
elif period == "30d":
days = 30
cutoff = datetime.now() - timedelta(days=days)
cutoff_ts = cutoff.timestamp()
try:
all_builds = []
repos_to_check = ["cleargrow/controller", "cleargrow/probe", "cleargrow/docs"]
for repo_name in repos_to_check:
try:
builds = self.runner.woodpecker.get_pipelines_extended(repo_name, 100)
# Filter by date
for build in builds:
created = build.get("created", 0)
if created >= cutoff_ts:
build["repo_key"] = repo_name.split("/")[-1]
all_builds.append(build)
except Exception as e:
logger.warning(f"Failed to fetch builds for {repo_name}: {e}")
if not all_builds:
return self._empty_build_analytics(period)
# Calculate stats
successful = sum(1 for b in all_builds if b.get("status", "").lower() == "success")
total_duration = 0
duration_count = 0
by_repo = {}
for build in all_builds:
repo_key = build.get("repo_key", "unknown")
if repo_key not in by_repo:
by_repo[repo_key] = {"total": 0, "success": 0, "total_duration": 0, "duration_count": 0}
by_repo[repo_key]["total"] += 1
if build.get("status", "").lower() == "success":
by_repo[repo_key]["success"] += 1
started = build.get("started")
finished = build.get("finished")
if started and finished:
duration = finished - started
total_duration += duration
duration_count += 1
by_repo[repo_key]["total_duration"] += duration
by_repo[repo_key]["duration_count"] += 1
return {
"period": period,
"totalBuilds": len(all_builds),
"successRate": round((successful / len(all_builds)) * 100, 1) if all_builds else 0,
"avgDuration": round(total_duration / duration_count) if duration_count > 0 else 0,
"buildsPerDay": round(len(all_builds) / days, 1),
"failurePatterns": [], # Would need log analysis
"byRepo": {
k: {
"total": v["total"],
"success": v["success"],
"avgDuration": round(v["total_duration"] / v["duration_count"]) if v["duration_count"] > 0 else 0,
}
for k, v in by_repo.items()
},
}
except Exception as e:
logger.error(f"Failed to get build analytics: {e}")
return self._empty_build_analytics(period)
def _empty_build_analytics(self, period: str) -> dict:
"""Return empty build analytics structure."""
return {
"period": period,
"totalBuilds": 0,
"successRate": 0,
"avgDuration": 0,
"buildsPerDay": 0,
"failurePatterns": [],
"byRepo": {},
}
def _get_issue_analytics(self, period: str) -> dict:
"""Get issue throughput analytics."""
# This is a simplified version - full implementation would track historical data
return {
"period": period,
"throughput": 0,
"avgTimeInState": {},
"stateDistribution": {},
"trend": [],
}
# Orchestrator helper methods
def _get_orchestrator_status(self) -> dict:
"""Get orchestrator session status."""
if not self.orchestrator_manager:
return {
"session_id": None,
"active": False,
"message_count": 0,
"pending_actions": [],
}
return self.orchestrator_manager.get_status()
def _get_orchestrator_messages(self, since: int) -> dict:
"""Get orchestrator conversation messages."""
if not self.orchestrator_manager:
return {"messages": [], "count": 0}
session = self.orchestrator_manager.get_session()
if not session:
return {"messages": [], "count": 0}
messages = session.get_messages(since)
return {
"messages": messages,
"count": len(messages),
"total": len(session.messages),
}
def _start_orchestrator(self) -> dict:
"""Start a new orchestrator session."""
if not self.orchestrator_manager:
return {"success": False, "error": "Orchestrator not configured"}
try:
session, created = self.orchestrator_manager.start_session()
return {
"success": True,
"session_id": session.session_id,
"created": created,
}
except Exception as e:
logger.error(f"Failed to start orchestrator: {e}")
return {"success": False, "error": str(e)}
def _stop_orchestrator(self) -> dict:
"""Stop the current orchestrator session."""
logger.debug("[API] _stop_orchestrator called")
if not self.orchestrator_manager:
logger.warning("[API] _stop_orchestrator: orchestrator not configured")
return {"success": False, "error": "Orchestrator not configured"}
stopped = self.orchestrator_manager.stop_session()
logger.info(f"[API] _stop_orchestrator: stopped={stopped}")
return {"success": stopped}
def _execute_orchestrator_action(self, action_id: str, approved: bool) -> dict:
"""Execute or reject an orchestrator action."""
logger.info(f"[API] _execute_orchestrator_action: action_id={action_id}, approved={approved}")
if not self.orchestrator_manager:
logger.error("[API] _execute_orchestrator_action: orchestrator not configured")
return {"success": False, "error": "Orchestrator not configured"}
session = self.orchestrator_manager.get_session()
if not session:
logger.error("[API] _execute_orchestrator_action: no active session")
return {"success": False, "error": "No active orchestrator session"}
logger.debug(f"[API] _execute_orchestrator_action: got session {session.session_id[:8]}")
result = session.execute_action(action_id, approved)
logger.info(f"[API] _execute_orchestrator_action: result={result}")
return result
def handle_orchestrator_chat(self, message: str):
"""
Handle chat message to orchestrator via SSE.
Streams response chunks back to the client.
"""
logger.info(f"[API] handle_orchestrator_chat: message length={len(message)}")
if not self.orchestrator_manager:
logger.error("[API] handle_orchestrator_chat: orchestrator not configured")
self.send_error_json(503, "NOT_CONFIGURED", "Orchestrator not configured")
return
# Start session if not active
try:
session, created = self.orchestrator_manager.start_session()
logger.info(f"[API] handle_orchestrator_chat: session={session.session_id[:8]}, created={created}")
except Exception as e:
logger.exception(f"[API] handle_orchestrator_chat: failed to start session: {e}")
self.send_error_json(500, "START_FAILED", str(e))
return
# Send SSE headers
logger.debug("[API] handle_orchestrator_chat: sending SSE headers")
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
try:
chunk_count = 0
for chunk in session.send_message(message):
chunk_count += 1
event_data = json.dumps(chunk)
logger.debug(f"[API] handle_orchestrator_chat: sending chunk #{chunk_count} type={chunk.get('type')}")
self.wfile.write(f"data: {event_data}\n\n".encode())
self.wfile.flush()
logger.info(f"[API] handle_orchestrator_chat: completed, sent {chunk_count} chunks")
except (BrokenPipeError, ConnectionResetError):
logger.warning("[API] handle_orchestrator_chat: client disconnected")
except Exception as e:
logger.exception(f"[API] handle_orchestrator_chat: error: {e}")
try:
error_data = json.dumps({"type": "error", "message": str(e)})
self.wfile.write(f"data: {error_data}\n\n".encode())
self.wfile.flush()
except Exception:
pass
def handle_orchestrator_stream(self):
"""
Stream orchestrator output via SSE.
Sends new messages as they arrive.
"""
if not self.orchestrator_manager:
self.send_error_json(503, "NOT_CONFIGURED", "Orchestrator not configured")
return
session = self.orchestrator_manager.get_session()
if not session:
self.send_error_json(404, "NO_SESSION", "No active orchestrator session")
return
# Send SSE headers
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
import time
last_index = 0
try:
while True:
# Check if session still active
current_session = self.orchestrator_manager.get_session()
if not current_session or current_session.session_id != session.session_id:
self.wfile.write(f"event: session_ended\ndata: {{}}\n\n".encode())
self.wfile.flush()
break
# Send new messages
messages = current_session.get_messages(last_index)
for msg in messages:
event_data = json.dumps(msg)
self.wfile.write(f"data: {event_data}\n\n".encode())
self.wfile.flush()
last_index += 1
# Send pending actions
status = current_session.get_status()
if status.get("pending_actions"):
actions_data = json.dumps({
"type": "actions",
"actions": status["pending_actions"],
})
self.wfile.write(f"data: {actions_data}\n\n".encode())
self.wfile.flush()
# Keepalive
self.wfile.write(": keepalive\n\n".encode())
self.wfile.flush()
time.sleep(0.5)
except (BrokenPipeError, ConnectionResetError):
logger.debug("Orchestrator stream client disconnected")
except Exception as e:
logger.error(f"Error in orchestrator stream: {e}")
def setup_api_handler(runner: "Runner", oauth=None):
"""
Configure the API handler with runner reference.
Call this before starting the HTTP server.
"""
from dashboard_api import DashboardAPI
from orchestrator import OrchestratorManager
DashboardAPIHandler.runner = runner
DashboardAPIHandler.dashboard_api = DashboardAPI(runner)
DashboardAPIHandler.oauth = oauth
DashboardAPIHandler.orchestrator_manager = OrchestratorManager(runner)
logger.info("Dashboard API handler configured (with orchestrator)")