Files
agentrunner/agent.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

747 lines
27 KiB
Python

"""
Claude Code agent subprocess management.
Adapted for YouTrack + Gitea stack.
"""
import subprocess
import threading
import logging
import time
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class StreamMessage:
"""A single message from Claude Code's streaming output."""
timestamp: datetime
type: str # "system", "user", "assistant", "result"
content: dict # Raw JSON content from stream
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp.isoformat(),
"type": self.type,
"content": self.content,
}
@dataclass
class AgentTask:
"""A task to be executed by a Claude Code agent."""
task_id: str
issue_number: int
issue_id: str # YouTrack issue ID (e.g., "CG-123")
repo: str # Repository name
platform: str # Platform identifier (e.g., "controller", "probe")
work_dir: str
prompt: str
task_type: str = "remediation" # "remediation", "verification", or "librarian"
# Runtime state
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
process: Optional[subprocess.Popen] = None
returncode: Optional[int] = None
stdout: str = ""
stderr: str = ""
timed_out: bool = False # True if killed due to timeout
# Streaming output
messages: list = field(default_factory=list) # List of StreamMessage
_messages_lock: threading.Lock = field(default_factory=threading.Lock)
_output_thread: Optional[threading.Thread] = None
def add_message(self, msg: StreamMessage):
"""Thread-safe message append."""
with self._messages_lock:
self.messages.append(msg)
def get_messages(self, since_index: int = 0) -> list[dict]:
"""Get messages since a given index (for polling)."""
with self._messages_lock:
return [m.to_dict() for m in self.messages[since_index:]]
def message_count(self) -> int:
"""Get current message count."""
with self._messages_lock:
return len(self.messages)
def to_history_dict(self) -> dict:
"""Convert task to a dict for history persistence."""
return {
"task_id": self.task_id,
"issue_id": self.issue_id,
"issue_number": self.issue_number,
"repo": self.repo,
"platform": self.platform,
"task_type": self.task_type,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"returncode": self.returncode,
"timed_out": self.timed_out,
"messages": self.get_messages(),
}
class AgentHistory:
"""Manages persistent history of agent runs."""
def __init__(self, history_dir: str = "/opt/agent_runner/history"):
self.history_dir = Path(history_dir)
self.history_dir.mkdir(parents=True, exist_ok=True)
self._index_file = self.history_dir / "index.json"
self._index_lock = threading.Lock()
self._load_index()
def _load_index(self):
"""Load or create the history index."""
if self._index_file.exists():
try:
with open(self._index_file, "r") as f:
self._index = json.load(f)
except (json.JSONDecodeError, IOError):
self._index = {"runs": []}
else:
self._index = {"runs": []}
def _save_index(self):
"""Save the history index."""
try:
with open(self._index_file, "w") as f:
json.dump(self._index, f, indent=2)
except IOError as e:
logger.error(f"Failed to save history index: {e}")
def _safe_filename(self, task_id: str) -> str:
"""Convert task_id to safe filename (replace : with _)."""
return task_id.replace(":", "_")
def save_run(self, task: "AgentTask"):
"""Save a completed agent run to history."""
run_data = task.to_history_dict()
safe_id = self._safe_filename(task.task_id)
# Save full run data to individual file
run_file = self.history_dir / f"{safe_id}.json"
try:
with open(run_file, "w") as f:
json.dump(run_data, f, indent=2)
except IOError as e:
logger.error(f"Failed to save run {task.task_id}: {e}")
return
# Update index with summary
with self._index_lock:
summary = {
"task_id": task.task_id,
"issue_id": task.issue_id,
"repo": task.repo,
"task_type": task.task_type,
"started_at": run_data["started_at"],
"completed_at": run_data["completed_at"],
"returncode": task.returncode,
"timed_out": task.timed_out,
"message_count": len(run_data["messages"]),
}
# Add to front of list (most recent first)
self._index["runs"].insert(0, summary)
# Keep only last 100 runs in index
if len(self._index["runs"]) > 100:
self._index["runs"] = self._index["runs"][:100]
self._save_index()
logger.info(f"Saved history for run {task.task_id}")
def get_runs(self, limit: int = 50, offset: int = 0) -> list[dict]:
"""Get list of recent runs (summaries only)."""
with self._index_lock:
return self._index["runs"][offset:offset + limit]
def get_run(self, task_id: str) -> Optional[dict]:
"""Get full run data by task ID."""
safe_id = self._safe_filename(task_id)
run_file = self.history_dir / f"{safe_id}.json"
if not run_file.exists():
return None
try:
with open(run_file, "r") as f:
return json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.error(f"Failed to load run {task_id}: {e}")
return None
def delete_run(self, task_id: str) -> bool:
"""Delete a run from history."""
safe_id = self._safe_filename(task_id)
run_file = self.history_dir / f"{safe_id}.json"
# Remove from index
with self._index_lock:
self._index["runs"] = [r for r in self._index["runs"] if r["task_id"] != task_id]
self._save_index()
# Delete file
try:
if run_file.exists():
run_file.unlink()
return True
except IOError as e:
logger.error(f"Failed to delete run {task_id}: {e}")
return False
def get_runs_filtered(
self,
limit: int = 50,
offset: int = 0,
task_type: Optional[str] = None,
status: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
repo: Optional[str] = None,
search: Optional[str] = None,
) -> tuple[list[dict], int]:
"""
Get filtered history runs.
Args:
limit: Maximum number of runs to return
offset: Number of runs to skip
task_type: Filter by task type (remediation, verification, librarian)
status: Filter by status (success, failed, timeout)
since: Filter runs started after this ISO date/datetime
until: Filter runs started before this ISO date/datetime
repo: Filter by repository name (partial match)
search: Search in issue_id (case-insensitive)
Returns:
Tuple of (filtered runs list, total count before pagination)
"""
with self._index_lock:
filtered = self._index["runs"]
# Filter by task type
if task_type:
filtered = [r for r in filtered if r.get("task_type") == task_type]
# Filter by status
if status:
if status == "success":
filtered = [r for r in filtered if r.get("returncode") == 0 and not r.get("timed_out")]
elif status == "failed":
filtered = [r for r in filtered if r.get("returncode") != 0 and not r.get("timed_out")]
elif status == "timeout":
filtered = [r for r in filtered if r.get("timed_out")]
# Filter by date range
if since:
filtered = [r for r in filtered if r.get("started_at", "") >= since]
if until:
# Add time component if only date provided
until_cmp = until if "T" in until else until + "T23:59:59"
filtered = [r for r in filtered if r.get("started_at", "") <= until_cmp]
# Filter by repo (partial match)
if repo:
filtered = [r for r in filtered if repo.lower() in r.get("repo", "").lower()]
# Search in issue_id
if search:
search_upper = search.upper()
filtered = [r for r in filtered if search_upper in r.get("issue_id", "").upper()]
total = len(filtered)
return filtered[offset:offset + limit], total
def delete_runs(self, task_ids: list[str]) -> int:
"""
Bulk delete runs.
Args:
task_ids: List of task IDs to delete
Returns:
Number of runs successfully deleted
"""
deleted = 0
for task_id in task_ids:
if self.delete_run(task_id):
deleted += 1
return deleted
def clear_all(self) -> int:
"""
Delete all history runs.
Returns:
Number of runs deleted
"""
with self._index_lock:
count = len(self._index["runs"])
# Delete all individual run files
for run in self._index["runs"]:
safe_id = self._safe_filename(run["task_id"])
run_file = self.history_dir / f"{safe_id}.json"
try:
if run_file.exists():
run_file.unlink()
except IOError as e:
logger.error(f"Failed to delete run file {run['task_id']}: {e}")
# Clear the index
self._index["runs"] = []
self._save_index()
logger.info(f"Cleared all history ({count} runs)")
return count
class AgentPool:
"""Manages a pool of Claude Code agent processes."""
# Timeout for thread join during shutdown
SHUTDOWN_TIMEOUT = 10 # seconds
MONITOR_INTERVAL = 1 # seconds between monitor checks
def __init__(
self,
max_agents: int = 3,
claude_command: str = "claude",
claude_flags: list[str] = None,
on_complete: Callable[[AgentTask], None] = None,
timeout_seconds: int = 1800, # 30 minutes default
history_dir: str = "/opt/agent_runner/history",
):
self.max_agents = max_agents
self.claude_command = claude_command
self.claude_flags = claude_flags or []
self.on_complete = on_complete
self.timeout_seconds = timeout_seconds
self._active: dict[str, AgentTask] = {}
self._lock = threading.Lock()
self._monitor_thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
# History persistence
self.history = AgentHistory(history_dir)
def start(self):
"""Start the agent pool monitor."""
self._shutdown_event.clear()
self._monitor_thread = threading.Thread(
target=self._monitor_loop,
name="AgentPoolMonitor",
daemon=False
)
self._monitor_thread.start()
logger.info(f"Agent pool started (max_agents={self.max_agents})")
def stop(self):
"""Stop the agent pool gracefully."""
logger.debug("Initiating agent pool shutdown...")
self._shutdown_event.set()
if self._monitor_thread:
self._monitor_thread.join(timeout=self.SHUTDOWN_TIMEOUT)
if self._monitor_thread.is_alive():
logger.warning(f"Agent pool monitor thread did not terminate within {self.SHUTDOWN_TIMEOUT}s")
else:
logger.debug("Agent pool monitor thread terminated cleanly")
self._monitor_thread = None
logger.info("Agent pool stopped")
@property
def active_count(self) -> int:
with self._lock:
return len(self._active)
@property
def has_capacity(self) -> bool:
return self.active_count < self.max_agents
def is_task_running(self, task_id: str) -> bool:
with self._lock:
return task_id in self._active
def submit(self, task: AgentTask) -> bool:
"""
Submit a task to the pool atomically.
This method is thread-safe and performs an atomic check-and-add operation.
Callers should rely on the return value rather than calling is_task_running()
separately, to avoid race conditions.
Returns:
True if submitted successfully
False if pool is at capacity or task is already running
"""
with self._lock:
if len(self._active) >= self.max_agents:
logger.warning(f"Pool at capacity, rejecting task {task.task_id}")
return False
if task.task_id in self._active:
logger.warning(f"Task {task.task_id} already running")
return False
# Start the Claude Code process
try:
self._start_agent(task)
self._active[task.task_id] = task
logger.info(f"Started agent for task {task.task_id} (issue {task.issue_id})")
return True
except FileNotFoundError as e:
logger.error(f"Claude command not found for {task.task_id}: {e}")
return False
except PermissionError as e:
logger.error(f"Permission denied starting agent for {task.task_id}: {e}")
return False
except OSError as e:
logger.error(f"OS error starting agent for {task.task_id}: {e}")
return False
def _start_agent(self, task: AgentTask):
"""Start a Claude Code subprocess for the task with streaming output."""
cmd = [
self.claude_command,
"-p", task.prompt, # Print mode with prompt
"--output-format", "stream-json", # Stream JSON for real-time output
"--verbose", # Required for stream-json with -p
] + self.claude_flags
logger.debug(f"Running: {cmd[0]} -p '...' --output-format stream-json --verbose in {task.work_dir}")
task.started_at = datetime.now()
task.process = subprocess.Popen(
cmd,
cwd=task.work_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered for streaming
)
# Start output reader thread
task._output_thread = threading.Thread(
target=self._read_output,
args=(task,),
daemon=True,
name=f"output-{task.task_id}"
)
task._output_thread.start()
def _read_output(self, task: AgentTask):
"""Read streaming JSON output from Claude Code process."""
try:
for line in task.process.stdout:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
msg_type = data.get("type", "unknown")
# Create StreamMessage from the JSON
msg = StreamMessage(
timestamp=datetime.now(),
type=msg_type,
content=data,
)
task.add_message(msg)
# Also accumulate raw output for backward compatibility
task.stdout += line + "\n"
except json.JSONDecodeError:
# Non-JSON output (shouldn't happen with stream-json)
logger.warning(f"Non-JSON output from {task.task_id}: {line[:100]}")
task.stdout += line + "\n"
except Exception as e:
logger.error(f"Error reading output from {task.task_id}: {e}")
# Read any remaining stderr
try:
if task.process and task.process.stderr:
task.stderr = task.process.stderr.read()
except Exception as e:
logger.error(f"Error reading stderr from {task.task_id}: {e}")
def _monitor_loop(self):
"""Monitor active agents for completion and timeouts."""
while not self._shutdown_event.is_set():
completed = []
timed_out = []
# Get current time outside the lock
now = datetime.now()
# Take a snapshot of active tasks to minimize lock duration
with self._lock:
active_snapshot = list(self._active.items())
# Check status outside the lock (process.poll() is thread-safe)
for task_id, task in active_snapshot:
# Check for completion
if task.process and task.process.poll() is not None:
# Process finished - communicate() should be called outside lock
completed.append((task_id, task))
# Check for timeout
elif task.started_at and self.timeout_seconds > 0:
elapsed = (now - task.started_at).total_seconds()
if elapsed > self.timeout_seconds:
timed_out.append((task_id, task))
# Process completed tasks - output is already read by _read_output thread
for task_id, task in completed:
if task.process:
# Wait for output thread to finish reading
if task._output_thread and task._output_thread.is_alive():
task._output_thread.join(timeout=2)
task.returncode = task.process.returncode
task.completed_at = now
# Handle timeouts (kill processes) - process operations outside lock
for task_id, task in timed_out:
if task.process:
logger.warning(
f"Agent {task_id} timed out after {self.timeout_seconds}s - killing"
)
try:
task.process.terminate()
# Give it 5 seconds to terminate gracefully
try:
task.process.wait(timeout=5)
except subprocess.TimeoutExpired:
task.process.kill()
task.process.wait()
task.returncode = -1
task.timed_out = True
task.stderr = f"Agent timed out after {self.timeout_seconds} seconds"
task.completed_at = datetime.now()
except ProcessLookupError:
logger.warning(f"Agent {task_id} process already terminated")
except PermissionError as e:
logger.error(f"Permission denied killing agent {task_id}: {e}")
except OSError as e:
logger.error(f"OS error killing timed out agent {task_id}: {e}")
# Combine completed and timed_out for removal (both are now lists of tuples)
all_completed = completed + timed_out
all_completed_ids = [task_id for task_id, _ in all_completed]
# Remove completed tasks from active dict (single lock acquisition)
removed_tasks = []
if all_completed_ids:
with self._lock:
for task_id in all_completed_ids:
task = self._active.pop(task_id, None)
if task:
removed_tasks.append(task)
# Handle callbacks outside the lock
for task in removed_tasks:
duration = (task.completed_at - task.started_at).total_seconds() if task.completed_at and task.started_at else 0
status = "TIMEOUT" if task.timed_out else f"rc={task.returncode}"
logger.info(
f"Agent completed: {task.task_id} "
f"({status}, duration={duration:.1f}s)"
)
if task.returncode != 0:
stderr_preview = task.stderr[:500] if task.stderr else ""
logger.warning(f"Agent {task.task_id} stderr: {stderr_preview}")
# Save to history before callback
try:
self.history.save_run(task)
except Exception as e:
logger.error(f"Failed to save history for {task.task_id}: {e}")
if self.on_complete:
try:
self.on_complete(task)
except (KeyError, ValueError, TypeError) as e:
logger.error(f"on_complete callback failed with data error: {e}")
except (IOError, OSError) as e:
logger.error(f"on_complete callback failed with I/O error: {e}")
except RuntimeError as e:
logger.error(f"on_complete callback failed with runtime error: {e}")
# Use event wait instead of sleep for faster shutdown response
self._shutdown_event.wait(timeout=self.MONITOR_INTERVAL)
def get_status(self) -> dict:
"""Get current pool status."""
with self._lock:
return {
"max_agents": self.max_agents,
"active": len(self._active),
"available": self.max_agents - len(self._active),
"tasks": [
{
"task_id": t.task_id,
"issue_id": t.issue_id,
"repo": t.repo,
"platform": t.platform,
"task_type": t.task_type,
"started": t.started_at.isoformat() if t.started_at else None,
}
for t in self._active.values()
]
}
def get_task(self, task_id: str) -> Optional[AgentTask]:
"""Get a task by ID. Returns None if not found."""
with self._lock:
return self._active.get(task_id)
def get_task_output(self, task_id: str, since_index: int = 0) -> Optional[dict]:
"""
Get streaming output for a task.
Args:
task_id: The task ID
since_index: Only return messages after this index (for polling)
Returns:
Dict with messages and metadata, or None if task not found
"""
with self._lock:
task = self._active.get(task_id)
if not task:
return None
return {
"task_id": task_id,
"issue_id": task.issue_id,
"repo": task.repo,
"task_type": task.task_type,
"started": task.started_at.isoformat() if task.started_at else None,
"message_count": task.message_count(),
"messages": task.get_messages(since_index),
}
def kill_task(self, task_id: str, reason: str = "manual") -> bool:
"""
Kill a running task by ID.
Thread-safe: acquires lock and terminates the subprocess.
Args:
task_id: The task ID to kill
reason: Reason for killing (for logging)
Returns:
True if task was found and killed, False otherwise
"""
with self._lock:
task = self._active.get(task_id)
if not task:
return False
if not task.process:
return False
logger.warning(f"Killing agent {task_id} (reason: {reason})")
try:
# Try graceful termination first
task.process.terminate()
try:
task.process.wait(timeout=5)
except subprocess.TimeoutExpired:
# Force kill if it doesn't terminate
task.process.kill()
task.process.wait()
# Mark as killed
task.returncode = -1
task.timed_out = False
task.stderr = f"Agent killed by dashboard (reason: {reason})"
task.completed_at = datetime.now()
# Remove from active
del self._active[task_id]
return True
except ProcessLookupError:
logger.warning(f"Agent {task_id} process already terminated")
del self._active[task_id]
return True
except (PermissionError, OSError) as e:
logger.error(f"Failed to kill agent {task_id}: {e}")
return False
def build_prompt(
issue_number: int,
issue_id: str,
repo: str,
platform: str,
issue_body: str,
comments: list[dict],
template_path: Optional[Path] = None,
task_type: str = "remediation",
) -> str:
"""
Build the prompt for a Claude Code agent.
Args:
issue_number: Numeric issue number
issue_id: Full issue ID (e.g., "CG-123")
repo: Repository name
platform: Platform identifier
issue_body: Issue description
comments: List of comment dicts with 'body', 'author', 'createdAt'
template_path: Optional custom template file
task_type: "remediation", "verification", or "librarian"
"""
# Build comments section
instructions_parts = []
for i, comment in enumerate(comments, 1):
body = (comment.get("body") or "").strip()
if body:
author = comment.get("author", {}).get("login", "unknown")
created = comment.get("createdAt", "")[:10] if comment.get("createdAt") else ""
instructions_parts.append(f"### Comment {i} ({author}, {created})\n{body}")
instructions_text = "\n\n".join(instructions_parts) if instructions_parts else "No comments."
# Load template (required - no fallback)
if not template_path:
raise ValueError(f"No template path provided for task_type '{task_type}'")
if not template_path.exists():
raise FileNotFoundError(f"Prompt template not found: {template_path}")
template = template_path.read_text()
# Fill placeholders
prompt = template.format(
issue_number=issue_number,
issue_id=issue_id,
repo=repo,
platform=platform,
issue_body=issue_body,
instructions=instructions_text,
)
return prompt