Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1544 lines
66 KiB
Python
1544 lines
66 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ClearGrow Agent Runner
|
|
|
|
Monitors YouTrack for issues and spawns Claude Code agents.
|
|
Supports both polling and webhook modes.
|
|
|
|
Stack:
|
|
- YouTrack: Issue tracking and Kanban boards
|
|
- Gitea: Git repository hosting
|
|
- TeamCity: CI/CD (triggered via webhooks)
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import yaml
|
|
import requests
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from queue import Queue, Empty
|
|
|
|
from youtrack_client import YouTrackClient, YouTrackIssue, load_youtrack_config
|
|
from gitea_client import GiteaClient, load_gitea_config, git_push, git_merge_to_main, git_branch_exists, git_current_branch
|
|
from agent import AgentPool, AgentTask, build_prompt
|
|
from webhook_server import WebhookServer, WebhookEvent, load_webhook_config
|
|
from woodpecker_client import WoodpeckerClient, BuildInfo
|
|
from internal_api import InternalAPIServer
|
|
import re
|
|
|
|
# Constants
|
|
DEFAULT_WEBHOOK_QUEUE_SIZE = 1000 # Maximum queued webhook events
|
|
DEFAULT_POLL_INTERVAL = 300 # 5 minutes
|
|
DEFAULT_AGENT_TIMEOUT = 1800 # 30 minutes
|
|
DEFAULT_MAX_PARALLEL_AGENTS = 3
|
|
DEFAULT_HEALTH_CHECK_INTERVAL = 300 # 5 minutes
|
|
MAX_BACKOFF_INTERVAL = 1800 # 30 minutes max backoff
|
|
|
|
# Map agent task types to YouTrack work item types
|
|
AGENT_TO_WORK_TYPE = {
|
|
"remediation": "Development",
|
|
"verification": "Testing",
|
|
"librarian": "Documentation",
|
|
}
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
|
datefmt="%Y-%m-%d %H:%M:%S",
|
|
)
|
|
logger = logging.getLogger("runner")
|
|
|
|
|
|
class SystemHealth:
|
|
"""
|
|
Monitors health of platform services.
|
|
|
|
Checks YouTrack, Gitea, and TeamCity availability before
|
|
processing tasks to avoid partial failures.
|
|
"""
|
|
|
|
def __init__(self, config: dict):
|
|
self.config = config
|
|
self.last_check = None
|
|
self.check_interval = config.get("health_check_interval", DEFAULT_HEALTH_CHECK_INTERVAL)
|
|
self._status = {
|
|
"youtrack": False,
|
|
"gitea": False,
|
|
"woodpecker": False,
|
|
}
|
|
|
|
def _check_endpoint(self, name: str, url: str, timeout: int = 10) -> bool:
|
|
"""Check if an HTTP endpoint is healthy."""
|
|
try:
|
|
response = requests.get(url, timeout=timeout)
|
|
healthy = response.status_code < 500
|
|
if not healthy:
|
|
logger.warning(f"{name} unhealthy: HTTP {response.status_code}")
|
|
return healthy
|
|
except requests.exceptions.Timeout:
|
|
logger.warning(f"{name} health check timed out: {url}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.warning(f"{name} connection failed: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"{name} health check error: {e}")
|
|
return False
|
|
|
|
def check_all(self, force: bool = False) -> dict:
|
|
"""
|
|
Check health of all services.
|
|
|
|
Args:
|
|
force: If True, ignore check interval and always check
|
|
|
|
Returns:
|
|
Dict with service names and boolean health status
|
|
"""
|
|
now = time.time()
|
|
|
|
# Skip if checked recently (unless forced)
|
|
if not force and self.last_check:
|
|
if now - self.last_check < self.check_interval:
|
|
return self._status
|
|
|
|
logger.debug("Running system health checks...")
|
|
|
|
# YouTrack
|
|
yt_config = self.config.get("youtrack", {})
|
|
if yt_config.get("base_url"):
|
|
yt_url = f"{yt_config['base_url'].rstrip('/')}/api/config"
|
|
self._status["youtrack"] = self._check_endpoint("YouTrack", yt_url)
|
|
|
|
# Gitea
|
|
gitea_config = self.config.get("gitea", {})
|
|
if gitea_config.get("base_url"):
|
|
gitea_url = f"{gitea_config['base_url'].rstrip('/')}/api/v1/version"
|
|
self._status["gitea"] = self._check_endpoint("Gitea", gitea_url)
|
|
|
|
# Woodpecker
|
|
wp_config = self.config.get("woodpecker", {})
|
|
if wp_config.get("base_url"):
|
|
wp_url = f"{wp_config['base_url'].rstrip('/')}/api/version"
|
|
self._status["woodpecker"] = self._check_endpoint("Woodpecker", wp_url)
|
|
|
|
self.last_check = now
|
|
|
|
# Log summary
|
|
healthy = [k for k, v in self._status.items() if v]
|
|
unhealthy = [k for k, v in self._status.items() if not v and self.config.get(k, {}).get("base_url")]
|
|
|
|
if unhealthy:
|
|
logger.warning(f"System health: {len(healthy)} healthy, {len(unhealthy)} unhealthy: {unhealthy}")
|
|
else:
|
|
logger.debug(f"System health: all {len(healthy)} services healthy")
|
|
|
|
return self._status
|
|
|
|
@property
|
|
def is_healthy(self) -> bool:
|
|
"""Check if critical services (YouTrack) are healthy."""
|
|
return self._status.get("youtrack", False)
|
|
|
|
@property
|
|
def can_process(self) -> bool:
|
|
"""Check if we can process tasks (YouTrack + Gitea required)."""
|
|
return self._status.get("youtrack", False) and self._status.get("gitea", True)
|
|
|
|
def wait_for_healthy(self, timeout: int = 300, interval: int = 30):
|
|
"""
|
|
Wait for services to become healthy.
|
|
|
|
Args:
|
|
timeout: Maximum seconds to wait
|
|
interval: Seconds between checks
|
|
"""
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
self.check_all(force=True)
|
|
if self.is_healthy:
|
|
return True
|
|
logger.info(f"Waiting for services to become healthy... ({int(time.time() - start)}s)")
|
|
time.sleep(interval)
|
|
return False
|
|
|
|
|
|
class ConfigurationError(Exception):
|
|
"""Raised when configuration is invalid or incomplete."""
|
|
pass
|
|
|
|
|
|
def _detect_verification_result(stdout: str) -> tuple[bool, str]:
|
|
"""
|
|
Parse agent stdout for explicit verification pass/fail markers.
|
|
|
|
The QA agent may exit with code 0 but still report verification failure
|
|
in its output. This function detects failure markers in stdout to provide
|
|
a secondary check.
|
|
|
|
Args:
|
|
stdout: The agent's stdout output
|
|
|
|
Returns:
|
|
Tuple of (passed: bool, reason: str) where:
|
|
- passed: True if verification passed, False if failure markers found
|
|
- reason: Human-readable explanation of the detection result
|
|
"""
|
|
if not stdout:
|
|
return True, "no stdout to analyze"
|
|
|
|
stdout_lower = stdout.lower()
|
|
|
|
# Check for explicit failure markers (case-insensitive)
|
|
failure_patterns = [
|
|
(r"verification\s+failed", "found 'verification failed'"),
|
|
(r"requirements?\s+not\s+met", "found 'requirements not met'"),
|
|
(r"was\s+not\s+implemented", "found 'was not implemented'"),
|
|
(r"were\s+not\s+implemented", "found 'were not implemented'"),
|
|
(r"not\s+actually\s+implemented", "found 'not actually implemented'"),
|
|
(r"none\s+of\s+the\s+.*\s+changes\s+were\s+(actually\s+)?implemented", "found 'none of the changes were implemented'"),
|
|
(r"acceptance\s+criteria\s+not\s+met", "found 'acceptance criteria not met'"),
|
|
(r"##\s*verification\s+failed", "found 'Verification Failed' header"),
|
|
]
|
|
|
|
for pattern, reason in failure_patterns:
|
|
if re.search(pattern, stdout_lower):
|
|
return False, reason
|
|
|
|
# Check for explicit pass markers
|
|
pass_patterns = [
|
|
(r"verification\s+passed", "found 'verification passed'"),
|
|
(r"verification\s+succeeded", "found 'verification succeeded'"),
|
|
(r"all\s+acceptance\s+criteria\s+met", "found 'all acceptance criteria met'"),
|
|
(r"##\s*verification\s+passed", "found 'Verification Passed' header"),
|
|
]
|
|
|
|
for pattern, reason in pass_patterns:
|
|
if re.search(pattern, stdout_lower):
|
|
return True, reason
|
|
|
|
# No explicit markers found - assume pass (rely on exit code)
|
|
return True, "no explicit pass/fail markers found"
|
|
|
|
|
|
class Runner:
|
|
def __init__(self, config_path: str):
|
|
self.config = self._load_config(config_path)
|
|
self._validate_config()
|
|
self.youtrack: Optional[YouTrackClient] = None
|
|
self.youtrack_build: Optional[YouTrackClient] = None # For build agent comments
|
|
self.youtrack_qa: Optional[YouTrackClient] = None # For QA agent comments
|
|
self.youtrack_developer: Optional[YouTrackClient] = None # For developer agent
|
|
self.youtrack_librarian: Optional[YouTrackClient] = None # For librarian agent
|
|
self.gitea: Optional[GiteaClient] = None
|
|
self.woodpecker: Optional[WoodpeckerClient] = None
|
|
self.agent_pool: Optional[AgentPool] = None
|
|
self.health: Optional[SystemHealth] = None
|
|
self.webhook_server: Optional[WebhookServer] = None
|
|
self.internal_api: Optional[InternalAPIServer] = None
|
|
self._webhook_queue: Queue = Queue(maxsize=DEFAULT_WEBHOOK_QUEUE_SIZE) # Bounded queue for webhook events
|
|
self._shutdown = False
|
|
self._shutdown_event = threading.Event() # For interruptible sleep
|
|
self._wake_event = threading.Event() # To wake up main loop early (e.g., for webhook processing)
|
|
self._seen_items: set[str] = set() # Track items we've already processed
|
|
self._build_checks: dict[str, dict] = {} # Track pending build checks
|
|
self._consecutive_failures = 0
|
|
self._max_consecutive_failures = 5
|
|
# Cache build_types mapping at initialization to avoid repeated config access
|
|
self._build_types: dict[str, str] = self.config.get("build_types", {
|
|
"controller": "Controller_Build",
|
|
"probe": "Probe_Build",
|
|
"docs": "Docs_BuildPdfs",
|
|
})
|
|
|
|
def _load_config(self, path: str) -> dict:
|
|
with open(path) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def _validate_config(self):
|
|
"""
|
|
Validate configuration structure and required fields.
|
|
|
|
Raises:
|
|
ConfigurationError: If configuration is invalid or missing required fields.
|
|
"""
|
|
errors = []
|
|
|
|
# Validate project configuration (required)
|
|
if "project" not in self.config:
|
|
errors.append("Missing required section: 'project'")
|
|
else:
|
|
project = self.config["project"]
|
|
if not isinstance(project, dict):
|
|
errors.append("'project' must be a dictionary")
|
|
elif "name" not in project:
|
|
errors.append("Missing required field: 'project.name'")
|
|
elif not project["name"]:
|
|
errors.append("'project.name' cannot be empty")
|
|
|
|
# Validate YouTrack configuration (required)
|
|
if "youtrack" not in self.config:
|
|
errors.append("Missing required section: 'youtrack'")
|
|
else:
|
|
youtrack = self.config["youtrack"]
|
|
if not isinstance(youtrack, dict):
|
|
errors.append("'youtrack' must be a dictionary")
|
|
else:
|
|
if not youtrack.get("base_url"):
|
|
errors.append("Missing required field: 'youtrack.base_url'")
|
|
if not youtrack.get("token"):
|
|
errors.append("Missing required field: 'youtrack.token'")
|
|
|
|
# Validate repos configuration (required)
|
|
if "repos" not in self.config:
|
|
errors.append("Missing required section: 'repos'")
|
|
else:
|
|
repos = self.config["repos"]
|
|
if not isinstance(repos, dict):
|
|
errors.append("'repos' must be a dictionary")
|
|
elif len(repos) == 0:
|
|
errors.append("'repos' cannot be empty - at least one repository required")
|
|
else:
|
|
for repo_name, repo_config in repos.items():
|
|
if not isinstance(repo_config, dict):
|
|
errors.append(f"'repos.{repo_name}' must be a dictionary")
|
|
elif not repo_config.get("path"):
|
|
errors.append(f"Missing required field: 'repos.{repo_name}.path'")
|
|
|
|
# Validate optional sections have correct types
|
|
if "gitea" in self.config and not isinstance(self.config["gitea"], dict):
|
|
errors.append("'gitea' must be a dictionary")
|
|
|
|
if "teamcity" in self.config and not isinstance(self.config["teamcity"], dict):
|
|
errors.append("'teamcity' must be a dictionary")
|
|
|
|
if "webhook" in self.config and not isinstance(self.config["webhook"], dict):
|
|
errors.append("'webhook' must be a dictionary")
|
|
|
|
if "claude" in self.config and not isinstance(self.config["claude"], dict):
|
|
errors.append("'claude' must be a dictionary")
|
|
|
|
# Validate numeric fields have correct types and ranges
|
|
if "poll_interval_seconds" in self.config:
|
|
val = self.config["poll_interval_seconds"]
|
|
if not isinstance(val, (int, float)) or val <= 0:
|
|
errors.append("'poll_interval_seconds' must be a positive number")
|
|
|
|
if "agent_timeout_seconds" in self.config:
|
|
val = self.config["agent_timeout_seconds"]
|
|
if not isinstance(val, (int, float)) or val <= 0:
|
|
errors.append("'agent_timeout_seconds' must be a positive number")
|
|
|
|
if "max_parallel_agents" in self.config:
|
|
val = self.config["max_parallel_agents"]
|
|
if not isinstance(val, int) or val <= 0:
|
|
errors.append("'max_parallel_agents' must be a positive integer")
|
|
|
|
if errors:
|
|
error_msg = "Configuration validation failed:\n - " + "\n - ".join(errors)
|
|
logger.error(error_msg)
|
|
raise ConfigurationError(error_msg)
|
|
|
|
def _setup_logging(self):
|
|
log_file = self.config.get("log_file")
|
|
if log_file:
|
|
handler = logging.FileHandler(log_file)
|
|
handler.setFormatter(logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
|
|
))
|
|
logging.getLogger().addHandler(handler)
|
|
|
|
level = self.config.get("log_level", "INFO")
|
|
logging.getLogger().setLevel(getattr(logging, level))
|
|
|
|
def _broadcast_event(self, event_type: str, data: dict):
|
|
"""Broadcast an event to dashboard clients."""
|
|
try:
|
|
from api_server import broadcaster
|
|
broadcaster.broadcast(event_type, data)
|
|
except ImportError:
|
|
pass # Dashboard API not available
|
|
except Exception as e:
|
|
logger.debug(f"Failed to broadcast event: {e}")
|
|
|
|
def _get_youtrack_client_for_task(self, task_type: str) -> "YouTrackClient":
|
|
"""Get the appropriate YouTrack client for a task type."""
|
|
if task_type == "verification" and self.youtrack_qa:
|
|
return self.youtrack_qa
|
|
elif task_type == "librarian" and self.youtrack_librarian:
|
|
return self.youtrack_librarian
|
|
elif task_type == "remediation" and self.youtrack_developer:
|
|
return self.youtrack_developer
|
|
# Fallback to main admin client
|
|
return self.youtrack
|
|
|
|
def _log_work_item(self, task: AgentTask, duration_seconds: float):
|
|
"""Log time spent on a task to YouTrack."""
|
|
if duration_seconds <= 0:
|
|
return
|
|
|
|
duration_minutes = max(1, int(duration_seconds / 60))
|
|
work_type = AGENT_TO_WORK_TYPE.get(task.task_type, "Development")
|
|
|
|
# Determine result text
|
|
if task.timed_out:
|
|
result = "timed out"
|
|
elif task.returncode == 0:
|
|
result = "completed"
|
|
elif task.returncode == 2:
|
|
result = "skipped (work already done)"
|
|
else:
|
|
result = f"failed (exit code {task.returncode})"
|
|
|
|
text = f"Agent {task.task_type}: {result}"
|
|
|
|
# Use appropriate client for the task type
|
|
yt_client = self._get_youtrack_client_for_task(task.task_type)
|
|
yt_client.add_work_item(
|
|
issue_id=task.issue_id,
|
|
duration_minutes=duration_minutes,
|
|
work_type=work_type,
|
|
text=text,
|
|
)
|
|
|
|
def _on_agent_complete(self, task: AgentTask):
|
|
"""Called when an agent finishes."""
|
|
# Broadcast completion event to dashboard
|
|
duration = 0
|
|
if task.started_at and task.completed_at:
|
|
duration = (task.completed_at - task.started_at).total_seconds()
|
|
|
|
self._broadcast_event("agent.completed", {
|
|
"task_id": task.task_id,
|
|
"issue_id": task.issue_id,
|
|
"task_type": task.task_type,
|
|
"returncode": task.returncode,
|
|
"timed_out": task.timed_out,
|
|
"duration_seconds": round(duration, 1),
|
|
})
|
|
|
|
# Log time to YouTrack work items
|
|
self._log_work_item(task, duration)
|
|
|
|
states = self.config["project"].get("states", {})
|
|
triage_state = states.get("triage", "Triage")
|
|
build_state = states.get("build", "Build")
|
|
verify_state = states.get("verify", "Verify")
|
|
document_state = states.get("document", "Document")
|
|
review_state = states.get("review", "Review")
|
|
|
|
logger.info(f"Agent completed for issue {task.issue_id} (type: {task.task_type})")
|
|
|
|
# Handle timeout
|
|
if task.timed_out:
|
|
logger.warning(f"Task {task.task_id} timed out → {triage_state}")
|
|
|
|
timeout_mins = self.config.get("agent_timeout_seconds", 1800) // 60
|
|
comment_body = (
|
|
f"## Agent Timed Out\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Task Type:** {task.task_type}\n"
|
|
f"**Timeout:** {timeout_mins} minutes\n\n"
|
|
f"The agent exceeded the maximum allowed time and was terminated.\n\n"
|
|
f"**Possible causes:**\n"
|
|
f"- Task too complex for single agent run\n"
|
|
f"- Agent stuck in a loop\n"
|
|
f"- Unclear or ambiguous requirements\n\n"
|
|
f"Please review the issue, simplify if needed, and move back to Ready."
|
|
)
|
|
self.youtrack.add_issue_comment(task.issue_id, comment_body)
|
|
self.youtrack.update_issue_state(task.issue_id, triage_state)
|
|
|
|
elif task.returncode == 0:
|
|
# Determine next state based on task type
|
|
if task.task_type == "remediation":
|
|
target_state = build_state # Developer done → Build (CI verification)
|
|
elif task.task_type == "verification":
|
|
target_state = document_state # Verification done → Document
|
|
elif task.task_type == "librarian":
|
|
target_state = review_state # Librarian done → Review
|
|
else:
|
|
target_state = build_state # Default
|
|
|
|
# For remediation tasks, check if there are actual changes to push
|
|
if task.task_type == "remediation" and self.config.get("auto_push", True):
|
|
branch_name = f"issue/{task.issue_id}"
|
|
|
|
# Check if the feature branch exists and we're on it
|
|
branch_exists = git_branch_exists(task.work_dir, branch_name)
|
|
current_branch = git_current_branch(task.work_dir)
|
|
has_changes = branch_exists and current_branch == branch_name
|
|
|
|
if has_changes:
|
|
# There are changes to push - proceed to Build
|
|
if git_push(task.work_dir, branch=branch_name):
|
|
logger.info(f"Pushed changes for {task.task_id}")
|
|
else:
|
|
logger.error(f"Failed to push changes for {task.task_id}, cannot proceed to Build")
|
|
self.youtrack.add_issue_comment(
|
|
task.issue_id,
|
|
f"## Push Failed\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Task:** {task.task_id}\n\n"
|
|
f"The agent completed successfully but failed to push changes to the remote repository. "
|
|
f"This prevents CI from running. Please check the branch state and push manually, "
|
|
f"or investigate the push failure."
|
|
)
|
|
self.youtrack.update_issue_state(task.issue_id, triage_state)
|
|
return # Don't proceed to Build state
|
|
else:
|
|
# No changes - agent determined work was already done
|
|
# Skip Build and go directly to Verify
|
|
logger.info(f"No changes for {task.task_id} (branch: {current_branch}, exists: {branch_exists}) - skipping Build, going to Verify")
|
|
target_state = verify_state
|
|
|
|
logger.info(f"Task {task.task_id} ({task.task_type}) succeeded → {target_state}")
|
|
|
|
# Update YouTrack issue state
|
|
if not self.youtrack.update_issue_state(task.issue_id, target_state):
|
|
logger.error(f"Failed to update status for {task.task_id}")
|
|
|
|
# Push changes for non-remediation tasks (verification, librarian)
|
|
if task.task_type != "remediation" and self.config.get("auto_push", True):
|
|
branch_name = f"issue/{task.issue_id}"
|
|
if git_push(task.work_dir, branch=branch_name):
|
|
logger.info(f"Pushed changes for {task.task_id}")
|
|
else:
|
|
logger.warning(f"Failed to push changes for {task.task_id}")
|
|
|
|
# Merge to main after verification succeeds
|
|
if task.task_type == "verification":
|
|
# Secondary check: parse stdout for explicit failure markers
|
|
# The QA agent may exit with code 0 but still report verification failure
|
|
stdout_passed, detection_reason = _detect_verification_result(task.stdout or "")
|
|
logger.info(f"Verification result for {task.issue_id}: exit_code=0, stdout_check={stdout_passed} ({detection_reason})")
|
|
|
|
if stdout_passed:
|
|
if not self._merge_feature_branch(task):
|
|
# Merge failed - move to Triage for human intervention
|
|
logger.warning(f"Merge failed for {task.issue_id}, moving to {triage_state}")
|
|
self.youtrack.update_issue_state(task.issue_id, triage_state)
|
|
else:
|
|
logger.warning(f"Blocking merge for {task.issue_id}: stdout indicates failure despite exit code 0")
|
|
# Add failure comment and move to triage
|
|
self.youtrack_qa.add_issue_comment(
|
|
task.issue_id,
|
|
f"## Merge Blocked\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Reason:** Verification output indicates failure despite exit code 0\n"
|
|
f"**Detection:** {detection_reason}\n\n"
|
|
f"The QA agent's output contains failure markers. Please review the verification "
|
|
f"output and address the issues before the branch can be merged."
|
|
)
|
|
self.youtrack.update_issue_state(task.issue_id, triage_state)
|
|
|
|
elif task.returncode == 2:
|
|
# Exit code 2: "Nothing to do" - work already complete
|
|
# Skip intermediate steps and go directly to Review
|
|
logger.info(f"Task {task.task_id} ({task.task_type}) found work already complete → {review_state}")
|
|
|
|
# Update YouTrack issue state - skip to Review
|
|
if not self.youtrack.update_issue_state(task.issue_id, review_state):
|
|
logger.error(f"Failed to update status for {task.task_id}")
|
|
|
|
# Push any changes (unlikely but safe)
|
|
if self.config.get("auto_push", True):
|
|
branch_name = f"issue/{task.issue_id}"
|
|
if git_push(task.work_dir, branch=branch_name):
|
|
logger.info(f"Pushed changes for {task.task_id}")
|
|
|
|
else:
|
|
logger.warning(f"Task {task.task_id} failed (rc={task.returncode}) → {triage_state}")
|
|
|
|
# Add failure comment to YouTrack
|
|
comment_body = (
|
|
f"## Agent Run Failed\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Task Type:** {task.task_type}\n"
|
|
f"**Exit Code:** {task.returncode}\n\n"
|
|
f"The agent encountered an error and needs human review.\n"
|
|
f"Please add a comment with clarification and move back to Ready.\n\n"
|
|
f"```\n{task.stderr[:1000] if task.stderr else 'No stderr'}\n```"
|
|
)
|
|
self.youtrack.add_issue_comment(task.issue_id, comment_body)
|
|
|
|
# Move to Triage for human review
|
|
self.youtrack.update_issue_state(task.issue_id, triage_state)
|
|
|
|
# Remove from seen set so it can be picked up again if moved back to Ready
|
|
self._seen_items.discard(task.task_id)
|
|
|
|
def _get_repo_for_issue(self, issue: YouTrackIssue) -> Optional[dict]:
|
|
"""
|
|
Determine which repository an issue belongs to.
|
|
|
|
Uses custom field 'Repository' or falls back to project mapping.
|
|
"""
|
|
repos = self.config.get("repos", {})
|
|
|
|
# Check for Repository custom field
|
|
repo_field = issue.custom_fields.get("Repository", "")
|
|
if repo_field:
|
|
for name, info in repos.items():
|
|
if info.get("name") == repo_field or name == repo_field.lower():
|
|
return info
|
|
|
|
# Fallback: check issue ID prefix or project
|
|
project = issue.project_id.lower()
|
|
for name, info in repos.items():
|
|
if name == project or info.get("project") == project:
|
|
return info
|
|
|
|
# Last resort: return first repo
|
|
if repos:
|
|
return list(repos.values())[0]
|
|
|
|
return None
|
|
|
|
def _process_item(self, issue: YouTrackIssue, task_type: str = "remediation") -> bool:
|
|
"""
|
|
Process a single issue.
|
|
Returns True if agent was spawned.
|
|
"""
|
|
task_id = f"{issue.id}:{task_type}"
|
|
|
|
# Quick check against local seen set (fast path, no lock needed)
|
|
# Note: The actual atomic check happens in agent_pool.submit()
|
|
if task_id in self._seen_items:
|
|
return False
|
|
|
|
logger.info(f"Processing ({task_type}): {issue.id} - {issue.summary}")
|
|
|
|
# Get repository info
|
|
repo_info = self._get_repo_for_issue(issue)
|
|
if not repo_info:
|
|
logger.error(f"No repository configured for issue {issue.id}")
|
|
self.youtrack.add_issue_comment(
|
|
issue.id,
|
|
"## Agent Error\n\nNo repository configured for this issue."
|
|
)
|
|
return False
|
|
|
|
repo_name = repo_info.get("name", "unknown")
|
|
work_dir = repo_info.get("path", f"/opt/repos/{repo_name}")
|
|
platform = repo_info.get("platform", repo_name)
|
|
|
|
# Update status to In Progress (for remediation tasks)
|
|
if task_type == "remediation":
|
|
if not self.youtrack.update_issue_state(issue.id, "In Progress"):
|
|
logger.error(f"Failed to set In Progress for {task_id}")
|
|
return False
|
|
|
|
# Check work dir exists
|
|
if not Path(work_dir).exists():
|
|
logger.error(f"Work directory does not exist: {work_dir}")
|
|
self.youtrack.add_issue_comment(
|
|
issue.id,
|
|
f"## Agent Error\n\nWork directory not found: `{work_dir}`"
|
|
)
|
|
self.youtrack.update_issue_state(issue.id, "Backlog")
|
|
return False
|
|
|
|
# Get comments from YouTrack
|
|
comments = self.youtrack.get_issue_comments(issue.id)
|
|
comments_list = [
|
|
{
|
|
"body": c.text,
|
|
"author": {"login": c.author},
|
|
"createdAt": datetime.fromtimestamp(c.created / 1000).isoformat() if c.created else "",
|
|
}
|
|
for c in comments
|
|
]
|
|
|
|
# Determine prompt template
|
|
prompts_dir = Path(self.config.get("prompts_dir", "prompts"))
|
|
if task_type == "librarian":
|
|
template_path = prompts_dir / "librarian.md"
|
|
elif task_type == "verification":
|
|
template_path = prompts_dir / "verification.md"
|
|
else:
|
|
template_path = prompts_dir / "remediation.md"
|
|
|
|
# Build prompt
|
|
prompt = build_prompt(
|
|
issue_number=issue.issue_number,
|
|
issue_id=issue.id,
|
|
repo=repo_name,
|
|
platform=platform,
|
|
issue_body=issue.description,
|
|
comments=comments_list,
|
|
template_path=template_path if template_path.exists() else None,
|
|
task_type=task_type,
|
|
)
|
|
|
|
# Create task
|
|
task = AgentTask(
|
|
task_id=task_id,
|
|
issue_number=issue.issue_number,
|
|
issue_id=issue.id,
|
|
repo=repo_name,
|
|
platform=platform,
|
|
work_dir=work_dir,
|
|
prompt=prompt,
|
|
task_type=task_type,
|
|
)
|
|
|
|
# Submit to pool
|
|
if self.agent_pool.submit(task):
|
|
self._seen_items.add(task_id)
|
|
|
|
# Broadcast agent started event
|
|
self._broadcast_event("agent.started", {
|
|
"task_id": task_id,
|
|
"issue_id": issue.id,
|
|
"repo": repo_name,
|
|
"platform": platform,
|
|
"task_type": task_type,
|
|
})
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def _get_build_type_for_repo(self, repo_name: str) -> Optional[str]:
|
|
"""Map repository name to TeamCity build type ID."""
|
|
# Use cached build_types mapping (initialized in __init__)
|
|
return self._build_types.get(repo_name.lower())
|
|
|
|
def _merge_feature_branch(self, task: AgentTask) -> bool:
|
|
"""
|
|
Merge the feature branch to main after verification succeeds.
|
|
|
|
Args:
|
|
task: The completed verification task
|
|
|
|
Returns:
|
|
True if merge succeeded, False otherwise
|
|
"""
|
|
feature_branch = f"issue/{task.issue_id}"
|
|
|
|
success, message = git_merge_to_main(task.work_dir, feature_branch, delete_after=True)
|
|
|
|
if success:
|
|
logger.info(f"Merged {feature_branch} to main for {task.issue_id}")
|
|
self.youtrack_qa.add_issue_comment(
|
|
task.issue_id,
|
|
f"## Branch Merged\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Branch:** `{feature_branch}`\n"
|
|
f"**Target:** `main`\n\n"
|
|
f"Feature branch has been merged to main and deleted."
|
|
)
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to merge {feature_branch} for {task.issue_id}: {message}")
|
|
self.youtrack_qa.add_issue_comment(
|
|
task.issue_id,
|
|
f"## Merge Failed\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Branch:** `{feature_branch}`\n"
|
|
f"**Error:** {message}\n\n"
|
|
f"Please merge manually or investigate the conflict."
|
|
)
|
|
return False
|
|
|
|
def _check_build_status(self, issue: YouTrackIssue) -> Optional[str]:
|
|
"""
|
|
Check build status for an issue's feature branch.
|
|
|
|
Returns:
|
|
"SUCCESS" - Build passed, ready for verification
|
|
"FAILURE" - Build failed, needs developer fix
|
|
"RUNNING" - Build still in progress
|
|
"PENDING" - Build waiting in queue
|
|
"NO_CI" - Repository has no CI configured (skip Build state)
|
|
None - No build found yet (waiting for Woodpecker)
|
|
"""
|
|
if not self.woodpecker:
|
|
return "NO_CI"
|
|
|
|
repo_info = self._get_repo_for_issue(issue)
|
|
if not repo_info:
|
|
return "NO_CI"
|
|
|
|
repo_name = repo_info.get("name", "").split("/")[-1]
|
|
build_type = self._get_build_type_for_repo(repo_name)
|
|
if not build_type:
|
|
logger.info(f"No CI configured for repo {repo_name}, skipping Build state")
|
|
return "NO_CI"
|
|
|
|
# Feature branch name: issue/CG-XX
|
|
branch = f"issue/{issue.id}"
|
|
|
|
# First check for queued/pending builds
|
|
queued = self.woodpecker.get_queued_builds(build_type)
|
|
for q in queued:
|
|
if q.get('branchName') == branch:
|
|
logger.debug(f"Build pending for {branch}")
|
|
return "PENDING"
|
|
|
|
# Then check for running builds
|
|
running = self.woodpecker.get_running_builds(build_type)
|
|
for r in running:
|
|
if r.branch == branch:
|
|
logger.debug(f"Build running for {branch}")
|
|
return "RUNNING"
|
|
|
|
# Finally check completed builds
|
|
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1)
|
|
if not builds:
|
|
logger.debug(f"No builds found for {branch} in {build_type}")
|
|
return None
|
|
|
|
return builds[0].status
|
|
|
|
def _process_build_items(self):
|
|
"""Process items in Build state - check CI status."""
|
|
project = self.config["project"]["name"]
|
|
states = self.config["project"].get("states", {})
|
|
build_state = states.get("build", "Build")
|
|
verify_state = states.get("verify", "Verify")
|
|
ready_state = states.get("ready", "Ready")
|
|
|
|
# Note: Even without Woodpecker, we still need to process Build items
|
|
# to skip repos without CI to Verify state
|
|
|
|
try:
|
|
build_issues = self.youtrack.get_issues_by_state(project, build_state)
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to poll for Build issues - connection error: {e}")
|
|
return
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to poll for Build issues - timeout: {e}")
|
|
return
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to poll for Build issues: {e}")
|
|
return
|
|
|
|
if not build_issues:
|
|
return
|
|
|
|
logger.info(f"Checking {len(build_issues)} issues in Build state")
|
|
|
|
for issue in build_issues:
|
|
try:
|
|
status = self._check_build_status(issue)
|
|
|
|
if status == "SUCCESS":
|
|
# Build passed - move to Verify
|
|
logger.info(f"Build SUCCESS for {issue.id} → {verify_state}")
|
|
|
|
# Get build info for comment
|
|
repo_info = self._get_repo_for_issue(issue)
|
|
repo_name = repo_info.get("name", "").split("/")[-1] if repo_info else "unknown"
|
|
build_type = self._get_build_type_for_repo(repo_name)
|
|
branch = f"issue/{issue.id}"
|
|
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1) if build_type else []
|
|
|
|
comment = (
|
|
f"## Build Verification Passed\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Status:** ✓ SUCCESS\n"
|
|
f"**Branch:** `{branch}`\n"
|
|
)
|
|
if builds:
|
|
comment += f"**Build URL:** {builds[0].web_url}\n"
|
|
comment += "\nThe feature branch build completed successfully. Moving to code verification."
|
|
|
|
self.youtrack_build.add_issue_comment(issue.id, comment)
|
|
self.youtrack.update_issue_state(issue.id, verify_state)
|
|
|
|
elif status == "FAILURE":
|
|
# Build failed - move back to Ready for developer to fix
|
|
logger.warning(f"Build FAILURE for {issue.id} → {ready_state}")
|
|
|
|
# Get build log excerpt
|
|
repo_info = self._get_repo_for_issue(issue)
|
|
repo_name = repo_info.get("name", "").split("/")[-1] if repo_info else "unknown"
|
|
build_type = self._get_build_type_for_repo(repo_name)
|
|
branch = f"issue/{issue.id}"
|
|
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1) if build_type else []
|
|
|
|
log_excerpt = ""
|
|
web_url = ""
|
|
if builds:
|
|
web_url = builds[0].web_url
|
|
log_excerpt = self.woodpecker.get_build_log_excerpt(build_type, builds[0].build_id, lines=50)
|
|
|
|
comment = (
|
|
f"## Build Verification Failed\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Status:** ✗ FAILURE\n"
|
|
f"**Branch:** `{branch}`\n"
|
|
)
|
|
if web_url:
|
|
comment += f"**Build URL:** {web_url}\n"
|
|
comment += (
|
|
f"\n### Build Log (last 50 lines)\n"
|
|
f"```\n{log_excerpt[:2000]}\n```\n\n"
|
|
f"Please fix the build errors and the issue will be automatically reprocessed."
|
|
)
|
|
|
|
self.youtrack_build.add_issue_comment(issue.id, comment)
|
|
self.youtrack.update_issue_state(issue.id, ready_state)
|
|
|
|
elif status in ("RUNNING", "PENDING"):
|
|
logger.debug(f"Build {status} for {issue.id}, will check again")
|
|
|
|
elif status == "NO_CI":
|
|
# Repository has no CI configured - skip Build state, go to Verify
|
|
logger.info(f"No CI for {issue.id}, skipping Build → {verify_state}")
|
|
|
|
comment = (
|
|
f"## Build Step Skipped\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Reason:** No CI/CD configured for this repository\n\n"
|
|
f"Moving directly to code verification."
|
|
)
|
|
self.youtrack_build.add_issue_comment(issue.id, comment)
|
|
self.youtrack.update_issue_state(issue.id, verify_state)
|
|
|
|
else:
|
|
# No build yet - might need to wait for Woodpecker to pick it up
|
|
# But if waiting too long, the push probably failed - move to Triage
|
|
triage_state = states.get("triage", "Triage")
|
|
|
|
# Check how long issue has been in Build state
|
|
# If updated more than 10 minutes ago with no pipeline, something is wrong
|
|
updated = issue.updated
|
|
if updated:
|
|
try:
|
|
# YouTrack timestamps are in milliseconds
|
|
updated_time = datetime.fromtimestamp(updated / 1000)
|
|
wait_time = (datetime.now() - updated_time).total_seconds()
|
|
|
|
if wait_time > 600: # 10 minutes
|
|
logger.warning(f"No build found for {issue.id} after {wait_time/60:.1f} minutes, moving to Triage")
|
|
|
|
comment = (
|
|
f"## Build Not Found\n\n"
|
|
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
|
|
f"**Wait Time:** {wait_time/60:.1f} minutes\n\n"
|
|
f"No CI pipeline was found for branch `issue/{issue.id}`. This usually means:\n"
|
|
f"- The branch was never pushed to the remote\n"
|
|
f"- The push failed after the agent completed\n"
|
|
f"- Woodpecker CI is not configured for this branch pattern\n\n"
|
|
f"Please check the branch state and push manually if needed."
|
|
)
|
|
self.youtrack_build.add_issue_comment(issue.id, comment)
|
|
self.youtrack.update_issue_state(issue.id, triage_state)
|
|
else:
|
|
logger.debug(f"No build status for {issue.id}, waiting ({wait_time/60:.1f} min)")
|
|
except (ValueError, TypeError) as e:
|
|
logger.debug(f"No build status for {issue.id}, could not check age: {e}")
|
|
else:
|
|
logger.debug(f"No build status for {issue.id}")
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Error checking build for {issue.id} - connection error: {e}")
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Error checking build for {issue.id} - timeout: {e}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error checking build for {issue.id}: {e}")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"Error checking build for {issue.id} - data error: {e}")
|
|
|
|
def _poll_cycle(self):
|
|
"""Execute one polling cycle."""
|
|
project = self.config["project"]["name"]
|
|
states = self.config["project"].get("states", {})
|
|
|
|
ready_state = states.get("ready", "Ready")
|
|
verify_state = states.get("verify", "Verify")
|
|
document_state = states.get("document", "Document")
|
|
|
|
# Get Ready items (developer agent queue)
|
|
try:
|
|
ready_issues = self.youtrack.get_issues_by_state(project, ready_state)
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to poll for Ready issues - connection error: {e}")
|
|
ready_issues = []
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to poll for Ready issues - timeout: {e}")
|
|
ready_issues = []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to poll for Ready issues: {e}")
|
|
ready_issues = []
|
|
|
|
# Get Verify items (verification agent queue)
|
|
try:
|
|
verify_issues = self.youtrack.get_issues_by_state(project, verify_state)
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to poll for Verify issues - connection error: {e}")
|
|
verify_issues = []
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to poll for Verify issues - timeout: {e}")
|
|
verify_issues = []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to poll for Verify issues: {e}")
|
|
verify_issues = []
|
|
|
|
# Get Document items (librarian agent queue)
|
|
try:
|
|
document_issues = self.youtrack.get_issues_by_state(project, document_state)
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to poll for Document issues - connection error: {e}")
|
|
document_issues = []
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to poll for Document issues - timeout: {e}")
|
|
document_issues = []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to poll for Document issues: {e}")
|
|
document_issues = []
|
|
|
|
total_items = len(ready_issues) + len(verify_issues) + len(document_issues)
|
|
if total_items > 0:
|
|
logger.info(f"Found {len(ready_issues)} Ready, {len(verify_issues)} Verify, "
|
|
f"{len(document_issues)} Document, "
|
|
f"pool has {self.agent_pool.active_count}/{self.agent_pool.max_agents} active")
|
|
|
|
# Process developer tasks (Ready → In Progress → Verify)
|
|
for issue in ready_issues:
|
|
if not self.agent_pool.has_capacity:
|
|
logger.debug("Pool at capacity, waiting for next cycle")
|
|
break
|
|
try:
|
|
self._process_item(issue, task_type="remediation")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"Error processing {issue.id} - data error: {e}")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Error processing {issue.id} - I/O error: {e}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error processing {issue.id} - request error: {e}")
|
|
|
|
# Process verification tasks (Verify → Document)
|
|
for issue in verify_issues:
|
|
if not self.agent_pool.has_capacity:
|
|
logger.debug("Pool at capacity, waiting for next cycle")
|
|
break
|
|
try:
|
|
self._process_item(issue, task_type="verification")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"Error processing {issue.id} - data error: {e}")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Error processing {issue.id} - I/O error: {e}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error processing {issue.id} - request error: {e}")
|
|
|
|
# Process librarian tasks (Document → Review)
|
|
for issue in document_issues:
|
|
if not self.agent_pool.has_capacity:
|
|
logger.debug("Pool at capacity, waiting for next cycle")
|
|
break
|
|
try:
|
|
self._process_item(issue, task_type="librarian")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"Error processing {issue.id} - data error: {e}")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Error processing {issue.id} - I/O error: {e}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error processing {issue.id} - request error: {e}")
|
|
|
|
# Check build status for items in Build state
|
|
self._process_build_items()
|
|
|
|
def run(self):
|
|
"""Main run loop."""
|
|
self._setup_logging()
|
|
logger.info("ClearGrow Agent Runner starting...")
|
|
|
|
# Initialize health monitor
|
|
self.health = SystemHealth(self.config)
|
|
|
|
# Initialize YouTrack client
|
|
self.youtrack = load_youtrack_config(self.config)
|
|
if not self.youtrack:
|
|
logger.error("Failed to initialize YouTrack client")
|
|
sys.exit(1)
|
|
|
|
# Test YouTrack connection
|
|
yt_status = self.youtrack.test_connection()
|
|
if yt_status.get("status") != "ok":
|
|
logger.error(f"YouTrack connection failed: {yt_status.get('message')}")
|
|
sys.exit(1)
|
|
logger.info(f"YouTrack connected as: {yt_status.get('user', {}).get('login', 'unknown')}")
|
|
|
|
# Initialize YouTrack build agent client (for build-related comments)
|
|
agent_tokens = self.config.get("agent_tokens", {})
|
|
build_token = agent_tokens.get("build")
|
|
if build_token:
|
|
yt_config = self.config.get("youtrack", {})
|
|
self.youtrack_build = YouTrackClient(yt_config.get("base_url"), build_token)
|
|
build_status = self.youtrack_build.test_connection()
|
|
if build_status.get("status") == "ok":
|
|
logger.info(f"YouTrack build agent connected as: {build_status.get('user', {}).get('login', 'build')}")
|
|
else:
|
|
logger.warning("YouTrack build agent connection failed, using admin token for build comments")
|
|
self.youtrack_build = self.youtrack
|
|
else:
|
|
logger.info("No build agent token configured, using admin token for build comments")
|
|
self.youtrack_build = self.youtrack
|
|
|
|
# Initialize YouTrack QA agent client (for QA/verification-related comments)
|
|
qa_token = agent_tokens.get("qa")
|
|
if qa_token:
|
|
yt_config = self.config.get("youtrack", {})
|
|
self.youtrack_qa = YouTrackClient(yt_config.get("base_url"), qa_token)
|
|
qa_status = self.youtrack_qa.test_connection()
|
|
if qa_status.get("status") == "ok":
|
|
logger.info(f"YouTrack QA agent connected as: {qa_status.get('user', {}).get('login', 'qa')}")
|
|
else:
|
|
logger.warning("YouTrack QA agent connection failed, using admin token for QA comments")
|
|
self.youtrack_qa = self.youtrack
|
|
else:
|
|
logger.info("No QA agent token configured, using admin token for QA comments")
|
|
self.youtrack_qa = self.youtrack
|
|
|
|
# Initialize YouTrack developer agent client (for developer/remediation work items)
|
|
developer_token = agent_tokens.get("developer")
|
|
if developer_token:
|
|
yt_config = self.config.get("youtrack", {})
|
|
self.youtrack_developer = YouTrackClient(yt_config.get("base_url"), developer_token)
|
|
dev_status = self.youtrack_developer.test_connection()
|
|
if dev_status.get("status") == "ok":
|
|
logger.info(f"YouTrack developer agent connected as: {dev_status.get('user', {}).get('login', 'developer')}")
|
|
else:
|
|
logger.warning("YouTrack developer agent connection failed, using admin token")
|
|
self.youtrack_developer = self.youtrack
|
|
else:
|
|
logger.info("No developer agent token configured, using admin token")
|
|
self.youtrack_developer = self.youtrack
|
|
|
|
# Initialize YouTrack librarian agent client (for documentation work items)
|
|
librarian_token = agent_tokens.get("librarian")
|
|
if librarian_token:
|
|
yt_config = self.config.get("youtrack", {})
|
|
self.youtrack_librarian = YouTrackClient(yt_config.get("base_url"), librarian_token)
|
|
lib_status = self.youtrack_librarian.test_connection()
|
|
if lib_status.get("status") == "ok":
|
|
logger.info(f"YouTrack librarian agent connected as: {lib_status.get('user', {}).get('login', 'librarian')}")
|
|
else:
|
|
logger.warning("YouTrack librarian agent connection failed, using admin token")
|
|
self.youtrack_librarian = self.youtrack
|
|
else:
|
|
logger.info("No librarian agent token configured, using admin token")
|
|
self.youtrack_librarian = self.youtrack
|
|
|
|
# Initialize Gitea client (optional - for comments)
|
|
self.gitea = load_gitea_config(self.config)
|
|
if self.gitea:
|
|
gitea_status = self.gitea.test_connection()
|
|
if gitea_status.get("status") == "ok":
|
|
logger.info(f"Gitea connected: {gitea_status.get('version')} as {gitea_status.get('user')}")
|
|
else:
|
|
logger.warning(f"Gitea connection failed: {gitea_status.get('message')}")
|
|
else:
|
|
logger.info("Gitea not configured (comments will only go to YouTrack)")
|
|
|
|
# Initialize Woodpecker client (optional - for build verification)
|
|
wp_config = self.config.get("woodpecker", {})
|
|
if wp_config.get("base_url") and wp_config.get("token"):
|
|
self.woodpecker = WoodpeckerClient(
|
|
base_url=wp_config["base_url"],
|
|
token=wp_config["token"]
|
|
)
|
|
if self.woodpecker.test_connection():
|
|
logger.info(f"Woodpecker connected: {wp_config['base_url']}")
|
|
else:
|
|
logger.warning("Woodpecker connection failed - build verification disabled")
|
|
self.woodpecker = None
|
|
else:
|
|
logger.info("Woodpecker not configured (build verification disabled)")
|
|
|
|
# Initial health check
|
|
logger.info("Running initial health check...")
|
|
self.health.check_all(force=True)
|
|
if not self.health.is_healthy:
|
|
logger.warning("Some services unhealthy at startup, waiting...")
|
|
if not self.health.wait_for_healthy(timeout=120):
|
|
logger.error("Services did not become healthy, starting anyway...")
|
|
|
|
# Initialize agent pool
|
|
claude_config = self.config.get("claude", {})
|
|
timeout = self.config.get("agent_timeout_seconds", DEFAULT_AGENT_TIMEOUT)
|
|
self.agent_pool = AgentPool(
|
|
max_agents=self.config.get("max_parallel_agents", DEFAULT_MAX_PARALLEL_AGENTS),
|
|
claude_command=claude_config.get("command", "claude"),
|
|
claude_flags=claude_config.get("flags", []),
|
|
on_complete=self._on_agent_complete,
|
|
timeout_seconds=timeout,
|
|
)
|
|
self.agent_pool.start()
|
|
|
|
logger.info(f"Agent pool started (max={self.agent_pool.max_agents}, timeout={timeout}s)")
|
|
|
|
# Initialize OAuth if configured
|
|
oauth = None
|
|
oauth_config = self.config.get("oauth")
|
|
if oauth_config and oauth_config.get("client_id"):
|
|
try:
|
|
from oauth import create_oauth_from_config
|
|
oauth = create_oauth_from_config(self.config)
|
|
if oauth:
|
|
logger.info("OAuth authentication enabled")
|
|
except ImportError as e:
|
|
logger.warning(f"OAuth module not available: {e}")
|
|
|
|
# Initialize webhook server with dashboard API (optional)
|
|
webhook_config = self.config.get("webhook", {})
|
|
if webhook_config.get("enabled", False):
|
|
self.webhook_server = WebhookServer(
|
|
host=webhook_config.get("host", "0.0.0.0"),
|
|
port=webhook_config.get("port", 8765),
|
|
secret=webhook_config.get("secret"),
|
|
on_event=self._on_webhook_event,
|
|
runner=self, # Pass runner for dashboard API
|
|
oauth=oauth, # Pass OAuth handler
|
|
)
|
|
self.webhook_server.start()
|
|
logger.info(f"Webhook server started on {webhook_config.get('host', '0.0.0.0')}:{webhook_config.get('port', 8765)}")
|
|
if oauth:
|
|
logger.info("Dashboard API available with Gitea OAuth authentication")
|
|
|
|
# Start internal API server for MCP tools
|
|
self.internal_api = InternalAPIServer(self)
|
|
self.internal_api.start()
|
|
|
|
poll_interval = self.config.get("poll_interval_seconds", DEFAULT_POLL_INTERVAL)
|
|
backoff_interval = poll_interval
|
|
max_backoff = MAX_BACKOFF_INTERVAL
|
|
|
|
mode = "webhook + polling" if self.webhook_server else "polling"
|
|
logger.info(f"Starting main loop (mode={mode}, poll_interval={poll_interval}s)")
|
|
|
|
# Setup signal handlers
|
|
def handle_shutdown(signum, frame):
|
|
logger.info("Shutdown signal received")
|
|
self._shutdown = True
|
|
self._shutdown_event.set()
|
|
self._wake_event.set() # Wake up main loop immediately
|
|
|
|
signal.signal(signal.SIGINT, handle_shutdown)
|
|
signal.signal(signal.SIGTERM, handle_shutdown)
|
|
|
|
# Main loop
|
|
while not self._shutdown:
|
|
try:
|
|
# Periodic health check
|
|
self.health.check_all()
|
|
|
|
if not self.health.can_process:
|
|
logger.warning("Services unhealthy, skipping poll cycle")
|
|
self._consecutive_failures += 1
|
|
|
|
if self._consecutive_failures >= self._max_consecutive_failures:
|
|
logger.error(f"Too many consecutive failures ({self._consecutive_failures}), "
|
|
"waiting for recovery...")
|
|
self.health.wait_for_healthy(timeout=300)
|
|
self._consecutive_failures = 0
|
|
else:
|
|
# Process any webhook events first (instant response)
|
|
self._process_webhook_events()
|
|
|
|
# Then do regular poll cycle
|
|
self._poll_cycle()
|
|
self._consecutive_failures = 0
|
|
backoff_interval = poll_interval # Reset on success
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Error in poll cycle: {e}")
|
|
self._consecutive_failures += 1
|
|
backoff_interval = min(backoff_interval * 2, max_backoff)
|
|
logger.warning(f"Backing off to {backoff_interval}s")
|
|
|
|
# Use event-based wait instead of sleep loop for faster response
|
|
# Wake up every second to process webhooks, or immediately on shutdown/wake signal
|
|
wait_until = time.monotonic() + backoff_interval
|
|
while not self._shutdown and time.monotonic() < wait_until:
|
|
# Wait for up to 1 second, or until woken by webhook/shutdown
|
|
remaining = min(1.0, wait_until - time.monotonic())
|
|
if remaining > 0:
|
|
self._wake_event.wait(timeout=remaining)
|
|
self._wake_event.clear()
|
|
# Process webhooks during wait
|
|
self._process_webhook_events()
|
|
|
|
logger.info("Shutting down...")
|
|
if self.internal_api:
|
|
self.internal_api.stop()
|
|
if self.webhook_server:
|
|
self.webhook_server.stop()
|
|
self.agent_pool.stop()
|
|
# Close API client sessions
|
|
if self.youtrack:
|
|
self.youtrack.close()
|
|
if self.youtrack_build and self.youtrack_build is not self.youtrack:
|
|
self.youtrack_build.close()
|
|
if self.youtrack_qa and self.youtrack_qa is not self.youtrack:
|
|
self.youtrack_qa.close()
|
|
if self.youtrack_developer and self.youtrack_developer is not self.youtrack:
|
|
self.youtrack_developer.close()
|
|
if self.youtrack_librarian and self.youtrack_librarian is not self.youtrack:
|
|
self.youtrack_librarian.close()
|
|
if self.gitea:
|
|
self.gitea.close()
|
|
if self.woodpecker:
|
|
self.woodpecker.close()
|
|
logger.info("Runner stopped")
|
|
|
|
def _on_webhook_event(self, event: WebhookEvent):
|
|
"""Handle webhook event from YouTrack."""
|
|
# Queue the event for processing in main loop
|
|
try:
|
|
self._webhook_queue.put_nowait(event)
|
|
logger.debug(f"Queued webhook event: {event.event_type} for {event.issue_id}")
|
|
# Wake up main loop to process webhook immediately
|
|
self._wake_event.set()
|
|
except Exception:
|
|
# Queue is full - log warning but don't block webhook response
|
|
logger.warning(f"Webhook queue full, dropping event for {event.issue_id}. "
|
|
f"Event will be picked up in next poll cycle.")
|
|
|
|
def _process_webhook_events(self):
|
|
"""Process any pending webhook events."""
|
|
states = self.config["project"].get("states", {})
|
|
ready_state = states.get("ready", "Ready")
|
|
verify_state = states.get("verify", "Verify")
|
|
document_state = states.get("document", "Document")
|
|
project = self.config["project"]["name"]
|
|
|
|
# Process all queued events
|
|
processed = 0
|
|
while not self._webhook_queue.empty():
|
|
try:
|
|
event = self._webhook_queue.get_nowait()
|
|
except Empty:
|
|
break
|
|
|
|
# Skip if not our project
|
|
if event.project != project:
|
|
continue
|
|
|
|
# Only process state changes to actionable states
|
|
if event.new_state not in [ready_state, verify_state, document_state]:
|
|
continue
|
|
|
|
logger.info(f"Webhook: {event.issue_id} moved to {event.new_state}")
|
|
|
|
# Fetch the full issue from YouTrack
|
|
try:
|
|
issue = self.youtrack.get_issue(event.issue_id)
|
|
if not issue:
|
|
logger.warning(f"Could not fetch issue {event.issue_id}")
|
|
continue
|
|
|
|
# Determine task type based on state
|
|
if event.new_state == ready_state:
|
|
task_type = "remediation"
|
|
elif event.new_state == verify_state:
|
|
task_type = "verification"
|
|
elif event.new_state == document_state:
|
|
task_type = "librarian"
|
|
else:
|
|
continue
|
|
|
|
# Process immediately if we have capacity
|
|
if self.agent_pool.has_capacity:
|
|
self._process_item(issue, task_type=task_type)
|
|
processed += 1
|
|
else:
|
|
logger.debug(f"Pool at capacity, {event.issue_id} will be picked up in next poll")
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Error processing webhook for {event.issue_id} - connection error: {e}")
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Error processing webhook for {event.issue_id} - timeout: {e}")
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error processing webhook for {event.issue_id}: {e}")
|
|
except (KeyError, ValueError, TypeError) as e:
|
|
logger.error(f"Error processing webhook for {event.issue_id} - data error: {e}")
|
|
|
|
if processed > 0:
|
|
logger.info(f"Processed {processed} webhook events")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="ClearGrow Agent Runner")
|
|
parser.add_argument(
|
|
"-c", "--config",
|
|
default="config.yaml",
|
|
help="Path to config file (default: config.yaml)"
|
|
)
|
|
parser.add_argument(
|
|
"--once",
|
|
action="store_true",
|
|
help="Run one poll cycle and exit"
|
|
)
|
|
parser.add_argument(
|
|
"--status",
|
|
action="store_true",
|
|
help="Print current project status and exit"
|
|
)
|
|
parser.add_argument(
|
|
"--test-connection",
|
|
action="store_true",
|
|
help="Test connections to YouTrack and Gitea"
|
|
)
|
|
parser.add_argument(
|
|
"--health",
|
|
action="store_true",
|
|
help="Check platform health (YouTrack, Gitea, TeamCity)"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
config_path = Path(args.config)
|
|
if not config_path.exists():
|
|
print(f"Config file not found: {config_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
with open(config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
if args.health:
|
|
print("\n=== Platform Health Check ===\n")
|
|
health = SystemHealth(config)
|
|
status = health.check_all(force=True)
|
|
|
|
for service, healthy in status.items():
|
|
svc_config = config.get(service, {})
|
|
if svc_config.get("base_url"):
|
|
icon = "✓" if healthy else "✗"
|
|
state = "healthy" if healthy else "UNHEALTHY"
|
|
print(f"{icon} {service.title()}: {state}")
|
|
print(f" URL: {svc_config.get('base_url')}")
|
|
else:
|
|
print(f"○ {service.title()}: not configured")
|
|
|
|
print()
|
|
if health.can_process:
|
|
print("Status: Ready to process tasks")
|
|
else:
|
|
print("Status: NOT READY - critical services unhealthy")
|
|
print()
|
|
return
|
|
|
|
if args.test_connection:
|
|
print("\n=== Testing Connections ===\n")
|
|
|
|
# Test YouTrack
|
|
yt = load_youtrack_config(config)
|
|
if yt:
|
|
result = yt.test_connection()
|
|
if result.get("status") == "ok":
|
|
user = result.get("user", {})
|
|
print(f"✓ YouTrack: Connected as {user.get('login', 'unknown')}")
|
|
else:
|
|
print(f"✗ YouTrack: {result.get('message')}")
|
|
else:
|
|
print("✗ YouTrack: Not configured")
|
|
|
|
# Test Gitea
|
|
gitea = load_gitea_config(config)
|
|
if gitea:
|
|
result = gitea.test_connection()
|
|
if result.get("status") == "ok":
|
|
print(f"✓ Gitea: Connected as {result.get('user')} (v{result.get('version')})")
|
|
else:
|
|
print(f"✗ Gitea: {result.get('message')}")
|
|
else:
|
|
print("○ Gitea: Not configured (optional)")
|
|
|
|
# Test Woodpecker
|
|
wp_config = config.get("woodpecker", {})
|
|
if wp_config.get("base_url") and wp_config.get("token"):
|
|
try:
|
|
client = WoodpeckerClient(wp_config["base_url"], wp_config["token"])
|
|
if client.test_connection():
|
|
print(f"✓ Woodpecker: Connected at {wp_config['base_url']}")
|
|
else:
|
|
print(f"✗ Woodpecker: Connection failed")
|
|
client.close()
|
|
except Exception as e:
|
|
print(f"✗ Woodpecker: {e}")
|
|
else:
|
|
print("○ Woodpecker: Not configured")
|
|
|
|
print()
|
|
return
|
|
|
|
if args.status:
|
|
yt = load_youtrack_config(config)
|
|
if not yt:
|
|
print("YouTrack not configured", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
project = config["project"]["name"]
|
|
states = config["project"].get("states", {})
|
|
|
|
ready_state = states.get("ready", "Ready")
|
|
verify_state = states.get("verify", "In review")
|
|
in_progress_state = states.get("in_progress", "In Progress")
|
|
|
|
print(f"\n=== YouTrack Project: {project} ===\n")
|
|
|
|
for state_name, label in [
|
|
(ready_state, "Ready (remediation queue)"),
|
|
(in_progress_state, "In Progress"),
|
|
(verify_state, "In review (librarian queue)"),
|
|
]:
|
|
issues = yt.get_issues_by_state(project, state_name)
|
|
print(f"{label}:")
|
|
if issues:
|
|
for issue in issues:
|
|
print(f" {issue.id}: {issue.summary}")
|
|
else:
|
|
print(" (none)")
|
|
print()
|
|
|
|
return
|
|
|
|
runner = Runner(str(config_path))
|
|
|
|
if args.once:
|
|
# Run once and wait for agents to complete
|
|
runner._setup_logging()
|
|
runner.youtrack = load_youtrack_config(config)
|
|
runner.gitea = load_gitea_config(config)
|
|
|
|
claude_config = config.get("claude", {})
|
|
runner.agent_pool = AgentPool(
|
|
max_agents=config.get("max_parallel_agents", 10),
|
|
claude_command=claude_config.get("command", "claude"),
|
|
claude_flags=claude_config.get("flags", []),
|
|
on_complete=runner._on_agent_complete,
|
|
)
|
|
runner.agent_pool.start()
|
|
runner._poll_cycle()
|
|
|
|
# Wait for all agents to complete
|
|
timeout = 600 # 10 minutes max
|
|
waited = 0
|
|
while runner.agent_pool.active_count > 0 and waited < timeout:
|
|
logger.info(f"Waiting for {runner.agent_pool.active_count} agent(s) to complete...")
|
|
time.sleep(10)
|
|
waited += 10
|
|
|
|
if runner.agent_pool.active_count > 0:
|
|
logger.warning(f"Timeout reached with {runner.agent_pool.active_count} agents still running")
|
|
|
|
runner.agent_pool.stop()
|
|
return
|
|
|
|
runner.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|