Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
258 lines
8.8 KiB
Python
258 lines
8.8 KiB
Python
"""
|
|
Internal API server for MCP tools.
|
|
|
|
Exposes runner functionality via Unix socket for the orchestrator's MCP server.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import threading
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from runner import Runner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SOCKET_PATH = "/run/cleargrow/runner.sock"
|
|
|
|
|
|
class InternalAPIServer:
|
|
"""Unix socket server for internal API calls from MCP."""
|
|
|
|
def __init__(self, runner: "Runner"):
|
|
self.runner = runner
|
|
self.socket_path = SOCKET_PATH
|
|
self._server_socket: Optional[socket.socket] = None
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._shutdown = threading.Event()
|
|
|
|
def start(self):
|
|
"""Start the internal API server."""
|
|
try:
|
|
# Create directory if needed
|
|
socket_dir = os.path.dirname(self.socket_path)
|
|
if socket_dir and not os.path.exists(socket_dir):
|
|
os.makedirs(socket_dir, mode=0o755)
|
|
|
|
# Remove existing socket
|
|
if os.path.exists(self.socket_path):
|
|
os.unlink(self.socket_path)
|
|
|
|
self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self._server_socket.bind(self.socket_path)
|
|
self._server_socket.listen(5)
|
|
self._server_socket.settimeout(1.0) # Allow periodic shutdown checks
|
|
|
|
# Make socket accessible
|
|
os.chmod(self.socket_path, 0o666)
|
|
|
|
self._thread = threading.Thread(target=self._serve, daemon=True)
|
|
self._thread.start()
|
|
logger.info(f"Internal API server started on {self.socket_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to start internal API server: {e}")
|
|
raise
|
|
|
|
def stop(self):
|
|
"""Stop the internal API server."""
|
|
self._shutdown.set()
|
|
if self._server_socket:
|
|
self._server_socket.close()
|
|
if os.path.exists(self.socket_path):
|
|
os.unlink(self.socket_path)
|
|
logger.info("Internal API server stopped")
|
|
|
|
def _serve(self):
|
|
"""Main server loop."""
|
|
while not self._shutdown.is_set():
|
|
try:
|
|
conn, _ = self._server_socket.accept()
|
|
threading.Thread(
|
|
target=self._handle_connection,
|
|
args=(conn,),
|
|
daemon=True
|
|
).start()
|
|
except socket.timeout:
|
|
continue
|
|
except Exception as e:
|
|
if not self._shutdown.is_set():
|
|
logger.error(f"Internal API server error: {e}")
|
|
|
|
def _handle_connection(self, conn: socket.socket):
|
|
"""Handle a single client connection."""
|
|
try:
|
|
data = b""
|
|
while True:
|
|
chunk = conn.recv(4096)
|
|
if not chunk:
|
|
break
|
|
data += chunk
|
|
if b"\n" in data:
|
|
break
|
|
|
|
if data:
|
|
request = json.loads(data.decode().strip())
|
|
response = self._handle_request(request)
|
|
conn.sendall(json.dumps(response).encode() + b"\n")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Internal API request error: {e}")
|
|
try:
|
|
conn.sendall(json.dumps({"error": str(e)}).encode() + b"\n")
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
conn.close()
|
|
|
|
def _handle_request(self, request: dict) -> dict:
|
|
"""Route and handle an API request."""
|
|
method = request.get("method", "")
|
|
params = request.get("params", {})
|
|
|
|
handlers = {
|
|
"get_issues": self._get_issues,
|
|
"get_issue_details": self._get_issue_details,
|
|
"get_agent_pool_status": self._get_agent_pool_status,
|
|
"get_workflow_summary": self._get_workflow_summary,
|
|
"transition_issue": self._transition_issue,
|
|
"add_comment": self._add_comment,
|
|
}
|
|
|
|
handler = handlers.get(method)
|
|
if not handler:
|
|
return {"error": f"Unknown method: {method}"}
|
|
|
|
try:
|
|
return handler(params)
|
|
except Exception as e:
|
|
logger.error(f"Handler error for {method}: {e}")
|
|
return {"error": str(e)}
|
|
|
|
def _get_issues(self, params: dict) -> dict:
|
|
"""Get issues by state."""
|
|
state = params.get("state", "Ready")
|
|
limit = params.get("limit", 20)
|
|
|
|
try:
|
|
project = self.runner.config.get("project", {}).get("name", "CG")
|
|
issues = self.runner.youtrack.get_issues_by_state(project, state)
|
|
|
|
if not issues:
|
|
return {"issues": [], "count": 0, "state": state}
|
|
|
|
result = []
|
|
for issue in issues[:limit]:
|
|
result.append({
|
|
"id": issue.id,
|
|
"summary": issue.summary,
|
|
"type": getattr(issue, "type", "Task"),
|
|
"priority": getattr(issue, "priority", "Normal"),
|
|
"description": (getattr(issue, "description", "") or "")[:500],
|
|
})
|
|
|
|
return {"issues": result, "count": len(issues), "state": state}
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def _get_issue_details(self, params: dict) -> dict:
|
|
"""Get detailed info for a single issue."""
|
|
issue_id = params.get("issue_id")
|
|
if not issue_id:
|
|
return {"error": "issue_id required"}
|
|
|
|
try:
|
|
issue = self.runner.youtrack.get_issue(issue_id)
|
|
if not issue:
|
|
return {"error": f"Issue {issue_id} not found"}
|
|
|
|
return {
|
|
"id": issue.id,
|
|
"summary": issue.summary,
|
|
"type": getattr(issue, "type", "Task"),
|
|
"priority": getattr(issue, "priority", "Normal"),
|
|
"state": getattr(issue, "state", "Unknown"),
|
|
"description": getattr(issue, "description", ""),
|
|
"created": str(getattr(issue, "created", "")),
|
|
"updated": str(getattr(issue, "updated", "")),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def _get_agent_pool_status(self, params: dict) -> dict:
|
|
"""Get agent pool status."""
|
|
try:
|
|
status = self.runner.agent_pool.get_status()
|
|
return {
|
|
"active": status.get("active", 0),
|
|
"max_agents": status.get("max_agents", 10),
|
|
"tasks": status.get("tasks", []),
|
|
}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def _get_workflow_summary(self, params: dict) -> dict:
|
|
"""Get summary of all workflow states."""
|
|
try:
|
|
project = self.runner.config.get("project", {}).get("name", "CG")
|
|
states = ["Backlog", "Triage", "Ready", "In Progress", "Build", "Verify", "Document", "Review"]
|
|
|
|
summary = {}
|
|
for state in states:
|
|
try:
|
|
issues = self.runner.youtrack.get_issues_by_state(project, state)
|
|
count = len(issues) if issues else 0
|
|
summary[state] = {
|
|
"count": count,
|
|
"issues": [{"id": i.id, "summary": i.summary} for i in (issues or [])[:5]]
|
|
}
|
|
except Exception:
|
|
summary[state] = {"count": 0, "issues": []}
|
|
|
|
# Also get agent status
|
|
try:
|
|
pool = self.runner.agent_pool.get_status()
|
|
summary["agents"] = {
|
|
"active": pool.get("active", 0),
|
|
"max": pool.get("max_agents", 10),
|
|
}
|
|
except Exception:
|
|
summary["agents"] = {"active": 0, "max": 10}
|
|
|
|
return summary
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def _transition_issue(self, params: dict) -> dict:
|
|
"""Transition an issue to a new state."""
|
|
issue_id = params.get("issue_id")
|
|
new_state = params.get("new_state")
|
|
|
|
if not issue_id or not new_state:
|
|
return {"error": "issue_id and new_state required"}
|
|
|
|
try:
|
|
self.runner.youtrack.update_issue_state(issue_id, new_state)
|
|
return {"success": True, "issue_id": issue_id, "new_state": new_state}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
def _add_comment(self, params: dict) -> dict:
|
|
"""Add a comment to an issue."""
|
|
issue_id = params.get("issue_id")
|
|
text = params.get("text")
|
|
|
|
if not issue_id or not text:
|
|
return {"error": "issue_id and text required"}
|
|
|
|
try:
|
|
self.runner.youtrack.add_comment(issue_id, text)
|
|
return {"success": True, "issue_id": issue_id}
|
|
except Exception as e:
|
|
return {"error": str(e)}
|