Initial commit: ClearGrow Agent Runner

Automated task orchestration system for YouTrack + Gitea + Woodpecker CI:

- runner.py: Main orchestration engine with state machine workflow
- agent.py: Claude Code subprocess pool management
- youtrack_client.py: YouTrack API wrapper
- gitea_client.py: Gitea API + git CLI operations
- woodpecker_client.py: CI build monitoring
- webhook_server.py: Real-time event handling
- prompts/: Agent prompt templates (developer, qa, librarian)

Workflow: Ready → In Progress → Build → Verify → Document → Review → Done

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
CI System
2025-12-10 21:05:31 -07:00
commit 5903b69b2d
20 changed files with 4695 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
ENV/
env/
.venv/
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# Logs
*.log
# Config with secrets
config.yaml
# Environment
.env
.env.*
# OS
.DS_Store
Thumbs.db

104
README.md Normal file
View File

@@ -0,0 +1,104 @@
# ClearGrow Agent Runner
Automated task orchestration system that monitors YouTrack issues and coordinates Claude Code agents to resolve them across Gitea repositories, with Woodpecker CI verification.
## Overview
The Agent Runner automates the software development workflow:
```
Ready → In Progress → Build → Verify → Document → Review → Done
(Developer) (CI) (QA) (Librarian) ↓
↑ │ Merge to main
└──────────┘ (on build failure)
```
## Components
| File | Purpose |
|------|---------|
| `runner.py` | Main orchestration engine |
| `agent.py` | Claude Code subprocess pool management |
| `youtrack_client.py` | YouTrack API wrapper |
| `gitea_client.py` | Gitea API wrapper + git CLI operations |
| `woodpecker_client.py` | Woodpecker CI build monitoring |
| `webhook_server.py` | HTTP webhook receiver for real-time events |
| `prompts/` | Prompt templates for Claude agents |
## Agent Types
| Agent | Token | Purpose |
|-------|-------|---------|
| Developer | `agent_tokens.developer` | Code remediation and bug fixes |
| QA | `agent_tokens.qa` | Code verification and review |
| Librarian | `agent_tokens.librarian` | Documentation updates |
| Build | `agent_tokens.build` | CI build status comments |
## Configuration
Copy the example config and fill in your credentials:
```bash
cp config.yaml.example config.yaml
chmod 600 config.yaml
```
Required configuration:
- `youtrack.base_url` and `youtrack.token`
- `gitea.base_url` and `gitea.token`
- `woodpecker.base_url` and `woodpecker.token`
- `agent_tokens.*` for each agent type
- `repos.*` mapping repositories to local paths
## Installation
```bash
# Create virtual environment
python3 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Configure
cp config.yaml.example config.yaml
vim config.yaml
# Run
python runner.py -c config.yaml
```
## Systemd Service
```bash
sudo cp cleargrow-agent-runner.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable cleargrow-agent-runner
sudo systemctl start cleargrow-agent-runner
```
## Workflow
1. **Ready**: Issue waiting for remediation
2. **In Progress**: Developer agent working on fix
3. **Build**: Woodpecker CI building feature branch
4. **Verify**: QA agent reviewing changes
5. **Document**: Librarian agent updating docs
6. **Review**: Human review (manual)
7. **Done**: Complete, branch merged to main
## Branch Convention
Feature branches follow the pattern: `issue/{ISSUE_ID}`
Example: Issue `CG-47` → branch `issue/CG-47`
## API Integrations
- **YouTrack**: Issue tracking and state management
- **Gitea**: Repository hosting and git operations
- **Woodpecker CI**: Automated builds on push
## License
Proprietary - ClearGrow

330
agent.py Normal file
View File

