Files
agentrunner/internal_api.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

258 lines
8.8 KiB
Python

"""
Internal API server for MCP tools.
Exposes runner functionality via Unix socket for the orchestrator's MCP server.
"""
import json
import logging
import os
import socket
import threading
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from runner import Runner
logger = logging.getLogger(__name__)
SOCKET_PATH = "/run/cleargrow/runner.sock"
class InternalAPIServer:
"""Unix socket server for internal API calls from MCP."""
def __init__(self, runner: "Runner"):
self.runner = runner
self.socket_path = SOCKET_PATH
self._server_socket: Optional[socket.socket] = None
self._thread: Optional[threading.Thread] = None
self._shutdown = threading.Event()
def start(self):
"""Start the internal API server."""
try:
# Create directory if needed
socket_dir = os.path.dirname(self.socket_path)
if socket_dir and not os.path.exists(socket_dir):
os.makedirs(socket_dir, mode=0o755)
# Remove existing socket
if os.path.exists(self.socket_path):
os.unlink(self.socket_path)
self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._server_socket.bind(self.socket_path)
self._server_socket.listen(5)
self._server_socket.settimeout(1.0) # Allow periodic shutdown checks
# Make socket accessible
os.chmod(self.socket_path, 0o666)
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()
logger.info(f"Internal API server started on {self.socket_path}")
except Exception as e:
logger.error(f"Failed to start internal API server: {e}")
raise
def stop(self):
"""Stop the internal API server."""
self._shutdown.set()
if self._server_socket:
self._server_socket.close()
if os.path.exists(self.socket_path):
os.unlink(self.socket_path)
logger.info("Internal API server stopped")
def _serve(self):
"""Main server loop."""
while not self._shutdown.is_set():
try:
conn, _ = self._server_socket.accept()
threading.Thread(
target=self._handle_connection,
args=(conn,),
daemon=True
).start()
except socket.timeout:
continue
except Exception as e:
if not self._shutdown.is_set():
logger.error(f"Internal API server error: {e}")
def _handle_connection(self, conn: socket.socket):
"""Handle a single client connection."""
try:
data = b""
while True:
chunk = conn.recv(4096)
if not chunk:
break
data += chunk
if b"\n" in data:
break
if data:
request = json.loads(data.decode().strip())
response = self._handle_request(request)
conn.sendall(json.dumps(response).encode() + b"\n")
except Exception as e:
logger.error(f"Internal API request error: {e}")
try:
conn.sendall(json.dumps({"error": str(e)}).encode() + b"\n")
except Exception:
pass
finally:
conn.close()
def _handle_request(self, request: dict) -> dict:
"""Route and handle an API request."""
method = request.get("method", "")
params = request.get("params", {})
handlers = {
"get_issues": self._get_issues,
"get_issue_details": self._get_issue_details,
"get_agent_pool_status": self._get_agent_pool_status,
"get_workflow_summary": self._get_workflow_summary,
"transition_issue": self._transition_issue,
"add_comment": self._add_comment,
}
handler = handlers.get(method)
if not handler:
return {"error": f"Unknown method: {method}"}
try:
return handler(params)
except Exception as e:
logger.error(f"Handler error for {method}: {e}")
return {"error": str(e)}
def _get_issues(self, params: dict) -> dict:
"""Get issues by state."""
state = params.get("state", "Ready")
limit = params.get("limit", 20)
try:
project = self.runner.config.get("project", {}).get("name", "CG")
issues = self.runner.youtrack.get_issues_by_state(project, state)
if not issues:
return {"issues": [], "count": 0, "state": state}
result = []
for issue in issues[:limit]:
result.append({
"id": issue.id,
"summary": issue.summary,
"type": getattr(issue, "type", "Task"),
"priority": getattr(issue, "priority", "Normal"),
"description": (getattr(issue, "description", "") or "")[:500],
})
return {"issues": result, "count": len(issues), "state": state}
except Exception as e:
return {"error": str(e)}
def _get_issue_details(self, params: dict) -> dict:
"""Get detailed info for a single issue."""
issue_id = params.get("issue_id")
if not issue_id:
return {"error": "issue_id required"}
try:
issue = self.runner.youtrack.get_issue(issue_id)
if not issue:
return {"error": f"Issue {issue_id} not found"}
return {
"id": issue.id,
"summary": issue.summary,
"type": getattr(issue, "type", "Task"),
"priority": getattr(issue, "priority", "Normal"),
"state": getattr(issue, "state", "Unknown"),
"description": getattr(issue, "description", ""),
"created": str(getattr(issue, "created", "")),
"updated": str(getattr(issue, "updated", "")),
}
except Exception as e:
return {"error": str(e)}
def _get_agent_pool_status(self, params: dict) -> dict:
"""Get agent pool status."""
try:
status = self.runner.agent_pool.get_status()
return {
"active": status.get("active", 0),
"max_agents": status.get("max_agents", 10),
"tasks": status.get("tasks", []),
}
except Exception as e:
return {"error": str(e)}
def _get_workflow_summary(self, params: dict) -> dict:
"""Get summary of all workflow states."""
try:
project = self.runner.config.get("project", {}).get("name", "CG")
states = ["Backlog", "Triage", "Ready", "In Progress", "Build", "Verify", "Document", "Review"]
summary = {}
for state in states:
try:
issues = self.runner.youtrack.get_issues_by_state(project, state)
count = len(issues) if issues else 0
summary[state] = {
"count": count,
"issues": [{"id": i.id, "summary": i.summary} for i in (issues or [])[:5]]
}
except Exception:
summary[state] = {"count": 0, "issues": []}
# Also get agent status
try:
pool = self.runner.agent_pool.get_status()
summary["agents"] = {
"active": pool.get("active", 0),
"max": pool.get("max_agents", 10),
}
except Exception:
summary["agents"] = {"active": 0, "max": 10}
return summary
except Exception as e:
return {"error": str(e)}
def _transition_issue(self, params: dict) -> dict:
"""Transition an issue to a new state."""
issue_id = params.get("issue_id")
new_state = params.get("new_state")
if not issue_id or not new_state:
return {"error": "issue_id and new_state required"}
try:
self.runner.youtrack.update_issue_state(issue_id, new_state)
return {"success": True, "issue_id": issue_id, "new_state": new_state}
except Exception as e:
return {"error": str(e)}
def _add_comment(self, params: dict) -> dict:
"""Add a comment to an issue."""
issue_id = params.get("issue_id")
text = params.get("text")
if not issue_id or not text:
return {"error": "issue_id and text required"}
try:
self.runner.youtrack.add_comment(issue_id, text)
return {"success": True, "issue_id": issue_id}
except Exception as e:
return {"error": str(e)}