Files
agentrunner/runner.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

1544 lines
66 KiB
Python

#!/usr/bin/env python3
"""
ClearGrow Agent Runner
Monitors YouTrack for issues and spawns Claude Code agents.
Supports both polling and webhook modes.
Stack:
- YouTrack: Issue tracking and Kanban boards
- Gitea: Git repository hosting
- TeamCity: CI/CD (triggered via webhooks)
"""
import argparse
import logging
import signal
import sys
import threading
import time
import yaml
import requests
from pathlib import Path
from datetime import datetime
from typing import Optional
from queue import Queue, Empty
from youtrack_client import YouTrackClient, YouTrackIssue, load_youtrack_config
from gitea_client import GiteaClient, load_gitea_config, git_push, git_merge_to_main, git_branch_exists, git_current_branch
from agent import AgentPool, AgentTask, build_prompt
from webhook_server import WebhookServer, WebhookEvent, load_webhook_config
from woodpecker_client import WoodpeckerClient, BuildInfo
from internal_api import InternalAPIServer
import re
# Constants
DEFAULT_WEBHOOK_QUEUE_SIZE = 1000 # Maximum queued webhook events
DEFAULT_POLL_INTERVAL = 300 # 5 minutes
DEFAULT_AGENT_TIMEOUT = 1800 # 30 minutes
DEFAULT_MAX_PARALLEL_AGENTS = 3
DEFAULT_HEALTH_CHECK_INTERVAL = 300 # 5 minutes
MAX_BACKOFF_INTERVAL = 1800 # 30 minutes max backoff
# Map agent task types to YouTrack work item types
AGENT_TO_WORK_TYPE = {
"remediation": "Development",
"verification": "Testing",
"librarian": "Documentation",
}
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("runner")
class SystemHealth:
"""
Monitors health of platform services.
Checks YouTrack, Gitea, and TeamCity availability before
processing tasks to avoid partial failures.
"""
def __init__(self, config: dict):
self.config = config
self.last_check = None
self.check_interval = config.get("health_check_interval", DEFAULT_HEALTH_CHECK_INTERVAL)
self._status = {
"youtrack": False,
"gitea": False,
"woodpecker": False,
}
def _check_endpoint(self, name: str, url: str, timeout: int = 10) -> bool:
"""Check if an HTTP endpoint is healthy."""
try:
response = requests.get(url, timeout=timeout)
healthy = response.status_code < 500
if not healthy:
logger.warning(f"{name} unhealthy: HTTP {response.status_code}")
return healthy
except requests.exceptions.Timeout:
logger.warning(f"{name} health check timed out: {url}")
return False
except requests.exceptions.ConnectionError as e:
logger.warning(f"{name} connection failed: {e}")
return False
except Exception as e:
logger.warning(f"{name} health check error: {e}")
return False
def check_all(self, force: bool = False) -> dict:
"""
Check health of all services.
Args:
force: If True, ignore check interval and always check
Returns:
Dict with service names and boolean health status
"""
now = time.time()
# Skip if checked recently (unless forced)
if not force and self.last_check:
if now - self.last_check < self.check_interval:
return self._status
logger.debug("Running system health checks...")
# YouTrack
yt_config = self.config.get("youtrack", {})
if yt_config.get("base_url"):
yt_url = f"{yt_config['base_url'].rstrip('/')}/api/config"
self._status["youtrack"] = self._check_endpoint("YouTrack", yt_url)
# Gitea
gitea_config = self.config.get("gitea", {})
if gitea_config.get("base_url"):
gitea_url = f"{gitea_config['base_url'].rstrip('/')}/api/v1/version"
self._status["gitea"] = self._check_endpoint("Gitea", gitea_url)
# Woodpecker
wp_config = self.config.get("woodpecker", {})
if wp_config.get("base_url"):
wp_url = f"{wp_config['base_url'].rstrip('/')}/api/version"
self._status["woodpecker"] = self._check_endpoint("Woodpecker", wp_url)
self.last_check = now
# Log summary
healthy = [k for k, v in self._status.items() if v]
unhealthy = [k for k, v in self._status.items() if not v and self.config.get(k, {}).get("base_url")]
if unhealthy:
logger.warning(f"System health: {len(healthy)} healthy, {len(unhealthy)} unhealthy: {unhealthy}")
else:
logger.debug(f"System health: all {len(healthy)} services healthy")
return self._status
@property
def is_healthy(self) -> bool:
"""Check if critical services (YouTrack) are healthy."""
return self._status.get("youtrack", False)
@property
def can_process(self) -> bool:
"""Check if we can process tasks (YouTrack + Gitea required)."""
return self._status.get("youtrack", False) and self._status.get("gitea", True)
def wait_for_healthy(self, timeout: int = 300, interval: int = 30):
"""
Wait for services to become healthy.
Args:
timeout: Maximum seconds to wait
interval: Seconds between checks
"""
start = time.time()
while time.time() - start < timeout:
self.check_all(force=True)
if self.is_healthy:
return True
logger.info(f"Waiting for services to become healthy... ({int(time.time() - start)}s)")
time.sleep(interval)
return False
class ConfigurationError(Exception):
"""Raised when configuration is invalid or incomplete."""
pass
def _detect_verification_result(stdout: str) -> tuple[bool, str]:
"""
Parse agent stdout for explicit verification pass/fail markers.
The QA agent may exit with code 0 but still report verification failure
in its output. This function detects failure markers in stdout to provide
a secondary check.
Args:
stdout: The agent's stdout output
Returns:
Tuple of (passed: bool, reason: str) where:
- passed: True if verification passed, False if failure markers found
- reason: Human-readable explanation of the detection result
"""
if not stdout:
return True, "no stdout to analyze"
stdout_lower = stdout.lower()
# Check for explicit failure markers (case-insensitive)
failure_patterns = [
(r"verification\s+failed", "found 'verification failed'"),
(r"requirements?\s+not\s+met", "found 'requirements not met'"),
(r"was\s+not\s+implemented", "found 'was not implemented'"),
(r"were\s+not\s+implemented", "found 'were not implemented'"),
(r"not\s+actually\s+implemented", "found 'not actually implemented'"),
(r"none\s+of\s+the\s+.*\s+changes\s+were\s+(actually\s+)?implemented", "found 'none of the changes were implemented'"),
(r"acceptance\s+criteria\s+not\s+met", "found 'acceptance criteria not met'"),
(r"##\s*verification\s+failed", "found 'Verification Failed' header"),
]
for pattern, reason in failure_patterns:
if re.search(pattern, stdout_lower):
return False, reason
# Check for explicit pass markers
pass_patterns = [
(r"verification\s+passed", "found 'verification passed'"),
(r"verification\s+succeeded", "found 'verification succeeded'"),
(r"all\s+acceptance\s+criteria\s+met", "found 'all acceptance criteria met'"),
(r"##\s*verification\s+passed", "found 'Verification Passed' header"),
]
for pattern, reason in pass_patterns:
if re.search(pattern, stdout_lower):
return True, reason
# No explicit markers found - assume pass (rely on exit code)
return True, "no explicit pass/fail markers found"
class Runner:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self._validate_config()
self.youtrack: Optional[YouTrackClient] = None
self.youtrack_build: Optional[YouTrackClient] = None # For build agent comments
self.youtrack_qa: Optional[YouTrackClient] = None # For QA agent comments
self.youtrack_developer: Optional[YouTrackClient] = None # For developer agent
self.youtrack_librarian: Optional[YouTrackClient] = None # For librarian agent
self.gitea: Optional[GiteaClient] = None
self.woodpecker: Optional[WoodpeckerClient] = None
self.agent_pool: Optional[AgentPool] = None
self.health: Optional[SystemHealth] = None
self.webhook_server: Optional[WebhookServer] = None
self.internal_api: Optional[InternalAPIServer] = None
self._webhook_queue: Queue = Queue(maxsize=DEFAULT_WEBHOOK_QUEUE_SIZE) # Bounded queue for webhook events
self._shutdown = False
self._shutdown_event = threading.Event() # For interruptible sleep
self._wake_event = threading.Event() # To wake up main loop early (e.g., for webhook processing)
self._seen_items: set[str] = set() # Track items we've already processed
self._build_checks: dict[str, dict] = {} # Track pending build checks
self._consecutive_failures = 0
self._max_consecutive_failures = 5
# Cache build_types mapping at initialization to avoid repeated config access
self._build_types: dict[str, str] = self.config.get("build_types", {
"controller": "Controller_Build",
"probe": "Probe_Build",
"docs": "Docs_BuildPdfs",
})
def _load_config(self, path: str) -> dict:
with open(path) as f:
return yaml.safe_load(f)
def _validate_config(self):
"""
Validate configuration structure and required fields.
Raises:
ConfigurationError: If configuration is invalid or missing required fields.
"""
errors = []
# Validate project configuration (required)
if "project" not in self.config:
errors.append("Missing required section: 'project'")
else:
project = self.config["project"]
if not isinstance(project, dict):
errors.append("'project' must be a dictionary")
elif "name" not in project:
errors.append("Missing required field: 'project.name'")
elif not project["name"]:
errors.append("'project.name' cannot be empty")
# Validate YouTrack configuration (required)
if "youtrack" not in self.config:
errors.append("Missing required section: 'youtrack'")
else:
youtrack = self.config["youtrack"]
if not isinstance(youtrack, dict):
errors.append("'youtrack' must be a dictionary")
else:
if not youtrack.get("base_url"):
errors.append("Missing required field: 'youtrack.base_url'")
if not youtrack.get("token"):
errors.append("Missing required field: 'youtrack.token'")
# Validate repos configuration (required)
if "repos" not in self.config:
errors.append("Missing required section: 'repos'")
else:
repos = self.config["repos"]
if not isinstance(repos, dict):
errors.append("'repos' must be a dictionary")
elif len(repos) == 0:
errors.append("'repos' cannot be empty - at least one repository required")
else:
for repo_name, repo_config in repos.items():
if not isinstance(repo_config, dict):
errors.append(f"'repos.{repo_name}' must be a dictionary")
elif not repo_config.get("path"):
errors.append(f"Missing required field: 'repos.{repo_name}.path'")
# Validate optional sections have correct types
if "gitea" in self.config and not isinstance(self.config["gitea"], dict):
errors.append("'gitea' must be a dictionary")
if "teamcity" in self.config and not isinstance(self.config["teamcity"], dict):
errors.append("'teamcity' must be a dictionary")
if "webhook" in self.config and not isinstance(self.config["webhook"], dict):
errors.append("'webhook' must be a dictionary")
if "claude" in self.config and not isinstance(self.config["claude"], dict):
errors.append("'claude' must be a dictionary")
# Validate numeric fields have correct types and ranges
if "poll_interval_seconds" in self.config:
val = self.config["poll_interval_seconds"]
if not isinstance(val, (int, float)) or val <= 0:
errors.append("'poll_interval_seconds' must be a positive number")
if "agent_timeout_seconds" in self.config:
val = self.config["agent_timeout_seconds"]
if not isinstance(val, (int, float)) or val <= 0:
errors.append("'agent_timeout_seconds' must be a positive number")
if "max_parallel_agents" in self.config:
val = self.config["max_parallel_agents"]
if not isinstance(val, int) or val <= 0:
errors.append("'max_parallel_agents' must be a positive integer")
if errors:
error_msg = "Configuration validation failed:\n - " + "\n - ".join(errors)
logger.error(error_msg)
raise ConfigurationError(error_msg)
def _setup_logging(self):
log_file = self.config.get("log_file")
if log_file:
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s"
))
logging.getLogger().addHandler(handler)
level = self.config.get("log_level", "INFO")
logging.getLogger().setLevel(getattr(logging, level))
def _broadcast_event(self, event_type: str, data: dict):
"""Broadcast an event to dashboard clients."""
try:
from api_server import broadcaster
broadcaster.broadcast(event_type, data)
except ImportError:
pass # Dashboard API not available
except Exception as e:
logger.debug(f"Failed to broadcast event: {e}")
def _get_youtrack_client_for_task(self, task_type: str) -> "YouTrackClient":
"""Get the appropriate YouTrack client for a task type."""
if task_type == "verification" and self.youtrack_qa:
return self.youtrack_qa
elif task_type == "librarian" and self.youtrack_librarian:
return self.youtrack_librarian
elif task_type == "remediation" and self.youtrack_developer:
return self.youtrack_developer
# Fallback to main admin client
return self.youtrack
def _log_work_item(self, task: AgentTask, duration_seconds: float):
"""Log time spent on a task to YouTrack."""
if duration_seconds <= 0:
return
duration_minutes = max(1, int(duration_seconds / 60))
work_type = AGENT_TO_WORK_TYPE.get(task.task_type, "Development")
# Determine result text
if task.timed_out:
result = "timed out"
elif task.returncode == 0:
result = "completed"
elif task.returncode == 2:
result = "skipped (work already done)"
else:
result = f"failed (exit code {task.returncode})"
text = f"Agent {task.task_type}: {result}"
# Use appropriate client for the task type
yt_client = self._get_youtrack_client_for_task(task.task_type)
yt_client.add_work_item(
issue_id=task.issue_id,
duration_minutes=duration_minutes,
work_type=work_type,
text=text,
)
def _on_agent_complete(self, task: AgentTask):
"""Called when an agent finishes."""
# Broadcast completion event to dashboard
duration = 0
if task.started_at and task.completed_at:
duration = (task.completed_at - task.started_at).total_seconds()
self._broadcast_event("agent.completed", {
"task_id": task.task_id,
"issue_id": task.issue_id,
"task_type": task.task_type,
"returncode": task.returncode,
"timed_out": task.timed_out,
"duration_seconds": round(duration, 1),
})
# Log time to YouTrack work items
self._log_work_item(task, duration)
states = self.config["project"].get("states", {})
triage_state = states.get("triage", "Triage")
build_state = states.get("build", "Build")
verify_state = states.get("verify", "Verify")
document_state = states.get("document", "Document")
review_state = states.get("review", "Review")
logger.info(f"Agent completed for issue {task.issue_id} (type: {task.task_type})")
# Handle timeout
if task.timed_out:
logger.warning(f"Task {task.task_id} timed out → {triage_state}")
timeout_mins = self.config.get("agent_timeout_seconds", 1800) // 60
comment_body = (
f"## Agent Timed Out\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Task Type:** {task.task_type}\n"
f"**Timeout:** {timeout_mins} minutes\n\n"
f"The agent exceeded the maximum allowed time and was terminated.\n\n"
f"**Possible causes:**\n"
f"- Task too complex for single agent run\n"
f"- Agent stuck in a loop\n"
f"- Unclear or ambiguous requirements\n\n"
f"Please review the issue, simplify if needed, and move back to Ready."
)
self.youtrack.add_issue_comment(task.issue_id, comment_body)
self.youtrack.update_issue_state(task.issue_id, triage_state)
elif task.returncode == 0:
# Determine next state based on task type
if task.task_type == "remediation":
target_state = build_state # Developer done → Build (CI verification)
elif task.task_type == "verification":
target_state = document_state # Verification done → Document
elif task.task_type == "librarian":
target_state = review_state # Librarian done → Review
else:
target_state = build_state # Default
# For remediation tasks, check if there are actual changes to push
if task.task_type == "remediation" and self.config.get("auto_push", True):
branch_name = f"issue/{task.issue_id}"
# Check if the feature branch exists and we're on it
branch_exists = git_branch_exists(task.work_dir, branch_name)
current_branch = git_current_branch(task.work_dir)
has_changes = branch_exists and current_branch == branch_name
if has_changes:
# There are changes to push - proceed to Build
if git_push(task.work_dir, branch=branch_name):
logger.info(f"Pushed changes for {task.task_id}")
else:
logger.error(f"Failed to push changes for {task.task_id}, cannot proceed to Build")
self.youtrack.add_issue_comment(
task.issue_id,
f"## Push Failed\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Task:** {task.task_id}\n\n"
f"The agent completed successfully but failed to push changes to the remote repository. "
f"This prevents CI from running. Please check the branch state and push manually, "
f"or investigate the push failure."
)
self.youtrack.update_issue_state(task.issue_id, triage_state)
return # Don't proceed to Build state
else:
# No changes - agent determined work was already done
# Skip Build and go directly to Verify
logger.info(f"No changes for {task.task_id} (branch: {current_branch}, exists: {branch_exists}) - skipping Build, going to Verify")
target_state = verify_state
logger.info(f"Task {task.task_id} ({task.task_type}) succeeded → {target_state}")
# Update YouTrack issue state
if not self.youtrack.update_issue_state(task.issue_id, target_state):
logger.error(f"Failed to update status for {task.task_id}")
# Push changes for non-remediation tasks (verification, librarian)
if task.task_type != "remediation" and self.config.get("auto_push", True):
branch_name = f"issue/{task.issue_id}"
if git_push(task.work_dir, branch=branch_name):
logger.info(f"Pushed changes for {task.task_id}")
else:
logger.warning(f"Failed to push changes for {task.task_id}")
# Merge to main after verification succeeds
if task.task_type == "verification":
# Secondary check: parse stdout for explicit failure markers
# The QA agent may exit with code 0 but still report verification failure
stdout_passed, detection_reason = _detect_verification_result(task.stdout or "")
logger.info(f"Verification result for {task.issue_id}: exit_code=0, stdout_check={stdout_passed} ({detection_reason})")
if stdout_passed:
if not self._merge_feature_branch(task):
# Merge failed - move to Triage for human intervention
logger.warning(f"Merge failed for {task.issue_id}, moving to {triage_state}")
self.youtrack.update_issue_state(task.issue_id, triage_state)
else:
logger.warning(f"Blocking merge for {task.issue_id}: stdout indicates failure despite exit code 0")
# Add failure comment and move to triage
self.youtrack_qa.add_issue_comment(
task.issue_id,
f"## Merge Blocked\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Reason:** Verification output indicates failure despite exit code 0\n"
f"**Detection:** {detection_reason}\n\n"
f"The QA agent's output contains failure markers. Please review the verification "
f"output and address the issues before the branch can be merged."
)
self.youtrack.update_issue_state(task.issue_id, triage_state)
elif task.returncode == 2:
# Exit code 2: "Nothing to do" - work already complete
# Skip intermediate steps and go directly to Review
logger.info(f"Task {task.task_id} ({task.task_type}) found work already complete → {review_state}")
# Update YouTrack issue state - skip to Review
if not self.youtrack.update_issue_state(task.issue_id, review_state):
logger.error(f"Failed to update status for {task.task_id}")
# Push any changes (unlikely but safe)
if self.config.get("auto_push", True):
branch_name = f"issue/{task.issue_id}"
if git_push(task.work_dir, branch=branch_name):
logger.info(f"Pushed changes for {task.task_id}")
else:
logger.warning(f"Task {task.task_id} failed (rc={task.returncode}) → {triage_state}")
# Add failure comment to YouTrack
comment_body = (
f"## Agent Run Failed\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Task Type:** {task.task_type}\n"
f"**Exit Code:** {task.returncode}\n\n"
f"The agent encountered an error and needs human review.\n"
f"Please add a comment with clarification and move back to Ready.\n\n"
f"```\n{task.stderr[:1000] if task.stderr else 'No stderr'}\n```"
)
self.youtrack.add_issue_comment(task.issue_id, comment_body)
# Move to Triage for human review
self.youtrack.update_issue_state(task.issue_id, triage_state)
# Remove from seen set so it can be picked up again if moved back to Ready
self._seen_items.discard(task.task_id)
def _get_repo_for_issue(self, issue: YouTrackIssue) -> Optional[dict]:
"""
Determine which repository an issue belongs to.
Uses custom field 'Repository' or falls back to project mapping.
"""
repos = self.config.get("repos", {})
# Check for Repository custom field
repo_field = issue.custom_fields.get("Repository", "")
if repo_field:
for name, info in repos.items():
if info.get("name") == repo_field or name == repo_field.lower():
return info
# Fallback: check issue ID prefix or project
project = issue.project_id.lower()
for name, info in repos.items():
if name == project or info.get("project") == project:
return info
# Last resort: return first repo
if repos:
return list(repos.values())[0]
return None
def _process_item(self, issue: YouTrackIssue, task_type: str = "remediation") -> bool:
"""
Process a single issue.
Returns True if agent was spawned.
"""
task_id = f"{issue.id}:{task_type}"
# Quick check against local seen set (fast path, no lock needed)
# Note: The actual atomic check happens in agent_pool.submit()
if task_id in self._seen_items:
return False
logger.info(f"Processing ({task_type}): {issue.id} - {issue.summary}")
# Get repository info
repo_info = self._get_repo_for_issue(issue)
if not repo_info:
logger.error(f"No repository configured for issue {issue.id}")
self.youtrack.add_issue_comment(
issue.id,
"## Agent Error\n\nNo repository configured for this issue."
)
return False
repo_name = repo_info.get("name", "unknown")
work_dir = repo_info.get("path", f"/opt/repos/{repo_name}")
platform = repo_info.get("platform", repo_name)
# Update status to In Progress (for remediation tasks)
if task_type == "remediation":
if not self.youtrack.update_issue_state(issue.id, "In Progress"):
logger.error(f"Failed to set In Progress for {task_id}")
return False
# Check work dir exists
if not Path(work_dir).exists():
logger.error(f"Work directory does not exist: {work_dir}")
self.youtrack.add_issue_comment(
issue.id,
f"## Agent Error\n\nWork directory not found: `{work_dir}`"
)
self.youtrack.update_issue_state(issue.id, "Backlog")
return False
# Get comments from YouTrack
comments = self.youtrack.get_issue_comments(issue.id)
comments_list = [
{
"body": c.text,
"author": {"login": c.author},
"createdAt": datetime.fromtimestamp(c.created / 1000).isoformat() if c.created else "",
}
for c in comments
]
# Determine prompt template
prompts_dir = Path(self.config.get("prompts_dir", "prompts"))
if task_type == "librarian":
template_path = prompts_dir / "librarian.md"
elif task_type == "verification":
template_path = prompts_dir / "verification.md"
else:
template_path = prompts_dir / "remediation.md"
# Build prompt
prompt = build_prompt(
issue_number=issue.issue_number,
issue_id=issue.id,
repo=repo_name,
platform=platform,
issue_body=issue.description,
comments=comments_list,
template_path=template_path if template_path.exists() else None,
task_type=task_type,
)
# Create task
task = AgentTask(
task_id=task_id,
issue_number=issue.issue_number,
issue_id=issue.id,
repo=repo_name,
platform=platform,
work_dir=work_dir,
prompt=prompt,
task_type=task_type,
)
# Submit to pool
if self.agent_pool.submit(task):
self._seen_items.add(task_id)
# Broadcast agent started event
self._broadcast_event("agent.started", {
"task_id": task_id,
"issue_id": issue.id,
"repo": repo_name,
"platform": platform,
"task_type": task_type,
})
return True
return False
def _get_build_type_for_repo(self, repo_name: str) -> Optional[str]:
"""Map repository name to TeamCity build type ID."""
# Use cached build_types mapping (initialized in __init__)
return self._build_types.get(repo_name.lower())
def _merge_feature_branch(self, task: AgentTask) -> bool:
"""
Merge the feature branch to main after verification succeeds.
Args:
task: The completed verification task
Returns:
True if merge succeeded, False otherwise
"""
feature_branch = f"issue/{task.issue_id}"
success, message = git_merge_to_main(task.work_dir, feature_branch, delete_after=True)
if success:
logger.info(f"Merged {feature_branch} to main for {task.issue_id}")
self.youtrack_qa.add_issue_comment(
task.issue_id,
f"## Branch Merged\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Branch:** `{feature_branch}`\n"
f"**Target:** `main`\n\n"
f"Feature branch has been merged to main and deleted."
)
return True
else:
logger.error(f"Failed to merge {feature_branch} for {task.issue_id}: {message}")
self.youtrack_qa.add_issue_comment(
task.issue_id,
f"## Merge Failed\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Branch:** `{feature_branch}`\n"
f"**Error:** {message}\n\n"
f"Please merge manually or investigate the conflict."
)
return False
def _check_build_status(self, issue: YouTrackIssue) -> Optional[str]:
"""
Check build status for an issue's feature branch.
Returns:
"SUCCESS" - Build passed, ready for verification
"FAILURE" - Build failed, needs developer fix
"RUNNING" - Build still in progress
"PENDING" - Build waiting in queue
"NO_CI" - Repository has no CI configured (skip Build state)
None - No build found yet (waiting for Woodpecker)
"""
if not self.woodpecker:
return "NO_CI"
repo_info = self._get_repo_for_issue(issue)
if not repo_info:
return "NO_CI"
repo_name = repo_info.get("name", "").split("/")[-1]
build_type = self._get_build_type_for_repo(repo_name)
if not build_type:
logger.info(f"No CI configured for repo {repo_name}, skipping Build state")
return "NO_CI"
# Feature branch name: issue/CG-XX
branch = f"issue/{issue.id}"
# First check for queued/pending builds
queued = self.woodpecker.get_queued_builds(build_type)
for q in queued:
if q.get('branchName') == branch:
logger.debug(f"Build pending for {branch}")
return "PENDING"
# Then check for running builds
running = self.woodpecker.get_running_builds(build_type)
for r in running:
if r.branch == branch:
logger.debug(f"Build running for {branch}")
return "RUNNING"
# Finally check completed builds
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1)
if not builds:
logger.debug(f"No builds found for {branch} in {build_type}")
return None
return builds[0].status
def _process_build_items(self):
"""Process items in Build state - check CI status."""
project = self.config["project"]["name"]
states = self.config["project"].get("states", {})
build_state = states.get("build", "Build")
verify_state = states.get("verify", "Verify")
ready_state = states.get("ready", "Ready")
# Note: Even without Woodpecker, we still need to process Build items
# to skip repos without CI to Verify state
try:
build_issues = self.youtrack.get_issues_by_state(project, build_state)
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to poll for Build issues - connection error: {e}")
return
except requests.exceptions.Timeout as e:
logger.error(f"Failed to poll for Build issues - timeout: {e}")
return
except requests.exceptions.RequestException as e:
logger.error(f"Failed to poll for Build issues: {e}")
return
if not build_issues:
return
logger.info(f"Checking {len(build_issues)} issues in Build state")
for issue in build_issues:
try:
status = self._check_build_status(issue)
if status == "SUCCESS":
# Build passed - move to Verify
logger.info(f"Build SUCCESS for {issue.id}{verify_state}")
# Get build info for comment
repo_info = self._get_repo_for_issue(issue)
repo_name = repo_info.get("name", "").split("/")[-1] if repo_info else "unknown"
build_type = self._get_build_type_for_repo(repo_name)
branch = f"issue/{issue.id}"
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1) if build_type else []
comment = (
f"## Build Verification Passed\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Status:** ✓ SUCCESS\n"
f"**Branch:** `{branch}`\n"
)
if builds:
comment += f"**Build URL:** {builds[0].web_url}\n"
comment += "\nThe feature branch build completed successfully. Moving to code verification."
self.youtrack_build.add_issue_comment(issue.id, comment)
self.youtrack.update_issue_state(issue.id, verify_state)
elif status == "FAILURE":
# Build failed - move back to Ready for developer to fix
logger.warning(f"Build FAILURE for {issue.id}{ready_state}")
# Get build log excerpt
repo_info = self._get_repo_for_issue(issue)
repo_name = repo_info.get("name", "").split("/")[-1] if repo_info else "unknown"
build_type = self._get_build_type_for_repo(repo_name)
branch = f"issue/{issue.id}"
builds = self.woodpecker.get_builds_for_branch(build_type, branch, count=1) if build_type else []
log_excerpt = ""
web_url = ""
if builds:
web_url = builds[0].web_url
log_excerpt = self.woodpecker.get_build_log_excerpt(build_type, builds[0].build_id, lines=50)
comment = (
f"## Build Verification Failed\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Status:** ✗ FAILURE\n"
f"**Branch:** `{branch}`\n"
)
if web_url:
comment += f"**Build URL:** {web_url}\n"
comment += (
f"\n### Build Log (last 50 lines)\n"
f"```\n{log_excerpt[:2000]}\n```\n\n"
f"Please fix the build errors and the issue will be automatically reprocessed."
)
self.youtrack_build.add_issue_comment(issue.id, comment)
self.youtrack.update_issue_state(issue.id, ready_state)
elif status in ("RUNNING", "PENDING"):
logger.debug(f"Build {status} for {issue.id}, will check again")
elif status == "NO_CI":
# Repository has no CI configured - skip Build state, go to Verify
logger.info(f"No CI for {issue.id}, skipping Build → {verify_state}")
comment = (
f"## Build Step Skipped\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Reason:** No CI/CD configured for this repository\n\n"
f"Moving directly to code verification."
)
self.youtrack_build.add_issue_comment(issue.id, comment)
self.youtrack.update_issue_state(issue.id, verify_state)
else:
# No build yet - might need to wait for Woodpecker to pick it up
# But if waiting too long, the push probably failed - move to Triage
triage_state = states.get("triage", "Triage")
# Check how long issue has been in Build state
# If updated more than 10 minutes ago with no pipeline, something is wrong
updated = issue.updated
if updated:
try:
# YouTrack timestamps are in milliseconds
updated_time = datetime.fromtimestamp(updated / 1000)
wait_time = (datetime.now() - updated_time).total_seconds()
if wait_time > 600: # 10 minutes
logger.warning(f"No build found for {issue.id} after {wait_time/60:.1f} minutes, moving to Triage")
comment = (
f"## Build Not Found\n\n"
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}\n"
f"**Wait Time:** {wait_time/60:.1f} minutes\n\n"
f"No CI pipeline was found for branch `issue/{issue.id}`. This usually means:\n"
f"- The branch was never pushed to the remote\n"
f"- The push failed after the agent completed\n"
f"- Woodpecker CI is not configured for this branch pattern\n\n"
f"Please check the branch state and push manually if needed."
)
self.youtrack_build.add_issue_comment(issue.id, comment)
self.youtrack.update_issue_state(issue.id, triage_state)
else:
logger.debug(f"No build status for {issue.id}, waiting ({wait_time/60:.1f} min)")
except (ValueError, TypeError) as e:
logger.debug(f"No build status for {issue.id}, could not check age: {e}")
else:
logger.debug(f"No build status for {issue.id}")
except requests.exceptions.ConnectionError as e:
logger.error(f"Error checking build for {issue.id} - connection error: {e}")
except requests.exceptions.Timeout as e:
logger.error(f"Error checking build for {issue.id} - timeout: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Error checking build for {issue.id}: {e}")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Error checking build for {issue.id} - data error: {e}")
def _poll_cycle(self):
"""Execute one polling cycle."""
project = self.config["project"]["name"]
states = self.config["project"].get("states", {})
ready_state = states.get("ready", "Ready")
verify_state = states.get("verify", "Verify")
document_state = states.get("document", "Document")
# Get Ready items (developer agent queue)
try:
ready_issues = self.youtrack.get_issues_by_state(project, ready_state)
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to poll for Ready issues - connection error: {e}")
ready_issues = []
except requests.exceptions.Timeout as e:
logger.error(f"Failed to poll for Ready issues - timeout: {e}")
ready_issues = []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to poll for Ready issues: {e}")
ready_issues = []
# Get Verify items (verification agent queue)
try:
verify_issues = self.youtrack.get_issues_by_state(project, verify_state)
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to poll for Verify issues - connection error: {e}")
verify_issues = []
except requests.exceptions.Timeout as e:
logger.error(f"Failed to poll for Verify issues - timeout: {e}")
verify_issues = []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to poll for Verify issues: {e}")
verify_issues = []
# Get Document items (librarian agent queue)
try:
document_issues = self.youtrack.get_issues_by_state(project, document_state)
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to poll for Document issues - connection error: {e}")
document_issues = []
except requests.exceptions.Timeout as e:
logger.error(f"Failed to poll for Document issues - timeout: {e}")
document_issues = []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to poll for Document issues: {e}")
document_issues = []
total_items = len(ready_issues) + len(verify_issues) + len(document_issues)
if total_items > 0:
logger.info(f"Found {len(ready_issues)} Ready, {len(verify_issues)} Verify, "
f"{len(document_issues)} Document, "
f"pool has {self.agent_pool.active_count}/{self.agent_pool.max_agents} active")
# Process developer tasks (Ready → In Progress → Verify)
for issue in ready_issues:
if not self.agent_pool.has_capacity:
logger.debug("Pool at capacity, waiting for next cycle")
break
try:
self._process_item(issue, task_type="remediation")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Error processing {issue.id} - data error: {e}")
except (IOError, OSError) as e:
logger.error(f"Error processing {issue.id} - I/O error: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Error processing {issue.id} - request error: {e}")
# Process verification tasks (Verify → Document)
for issue in verify_issues:
if not self.agent_pool.has_capacity:
logger.debug("Pool at capacity, waiting for next cycle")
break
try:
self._process_item(issue, task_type="verification")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Error processing {issue.id} - data error: {e}")
except (IOError, OSError) as e:
logger.error(f"Error processing {issue.id} - I/O error: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Error processing {issue.id} - request error: {e}")
# Process librarian tasks (Document → Review)
for issue in document_issues:
if not self.agent_pool.has_capacity:
logger.debug("Pool at capacity, waiting for next cycle")
break
try:
self._process_item(issue, task_type="librarian")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Error processing {issue.id} - data error: {e}")
except (IOError, OSError) as e:
logger.error(f"Error processing {issue.id} - I/O error: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Error processing {issue.id} - request error: {e}")
# Check build status for items in Build state
self._process_build_items()
def run(self):
"""Main run loop."""
self._setup_logging()
logger.info("ClearGrow Agent Runner starting...")
# Initialize health monitor
self.health = SystemHealth(self.config)
# Initialize YouTrack client
self.youtrack = load_youtrack_config(self.config)
if not self.youtrack:
logger.error("Failed to initialize YouTrack client")
sys.exit(1)
# Test YouTrack connection
yt_status = self.youtrack.test_connection()
if yt_status.get("status") != "ok":
logger.error(f"YouTrack connection failed: {yt_status.get('message')}")
sys.exit(1)
logger.info(f"YouTrack connected as: {yt_status.get('user', {}).get('login', 'unknown')}")
# Initialize YouTrack build agent client (for build-related comments)
agent_tokens = self.config.get("agent_tokens", {})
build_token = agent_tokens.get("build")
if build_token:
yt_config = self.config.get("youtrack", {})
self.youtrack_build = YouTrackClient(yt_config.get("base_url"), build_token)
build_status = self.youtrack_build.test_connection()
if build_status.get("status") == "ok":
logger.info(f"YouTrack build agent connected as: {build_status.get('user', {}).get('login', 'build')}")
else:
logger.warning("YouTrack build agent connection failed, using admin token for build comments")
self.youtrack_build = self.youtrack
else:
logger.info("No build agent token configured, using admin token for build comments")
self.youtrack_build = self.youtrack
# Initialize YouTrack QA agent client (for QA/verification-related comments)
qa_token = agent_tokens.get("qa")
if qa_token:
yt_config = self.config.get("youtrack", {})
self.youtrack_qa = YouTrackClient(yt_config.get("base_url"), qa_token)
qa_status = self.youtrack_qa.test_connection()
if qa_status.get("status") == "ok":
logger.info(f"YouTrack QA agent connected as: {qa_status.get('user', {}).get('login', 'qa')}")
else:
logger.warning("YouTrack QA agent connection failed, using admin token for QA comments")
self.youtrack_qa = self.youtrack
else:
logger.info("No QA agent token configured, using admin token for QA comments")
self.youtrack_qa = self.youtrack
# Initialize YouTrack developer agent client (for developer/remediation work items)
developer_token = agent_tokens.get("developer")
if developer_token:
yt_config = self.config.get("youtrack", {})
self.youtrack_developer = YouTrackClient(yt_config.get("base_url"), developer_token)
dev_status = self.youtrack_developer.test_connection()
if dev_status.get("status") == "ok":
logger.info(f"YouTrack developer agent connected as: {dev_status.get('user', {}).get('login', 'developer')}")
else:
logger.warning("YouTrack developer agent connection failed, using admin token")
self.youtrack_developer = self.youtrack
else:
logger.info("No developer agent token configured, using admin token")
self.youtrack_developer = self.youtrack
# Initialize YouTrack librarian agent client (for documentation work items)
librarian_token = agent_tokens.get("librarian")
if librarian_token:
yt_config = self.config.get("youtrack", {})
self.youtrack_librarian = YouTrackClient(yt_config.get("base_url"), librarian_token)
lib_status = self.youtrack_librarian.test_connection()
if lib_status.get("status") == "ok":
logger.info(f"YouTrack librarian agent connected as: {lib_status.get('user', {}).get('login', 'librarian')}")
else:
logger.warning("YouTrack librarian agent connection failed, using admin token")
self.youtrack_librarian = self.youtrack
else:
logger.info("No librarian agent token configured, using admin token")
self.youtrack_librarian = self.youtrack
# Initialize Gitea client (optional - for comments)
self.gitea = load_gitea_config(self.config)
if self.gitea:
gitea_status = self.gitea.test_connection()
if gitea_status.get("status") == "ok":
logger.info(f"Gitea connected: {gitea_status.get('version')} as {gitea_status.get('user')}")
else:
logger.warning(f"Gitea connection failed: {gitea_status.get('message')}")
else:
logger.info("Gitea not configured (comments will only go to YouTrack)")
# Initialize Woodpecker client (optional - for build verification)
wp_config = self.config.get("woodpecker", {})
if wp_config.get("base_url") and wp_config.get("token"):
self.woodpecker = WoodpeckerClient(
base_url=wp_config["base_url"],
token=wp_config["token"]
)
if self.woodpecker.test_connection():
logger.info(f"Woodpecker connected: {wp_config['base_url']}")
else:
logger.warning("Woodpecker connection failed - build verification disabled")
self.woodpecker = None
else:
logger.info("Woodpecker not configured (build verification disabled)")
# Initial health check
logger.info("Running initial health check...")
self.health.check_all(force=True)
if not self.health.is_healthy:
logger.warning("Some services unhealthy at startup, waiting...")
if not self.health.wait_for_healthy(timeout=120):
logger.error("Services did not become healthy, starting anyway...")
# Initialize agent pool
claude_config = self.config.get("claude", {})
timeout = self.config.get("agent_timeout_seconds", DEFAULT_AGENT_TIMEOUT)
self.agent_pool = AgentPool(
max_agents=self.config.get("max_parallel_agents", DEFAULT_MAX_PARALLEL_AGENTS),
claude_command=claude_config.get("command", "claude"),
claude_flags=claude_config.get("flags", []),
on_complete=self._on_agent_complete,
timeout_seconds=timeout,
)
self.agent_pool.start()
logger.info(f"Agent pool started (max={self.agent_pool.max_agents}, timeout={timeout}s)")
# Initialize OAuth if configured
oauth = None
oauth_config = self.config.get("oauth")
if oauth_config and oauth_config.get("client_id"):
try:
from oauth import create_oauth_from_config
oauth = create_oauth_from_config(self.config)
if oauth:
logger.info("OAuth authentication enabled")
except ImportError as e:
logger.warning(f"OAuth module not available: {e}")
# Initialize webhook server with dashboard API (optional)
webhook_config = self.config.get("webhook", {})
if webhook_config.get("enabled", False):
self.webhook_server = WebhookServer(
host=webhook_config.get("host", "0.0.0.0"),
port=webhook_config.get("port", 8765),
secret=webhook_config.get("secret"),
on_event=self._on_webhook_event,
runner=self, # Pass runner for dashboard API
oauth=oauth, # Pass OAuth handler
)
self.webhook_server.start()
logger.info(f"Webhook server started on {webhook_config.get('host', '0.0.0.0')}:{webhook_config.get('port', 8765)}")
if oauth:
logger.info("Dashboard API available with Gitea OAuth authentication")
# Start internal API server for MCP tools
self.internal_api = InternalAPIServer(self)
self.internal_api.start()
poll_interval = self.config.get("poll_interval_seconds", DEFAULT_POLL_INTERVAL)
backoff_interval = poll_interval
max_backoff = MAX_BACKOFF_INTERVAL
mode = "webhook + polling" if self.webhook_server else "polling"
logger.info(f"Starting main loop (mode={mode}, poll_interval={poll_interval}s)")
# Setup signal handlers
def handle_shutdown(signum, frame):
logger.info("Shutdown signal received")
self._shutdown = True
self._shutdown_event.set()
self._wake_event.set() # Wake up main loop immediately
signal.signal(signal.SIGINT, handle_shutdown)
signal.signal(signal.SIGTERM, handle_shutdown)
# Main loop
while not self._shutdown:
try:
# Periodic health check
self.health.check_all()
if not self.health.can_process:
logger.warning("Services unhealthy, skipping poll cycle")
self._consecutive_failures += 1
if self._consecutive_failures >= self._max_consecutive_failures:
logger.error(f"Too many consecutive failures ({self._consecutive_failures}), "
"waiting for recovery...")
self.health.wait_for_healthy(timeout=300)
self._consecutive_failures = 0
else:
# Process any webhook events first (instant response)
self._process_webhook_events()
# Then do regular poll cycle
self._poll_cycle()
self._consecutive_failures = 0
backoff_interval = poll_interval # Reset on success
except Exception as e:
logger.exception(f"Error in poll cycle: {e}")
self._consecutive_failures += 1
backoff_interval = min(backoff_interval * 2, max_backoff)
logger.warning(f"Backing off to {backoff_interval}s")
# Use event-based wait instead of sleep loop for faster response
# Wake up every second to process webhooks, or immediately on shutdown/wake signal
wait_until = time.monotonic() + backoff_interval
while not self._shutdown and time.monotonic() < wait_until:
# Wait for up to 1 second, or until woken by webhook/shutdown
remaining = min(1.0, wait_until - time.monotonic())
if remaining > 0:
self._wake_event.wait(timeout=remaining)
self._wake_event.clear()
# Process webhooks during wait
self._process_webhook_events()
logger.info("Shutting down...")
if self.internal_api:
self.internal_api.stop()
if self.webhook_server:
self.webhook_server.stop()
self.agent_pool.stop()
# Close API client sessions
if self.youtrack:
self.youtrack.close()
if self.youtrack_build and self.youtrack_build is not self.youtrack:
self.youtrack_build.close()
if self.youtrack_qa and self.youtrack_qa is not self.youtrack:
self.youtrack_qa.close()
if self.youtrack_developer and self.youtrack_developer is not self.youtrack:
self.youtrack_developer.close()
if self.youtrack_librarian and self.youtrack_librarian is not self.youtrack:
self.youtrack_librarian.close()
if self.gitea:
self.gitea.close()
if self.woodpecker:
self.woodpecker.close()
logger.info("Runner stopped")
def _on_webhook_event(self, event: WebhookEvent):
"""Handle webhook event from YouTrack."""
# Queue the event for processing in main loop
try:
self._webhook_queue.put_nowait(event)
logger.debug(f"Queued webhook event: {event.event_type} for {event.issue_id}")
# Wake up main loop to process webhook immediately
self._wake_event.set()
except Exception:
# Queue is full - log warning but don't block webhook response
logger.warning(f"Webhook queue full, dropping event for {event.issue_id}. "
f"Event will be picked up in next poll cycle.")
def _process_webhook_events(self):
"""Process any pending webhook events."""
states = self.config["project"].get("states", {})
ready_state = states.get("ready", "Ready")
verify_state = states.get("verify", "Verify")
document_state = states.get("document", "Document")
project = self.config["project"]["name"]
# Process all queued events
processed = 0
while not self._webhook_queue.empty():
try:
event = self._webhook_queue.get_nowait()
except Empty:
break
# Skip if not our project
if event.project != project:
continue
# Only process state changes to actionable states
if event.new_state not in [ready_state, verify_state, document_state]:
continue
logger.info(f"Webhook: {event.issue_id} moved to {event.new_state}")
# Fetch the full issue from YouTrack
try:
issue = self.youtrack.get_issue(event.issue_id)
if not issue:
logger.warning(f"Could not fetch issue {event.issue_id}")
continue
# Determine task type based on state
if event.new_state == ready_state:
task_type = "remediation"
elif event.new_state == verify_state:
task_type = "verification"
elif event.new_state == document_state:
task_type = "librarian"
else:
continue
# Process immediately if we have capacity
if self.agent_pool.has_capacity:
self._process_item(issue, task_type=task_type)
processed += 1
else:
logger.debug(f"Pool at capacity, {event.issue_id} will be picked up in next poll")
except requests.exceptions.ConnectionError as e:
logger.error(f"Error processing webhook for {event.issue_id} - connection error: {e}")
except requests.exceptions.Timeout as e:
logger.error(f"Error processing webhook for {event.issue_id} - timeout: {e}")
except requests.exceptions.RequestException as e:
logger.error(f"Error processing webhook for {event.issue_id}: {e}")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"Error processing webhook for {event.issue_id} - data error: {e}")
if processed > 0:
logger.info(f"Processed {processed} webhook events")
def main():
parser = argparse.ArgumentParser(description="ClearGrow Agent Runner")
parser.add_argument(
"-c", "--config",
default="config.yaml",
help="Path to config file (default: config.yaml)"
)
parser.add_argument(
"--once",
action="store_true",
help="Run one poll cycle and exit"
)
parser.add_argument(
"--status",
action="store_true",
help="Print current project status and exit"
)
parser.add_argument(
"--test-connection",
action="store_true",
help="Test connections to YouTrack and Gitea"
)
parser.add_argument(
"--health",
action="store_true",
help="Check platform health (YouTrack, Gitea, TeamCity)"
)
args = parser.parse_args()
config_path = Path(args.config)
if not config_path.exists():
print(f"Config file not found: {config_path}", file=sys.stderr)
sys.exit(1)
with open(config_path) as f:
config = yaml.safe_load(f)
if args.health:
print("\n=== Platform Health Check ===\n")
health = SystemHealth(config)
status = health.check_all(force=True)
for service, healthy in status.items():
svc_config = config.get(service, {})
if svc_config.get("base_url"):
icon = "" if healthy else ""
state = "healthy" if healthy else "UNHEALTHY"
print(f"{icon} {service.title()}: {state}")
print(f" URL: {svc_config.get('base_url')}")
else:
print(f"{service.title()}: not configured")
print()
if health.can_process:
print("Status: Ready to process tasks")
else:
print("Status: NOT READY - critical services unhealthy")
print()
return
if args.test_connection:
print("\n=== Testing Connections ===\n")
# Test YouTrack
yt = load_youtrack_config(config)
if yt:
result = yt.test_connection()
if result.get("status") == "ok":
user = result.get("user", {})
print(f"✓ YouTrack: Connected as {user.get('login', 'unknown')}")
else:
print(f"✗ YouTrack: {result.get('message')}")
else:
print("✗ YouTrack: Not configured")
# Test Gitea
gitea = load_gitea_config(config)
if gitea:
result = gitea.test_connection()
if result.get("status") == "ok":
print(f"✓ Gitea: Connected as {result.get('user')} (v{result.get('version')})")
else:
print(f"✗ Gitea: {result.get('message')}")
else:
print("○ Gitea: Not configured (optional)")
# Test Woodpecker
wp_config = config.get("woodpecker", {})
if wp_config.get("base_url") and wp_config.get("token"):
try:
client = WoodpeckerClient(wp_config["base_url"], wp_config["token"])
if client.test_connection():
print(f"✓ Woodpecker: Connected at {wp_config['base_url']}")
else:
print(f"✗ Woodpecker: Connection failed")
client.close()
except Exception as e:
print(f"✗ Woodpecker: {e}")
else:
print("○ Woodpecker: Not configured")
print()
return
if args.status:
yt = load_youtrack_config(config)
if not yt:
print("YouTrack not configured", file=sys.stderr)
sys.exit(1)
project = config["project"]["name"]
states = config["project"].get("states", {})
ready_state = states.get("ready", "Ready")
verify_state = states.get("verify", "In review")
in_progress_state = states.get("in_progress", "In Progress")
print(f"\n=== YouTrack Project: {project} ===\n")
for state_name, label in [
(ready_state, "Ready (remediation queue)"),
(in_progress_state, "In Progress"),
(verify_state, "In review (librarian queue)"),
]:
issues = yt.get_issues_by_state(project, state_name)
print(f"{label}:")
if issues:
for issue in issues:
print(f" {issue.id}: {issue.summary}")
else:
print(" (none)")
print()
return
runner = Runner(str(config_path))
if args.once:
# Run once and wait for agents to complete
runner._setup_logging()
runner.youtrack = load_youtrack_config(config)
runner.gitea = load_gitea_config(config)
claude_config = config.get("claude", {})
runner.agent_pool = AgentPool(
max_agents=config.get("max_parallel_agents", 10),
claude_command=claude_config.get("command", "claude"),
claude_flags=claude_config.get("flags", []),
on_complete=runner._on_agent_complete,
)
runner.agent_pool.start()
runner._poll_cycle()
# Wait for all agents to complete
timeout = 600 # 10 minutes max
waited = 0
while runner.agent_pool.active_count > 0 and waited < timeout:
logger.info(f"Waiting for {runner.agent_pool.active_count} agent(s) to complete...")
time.sleep(10)
waited += 10
if runner.agent_pool.active_count > 0:
logger.warning(f"Timeout reached with {runner.agent_pool.active_count} agents still running")
runner.agent_pool.stop()
return
runner.run()
if __name__ == "__main__":
main()