@@ -0,0 +1,330 @@
"""
Claude Code agent subprocess management.
Adapted for YouTrack + Gitea stack.
"""
import subprocess
import threading
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
@dataclass
class AgentTask:
"""A task to be executed by a Claude Code agent."""
task_id: str
issue_number: int
issue_id: str # YouTrack issue ID (e.g., "CG-123")
repo: str # Repository name
platform: str # Platform identifier (e.g., "controller", "probe")
work_dir: str
prompt: str
task_type: str = "remediation" # "remediation", "verification", or "librarian"
# Runtime state
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
process: Optional[subprocess.Popen] = None
returncode: Optional[int] = None
stdout: str = ""
stderr: str = ""
timed_out: bool = False # True if killed due to timeout
class AgentPool:
"""Manages a pool of Claude Code agent processes."""
# Timeout for thread join during shutdown
SHUTDOWN_TIMEOUT = 10 # seconds
MONITOR_INTERVAL = 1 # seconds between monitor checks
def __init__(
self,
max_agents: int = 3,
claude_command: str = "claude",
claude_flags: list[str] = None,
on_complete: Callable[[AgentTask], None] = None,
timeout_seconds: int = 1800, # 30 minutes default
):
self.max_agents = max_agents
self.claude_command = claude_command
self.claude_flags = claude_flags or []
self.on_complete = on_complete
self.timeout_seconds = timeout_seconds
self._active: dict[str, AgentTask] = {}
self._lock = threading.Lock()
self._monitor_thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
def start(self):
"""Start the agent pool monitor."""
self._shutdown_event.clear()
self._monitor_thread = threading.Thread(
target=self._monitor_loop,
name="AgentPoolMonitor",
daemon=False
)
self._monitor_thread.start()
logger.info(f"Agent pool started (max_agents={self.max_agents})")
def stop(self):
"""Stop the agent pool gracefully."""
logger.debug("Initiating agent pool shutdown...")
self._shutdown_event.set()
if self._monitor_thread:
self._monitor_thread.join(timeout=self.SHUTDOWN_TIMEOUT)
if self._monitor_thread.is_alive():
logger.warning(f"Agent pool monitor thread did not terminate within {self.SHUTDOWN_TIMEOUT}s")
else:
logger.debug("Agent pool monitor thread terminated cleanly")
self._monitor_thread = None
logger.info("Agent pool stopped")
@property
def active_count(self) -> int:
with self._lock:
return len(self._active)
@property
def has_capacity(self) -> bool:
return self.active_count < self.max_agents
def is_task_running(self, task_id: str) -> bool:
with self._lock:
return task_id in self._active
def submit(self, task: AgentTask) -> bool:
"""
Submit a task to the pool atomically.
This method is thread-safe and performs an atomic check-and-add operation.
Callers should rely on the return value rather than calling is_task_running()
separately, to avoid race conditions.
Returns:
True if submitted successfully
False if pool is at capacity or task is already running
"""
with self._lock:
if len(self._active) >= self.max_agents:
logger.warning(f"Pool at capacity, rejecting task {task.task_id}")
return False
if task.task_id in self._active:
logger.warning(f"Task {task.task_id} already running")
return False
# Start the Claude Code process
try:
self._start_agent(task)
self._active[task.task_id] = task
logger.info(f"Started agent for task {task.task_id} (issue {task.issue_id})")
return True
except FileNotFoundError as e:
logger.error(f"Claude command not found for {task.task_id}: {e}")
return False
except PermissionError as e:
logger.error(f"Permission denied starting agent for {task.task_id}: {e}")
return False
except OSError as e:
logger.error(f"OS error starting agent for {task.task_id}: {e}")
return False
def _start_agent(self, task: AgentTask):
"""Start a Claude Code subprocess for the task."""
cmd = [
self.claude_command,
"-p", task.prompt, # Print mode with prompt
] + self.claude_flags
logger.debug(f"Running: {cmd[0]} -p '...' in {task.work_dir}")
task.started_at = datetime.now()
task.process = subprocess.Popen(
cmd,
cwd=task.work_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
def _monitor_loop(self):
"""Monitor active agents for completion and timeouts."""
while not self._shutdown_event.is_set():
completed = []
timed_out = []
# Get current time outside the lock
now = datetime.now()
# Take a snapshot of active tasks to minimize lock duration
with self._lock:
active_snapshot = list(self._active.items())
# Check status outside the lock (process.poll() is thread-safe)
for task_id, task in active_snapshot:
# Check for completion
if task.process and task.process.poll() is not None:
# Process finished - communicate() should be called outside lock
completed.append((task_id, task))
# Check for timeout
elif task.started_at and self.timeout_seconds > 0:
elapsed = (now - task.started_at).total_seconds()
if elapsed > self.timeout_seconds:
timed_out.append((task_id, task))
# Process completed tasks - get output outside lock
for task_id, task in completed:
if task.process:
task.stdout, task.stderr = task.process.communicate()
task.returncode = task.process.returncode
task.completed_at = now
# Handle timeouts (kill processes) - process operations outside lock
for task_id, task in timed_out:
if task.process:
logger.warning(
f"Agent {task_id} timed out after {self.timeout_seconds}s - killing"
)
try:
task.process.terminate()
# Give it 5 seconds to terminate gracefully
try:
task.process.wait(timeout=5)
except subprocess.TimeoutExpired:
task.process.kill()
task.process.wait()
task.returncode = -1
task.timed_out = True
task.stderr = f"Agent timed out after {self.timeout_seconds} seconds"
task.completed_at = datetime.now()
except ProcessLookupError:
logger.warning(f"Agent {task_id} process already terminated")
except PermissionError as e:
logger.error(f"Permission denied killing agent {task_id}: {e}")
except OSError as e:
logger.error(f"OS error killing timed out agent {task_id}: {e}")
# Combine completed and timed_out for removal (both are now lists of tuples)
all_completed = completed + timed_out
all_completed_ids = [task_id for task_id, _ in all_completed]
# Remove completed tasks from active dict (single lock acquisition)
removed_tasks = []
if all_completed_ids:
with self._lock:
for task_id in all_completed_ids:
task = self._active.pop(task_id, None)
if task:
removed_tasks.append(task)
# Handle callbacks outside the lock
for task in removed_tasks:
duration = (task.completed_at - task.started_at).total_seconds() if task.completed_at and task.started_at else 0
status = "TIMEOUT" if task.timed_out else f"rc={task.returncode}"
logger.info(
f"Agent completed: {task.task_id} "
f"({status}, duration={duration:.1f}s)"
)
if task.returncode != 0:
stderr_preview = task.stderr[:500] if task.stderr else ""
logger.warning(f"Agent {task.task_id} stderr: {stderr_preview}")
if self.on_complete:
try:
self.on_complete(task)
except (KeyError, ValueError, TypeError) as e:
logger.error(f"on_complete callback failed with data error: {e}")
except (IOError, OSError) as e:
logger.error(f"on_complete callback failed with I/O error: {e}")
except RuntimeError as e:
logger.error(f"on_complete callback failed with runtime error: {e}")
# Use event wait instead of sleep for faster shutdown response
self._shutdown_event.wait(timeout=self.MONITOR_INTERVAL)
def get_status(self) -> dict:
"""Get current pool status."""
with self._lock:
return {
"max_agents": self.max_agents,
"active": self.active_count,
"available": self.max_agents - len(self._active),
"tasks": [
{
"task_id": t.task_id,
"issue_id": t.issue_id,
"repo": t.repo,
"started": t.started_at.isoformat() if t.started_at else None,
}
for t in self._active.values()
]
}
def build_prompt(
issue_number: int,
issue_id: str,
repo: str,
platform: str,
issue_body: str,
comments: list[dict],
template_path: Optional[Path] = None,
task_type: str = "remediation",
) -> str:
"""
Build the prompt for a Claude Code agent.
Args:
issue_number: Numeric issue number
issue_id: Full issue ID (e.g., "CG-123")
repo: Repository name
platform: Platform identifier
issue_body: Issue description
comments: List of comment dicts with 'body', 'author', 'createdAt'
template_path: Optional custom template file
task_type: "remediation", "verification", or "librarian"
"""
# Build comments section
instructions_parts = []
for i, comment in enumerate(comments, 1):
body = comment.get("body", "").strip()
if body:
author = comment.get("author", {}).get("login", "unknown")
created = comment.get("createdAt", "")[:10] if comment.get("createdAt") else ""
instructions_parts.append(f"### Comment {i} ({author}, {created})\n{body}")
instructions_text = "\n\n".join(instructions_parts) if instructions_parts else "No comments."
# Load template (required - no fallback)
if not template_path:
raise ValueError(f"No template path provided for task_type '{task_type}'")
if not template_path.exists():
raise FileNotFoundError(f"Prompt template not found: {template_path}")
template = template_path.read_text()
# Fill placeholders
prompt = template.format(
issue_number=issue_number,
issue_id=issue_id,
repo=repo,
platform=platform,
issue_body=issue_body,
instructions=instructions_text,
)
return prompt

View File

@@ -0,0 +1,53 @@
[Unit]
Description=ClearGrow Agent Runner
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
# Run as dedicated user (not root!)
User=cleargrow
Group=cleargrow
# Working directory
WorkingDirectory=/opt/agent_runner
# Main process
ExecStart=/opt/agent_runner/venv/bin/python /opt/agent_runner/runner.py -c /opt/agent_runner/config.yaml
# Restart policy
Restart=always
RestartSec=30
# Stop timeout (allow agents to finish)
TimeoutStopSec=120
# Logging
StandardOutput=journal
StandardError=journal
SyslogIdentifier=cleargrow-agent-runner
# Environment
Environment="PYTHONUNBUFFERED=1"
Environment="HOME=/home/cleargrow"
Environment="YOUTRACK_URL=https://track.yourdomain.com"
# Token should be in a separate env file for security
EnvironmentFile=-/opt/agent_runner/.env
# Security hardening
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=read-only
PrivateTmp=true
# Allow write access to required directories
ReadWritePaths=/opt/repos
ReadWritePaths=/var/log/cleargrow_agent_runner.log
# Allow network access
PrivateNetwork=false
[Install]
WantedBy=multi-user.target

121
config.yaml.example Normal file
View File

@@ -0,0 +1,121 @@
# =============================================================================
# ClearGrow Agent Runner Configuration
# Generated: Wed Dec 10 08:08:27 AM MST 2025
# =============================================================================
# -----------------------------------------------------------------------------
# Polling Configuration
# -----------------------------------------------------------------------------
poll_interval_seconds: 10
max_parallel_agents: 10
agent_timeout_seconds: 1800
auto_push: true
health_check_interval: 300
# -----------------------------------------------------------------------------
# YouTrack Configuration
# -----------------------------------------------------------------------------
youtrack:
base_url: https://track.cleargrow.io
# Admin token (for state transitions)
token: YOUR_TOKEN_HERE:YOUR_TOKEN_HERE
# -----------------------------------------------------------------------------
# Agent-Specific YouTrack Tokens (for comments)
# -----------------------------------------------------------------------------
agent_tokens:
developer: perm:YOUR_TOKEN_HERE
qa: perm:YOUR_TOKEN_HERE
librarian: perm:YOUR_TOKEN_HERE
build: perm:YOUR_TOKEN_HERE
# -----------------------------------------------------------------------------
# Gitea Configuration
# -----------------------------------------------------------------------------
# IMPORTANT: Add your Gitea token below after creating it!
gitea:
base_url: https://git.cleargrow.io
token: YOUR_TOKEN_HERE
# -----------------------------------------------------------------------------
# Woodpecker CI Configuration (for build verification)
# -----------------------------------------------------------------------------
woodpecker:
base_url: https://ci.cleargrow.io
token: YOUR_TOKEN_HERE.YOUR_JWT_TOKEN_HERE
# -----------------------------------------------------------------------------
# Build Type Mapping (Woodpecker repo full names)
# -----------------------------------------------------------------------------
build_types:
controller: cleargrow/controller
probe: cleargrow/probe
docs: cleargrow/docs
# -----------------------------------------------------------------------------
# Project Configuration
# -----------------------------------------------------------------------------
project:
name: CG
states:
triage: Triage
backlog: Backlog
ready: Ready
in_progress: In Progress
build: Build
verify: Verify
document: Document
review: Review
done: Done
# -----------------------------------------------------------------------------
# Repository Mapping
# -----------------------------------------------------------------------------
repos:
controller:
name: cleargrow/controller
path: /opt/repos/controller
platform: controller
project: CG
probe:
name: cleargrow/probe
path: /opt/repos/probe
platform: probe
project: CG
docs:
name: cleargrow/docs
path: /opt/repos/docs
platform: docs
project: CG
# -----------------------------------------------------------------------------
# Claude Code Configuration
# -----------------------------------------------------------------------------
claude:
command: claude
flags:
- "--allowedTools"
- "Bash,Read,Write,Edit,MultiEdit"
- "--permission-mode"
- "acceptEdits"
- "--disallowedTools"
- "Bash(idf.py build),Bash(west build),Bash(make),Bash(cmake)"
# -----------------------------------------------------------------------------
# Prompt Templates
# -----------------------------------------------------------------------------
prompts_dir: ./prompts
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
log_file: /var/log/cleargrow_agent_runner.log
log_level: INFO
# -----------------------------------------------------------------------------
# Webhook Server (Optional)
# -----------------------------------------------------------------------------
webhook:
enabled: true
host: 0.0.0.0
port: 8765

516
gitea_client.py Normal file
View File

@@ -0,0 +1,516 @@
"""
Gitea REST API interface.
Gitea has no documented rate limits for self-hosted instances.
API Documentation: https://docs.gitea.com/api/1.20/
Used for:
- Repository operations
- Issue comments (linked to YouTrack issues)
- Webhooks
"""
import logging
import subprocess
import requests
from dataclasses import dataclass
from typing import Optional, List
logger = logging.getLogger(__name__)
class GiteaError(Exception):
"""Base exception for Gitea API errors."""
pass
class GiteaAuthError(GiteaError):
"""Authentication failed."""
pass
@dataclass
class GiteaRepo:
"""Represents a Gitea repository."""
id: int
name: str
full_name: str # owner/repo
description: str
clone_url: str
ssh_url: str
default_branch: str
@dataclass
class GiteaComment:
"""Represents a comment on an issue/PR."""
id: int
body: str
user: str
created_at: str
updated_at: str
class GiteaClient:
"""
Gitea REST API client.
Uses access token authentication.
Supports context manager protocol for proper resource cleanup.
"""
def __init__(self, base_url: str, token: str):
"""
Initialize Gitea client.
Args:
base_url: Gitea instance URL (e.g., https://git.yourdomain.com)
token: Access token from Gitea (Settings -> Applications -> Generate Token)
"""
self.base_url = base_url.rstrip('/')
self.api_url = f"{self.base_url}/api/v1"
self.token = token
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closing the session."""
self.close()
return False
def close(self):
"""Close the HTTP session and release resources."""
if self.session:
self.session.close()
logger.debug("Gitea session closed")
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an API request."""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
logger.debug(f"Gitea API: {method} {url}")
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise GiteaAuthError("Invalid or expired token")
if response.status_code == 403:
raise GiteaAuthError(f"Permission denied: {response.text}")
if not response.ok:
logger.error(f"Gitea API error: {response.status_code} - {response.text[:500]}")
response.raise_for_status()
return response
def _get(self, endpoint: str, params: dict = None) -> dict:
"""GET request returning JSON."""
response = self._request("GET", endpoint, params=params)
return response.json() if response.text else {}
def _post(self, endpoint: str, data: dict = None) -> dict:
"""POST request returning JSON."""
response = self._request("POST", endpoint, json=data)
return response.json() if response.text else {}
def _patch(self, endpoint: str, data: dict = None) -> dict:
"""PATCH request returning JSON."""
response = self._request("PATCH", endpoint, json=data)
return response.json() if response.text else {}
# =========================================================================
# Connection Test
# =========================================================================
def test_connection(self) -> dict:
"""Test connection and return server info."""
try:
# Get current user to verify auth
user = self._get("user")
version = self._get("version")
logger.info(f"Connected to Gitea {version.get('version', 'unknown')} as: {user.get('login', 'unknown')}")
return {
"status": "ok",
"user": user.get("login"),
"version": version.get("version"),
}
except GiteaAuthError as e:
logger.error(f"Connection test failed - authentication error: {e}")
return {"status": "error", "message": str(e)}
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection test failed - connection error: {e}")
return {"status": "error", "message": f"Connection error: {e}"}
except requests.exceptions.Timeout as e:
logger.error(f"Connection test failed - timeout: {e}")
return {"status": "error", "message": f"Timeout: {e}"}
except requests.exceptions.RequestException as e:
logger.error(f"Connection test failed - request error: {e}")
return {"status": "error", "message": str(e)}
# =========================================================================
# Repositories
# =========================================================================
def get_repos(self, owner: str = None) -> List[GiteaRepo]:
"""
Get repositories.
Args:
owner: Filter by owner (optional). If None, returns user's repos.
"""
if owner:
data = self._get(f"users/{owner}/repos")
else:
data = self._get("user/repos")
return [self._parse_repo(r) for r in data]
def get_repo(self, owner: str, repo: str) -> GiteaRepo:
"""Get a specific repository."""
data = self._get(f"repos/{owner}/{repo}")
return self._parse_repo(data)
def _parse_repo(self, data: dict) -> GiteaRepo:
"""Parse repo data into GiteaRepo object."""
return GiteaRepo(
id=data.get("id", 0),
name=data.get("name", ""),
full_name=data.get("full_name", ""),
description=data.get("description", ""),
clone_url=data.get("clone_url", ""),
ssh_url=data.get("ssh_url", ""),
default_branch=data.get("default_branch", "main"),
)
# =========================================================================
# Issues (for comments - main tracking is in YouTrack)
# =========================================================================
def get_issue(self, owner: str, repo: str, issue_number: int) -> dict:
"""Get issue details (if using Gitea issues as secondary)."""
return self._get(f"repos/{owner}/{repo}/issues/{issue_number}")
def get_issue_comments(self, owner: str, repo: str, issue_number: int) -> List[GiteaComment]:
"""Get comments on an issue."""
data = self._get(f"repos/{owner}/{repo}/issues/{issue_number}/comments")
return [
GiteaComment(
id=c.get("id", 0),
body=c.get("body", ""),
user=c.get("user", {}).get("login", "unknown"),
created_at=c.get("created_at", ""),
updated_at=c.get("updated_at", ""),
)
for c in data
]
def add_issue_comment(self, owner: str, repo: str, issue_number: int, body: str) -> bool:
"""
Add a comment to an issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number
body: Comment text (Markdown supported)
"""
try:
self._post(f"repos/{owner}/{repo}/issues/{issue_number}/comments", data={
"body": body
})
logger.info(f"Added comment to {owner}/{repo}#{issue_number}")
return True
except GiteaAuthError as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number}: {e}")
return False
# =========================================================================
# Commits
# =========================================================================
def get_commits(self, owner: str, repo: str, branch: str = None, limit: int = 10) -> List[dict]:
"""Get recent commits."""
params = {"limit": limit}
if branch:
params["sha"] = branch
return self._get(f"repos/{owner}/{repo}/commits", params=params)
def get_commit(self, owner: str, repo: str, sha: str) -> dict:
"""Get a specific commit."""
return self._get(f"repos/{owner}/{repo}/git/commits/{sha}")
# =========================================================================
# Branches
# =========================================================================
def get_branches(self, owner: str, repo: str) -> List[dict]:
"""Get all branches."""
return self._get(f"repos/{owner}/{repo}/branches")
def get_branch(self, owner: str, repo: str, branch: str) -> dict:
"""Get a specific branch."""
return self._get(f"repos/{owner}/{repo}/branches/{branch}")
# =========================================================================
# Pull Requests
# =========================================================================
def get_pull_requests(self, owner: str, repo: str, state: str = "open") -> List[dict]:
"""Get pull requests."""
return self._get(f"repos/{owner}/{repo}/pulls", params={"state": state})
def create_pull_request(
self,
owner: str,
repo: str,
title: str,
head: str,
base: str,
body: str = ""
) -> dict:
"""Create a pull request."""
return self._post(f"repos/{owner}/{repo}/pulls", data={
"title": title,
"head": head,
"base": base,
"body": body,
})
# =========================================================================
# Webhooks
# =========================================================================
def get_webhooks(self, owner: str, repo: str) -> List[dict]:
"""Get webhooks for a repository."""
return self._get(f"repos/{owner}/{repo}/hooks")
def create_webhook(
self,
owner: str,
repo: str,
url: str,
events: List[str] = None,
secret: str = ""
) -> dict:
"""
Create a webhook for repository events.
Args:
owner: Repository owner
repo: Repository name
url: Callback URL
events: List of event types (default: push)
Options: 'create', 'delete', 'fork', 'push', 'issues',
'issue_comment', 'pull_request', 'release'
secret: Webhook secret for signature verification
"""
if events is None:
events = ["push"]
return self._post(f"repos/{owner}/{repo}/hooks", data={
"type": "gitea",
"active": True,
"events": events,
"config": {
"url": url,
"content_type": "json",
"secret": secret,
}
})
def load_gitea_config(config: dict) -> Optional[GiteaClient]:
"""
Load Gitea client from configuration.
Expected config structure:
gitea:
base_url: https://git.yourdomain.com
token: xxx
"""
gitea_config = config.get("gitea", {})
base_url = gitea_config.get("base_url")
token = gitea_config.get("token")
if not base_url or not token:
logger.warning("Gitea configuration incomplete (missing base_url or token)")
return None
return GiteaClient(base_url, token)
# =============================================================================
# Git CLI Operations (for local repo management)
# =============================================================================
def git_clone(url: str, path: str, branch: str = None) -> bool:
"""Clone a repository."""
cmd = ["git", "clone"]
if branch:
cmd.extend(["-b", branch])
cmd.extend([url, path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"git clone failed: {result.stderr}")
return False
return True
def git_pull(path: str) -> bool:
"""Pull latest changes."""
result = subprocess.run(
["git", "pull"],
cwd=path,
capture_output=True,
text=True
)
if result.returncode != 0:
logger.error(f"git pull failed: {result.stderr}")
return False
return True
def git_push(path: str, remote: str = "origin", branch: str = None) -> bool:
"""Push changes."""
cmd = ["git", "push", remote]
if branch:
cmd.append(branch)
result = subprocess.run(cmd, cwd=path, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"git push failed: {result.stderr}")
return False
return True
def git_status(path: str) -> dict:
"""Get repository status."""
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=path,
capture_output=True,
text=True
)
modified = []
added = []
deleted = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
status = line[:2]
filename = line[3:]
if 'M' in status:
modified.append(filename)
elif 'A' in status or '?' in status:
added.append(filename)
elif 'D' in status:
deleted.append(filename)
return {
"modified": modified,
"added": added,
"deleted": deleted,
"clean": len(modified) + len(added) + len(deleted) == 0,
}
def git_merge_to_main(path: str, feature_branch: str, delete_after: bool = True) -> tuple[bool, str]:
"""
Merge a feature branch to main and optionally delete it.
Args:
path: Path to the repository
feature_branch: Name of the feature branch (e.g., "issue/CG-30")
delete_after: Whether to delete the feature branch after merge
Returns:
Tuple of (success: bool, message: str)
"""
try:
# Fetch latest from origin
result = subprocess.run(
["git", "fetch", "origin"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to fetch: {result.stderr}"
# Checkout main
result = subprocess.run(
["git", "checkout", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to checkout main: {result.stderr}"
# Pull latest main
result = subprocess.run(
["git", "pull", "origin", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to pull main: {result.stderr}"
# Merge feature branch with no-ff to preserve history
result = subprocess.run(
["git", "merge", "--no-ff", f"origin/{feature_branch}", "-m", f"Merge {feature_branch} to main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
# Abort merge if it failed
subprocess.run(["git", "merge", "--abort"], cwd=path, capture_output=True)
return False, f"Failed to merge: {result.stderr}"
# Push merged main
result = subprocess.run(
["git", "push", "origin", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to push main: {result.stderr}"
logger.info(f"Merged {feature_branch} to main in {path}")
# Delete remote feature branch if requested
if delete_after:
result = subprocess.run(
["git", "push", "origin", "--delete", feature_branch],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
logger.warning(f"Failed to delete remote branch {feature_branch}: {result.stderr}")
else:
logger.info(f"Deleted remote branch {feature_branch}")
# Delete local feature branch
subprocess.run(
["git", "branch", "-d", feature_branch],
cwd=path, capture_output=True, text=True
)
return True, f"Successfully merged {feature_branch} to main"
except Exception as e:
logger.error(f"Error merging {feature_branch}: {e}")
return False, str(e)

198
prompts/librarian.md Normal file
View File

@@ -0,0 +1,198 @@
# ClearGrow Librarian Agent Prompt
You are a ClearGrow Librarian Agent - responsible for **WRITING** product documentation based on completed code changes.
## CRITICAL: Your Job is to WRITE DOCUMENTATION
**DO NOT:**
- Re-verify the code (Verification Agent already did this)
- Just confirm code exists
- Write "verification" style comments
- Simply list what files exist
**DO:**
- Write NEW documentation in `/opt/repos/docs/`
- Update EXISTING documentation to reflect the changes
- Create user guides, API references, or architecture docs as needed
- If truly no docs needed, explain WHY with specific reasoning
---
## Assignment
- **Issue:** #{issue_number}
- **Repository:** {repo}
- **Platform:** {platform}
---
## Original Issue (Requirements)
{issue_body}
---
## Comments (Implementation & Verification Notes)
{instructions}
---
## Your Task: WRITE Documentation
The implementation has been verified. Now **write or update** product documentation.
### Documentation Structure
```
/opt/repos/docs/
├── guides/
│ ├── user/ # End user docs (HOW to use features)
│ └── developer/ # Developer docs (HOW to integrate/extend)
├── reference/
│ ├── architecture/ # System design (WHAT the system is)
│ ├── api/ # REST, CoAP, MQTT specs (WHAT the APIs are)
│ ├── firmware/ # Component specs (WHAT components do)
│ ├── hardware/ # PCB, pinouts
│ ├── ui/ # Screens, styles
│ └── errors/ # Error codes
└── project/
└── decisions/ # ADRs (WHY decisions were made)
```
### Documentation Mapping
| Code Change | Documentation Needed |
|-------------|---------------------|
| New feature | guides/user/ AND reference/ |
| New API endpoint | reference/api/ |
| CoAP/Thread changes | reference/api/, reference/architecture/ |
| New component | reference/firmware/ |
| UI changes | reference/ui/, guides/user/ |
| Configuration options | guides/user/, reference/firmware/ |
| Architecture changes | reference/architecture/, project/decisions/ |
---
## Workflow
### Step 1: Understand What Changed
Read the issue and comments to understand:
- What features/fixes were implemented
- What files were modified
- What user-facing behavior changed
### Step 2: Read the Implementation
```bash
cd /opt/repos/{platform}
# Read the actual code to understand what was implemented
```
### Step 3: Check Existing Documentation
```bash
cd /opt/repos/docs
# Search for related content that may need updates
ls -la guides/ reference/
```
### Step 4: WRITE or UPDATE Documentation
This is your **primary job**. You MUST do one of:
**A) Create new documentation** if the feature is new:
```bash
cd /opt/repos/docs
# Create appropriate doc file
# Write clear, helpful documentation
```
**B) Update existing documentation** if behavior changed:
```bash
cd /opt/repos/docs
# Find and update relevant docs
```
**C) Explain why no docs needed** (rare cases only):
- Pure internal refactor with zero user/developer impact
- Bug fix that doesn't change documented behavior
- Test-only changes
### Step 5: Commit Changes
```bash
cd /opt/repos/docs
git add -A
git commit -m "docs: update for #{issue_number}"
```
**Do NOT push** - the orchestrator handles that.
### Step 6: Comment on Issue
Use the `youtrack-comment-librarian` command:
```bash
youtrack-comment-librarian {issue_id} "## Documentation Update
**Issue:** #{issue_number}
**Date:** $(date +%Y-%m-%d)
### Documentation Changes
**Files Created:**
- docs/reference/firmware/probe-discovery.md - New reference doc for probe discovery API
**Files Updated:**
- docs/guides/user/pairing-probes.md - Added section on active discovery
### Summary
[What documentation was written and why it helps users/developers]
### Notes for Reviewer
[Any areas that may need human review or future expansion]"
```
If no documentation was needed:
```bash
youtrack-comment-librarian {issue_id} "## Documentation Review
**Issue:** #{issue_number}
**Date:** $(date +%Y-%m-%d)
### No Documentation Updates Required
**Reason:** [Specific explanation - e.g., 'Internal refactor of X that does not change any public API or user-facing behavior']
**Verified:** Checked guides/user/, reference/firmware/, reference/api/ - no existing docs reference this internal implementation."
```
### Step 7: Exit
Exit cleanly when complete.
---
## Rules
- You are running **headless** - no human interaction
- **DO NOT re-verify code** - Verification Agent already did that
- **DO write documentation** - that's your entire purpose
- Focus **ONLY on documentation** - do not modify source code in {platform} repo
- Work in `/opt/repos/docs` for all doc changes
- **DO NOT modify CLAUDE.md files** - those are AI configuration, not product docs
### CRITICAL: DO NOT RUN BUILDS
- **NEVER** run `idf.py build`, `west build`, `make`, or any build commands
- CI/CD handles all builds - just write documentation
### What Makes Good Documentation
- **User guides**: Step-by-step instructions with examples
- **Reference docs**: Complete API/component specifications
- **Architecture docs**: System design with diagrams (Mermaid)
- Keep it concise but complete
- Include code examples where helpful

260
prompts/remediation.md Normal file
View File

@@ -0,0 +1,260 @@
# ClearGrow Remediation Agent Prompt
You are a ClearGrow Remediation Agent - an expert embedded systems developer working autonomously on the ClearGrow agricultural monitoring system.
## Assignment
- **Issue:** #{issue_number}
- **Repository:** {repo}
- **Platform:** {platform}
---
## Original Issue
{issue_body}
---
## Comments (THESE TAKE PRECEDENCE)
{instructions}
---
## IMPORTANT: Comment Precedence
The comments above may **change, narrow, or redirect** the original issue.
**Always follow the most recent comments** even if they conflict with the original issue body.
Examples of comment instructions:
- "Skip part A, only do B"
- "Changed approach - use X instead of Y"
- "Only implement the header parsing for now"
- "Ignore acceptance criteria #3, we'll handle that separately"
Read ALL comments chronologically. The **last relevant comment** defines your current task.
---
## Project Structure
| Component | Location | Technology |
|-----------|----------|------------|
| Controller | /opt/repos/controller | ESP32-S3, ESP-IDF v5.2+, LVGL 8.x |
| Probe | /opt/repos/probe | nRF52840, Zephyr/nRF Connect SDK |
| Docs | /opt/repos/docs | Specifications and design docs |
---
## Platform-Specific Guidelines
### ESP32-S3 Controller (ESP-IDF v5.2+)
**Memory Constraints**:
- Internal SRAM: ~512KB (reserve 32KB for critical ops)
- PSRAM: 8MB (use `heap_caps_malloc(size, MALLOC_CAP_SPIRAM)` for buffers >4KB)
- Stack per task: 4KB min, 8KB for WiFi/BT callbacks
- NVS namespace: 15 char max
**Critical Patterns**:
```c
// Entry point - scheduler already running
void app_main(void) {{ }} // Never call vTaskStartScheduler()
// Error handling
ESP_ERROR_CHECK(fatal_function()); // Aborts on failure
ESP_RETURN_ON_ERROR(ret, TAG, "msg"); // Recoverable
// Task creation - pin to core
xTaskCreatePinnedToCore(func, "name", 8192, param, prio, &handle, core);
// Timing - never raw ticks
vTaskDelay(pdMS_TO_TICKS(100));
```
### LVGL UI (Version 8.x - NOT 9.x!)
**CRITICAL**: We use LVGL 8.x. APIs differ significantly from 9.x!
**Display**: 800x480 RGB LCD, capacitive touch (GT911)
| Metric | Value |
|--------|-------|
| Screen size | 800x480 pixels |
| Safe content area | 704x384 (48px margins) |
| Min touch target | 132x132px |
| Component spacing | 24px |
```c
// Touch targets - minimum 132px
lv_obj_set_ext_click_area(btn, 36); // Expand hit area
// Timer handler - ONLY from UI task on Core 1
lv_timer_handler(); // Never from multiple tasks
```
### nRF52840 Probe (Zephyr/nRF Connect SDK)
**Target**: nRF52840 (1MB flash, 256KB RAM)
**Networking**: OpenThread MTD-SED (Sleepy End Device)
```c
// Logging
LOG_MODULE_REGISTER(module_name, CONFIG_LOG_DEFAULT_LEVEL);
LOG_INF("msg"); LOG_WRN("msg"); LOG_ERR("msg");
// Kernel primitives
k_sleep(K_MSEC(100));
K_SEM_DEFINE(my_sem, 0, 1);
```
**POWER MANAGEMENT WARNING**: nRF52840 does NOT support Zephyr's generic `CONFIG_PM`!
```c
// Use Nordic HAL for deep sleep
#include <hal/nrf_power.h>
nrf_power_system_off(NRF_POWER); // <1µA, does NOT return
// Do NOT use: CONFIG_PM=y, pm_state_force(), pm_device_action_run()
```
### Code Style (C11)
- **Naming**: `snake_case` functions/variables, `SCREAMING_SNAKE` constants
- **Headers**: Include guards (`#ifndef`), not `#pragma once`
- **Logging**: `ESP_LOGI(TAG, "msg")` for ESP32, `LOG_INF("msg")` for Zephyr
- **Errors**: Check and handle ALL return values
---
## Workflow
### Step 1: Create Feature Branch
**CRITICAL**: Before making ANY changes, create a feature branch for this issue:
```bash
cd /opt/repos/{platform}
git fetch origin
git checkout -b issue/{issue_id} origin/main
```
All work MUST be done on this feature branch (`issue/{issue_id}`), NOT on main.
### Step 2: Understand the Current Task
1. Read the original issue body
2. Read ALL comments in order
3. Determine what you should actually implement based on the latest direction
### Step 3: Analyze the Code
```bash
cd /opt/repos/{platform}
cat [path/to/file]
```
### Step 4: Implement the Fix
- Make minimal, focused changes following the current instructions
- Use platform-appropriate patterns (ESP-IDF for controller, Zephyr for probe)
- Use LVGL 8.x APIs (NOT 9.x)
- Include proper error handling
### Step 5: Commit and Push to Feature Branch
```bash
cd /opt/repos/{platform}
git add -A
git commit -m "fix: {issue_id} [brief description]"
git push -u origin issue/{issue_id}
```
**IMPORTANT**:
- Push to your feature branch `issue/{issue_id}`, NOT to main
- The CI system will automatically build your branch
- Do NOT merge to main - that happens after review is complete
### Step 6: Report Progress
Use the `youtrack-comment-developer` command to add a comment to the issue:
```bash
youtrack-comment-developer {issue_id} "## Agent Progress
**Date:** $(date +%Y-%m-%d %H:%M)
**Status:** Fix applied
### Instructions Followed
[Which comments/instructions guided this work]
### Changes Made
- File: \`[path]\`
- Summary: [what was changed]
\`\`\`c
// Key snippet
\`\`\`
### Notes
[Any observations, blockers, or follow-up needed]"
```
### Step 7: Exit
Exit with the appropriate code:
- **Exit 0**: Work completed successfully - changes were made and committed
- **Exit 2**: Work already complete - no changes needed (code already implements the requirements)
- **Exit 1**: Failed - could not complete the task
```bash
# If you made changes and committed:
exit 0
# If you found the work was already done (no changes needed):
exit 2
# If you encountered an error or blocker:
exit 1
```
**IMPORTANT**: If you find that the required implementation already exists in the codebase:
1. Verify the existing code meets all acceptance criteria
2. Add a comment using this format (note the distinct header):
```bash
youtrack-comment-developer {issue_id} "## Remediation Skipped - Already Implemented
**Date:** $(date +%Y-%m-%d %H:%M)
**Status:** No changes needed
### Verification
The required implementation already exists in the codebase:
- [List files that already contain the implementation]
- [Confirm each acceptance criterion is met]
### Notes
[Any observations about the existing implementation]"
```
3. Exit with code 2 (this skips verification and goes directly to Review)
---
## Rules
- You are running **headless** - no human interaction
- **Comments OVERRIDE the original issue body**
- If blocked, comment explaining why and exit
- Keep changes **minimal and focused** on this single issue
- Use **LVGL 8.x** APIs (NOT 9.x)
- Follow **ESP-IDF** patterns for controller, **Zephyr** patterns for probe
- Always include proper **error handling**
### CRITICAL: DO NOT RUN BUILDS
- **NEVER** run `idf.py build`, `west build`, `make`, or any build commands
- **NEVER** run platformio, cmake, or compilation commands
- CI/CD handles all builds - parallel agents running builds will conflict and hang
- Just make the code changes and commit - the build will be verified by CI

126
prompts/verification.md Normal file
View File

@@ -0,0 +1,126 @@
# Verification Agent Prompt Template
You are a ClearGrow Verification Agent - responsible for verifying that remediation work was implemented correctly.
## Assignment
- **Issue:** {issue_id}
- **Repository:** {repo}
- **Platform:** {platform}
---
## Issue Details
{issue_body}
---
## Developer Comments
{instructions}
---
## Your Task: Verification
Verify the implementation meets the requirements:
### 1. Review the Original Issue
- What were the acceptance criteria?
- What specific changes were required?
### 2. Review the Developer's Comments
- What did the developer agent claim to do?
- What files were modified?
### 3. Examine the Actual Code
**CRITICAL:** You MUST read the actual file contents, not just check that files exist.
```bash
cd /opt/repos/{platform}
# Read each file mentioned in the developer's comments
# Example: cat components/common/src/system_status.c
```
For each changed file:
- Read the implementation code
- Verify it matches what the developer claimed
- Check the logic is correct
- Look for edge cases and error handling
### 4. Verify Implementation
For each acceptance criterion:
- [ ] Requirement met? (Y/N)
- [ ] Implementation correct?
- [ ] Edge cases handled?
---
## Verification Outcome
After verification, you MUST comment on the issue with your findings.
### If Verification PASSES
Use the `youtrack-comment-qa` command to add your findings:
```bash
youtrack-comment-qa {issue_id} "## Verification Passed ✓
**Issue:** {issue_id}
**Date:** $(date +%Y-%m-%d)
### Requirements Check
- [x] [Requirement 1] - Implemented correctly
- [x] [Requirement 2] - Implemented correctly
### Code Review
- Files reviewed: [list files you actually read]
- Key implementation details: [summarize what the code does]
- Implementation quality: [assessment]
### Notes
[Any observations for the librarian agent]"
```
Then exit successfully (exit code 0).
### If Verification FAILS
Use `youtrack-comment-qa` to add a comment explaining:
- Which requirements were NOT met
- What is wrong or missing
- Specific code locations with issues
```bash
youtrack-comment-qa {issue_id} "## Verification Failed ✗
**Issue:** {issue_id}
**Date:** $(date +%Y-%m-%d)
### Requirements NOT Met
- [ ] [Requirement] - [What is wrong or missing]
### Issues Found
[Specific code locations with issues]
### Required Changes
[What needs to be fixed]"
```
Then exit with error (exit code 1) to send the issue back for triage.
---
## Rules
- You are a **REVIEWER**, not an implementer - do NOT make code changes
- Be thorough but fair in your assessment
- If requirements are ambiguous, pass with notes
- If implementation is close but not perfect, pass with suggestions
**CRITICAL: DO NOT MAKE CHANGES**
- You are verifying, not fixing
- If something is wrong, fail the verification

12
requirements.txt Normal file
View File

@@ -0,0 +1,12 @@
# ClearGrow Agent Runner (YouTrack + Gitea Edition)
# Python 3.10+
# HTTP client
requests>=2.31.0
# Configuration
PyYAML>=6.0.1
# Optional: Webhook server
# flask>=3.0.0
# gunicorn>=21.0.0

1221
runner.py Normal file

File diff suppressed because it is too large Load Diff

299
teamcity_client.py Normal file
View File

@@ -0,0 +1,299 @@
"""
TeamCity API client for build status monitoring.
Provides functionality to:
- Query build status for feature branches
- Retrieve build logs for error reporting
- Check for running/queued builds
"""
import requests
import logging
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class BuildInfo:
"""Information about a TeamCity build."""
build_id: int
branch: str
status: str # "SUCCESS", "FAILURE", "RUNNING", "QUEUED", "UNKNOWN"
status_text: Optional[str]
commit: str
build_type: str
web_url: str
class TeamCityClient:
"""
Client for TeamCity REST API.
Supports context manager protocol for proper resource cleanup.
"""
def __init__(self, base_url: str, token: str):
"""
Initialize TeamCity client.
Args:
base_url: TeamCity server URL (e.g., https://ci.cleargrow.io)
token: API token for authentication
"""
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Accept': 'application/json'
})
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closing the session."""
self.close()
return False
def close(self):
"""Close the HTTP session and release resources."""
if self.session:
self.session.close()
logger.debug("TeamCity session closed")
def get_builds_for_branch(
self,
build_type: str,
branch: str,
count: int = 1
) -> list[BuildInfo]:
"""
Get recent builds for a specific branch.
Args:
build_type: TeamCity build configuration ID (e.g., "Controller_Build")
branch: Branch name (e.g., "issue/CG-34")
count: Maximum number of builds to return
Returns:
List of BuildInfo objects, most recent first
"""
url = f"{self.base_url}/app/rest/builds"
params = {
'locator': f'buildType:{build_type},branch:{branch},count:{count}',
'fields': 'build(id,branchName,status,statusText,revisions(revision(version)),webUrl)'
}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for b in resp.json().get('build', []):
revision = b.get('revisions', {}).get('revision', [{}])[0]
builds.append(BuildInfo(
build_id=b['id'],
branch=b.get('branchName', branch),
status=b.get('status', 'UNKNOWN'),
status_text=b.get('statusText'),
commit=revision.get('version', ''),
build_type=build_type,
web_url=b.get('webUrl', '')
))
return builds
except requests.exceptions.Timeout:
logger.error(f"Timeout getting builds for {branch}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get builds for {branch}: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error getting builds for {branch}: {e}")
return []
def get_build_by_id(self, build_id: int) -> Optional[BuildInfo]:
"""
Get build information by ID.
Args:
build_id: TeamCity build ID
Returns:
BuildInfo or None if not found
"""
url = f"{self.base_url}/app/rest/builds/id:{build_id}"
params = {
'fields': 'id,branchName,status,statusText,revisions(revision(version)),webUrl,buildType(id)'
}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
b = resp.json()
revision = b.get('revisions', {}).get('revision', [{}])[0]
return BuildInfo(
build_id=b['id'],
branch=b.get('branchName', ''),
status=b.get('status', 'UNKNOWN'),
status_text=b.get('statusText'),
commit=revision.get('version', ''),
build_type=b.get('buildType', {}).get('id', ''),
web_url=b.get('webUrl', '')
)
except Exception as e:
logger.error(f"Failed to get build {build_id}: {e}")
return None
def get_build_log_excerpt(self, build_id: int, lines: int = 100) -> str:
"""
Get last N lines of build log (for error reporting).
Args:
build_id: TeamCity build ID
lines: Number of lines from end to return
Returns:
Build log excerpt as string
"""
url = f"{self.base_url}/app/rest/builds/id:{build_id}/log"
try:
# Build log endpoint returns plain text, override the default JSON Accept header
resp = self.session.get(
url,
timeout=60,
headers={'Accept': 'text/plain'}
)
resp.raise_for_status()
log_lines = resp.text.split('\n')
return '\n'.join(log_lines[-lines:])
except requests.exceptions.Timeout:
logger.error(f"Timeout getting build log for {build_id}")
return "(Build log retrieval timed out)"
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get build log for {build_id}: {e}")
return f"(Failed to retrieve build log: {e})"
def get_running_builds(self, build_type: str = None) -> list[BuildInfo]:
"""
Get currently running builds.
Args:
build_type: Optional filter by build configuration ID
Returns:
List of BuildInfo for running builds
"""
url = f"{self.base_url}/app/rest/builds"
locator = 'running:true'
if build_type:
locator = f'buildType:{build_type},running:true'
params = {
'locator': locator,
'fields': 'build(id,branchName,status,statusText,revisions(revision(version)),webUrl,buildType(id))'
}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for b in resp.json().get('build', []):
revision = b.get('revisions', {}).get('revision', [{}])[0]
builds.append(BuildInfo(
build_id=b['id'],
branch=b.get('branchName', ''),
status='RUNNING',
status_text=b.get('statusText'),
commit=revision.get('version', ''),
build_type=b.get('buildType', {}).get('id', ''),
web_url=b.get('webUrl', '')
))
return builds
except Exception as e:
logger.error(f"Failed to get running builds: {e}")
return []
def get_queued_builds(self, build_type: str = None) -> list[dict]:
"""
Get builds in queue.
Args:
build_type: Optional filter by build configuration ID
Returns:
List of queued build info dicts
"""
url = f"{self.base_url}/app/rest/buildQueue"
params = {'fields': 'build(id,branchName,buildType(id))'}
if build_type:
params['locator'] = f'buildType:{build_type}'
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get('build', [])
except Exception as e:
logger.error(f"Failed to get queued builds: {e}")
return []
def trigger_build(self, build_type: str, branch: str) -> Optional[int]:
"""
Trigger a build for a specific branch.
Args:
build_type: TeamCity build configuration ID (e.g., "Controller_Build")
branch: Branch name (e.g., "issue/CG-30")
Returns:
Build ID if triggered successfully, None otherwise
"""
url = f"{self.base_url}/app/rest/buildQueue"
# XML payload for build trigger
payload = f'''<build branchName="{branch}">
<buildType id="{build_type}"/>
</build>'''
try:
resp = self.session.post(
url,
data=payload,
headers={'Content-Type': 'application/xml'},
timeout=30
)
resp.raise_for_status()
result = resp.json()
build_id = result.get('id')
logger.info(f"Triggered build {build_id} for {branch} on {build_type}")
return build_id
except requests.exceptions.Timeout:
logger.error(f"Timeout triggering build for {branch}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Failed to trigger build for {branch}: {e}")
return None
def test_connection(self) -> bool:
"""
Test connection to TeamCity server.
Returns:
True if connection successful
"""
url = f"{self.base_url}/app/rest/server"
try:
resp = self.session.get(url, timeout=10)
resp.raise_for_status()
server_info = resp.json()
logger.info(f"Connected to TeamCity {server_info.get('version', 'unknown')}")
return True
except Exception as e:
logger.error(f"Failed to connect to TeamCity: {e}")
return False

352
webhook_server.py Normal file
View File

@@ -0,0 +1,352 @@
#!/usr/bin/env python3
"""
Webhook server for instant issue detection.
Receives webhooks from YouTrack when issues change state,
eliminating polling latency for faster agent response.
Usage:
# Standalone
python webhook_server.py --config config.yaml
# Or import and use with runner
from webhook_server import WebhookServer
"""
import argparse
import hashlib
import hmac
import json
import logging
import threading
from dataclasses import dataclass
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Callable, Optional
from urllib.parse import parse_qs, urlparse
import yaml
logger = logging.getLogger(__name__)
@dataclass
class WebhookEvent:
"""Parsed webhook event from YouTrack."""
event_type: str # "issue_updated", "issue_created", etc.
issue_id: str # "CG-123"
project: str # "CG"
old_state: Optional[str] # Previous state (for state changes)
new_state: Optional[str] # New state (for state changes)
raw_payload: dict # Full webhook payload
class WebhookHandler(BaseHTTPRequestHandler):
"""HTTP request handler for YouTrack webhooks."""
# Class-level references set by WebhookServer
secret: Optional[str] = None
on_event: Optional[Callable[[WebhookEvent], None]] = None
def log_message(self, format, *args):
"""Override to use our logger."""
logger.debug(f"Webhook HTTP: {format % args}")
def do_POST(self):
"""Handle POST requests (webhooks)."""
# Parse path
parsed = urlparse(self.path)
if parsed.path != "/webhook/youtrack":
self.send_error(404, "Not Found")
return
# Read body
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_error(400, "Empty body")
return
body = self.rfile.read(content_length)
# Verify signature if secret configured
if self.secret:
signature = self.headers.get("X-Hub-Signature-256", "")
if not self._verify_signature(body, signature):
logger.warning("Webhook signature verification failed")
self.send_error(403, "Invalid signature")
return
# Parse JSON
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in webhook: {e}")
self.send_error(400, "Invalid JSON")
return
# Parse event
event = self._parse_event(payload)
if not event:
logger.debug("Ignoring unrecognized webhook event")
self.send_response(200)
self.end_headers()
return
logger.info(f"Webhook received: {event.event_type} for {event.issue_id}")
# Dispatch event
if self.on_event:
try:
# Run in thread to not block webhook response
threading.Thread(
target=self.on_event,
args=(event,),
daemon=True
).start()
except Exception as e:
logger.error(f"Error dispatching webhook event: {e}")
# Always respond 200 quickly
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
def do_GET(self):
"""Handle GET requests (health check)."""
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "healthy"}')
else:
self.send_error(404, "Not Found")
def _verify_signature(self, body: bytes, signature: str) -> bool:
"""Verify HMAC-SHA256 signature."""
if not signature.startswith("sha256="):
return False
expected = hmac.new(
self.secret.encode("utf-8"),
body,
hashlib.sha256
).hexdigest()
provided = signature[7:] # Remove "sha256=" prefix
return hmac.compare_digest(expected, provided)
def _parse_event(self, payload: dict) -> Optional[WebhookEvent]:
"""Parse YouTrack webhook payload into WebhookEvent."""
# YouTrack webhook format varies by configuration
# This handles the common "issue updated" format
# Try to extract issue info
issue = payload.get("issue", {})
if not issue:
# Some webhooks put issue at top level
if "idReadable" in payload:
issue = payload
else:
return None
issue_id = issue.get("idReadable", "")
if not issue_id:
return None
# Extract project from issue ID
project = issue_id.split("-")[0] if "-" in issue_id else ""
# Determine event type and state changes
event_type = payload.get("type", "issue_updated")
# Look for state change in field changes
old_state = None
new_state = None
# YouTrack sends field changes in different formats
changes = payload.get("fieldChanges", [])
for change in changes:
if change.get("name") == "State":
old_state = change.get("oldValue", {}).get("name")
new_state = change.get("newValue", {}).get("name")
break
# Also check issue fields for current state
if not new_state:
fields = issue.get("customFields", [])
for field in fields:
if field.get("name") == "State":
value = field.get("value", {})
new_state = value.get("name") if isinstance(value, dict) else value
break
return WebhookEvent(
event_type=event_type,
issue_id=issue_id,
project=project,
old_state=old_state,
new_state=new_state,
raw_payload=payload,
)
class WebhookServer:
"""
HTTP server for receiving YouTrack webhooks.
Usage:
server = WebhookServer(
host="0.0.0.0",
port=8765,
secret="your-secret",
on_event=handle_event
)
server.start() # Non-blocking
# ... later ...
server.stop()
"""
# Timeout for thread join during shutdown
SHUTDOWN_TIMEOUT = 10 # seconds
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
secret: Optional[str] = None,
on_event: Optional[Callable[[WebhookEvent], None]] = None,
):
self.host = host
self.port = port
self.secret = secret
self.on_event = on_event
self._server: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
def start(self):
"""Start the webhook server in a background thread."""
# Reset shutdown event
self._shutdown_event.clear()
# Configure handler class
WebhookHandler.secret = self.secret
WebhookHandler.on_event = self.on_event
# Create server
self._server = HTTPServer((self.host, self.port), WebhookHandler)
# Start in thread (non-daemon so it can be properly joined)
self._thread = threading.Thread(
target=self._serve_loop,
name="WebhookServer",
daemon=False
)
self._thread.start()
logger.info(f"Webhook server started on {self.host}:{self.port}")
def _serve_loop(self):
"""Server loop that checks for shutdown signal."""
if self._server:
self._server.serve_forever()
def stop(self):
"""Stop the webhook server gracefully."""
logger.debug("Initiating webhook server shutdown...")
# Signal shutdown
self._shutdown_event.set()
# Shutdown the HTTP server (this will cause serve_forever to return)
if self._server:
try:
self._server.shutdown()
except OSError as e:
logger.warning(f"Error shutting down HTTP server: {e}")
finally:
try:
self._server.server_close()
except OSError as e:
logger.warning(f"Error closing server socket: {e}")
self._server = None
# Wait for thread to finish
if self._thread:
self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)
if self._thread.is_alive():
logger.warning(f"Webhook server thread did not terminate within {self.SHUTDOWN_TIMEOUT}s")
else:
logger.debug("Webhook server thread terminated cleanly")
self._thread = None
logger.info("Webhook server stopped")
@property
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
def load_webhook_config(config: dict) -> Optional[WebhookServer]:
"""Create WebhookServer from config dict."""
webhook_config = config.get("webhook", {})
if not webhook_config.get("enabled", False):
return None
return WebhookServer(
host=webhook_config.get("host", "0.0.0.0"),
port=webhook_config.get("port", 8765),
secret=webhook_config.get("secret"),
)
def main():
"""Standalone webhook server for testing."""
parser = argparse.ArgumentParser(description="YouTrack Webhook Server")
parser.add_argument("-c", "--config", default="config.yaml", help="Config file")
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
parser.add_argument("--port", type=int, default=8765, help="Bind port")
parser.add_argument("--secret", help="Webhook secret")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
def handle_event(event: WebhookEvent):
print(f"\n{'='*60}")
print(f"Event: {event.event_type}")
print(f"Issue: {event.issue_id}")
print(f"Project: {event.project}")
print(f"State: {event.old_state}{event.new_state}")
print(f"{'='*60}\n")
server = WebhookServer(
host=args.host,
port=args.port,
secret=args.secret,
on_event=handle_event,
)
print(f"Starting webhook server on {args.host}:{args.port}")
print("Waiting for webhooks... (Ctrl+C to stop)")
print(f"\nConfigure YouTrack webhook URL:")
print(f" http://YOUR_SERVER:{args.port}/webhook/youtrack\n")
server.start()
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
server.stop()
if __name__ == "__main__":
main()

411
woodpecker_client.py Normal file
View File

@@ -0,0 +1,411 @@
"""
Woodpecker CI API client for build status monitoring.
API Reference: https://woodpecker-ci.org/api
Based on: https://github.com/woodpecker-ci/woodpecker/blob/main/server/api/
Key endpoints:
- GET /api/repos/lookup/{owner}/{repo} - Get repo by full name
- GET /api/repos/{repo_id}/pipelines - List pipelines
- GET /api/repos/{repo_id}/pipelines/{number} - Get pipeline details
- GET /api/repos/{repo_id}/pipelines/latest - Get latest pipeline
- POST /api/repos/{repo_id}/pipelines - Trigger manual pipeline
- POST /api/repos/{repo_id}/pipelines/{number}/cancel - Cancel pipeline
- GET /api/repos/{repo_id}/logs/{number}/{step_id} - Get step logs
- GET /api/user - Get current user (for connection test)
"""
import base64
import requests
import logging
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class BuildInfo:
"""Information about a Woodpecker CI pipeline."""
build_id: int # Pipeline number
branch: str
status: str # "SUCCESS", "FAILURE", "RUNNING", "PENDING", "UNKNOWN"
status_text: Optional[str]
commit: str
build_type: str # Repository full name (e.g., "cleargrow/controller")
web_url: str
class WoodpeckerClient:
"""
Client for Woodpecker CI REST API.
Requires an API token generated from the Woodpecker UI.
"""
# Map Woodpecker pipeline status to normalized status
STATUS_MAP = {
"success": "SUCCESS",
"failure": "FAILURE",
"error": "FAILURE",
"killed": "FAILURE",
"running": "RUNNING",
"pending": "PENDING",
"blocked": "PENDING",
"declined": "FAILURE",
"skipped": "SUCCESS",
}
def __init__(self, base_url: str, token: str):
"""
Initialize Woodpecker CI client.
Args:
base_url: Woodpecker server URL (e.g., https://ci.cleargrow.io)
token: API token for authentication
"""
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-Type': 'application/json',
})
# Cache for repo name -> repo ID mapping
self._repo_cache: dict[str, int] = {}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def close(self):
"""Close the HTTP session."""
if self.session:
self.session.close()
logger.debug("Woodpecker session closed")
def _normalize_status(self, status: str) -> str:
"""Convert Woodpecker status to normalized status."""
return self.STATUS_MAP.get(status.lower(), "UNKNOWN")
def _get_repo_id(self, repo_name: str) -> Optional[int]:
"""
Get Woodpecker repo ID from repo full name.
Args:
repo_name: Repository full name (e.g., "cleargrow/controller")
Returns:
Repo ID or None if not found
"""
if repo_name in self._repo_cache:
return self._repo_cache[repo_name]
# API: GET /api/repos/lookup/{owner}/{repo}
url = f"{self.base_url}/api/repos/lookup/{repo_name}"
try:
resp = self.session.get(url, timeout=30)
if resp.status_code == 404:
logger.warning(f"Repository not found: {repo_name}")
return None
resp.raise_for_status()
repo_data = resp.json()
repo_id = repo_data.get('id')
if repo_id:
self._repo_cache[repo_name] = repo_id
return repo_id
except requests.exceptions.RequestException as e:
logger.error(f"Failed to lookup repo {repo_name}: {e}")
return None
def _build_web_url(self, repo_id: int, pipeline_number: int) -> str:
"""Build the web URL for a pipeline."""
return f"{self.base_url}/repos/{repo_id}/pipeline/{pipeline_number}"
def get_builds_for_branch(
self,
build_type: str,
branch: str,
count: int = 1
) -> list[BuildInfo]:
"""
Get recent pipelines for a specific branch.
Args:
build_type: Repository full name (e.g., "cleargrow/controller")
branch: Branch name (e.g., "issue/CG-34")
count: Maximum number of builds to return
Returns:
List of BuildInfo objects, most recent first
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
# API: GET /api/repos/{repo_id}/pipelines
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': count * 5} # Fetch extra to filter by branch
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
if p.get('branch') != branch:
continue
builds.append(BuildInfo(
build_id=p['number'],
branch=p.get('branch', branch),
status=self._normalize_status(p.get('status', 'unknown')),
status_text=p.get('message', p.get('status')),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
))
if len(builds) >= count:
break
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipelines for {branch}: {e}")
return []
def get_build_by_id(self, build_type: str, build_id: int) -> Optional[BuildInfo]:
"""
Get pipeline by number.
Args:
build_type: Repository full name
build_id: Pipeline number
Returns:
BuildInfo or None if not found
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return None
# API: GET /api/repos/{repo_id}/pipelines/{number}
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
try:
resp = self.session.get(url, timeout=30)
if resp.status_code == 404:
return None
resp.raise_for_status()
p = resp.json()
return BuildInfo(
build_id=p['number'],
branch=p.get('branch', ''),
status=self._normalize_status(p.get('status', 'unknown')),
status_text=p.get('message', p.get('status')),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipeline {build_id}: {e}")
return None
def get_build_log_excerpt(
self,
build_type: str,
build_id: int,
lines: int = 100
) -> str:
"""
Get last N lines of build log.
Args:
build_type: Repository full name
build_id: Pipeline number
lines: Number of lines from end to return
Returns:
Build log excerpt as string
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return "(Repository not found)"
# First get pipeline to find step IDs
# API: GET /api/repos/{repo_id}/pipelines/{number}
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
try:
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
pipeline = resp.json()
all_logs = []
for workflow in pipeline.get('workflows', []):
for step in workflow.get('children', []):
step_id = step.get('id')
if not step_id:
continue
# API: GET /api/repos/{repo_id}/logs/{number}/{step_id}
log_url = f"{self.base_url}/api/repos/{repo_id}/logs/{build_id}/{step_id}"
try:
log_resp = self.session.get(log_url, timeout=60)
if log_resp.status_code == 200:
for entry in log_resp.json():
if isinstance(entry, dict):
data = entry.get('data')
if data:
try:
decoded = base64.b64decode(data).decode('utf-8', errors='replace')
all_logs.append(decoded)
except Exception:
all_logs.append(str(data))
except requests.exceptions.RequestException:
continue
if all_logs:
return '\n'.join(all_logs[-lines:])
return "(No logs available)"
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get build log for pipeline {build_id}: {e}")
return f"(Failed to retrieve build log: {e})"
def get_running_builds(self, build_type: str = None) -> list[BuildInfo]:
"""
Get currently running pipelines.
Args:
build_type: Optional repository full name to filter by
Returns:
List of BuildInfo for running pipelines
"""
if not build_type:
return []
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
# Get recent pipelines and filter for running
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': 20}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
status = p.get('status', '').lower()
if status != 'running':
continue
builds.append(BuildInfo(
build_id=p['number'],
branch=p.get('branch', ''),
status='RUNNING',
status_text=p.get('message'),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
))
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get running pipelines: {e}")
return []
def get_queued_builds(self, build_type: str = None) -> list[dict]:
"""
Get pending/blocked pipelines.
Args:
build_type: Optional repository full name to filter by
Returns:
List of queued build info dicts
"""
if not build_type:
return []
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': 20}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
queued = []
for p in resp.json():
status = p.get('status', '').lower()
if status not in ('pending', 'blocked'):
continue
queued.append({
'id': p.get('number'),
'branchName': p.get('branch'),
'buildType': build_type,
})
return queued
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get queued pipelines: {e}")
return []
def cancel_build(self, build_type: str, build_id: int) -> bool:
"""
Cancel a running pipeline.
Args:
build_type: Repository full name
build_id: Pipeline number
Returns:
True if cancelled successfully
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return False
# API: POST /api/repos/{repo_id}/pipelines/{number}/cancel
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}/cancel"
try:
resp = self.session.post(url, timeout=30)
resp.raise_for_status()
logger.info(f"Cancelled pipeline {build_id} for {build_type}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to cancel pipeline {build_id}: {e}")
return False
def test_connection(self) -> bool:
"""
Test connection to Woodpecker CI server.
Returns:
True if connection successful
"""
# API: GET /api/user - returns current user info
url = f"{self.base_url}/api/user"
try:
resp = self.session.get(url, timeout=10)
resp.raise_for_status()
user_info = resp.json()
logger.info(f"Connected to Woodpecker CI as: {user_info.get('login', 'unknown')}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to connect to Woodpecker CI: {e}")
return False

53
youtrack-comment Executable file
View File

@@ -0,0 +1,53 @@
#!/bin/bash
# YouTrack comment helper for Claude agents
# Usage: youtrack-comment <issue-id> <comment-text>
# e.g.: youtrack-comment CG-48 "## Agent Progress\n\nWork completed."
set -e
ISSUE_ID="$1"
COMMENT_TEXT="$2"
if [ -z "$ISSUE_ID" ] || [ -z "$COMMENT_TEXT" ]; then
echo "Usage: youtrack-comment <issue-id> <comment-text>" >&2
echo "Example: youtrack-comment CG-48 'Work completed successfully'" >&2
exit 1
fi
# Load config from yaml
CONFIG_FILE="/opt/agent_runner/config.yaml"
if [ ! -f "$CONFIG_FILE" ]; then
echo "Error: Config file not found: $CONFIG_FILE" >&2
exit 1
fi
# Extract YouTrack settings (simple grep/sed approach)
YOUTRACK_URL=$(grep -A2 "^youtrack:" "$CONFIG_FILE" | grep "base_url:" | sed 's/.*base_url: *//' | tr -d ' ')
YOUTRACK_TOKEN=$(grep -A3 "^youtrack:" "$CONFIG_FILE" | grep "token:" | sed 's/.*token: *//' | tr -d ' ')
if [ -z "$YOUTRACK_URL" ] || [ -z "$YOUTRACK_TOKEN" ]; then
echo "Error: YouTrack URL or token not configured in $CONFIG_FILE" >&2
exit 1
fi
# Post comment via YouTrack API
# Note: Comment text needs to be JSON-escaped
JSON_PAYLOAD=$(jq -n --arg text "$COMMENT_TEXT" '{"text": $text}')
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "Authorization: Bearer $YOUTRACK_TOKEN" \
-H "Content-Type: application/json" \
-d "$JSON_PAYLOAD" \
"${YOUTRACK_URL}/api/issues/${ISSUE_ID}/comments")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" -ge 200 ] && [ "$HTTP_CODE" -lt 300 ]; then
echo "Comment added to $ISSUE_ID"
else
echo "Error adding comment to $ISSUE_ID: HTTP $HTTP_CODE" >&2
echo "$BODY" >&2
exit 1
fi

35
youtrack-comment-build Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash
# YouTrack comment helper for Build (CI) agent
# Usage: youtrack-comment-build <issue-id> <comment-text>
set -e
ISSUE_ID="$1"
COMMENT_TEXT="$2"
if [ -z "$ISSUE_ID" ] || [ -z "$COMMENT_TEXT" ]; then
echo "Usage: youtrack-comment-build <issue-id> <comment-text>" >&2
exit 1
fi
YOUTRACK_URL="https://track.cleargrow.io"
YOUTRACK_TOKEN="perm:YnVpbGQ=.NDQtNg==.NF4e5zB0s0wnjxRFJJvill0w0f6YxH"
JSON_PAYLOAD=$(jq -n --arg text "$COMMENT_TEXT" '{"text": $text}')
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "Authorization: Bearer $YOUTRACK_TOKEN" \
-H "Content-Type: application/json" \
-d "$JSON_PAYLOAD" \
"${YOUTRACK_URL}/api/issues/${ISSUE_ID}/comments")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" -ge 200 ] && [ "$HTTP_CODE" -lt 300 ]; then
echo "Comment added to $ISSUE_ID (as Build Agent)"
else
echo "Error adding comment to $ISSUE_ID: HTTP $HTTP_CODE" >&2
echo "$BODY" >&2
exit 1
fi

35
youtrack-comment-developer Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash
# YouTrack comment helper for Developer (remediation) agent
# Usage: youtrack-comment-developer <issue-id> <comment-text>
set -e
ISSUE_ID="$1"
COMMENT_TEXT="$2"
if [ -z "$ISSUE_ID" ] || [ -z "$COMMENT_TEXT" ]; then
echo "Usage: youtrack-comment-developer <issue-id> <comment-text>" >&2
exit 1
fi
YOUTRACK_URL="https://track.cleargrow.io"
YOUTRACK_TOKEN="perm:ZGV2ZWxvcGVy.NDQtMQ==.ys1fbsDAmF2SnnE4lrkPZ6cYKeFJ7x"
JSON_PAYLOAD=$(jq -n --arg text "$COMMENT_TEXT" '{"text": $text}')
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "Authorization: Bearer $YOUTRACK_TOKEN" \
-H "Content-Type: application/json" \
-d "$JSON_PAYLOAD" \
"${YOUTRACK_URL}/api/issues/${ISSUE_ID}/comments")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" -ge 200 ] && [ "$HTTP_CODE" -lt 300 ]; then
echo "Comment added to $ISSUE_ID (as Developer Agent)"
else
echo "Error adding comment to $ISSUE_ID: HTTP $HTTP_CODE" >&2
echo "$BODY" >&2
exit 1
fi

35
youtrack-comment-librarian Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash
# YouTrack comment helper for Librarian (documentation) agent
# Usage: youtrack-comment-librarian <issue-id> <comment-text>
set -e
ISSUE_ID="$1"
COMMENT_TEXT="$2"
if [ -z "$ISSUE_ID" ] || [ -z "$COMMENT_TEXT" ]; then
echo "Usage: youtrack-comment-librarian <issue-id> <comment-text>" >&2
exit 1
fi
YOUTRACK_URL="https://track.cleargrow.io"
YOUTRACK_TOKEN="perm:bGlicmFyaWFu.NDQtMw==.NWUhRcAvfABYNV1GX10Tv7nmj36fj4"
JSON_PAYLOAD=$(jq -n --arg text "$COMMENT_TEXT" '{"text": $text}')
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "Authorization: Bearer $YOUTRACK_TOKEN" \
-H "Content-Type: application/json" \
-d "$JSON_PAYLOAD" \
"${YOUTRACK_URL}/api/issues/${ISSUE_ID}/comments")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" -ge 200 ] && [ "$HTTP_CODE" -lt 300 ]; then
echo "Comment added to $ISSUE_ID (as Librarian Agent)"
else
echo "Error adding comment to $ISSUE_ID: HTTP $HTTP_CODE" >&2
echo "$BODY" >&2
exit 1
fi

35
youtrack-comment-qa Executable file
View File

@@ -0,0 +1,35 @@
#!/bin/bash
# YouTrack comment helper for QA (verification) agent
# Usage: youtrack-comment-qa <issue-id> <comment-text>
set -e
ISSUE_ID="$1"
COMMENT_TEXT="$2"
if [ -z "$ISSUE_ID" ] || [ -z "$COMMENT_TEXT" ]; then
echo "Usage: youtrack-comment-qa <issue-id> <comment-text>" >&2
exit 1
fi
YOUTRACK_URL="https://track.cleargrow.io"
YOUTRACK_TOKEN="perm:cWE=.NDQtMg==.qUjgH4cBgoVmDV1uKLSxgChy7AyVN2"
JSON_PAYLOAD=$(jq -n --arg text "$COMMENT_TEXT" '{"text": $text}')
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST \
-H "Authorization: Bearer $YOUTRACK_TOKEN" \
-H "Content-Type: application/json" \
-d "$JSON_PAYLOAD" \
"${YOUTRACK_URL}/api/issues/${ISSUE_ID}/comments")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" -ge 200 ] && [ "$HTTP_CODE" -lt 300 ]; then
echo "Comment added to $ISSUE_ID (as QA Agent)"
else
echo "Error adding comment to $ISSUE_ID: HTTP $HTTP_CODE" >&2
echo "$BODY" >&2
exit 1
fi

468
youtrack_client.py Normal file
View File

@@ -0,0 +1,468 @@
"""
YouTrack REST API interface.
YouTrack has no documented rate limits for self-hosted instances,
making it ideal for automated agent systems.
API Documentation: https://www.jetbrains.com/help/youtrack/server/api-reference.html
"""
import logging
import requests
from dataclasses import dataclass
from typing import Optional, List
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class YouTrackError(Exception):
"""Base exception for YouTrack API errors."""
pass
class YouTrackAuthError(YouTrackError):
"""Authentication failed."""
pass
@dataclass
class YouTrackIssue:
"""Represents a YouTrack issue."""
id: str # e.g., "CG-123"
id_readable: str # Same as id for display
summary: str
description: str
project_id: str # e.g., "CG"
state: str # e.g., "Ready", "In Progress"
priority: str = "Normal" # e.g., "Critical", "Major", "Normal", "Minor"
custom_fields: dict = None # Additional fields
reporter: str = ""
created: int = 0 # Unix timestamp ms
updated: int = 0
def __post_init__(self):
if self.custom_fields is None:
self.custom_fields = {}
@property
def issue_number(self) -> int:
"""Extract numeric part of issue ID (e.g., 'CG-123' -> 123)."""
parts = self.id.split('-')
return int(parts[-1]) if parts else 0
@property
def priority_order(self) -> int:
"""Return numeric priority for sorting (lower = higher priority)."""
priority_map = {
"Show-stopper": 0,
"Critical": 1,
"Major": 2,
"Normal": 3,
"Minor": 4,
}
return priority_map.get(self.priority, 3)
@dataclass
class YouTrackComment:
"""Represents a comment on an issue."""
id: str
text: str
author: str
created: int # Unix timestamp ms
@dataclass
class BoardConfig:
"""Agile board configuration for status tracking."""
board_id: str
board_name: str
states: dict[str, str] # name -> state_id mapping
class YouTrackClient:
"""
YouTrack REST API client.
Uses permanent token authentication.
Supports context manager protocol for proper resource cleanup.
"""
def __init__(self, base_url: str, token: str):
"""
Initialize YouTrack client.
Args:
base_url: YouTrack instance URL (e.g., https://track.yourdomain.com)
token: Permanent token from YouTrack (Profile -> Account Security -> Tokens)
"""
self.base_url = base_url.rstrip('/')
self.api_url = f"{self.base_url}/api"
self.token = token
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closing the session."""
self.close()
return False
def close(self):
"""Close the HTTP session and release resources."""
if self.session:
self.session.close()
logger.debug("YouTrack session closed")
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an API request."""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
logger.debug(f"YouTrack API: {method} {url}")
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise YouTrackAuthError("Invalid or expired token")
if response.status_code == 403:
raise YouTrackAuthError(f"Permission denied: {response.text}")
if not response.ok:
logger.error(f"YouTrack API error: {response.status_code} - {response.text[:500]}")
response.raise_for_status()
return response
def _get(self, endpoint: str, params: dict = None) -> dict:
"""GET request returning JSON."""
response = self._request("GET", endpoint, params=params)
return response.json() if response.text else {}
def _post(self, endpoint: str, data: dict = None) -> dict:
"""POST request returning JSON."""
response = self._request("POST", endpoint, json=data)
return response.json() if response.text else {}
# =========================================================================
# Connection Test
# =========================================================================
def test_connection(self) -> dict:
"""Test connection and return server info."""
try:
# Get current user to verify auth
user = self._get("users/me", params={"fields": "login,name,email"})
logger.info(f"Connected to YouTrack as: {user.get('login', 'unknown')}")
return {"status": "ok", "user": user}
except YouTrackAuthError as e:
logger.error(f"Connection test failed - authentication error: {e}")
return {"status": "error", "message": str(e)}
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection test failed - connection error: {e}")
return {"status": "error", "message": f"Connection error: {e}"}
except requests.exceptions.Timeout as e:
logger.error(f"Connection test failed - timeout: {e}")
return {"status": "error", "message": f"Timeout: {e}"}
except requests.exceptions.RequestException as e:
logger.error(f"Connection test failed - request error: {e}")
return {"status": "error", "message": str(e)}
# =========================================================================
# Projects
# =========================================================================
def get_projects(self) -> List[dict]:
"""Get all accessible projects."""
return self._get("admin/projects", params={
"fields": "id,name,shortName,description"
})
def get_project(self, project_id: str) -> dict:
"""Get a specific project by short name (e.g., 'CG')."""
return self._get(f"admin/projects/{project_id}", params={
"fields": "id,name,shortName,description,customFields(field(name),bundle(values(name)))"
})
# =========================================================================
# Issues
# =========================================================================
def get_issue(self, issue_id: str) -> YouTrackIssue:
"""
Get a single issue by ID.
Args:
issue_id: Issue ID like 'CG-123'
"""
data = self._get(f"issues/{issue_id}", params={
"fields": "id,idReadable,summary,description,project(id,shortName),"
"reporter(login),created,updated,customFields(name,value(name))"
})
return self._parse_issue(data)
def get_issues_by_query(self, query: str, limit: int = 100) -> List[YouTrackIssue]:
"""
Search issues using YouTrack query language.
Example queries:
- "project: CG State: Ready"
- "project: CG #Unresolved"
- "project: CG State: {In Progress}"
- "project: CG updated: Today"
Args:
query: YouTrack search query
limit: Maximum results (default 100)
"""
data = self._get("issues", params={
"query": query,
"$top": limit,
"fields": "id,idReadable,summary,description,project(id,shortName),"
"reporter(login),created,updated,customFields(name,value(name))"
})
return [self._parse_issue(item) for item in data]
def get_issues_by_state(self, project: str, state: str, sort_by_priority: bool = True) -> List[YouTrackIssue]:
"""
Get issues in a specific state, sorted by priority.
Args:
project: Project short name (e.g., 'CG')
state: State name (e.g., 'Ready', 'In Progress')
sort_by_priority: If True, return highest priority first
Returns:
List of issues sorted by priority (Critical > Major > Normal > Minor)
"""
# Escape state names with spaces
state_query = f"{{{state}}}" if " " in state else state
# Sort by Priority in YouTrack query (asc = Show-stopper first)
query = f"project: {project} State: {state_query} sort by: Priority asc"
issues = self.get_issues_by_query(query)
# Also sort in Python as backup (in case YouTrack sorting differs)
if sort_by_priority:
issues.sort(key=lambda i: i.priority_order)
return issues
def update_issue_state(self, issue_id: str, state: str) -> bool:
"""
Update an issue's state.
Args:
issue_id: Issue ID like 'CG-123'
state: New state name (e.g., 'In Progress', 'Done')
"""
try:
# YouTrack custom field update format
self._post(f"issues/{issue_id}", data={
"customFields": [
{
"name": "State",
"$type": "StateIssueCustomField",
"value": {"name": state}
}
]
})
logger.info(f"Updated {issue_id} state to '{state}'")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to update {issue_id} state - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to update {issue_id} state - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to update {issue_id} state - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update {issue_id} state: {e}")
return False
def update_issue_field(self, issue_id: str, field_name: str, value: str) -> bool:
"""
Update a custom field on an issue.
Args:
issue_id: Issue ID like 'CG-123'
field_name: Custom field name
value: New value
"""
try:
self._post(f"issues/{issue_id}", data={
"customFields": [
{
"name": field_name,
"value": {"name": value}
}
]
})
logger.info(f"Updated {issue_id} {field_name} to '{value}'")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to update {issue_id} {field_name} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to update {issue_id} {field_name} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to update {issue_id} {field_name} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update {issue_id} {field_name}: {e}")
return False
def _parse_issue(self, data: dict) -> YouTrackIssue:
"""Parse issue data into YouTrackIssue object."""
# Extract state and priority from custom fields
state = ""
priority = "Normal"
custom_fields = {}
for field in data.get("customFields", []):
name = field.get("name", "")
value = field.get("value")
if isinstance(value, dict):
value = value.get("name", "")
elif isinstance(value, list):
value = ", ".join(v.get("name", str(v)) for v in value)
custom_fields[name] = value
if name.lower() == "state":
state = value or ""
elif name.lower() == "priority":
priority = value or "Normal"
project = data.get("project", {})
return YouTrackIssue(
id=data.get("idReadable", data.get("id", "")),
id_readable=data.get("idReadable", ""),
summary=data.get("summary", ""),
description=data.get("description", ""),
project_id=project.get("shortName", project.get("id", "")),
state=state,
priority=priority,
custom_fields=custom_fields,
reporter=data.get("reporter", {}).get("login", ""),
created=data.get("created", 0),
updated=data.get("updated", 0),
)
# =========================================================================
# Comments
# =========================================================================
def get_issue_comments(self, issue_id: str) -> List[YouTrackComment]:
"""Get all comments on an issue."""
data = self._get(f"issues/{issue_id}/comments", params={
"fields": "id,text,author(login),created"
})
return [
YouTrackComment(
id=c.get("id", ""),
text=c.get("text", ""),
author=c.get("author", {}).get("login", "unknown"),
created=c.get("created", 0),
)
for c in data
]
def add_issue_comment(self, issue_id: str, text: str) -> bool:
"""
Add a comment to an issue.
Args:
issue_id: Issue ID like 'CG-123'
text: Comment text (Markdown supported)
"""
try:
self._post(f"issues/{issue_id}/comments", data={"text": text})
logger.info(f"Added comment to {issue_id}")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to add comment to {issue_id} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to add comment to {issue_id} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to add comment to {issue_id} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to add comment to {issue_id}: {e}")
return False
# =========================================================================
# Agile Boards (Kanban)
# =========================================================================
def get_agile_boards(self) -> List[dict]:
"""Get all agile boards."""
return self._get("agiles", params={
"fields": "id,name,projects(shortName)"
})
def get_board_sprints(self, board_id: str) -> List[dict]:
"""Get sprints for an agile board."""
return self._get(f"agiles/{board_id}/sprints", params={
"fields": "id,name,start,finish,isDefault"
})
# =========================================================================
# Webhooks
# =========================================================================
def get_webhooks(self) -> List[dict]:
"""Get all configured webhooks."""
return self._get("admin/globalSettings/webhooks", params={
"fields": "id,name,url,enabled,events"
})
def create_webhook(self, name: str, url: str, events: List[str] = None) -> dict:
"""
Create a webhook for issue events.
Args:
name: Webhook name
url: Callback URL
events: List of event types (default: issue changes)
Options: 'IssueCreated', 'IssueChanged', 'IssueCommentCreated'
"""
if events is None:
events = ["IssueChanged"]
return self._post("admin/globalSettings/webhooks", data={
"name": name,
"url": url,
"enabled": True,
"events": events,
})
def load_youtrack_config(config: dict) -> Optional[YouTrackClient]:
"""
Load YouTrack client from configuration.
Expected config structure:
youtrack:
base_url: https://track.yourdomain.com
token: perm:xxx
"""
yt_config = config.get("youtrack", {})
base_url = yt_config.get("base_url")
token = yt_config.get("token")
if not base_url or not token:
logger.warning("YouTrack configuration incomplete (missing base_url or token)")
return None
return YouTrackClient(base_url, token)