Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
747 lines
27 KiB
Python
747 lines
27 KiB
Python
"""
|
|
Claude Code agent subprocess management.
|
|
|
|
Adapted for YouTrack + Gitea stack.
|
|
"""
|
|
|
|
import subprocess
|
|
import threading
|
|
import logging
|
|
import time
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Callable, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class StreamMessage:
|
|
"""A single message from Claude Code's streaming output."""
|
|
timestamp: datetime
|
|
type: str # "system", "user", "assistant", "result"
|
|
content: dict # Raw JSON content from stream
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"type": self.type,
|
|
"content": self.content,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class AgentTask:
|
|
"""A task to be executed by a Claude Code agent."""
|
|
task_id: str
|
|
issue_number: int
|
|
issue_id: str # YouTrack issue ID (e.g., "CG-123")
|
|
repo: str # Repository name
|
|
platform: str # Platform identifier (e.g., "controller", "probe")
|
|
work_dir: str
|
|
prompt: str
|
|
task_type: str = "remediation" # "remediation", "verification", or "librarian"
|
|
|
|
# Runtime state
|
|
started_at: Optional[datetime] = None
|
|
completed_at: Optional[datetime] = None
|
|
process: Optional[subprocess.Popen] = None
|
|
returncode: Optional[int] = None
|
|
stdout: str = ""
|
|
stderr: str = ""
|
|
timed_out: bool = False # True if killed due to timeout
|
|
|
|
# Streaming output
|
|
messages: list = field(default_factory=list) # List of StreamMessage
|
|
_messages_lock: threading.Lock = field(default_factory=threading.Lock)
|
|
_output_thread: Optional[threading.Thread] = None
|
|
|
|
def add_message(self, msg: StreamMessage):
|
|
"""Thread-safe message append."""
|
|
with self._messages_lock:
|
|
self.messages.append(msg)
|
|
|
|
def get_messages(self, since_index: int = 0) -> list[dict]:
|
|
"""Get messages since a given index (for polling)."""
|
|
with self._messages_lock:
|
|
return [m.to_dict() for m in self.messages[since_index:]]
|
|
|
|
def message_count(self) -> int:
|
|
"""Get current message count."""
|
|
with self._messages_lock:
|
|
return len(self.messages)
|
|
|
|
def to_history_dict(self) -> dict:
|
|
"""Convert task to a dict for history persistence."""
|
|
return {
|
|
"task_id": self.task_id,
|
|
"issue_id": self.issue_id,
|
|
"issue_number": self.issue_number,
|
|
"repo": self.repo,
|
|
"platform": self.platform,
|
|
"task_type": self.task_type,
|
|
"started_at": self.started_at.isoformat() if self.started_at else None,
|
|
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
|
|
"returncode": self.returncode,
|
|
"timed_out": self.timed_out,
|
|
"messages": self.get_messages(),
|
|
}
|
|
|
|
|
|
class AgentHistory:
|
|
"""Manages persistent history of agent runs."""
|
|
|
|
def __init__(self, history_dir: str = "/opt/agent_runner/history"):
|
|
self.history_dir = Path(history_dir)
|
|
self.history_dir.mkdir(parents=True, exist_ok=True)
|
|
self._index_file = self.history_dir / "index.json"
|
|
self._index_lock = threading.Lock()
|
|
self._load_index()
|
|
|
|
def _load_index(self):
|
|
"""Load or create the history index."""
|
|
if self._index_file.exists():
|
|
try:
|
|
with open(self._index_file, "r") as f:
|
|
self._index = json.load(f)
|
|
except (json.JSONDecodeError, IOError):
|
|
self._index = {"runs": []}
|
|
else:
|
|
self._index = {"runs": []}
|
|
|
|
def _save_index(self):
|
|
"""Save the history index."""
|
|
try:
|
|
with open(self._index_file, "w") as f:
|
|
json.dump(self._index, f, indent=2)
|
|
except IOError as e:
|
|
logger.error(f"Failed to save history index: {e}")
|
|
|
|
def _safe_filename(self, task_id: str) -> str:
|
|
"""Convert task_id to safe filename (replace : with _)."""
|
|
return task_id.replace(":", "_")
|
|
|
|
def save_run(self, task: "AgentTask"):
|
|
"""Save a completed agent run to history."""
|
|
run_data = task.to_history_dict()
|
|
safe_id = self._safe_filename(task.task_id)
|
|
|
|
# Save full run data to individual file
|
|
run_file = self.history_dir / f"{safe_id}.json"
|
|
try:
|
|
with open(run_file, "w") as f:
|
|
json.dump(run_data, f, indent=2)
|
|
except IOError as e:
|
|
logger.error(f"Failed to save run {task.task_id}: {e}")
|
|
return
|
|
|
|
# Update index with summary
|
|
with self._index_lock:
|
|
summary = {
|
|
"task_id": task.task_id,
|
|
"issue_id": task.issue_id,
|
|
"repo": task.repo,
|
|
"task_type": task.task_type,
|
|
"started_at": run_data["started_at"],
|
|
"completed_at": run_data["completed_at"],
|
|
"returncode": task.returncode,
|
|
"timed_out": task.timed_out,
|
|
"message_count": len(run_data["messages"]),
|
|
}
|
|
|
|
# Add to front of list (most recent first)
|
|
self._index["runs"].insert(0, summary)
|
|
|
|
# Keep only last 100 runs in index
|
|
if len(self._index["runs"]) > 100:
|
|
self._index["runs"] = self._index["runs"][:100]
|
|
|
|
self._save_index()
|
|
|
|
logger.info(f"Saved history for run {task.task_id}")
|
|
|
|
def get_runs(self, limit: int = 50, offset: int = 0) -> list[dict]:
|
|
"""Get list of recent runs (summaries only)."""
|
|
with self._index_lock:
|
|
return self._index["runs"][offset:offset + limit]
|
|
|
|
def get_run(self, task_id: str) -> Optional[dict]:
|
|
"""Get full run data by task ID."""
|
|
safe_id = self._safe_filename(task_id)
|
|
run_file = self.history_dir / f"{safe_id}.json"
|
|
if not run_file.exists():
|
|
return None
|
|
|
|
try:
|
|
with open(run_file, "r") as f:
|
|
return json.load(f)
|
|
except (json.JSONDecodeError, IOError) as e:
|
|
logger.error(f"Failed to load run {task_id}: {e}")
|
|
return None
|
|
|
|
def delete_run(self, task_id: str) -> bool:
|
|
"""Delete a run from history."""
|
|
safe_id = self._safe_filename(task_id)
|
|
run_file = self.history_dir / f"{safe_id}.json"
|
|
|
|
# Remove from index
|
|
with self._index_lock:
|
|
self._index["runs"] = [r for r in self._index["runs"] if r["task_id"] != task_id]
|
|
self._save_index()
|
|
|
|
# Delete file
|
|
try:
|
|
if run_file.exists():
|
|
run_file.unlink()
|
|
return True
|
|
except IOError as e:
|
|
logger.error(f"Failed to delete run {task_id}: {e}")
|
|
return False
|
|
|
|
def get_runs_filtered(
|
|
self,
|
|
limit: int = 50,
|
|
offset: int = 0,
|
|
task_type: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
since: Optional[str] = None,
|
|
until: Optional[str] = None,
|
|
repo: Optional[str] = None,
|
|
search: Optional[str] = None,
|
|
) -> tuple[list[dict], int]:
|
|
"""
|
|
Get filtered history runs.
|
|
|
|
Args:
|
|
limit: Maximum number of runs to return
|
|
offset: Number of runs to skip
|
|
task_type: Filter by task type (remediation, verification, librarian)
|
|
status: Filter by status (success, failed, timeout)
|
|
since: Filter runs started after this ISO date/datetime
|
|
until: Filter runs started before this ISO date/datetime
|
|
repo: Filter by repository name (partial match)
|
|
search: Search in issue_id (case-insensitive)
|
|
|
|
Returns:
|
|
Tuple of (filtered runs list, total count before pagination)
|
|
"""
|
|
with self._index_lock:
|
|
filtered = self._index["runs"]
|
|
|
|
# Filter by task type
|
|
if task_type:
|
|
filtered = [r for r in filtered if r.get("task_type") == task_type]
|
|
|
|
# Filter by status
|
|
if status:
|
|
if status == "success":
|
|
filtered = [r for r in filtered if r.get("returncode") == 0 and not r.get("timed_out")]
|
|
elif status == "failed":
|
|
filtered = [r for r in filtered if r.get("returncode") != 0 and not r.get("timed_out")]
|
|
elif status == "timeout":
|
|
filtered = [r for r in filtered if r.get("timed_out")]
|
|
|
|
# Filter by date range
|
|
if since:
|
|
filtered = [r for r in filtered if r.get("started_at", "") >= since]
|
|
if until:
|
|
# Add time component if only date provided
|
|
until_cmp = until if "T" in until else until + "T23:59:59"
|
|
filtered = [r for r in filtered if r.get("started_at", "") <= until_cmp]
|
|
|
|
# Filter by repo (partial match)
|
|
if repo:
|
|
filtered = [r for r in filtered if repo.lower() in r.get("repo", "").lower()]
|
|
|
|
# Search in issue_id
|
|
if search:
|
|
search_upper = search.upper()
|
|
filtered = [r for r in filtered if search_upper in r.get("issue_id", "").upper()]
|
|
|
|
total = len(filtered)
|
|
return filtered[offset:offset + limit], total
|
|
|
|
def delete_runs(self, task_ids: list[str]) -> int:
|
|
"""
|
|
Bulk delete runs.
|
|
|
|
Args:
|
|
task_ids: List of task IDs to delete
|
|
|
|
Returns:
|
|
Number of runs successfully deleted
|
|
"""
|
|
deleted = 0
|
|
for task_id in task_ids:
|
|
if self.delete_run(task_id):
|
|
deleted += 1
|
|
return deleted
|
|
|
|
def clear_all(self) -> int:
|
|
"""
|
|
Delete all history runs.
|
|
|
|
Returns:
|
|
Number of runs deleted
|
|
"""
|
|
with self._index_lock:
|
|
count = len(self._index["runs"])
|
|
|
|
# Delete all individual run files
|
|
for run in self._index["runs"]:
|
|
safe_id = self._safe_filename(run["task_id"])
|
|
run_file = self.history_dir / f"{safe_id}.json"
|
|
try:
|
|
if run_file.exists():
|
|
run_file.unlink()
|
|
except IOError as e:
|
|
logger.error(f"Failed to delete run file {run['task_id']}: {e}")
|
|
|
|
# Clear the index
|
|
self._index["runs"] = []
|
|
self._save_index()
|
|
|
|
logger.info(f"Cleared all history ({count} runs)")
|
|
return count
|
|
|
|
|
|
class AgentPool:
|
|
"""Manages a pool of Claude Code agent processes."""
|
|
|
|
# Timeout for thread join during shutdown
|
|
SHUTDOWN_TIMEOUT = 10 # seconds
|
|
MONITOR_INTERVAL = 1 # seconds between monitor checks
|
|
|
|
def __init__(
|
|
self,
|
|
max_agents: int = 3,
|
|
claude_command: str = "claude",
|
|
claude_flags: list[str] = None,
|
|
on_complete: Callable[[AgentTask], None] = None,
|
|
timeout_seconds: int = 1800, # 30 minutes default
|
|
history_dir: str = "/opt/agent_runner/history",
|
|
):
|
|
self.max_agents = max_agents
|
|
self.claude_command = claude_command
|
|
self.claude_flags = claude_flags or []
|
|
self.on_complete = on_complete
|
|
self.timeout_seconds = timeout_seconds
|
|
|
|
self._active: dict[str, AgentTask] = {}
|
|
self._lock = threading.Lock()
|
|
self._monitor_thread: Optional[threading.Thread] = None
|
|
self._shutdown_event = threading.Event()
|
|
|
|
# History persistence
|
|
self.history = AgentHistory(history_dir)
|
|
|
|
def start(self):
|
|
"""Start the agent pool monitor."""
|
|
self._shutdown_event.clear()
|
|
self._monitor_thread = threading.Thread(
|
|
target=self._monitor_loop,
|
|
name="AgentPoolMonitor",
|
|
daemon=False
|
|
)
|
|
self._monitor_thread.start()
|
|
logger.info(f"Agent pool started (max_agents={self.max_agents})")
|
|
|
|
def stop(self):
|
|
"""Stop the agent pool gracefully."""
|
|
logger.debug("Initiating agent pool shutdown...")
|
|
self._shutdown_event.set()
|
|
|
|
if self._monitor_thread:
|
|
self._monitor_thread.join(timeout=self.SHUTDOWN_TIMEOUT)
|
|
if self._monitor_thread.is_alive():
|
|
logger.warning(f"Agent pool monitor thread did not terminate within {self.SHUTDOWN_TIMEOUT}s")
|
|
else:
|
|
logger.debug("Agent pool monitor thread terminated cleanly")
|
|
self._monitor_thread = None
|
|
|
|
logger.info("Agent pool stopped")
|
|
|
|
@property
|
|
def active_count(self) -> int:
|
|
with self._lock:
|
|
return len(self._active)
|
|
|
|
@property
|
|
def has_capacity(self) -> bool:
|
|
return self.active_count < self.max_agents
|
|
|
|
def is_task_running(self, task_id: str) -> bool:
|
|
with self._lock:
|
|
return task_id in self._active
|
|
|
|
def submit(self, task: AgentTask) -> bool:
|
|
"""
|
|
Submit a task to the pool atomically.
|
|
|
|
This method is thread-safe and performs an atomic check-and-add operation.
|
|
Callers should rely on the return value rather than calling is_task_running()
|
|
separately, to avoid race conditions.
|
|
|
|
Returns:
|
|
True if submitted successfully
|
|
False if pool is at capacity or task is already running
|
|
"""
|
|
with self._lock:
|
|
if len(self._active) >= self.max_agents:
|
|
logger.warning(f"Pool at capacity, rejecting task {task.task_id}")
|
|
return False
|
|
|
|
if task.task_id in self._active:
|
|
logger.warning(f"Task {task.task_id} already running")
|
|
return False
|
|
|
|
# Start the Claude Code process
|
|
try:
|
|
self._start_agent(task)
|
|
self._active[task.task_id] = task
|
|
logger.info(f"Started agent for task {task.task_id} (issue {task.issue_id})")
|
|
return True
|
|
except FileNotFoundError as e:
|
|
logger.error(f"Claude command not found for {task.task_id}: {e}")
|
|
return False
|
|
except PermissionError as e:
|
|
logger.error(f"Permission denied starting agent for {task.task_id}: {e}")
|
|
return False
|
|
except OSError as e:
|
|
logger.error(f"OS error starting agent for {task.task_id}: {e}")
|
|
return False
|
|
|
|
def _start_agent(self, task: AgentTask):
|
|
"""Start a Claude Code subprocess for the task with streaming output."""
|
|
cmd = [
|
|
self.claude_command,
|
|
"-p", task.prompt, # Print mode with prompt
|
|
"--output-format", "stream-json", # Stream JSON for real-time output
|
|
"--verbose", # Required for stream-json with -p
|
|
] + self.claude_flags
|
|
|
|
logger.debug(f"Running: {cmd[0]} -p '...' --output-format stream-json --verbose in {task.work_dir}")
|
|
|
|
task.started_at = datetime.now()
|
|
task.process = subprocess.Popen(
|
|
cmd,
|
|
cwd=task.work_dir,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1, # Line buffered for streaming
|
|
)
|
|
|
|
# Start output reader thread
|
|
task._output_thread = threading.Thread(
|
|
target=self._read_output,
|
|
args=(task,),
|
|
daemon=True,
|
|
name=f"output-{task.task_id}"
|
|
)
|
|
task._output_thread.start()
|
|
|
|
def _read_output(self, task: AgentTask):
|
|
"""Read streaming JSON output from Claude Code process."""
|
|
try:
|
|
for line in task.process.stdout:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(line)
|
|
msg_type = data.get("type", "unknown")
|
|
|
|
# Create StreamMessage from the JSON
|
|
msg = StreamMessage(
|
|
timestamp=datetime.now(),
|
|
type=msg_type,
|
|
content=data,
|
|
)
|
|
task.add_message(msg)
|
|
|
|
# Also accumulate raw output for backward compatibility
|
|
task.stdout += line + "\n"
|
|
|
|
except json.JSONDecodeError:
|
|
# Non-JSON output (shouldn't happen with stream-json)
|
|
logger.warning(f"Non-JSON output from {task.task_id}: {line[:100]}")
|
|
task.stdout += line + "\n"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading output from {task.task_id}: {e}")
|
|
|
|
# Read any remaining stderr
|
|
try:
|
|
if task.process and task.process.stderr:
|
|
task.stderr = task.process.stderr.read()
|
|
except Exception as e:
|
|
logger.error(f"Error reading stderr from {task.task_id}: {e}")
|
|
|
|
def _monitor_loop(self):
|
|
"""Monitor active agents for completion and timeouts."""
|
|
while not self._shutdown_event.is_set():
|
|
completed = []
|
|
timed_out = []
|
|
|
|
# Get current time outside the lock
|
|
now = datetime.now()
|
|
|
|
# Take a snapshot of active tasks to minimize lock duration
|
|
with self._lock:
|
|
active_snapshot = list(self._active.items())
|
|
|
|
# Check status outside the lock (process.poll() is thread-safe)
|
|
for task_id, task in active_snapshot:
|
|
# Check for completion
|
|
if task.process and task.process.poll() is not None:
|
|
# Process finished - communicate() should be called outside lock
|
|
completed.append((task_id, task))
|
|
|
|
# Check for timeout
|
|
elif task.started_at and self.timeout_seconds > 0:
|
|
elapsed = (now - task.started_at).total_seconds()
|
|
if elapsed > self.timeout_seconds:
|
|
timed_out.append((task_id, task))
|
|
|
|
# Process completed tasks - output is already read by _read_output thread
|
|
for task_id, task in completed:
|
|
if task.process:
|
|
# Wait for output thread to finish reading
|
|
if task._output_thread and task._output_thread.is_alive():
|
|
task._output_thread.join(timeout=2)
|
|
task.returncode = task.process.returncode
|
|
task.completed_at = now
|
|
|
|
# Handle timeouts (kill processes) - process operations outside lock
|
|
for task_id, task in timed_out:
|
|
if task.process:
|
|
logger.warning(
|
|
f"Agent {task_id} timed out after {self.timeout_seconds}s - killing"
|
|
)
|
|
try:
|
|
task.process.terminate()
|
|
# Give it 5 seconds to terminate gracefully
|
|
try:
|
|
task.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
task.process.kill()
|
|
task.process.wait()
|
|
|
|
task.returncode = -1
|
|
task.timed_out = True
|
|
task.stderr = f"Agent timed out after {self.timeout_seconds} seconds"
|
|
task.completed_at = datetime.now()
|
|
except ProcessLookupError:
|
|
logger.warning(f"Agent {task_id} process already terminated")
|
|
except PermissionError as e:
|
|
logger.error(f"Permission denied killing agent {task_id}: {e}")
|
|
except OSError as e:
|
|
logger.error(f"OS error killing timed out agent {task_id}: {e}")
|
|
|
|
# Combine completed and timed_out for removal (both are now lists of tuples)
|
|
all_completed = completed + timed_out
|
|
all_completed_ids = [task_id for task_id, _ in all_completed]
|
|
|
|
# Remove completed tasks from active dict (single lock acquisition)
|
|
removed_tasks = []
|
|
if all_completed_ids:
|
|
with self._lock:
|
|
for task_id in all_completed_ids:
|
|
task = self._active.pop(task_id, None)
|
|
if task:
|
|
removed_tasks.append(task)
|
|
|
|
# Handle callbacks outside the lock
|
|
for task in removed_tasks:
|
|
duration = (task.completed_at - task.started_at).total_seconds() if task.completed_at and task.started_at else 0
|
|
status = "TIMEOUT" if task.timed_out else f"rc={task.returncode}"
|
|
logger.info(
|
|
f"Agent completed: {task.task_id} "
|
|
f"({status}, duration={duration:.1f}s)"
|
|
)
|
|
|
|
if task.returncode != 0:
|
|
stderr_preview = task.stderr[:500] if task.stderr else ""
|
|
logger.warning(f"Agent {task.task_id} stderr: {stderr_preview}")
|
|
|
|
# Save to history before callback
|
|
try:
|
|
self.history.save_run(task)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save history for {task.task_id}: {e}")
|
|
|
|
if self.on_complete:
|
|
try:
|
|
self.on_complete(task)
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"on_complete callback failed with data error: {e}")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"on_complete callback failed with I/O error: {e}")
|
|
except RuntimeError as e:
|
|
logger.error(f"on_complete callback failed with runtime error: {e}")
|
|
|
|
# Use event wait instead of sleep for faster shutdown response
|
|
self._shutdown_event.wait(timeout=self.MONITOR_INTERVAL)
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current pool status."""
|
|
with self._lock:
|
|
return {
|
|
"max_agents": self.max_agents,
|
|
"active": len(self._active),
|
|
"available": self.max_agents - len(self._active),
|
|
"tasks": [
|
|
{
|
|
"task_id": t.task_id,
|
|
"issue_id": t.issue_id,
|
|
"repo": t.repo,
|
|
"platform": t.platform,
|
|
"task_type": t.task_type,
|
|
"started": t.started_at.isoformat() if t.started_at else None,
|
|
}
|
|
for t in self._active.values()
|
|
]
|
|
}
|
|
|
|
def get_task(self, task_id: str) -> Optional[AgentTask]:
|
|
"""Get a task by ID. Returns None if not found."""
|
|
with self._lock:
|
|
return self._active.get(task_id)
|
|
|
|
def get_task_output(self, task_id: str, since_index: int = 0) -> Optional[dict]:
|
|
"""
|
|
Get streaming output for a task.
|
|
|
|
Args:
|
|
task_id: The task ID
|
|
since_index: Only return messages after this index (for polling)
|
|
|
|
Returns:
|
|
Dict with messages and metadata, or None if task not found
|
|
"""
|
|
with self._lock:
|
|
task = self._active.get(task_id)
|
|
if not task:
|
|
return None
|
|
|
|
return {
|
|
"task_id": task_id,
|
|
"issue_id": task.issue_id,
|
|
"repo": task.repo,
|
|
"task_type": task.task_type,
|
|
"started": task.started_at.isoformat() if task.started_at else None,
|
|
"message_count": task.message_count(),
|
|
"messages": task.get_messages(since_index),
|
|
}
|
|
|
|
def kill_task(self, task_id: str, reason: str = "manual") -> bool:
|
|
"""
|
|
Kill a running task by ID.
|
|
|
|
Thread-safe: acquires lock and terminates the subprocess.
|
|
|
|
Args:
|
|
task_id: The task ID to kill
|
|
reason: Reason for killing (for logging)
|
|
|
|
Returns:
|
|
True if task was found and killed, False otherwise
|
|
"""
|
|
with self._lock:
|
|
task = self._active.get(task_id)
|
|
if not task:
|
|
return False
|
|
|
|
if not task.process:
|
|
return False
|
|
|
|
logger.warning(f"Killing agent {task_id} (reason: {reason})")
|
|
|
|
try:
|
|
# Try graceful termination first
|
|
task.process.terminate()
|
|
try:
|
|
task.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
# Force kill if it doesn't terminate
|
|
task.process.kill()
|
|
task.process.wait()
|
|
|
|
# Mark as killed
|
|
task.returncode = -1
|
|
task.timed_out = False
|
|
task.stderr = f"Agent killed by dashboard (reason: {reason})"
|
|
task.completed_at = datetime.now()
|
|
|
|
# Remove from active
|
|
del self._active[task_id]
|
|
|
|
return True
|
|
|
|
except ProcessLookupError:
|
|
logger.warning(f"Agent {task_id} process already terminated")
|
|
del self._active[task_id]
|
|
return True
|
|
except (PermissionError, OSError) as e:
|
|
logger.error(f"Failed to kill agent {task_id}: {e}")
|
|
return False
|
|
|
|
|
|
def build_prompt(
|
|
issue_number: int,
|
|
issue_id: str,
|
|
repo: str,
|
|
platform: str,
|
|
issue_body: str,
|
|
comments: list[dict],
|
|
template_path: Optional[Path] = None,
|
|
task_type: str = "remediation",
|
|
) -> str:
|
|
"""
|
|
Build the prompt for a Claude Code agent.
|
|
|
|
Args:
|
|
issue_number: Numeric issue number
|
|
issue_id: Full issue ID (e.g., "CG-123")
|
|
repo: Repository name
|
|
platform: Platform identifier
|
|
issue_body: Issue description
|
|
comments: List of comment dicts with 'body', 'author', 'createdAt'
|
|
template_path: Optional custom template file
|
|
task_type: "remediation", "verification", or "librarian"
|
|
"""
|
|
|
|
# Build comments section
|
|
instructions_parts = []
|
|
for i, comment in enumerate(comments, 1):
|
|
body = (comment.get("body") or "").strip()
|
|
if body:
|
|
author = comment.get("author", {}).get("login", "unknown")
|
|
created = comment.get("createdAt", "")[:10] if comment.get("createdAt") else ""
|
|
instructions_parts.append(f"### Comment {i} ({author}, {created})\n{body}")
|
|
|
|
instructions_text = "\n\n".join(instructions_parts) if instructions_parts else "No comments."
|
|
|
|
# Load template (required - no fallback)
|
|
if not template_path:
|
|
raise ValueError(f"No template path provided for task_type '{task_type}'")
|
|
if not template_path.exists():
|
|
raise FileNotFoundError(f"Prompt template not found: {template_path}")
|
|
template = template_path.read_text()
|
|
|
|
# Fill placeholders
|
|
prompt = template.format(
|
|
issue_number=issue_number,
|
|
issue_id=issue_id,
|
|
repo=repo,
|
|
platform=platform,
|
|
issue_body=issue_body,
|
|
instructions=instructions_text,
|
|
)
|
|
|
|
return prompt
|