Files
agentrunner/woodpecker_client.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

556 lines
19 KiB
Python

"""
Woodpecker CI API client for build status monitoring.
API Reference: https://woodpecker-ci.org/api
Based on: https://github.com/woodpecker-ci/woodpecker/blob/main/server/api/
Key endpoints:
- GET /api/repos/lookup/{owner}/{repo} - Get repo by full name
- GET /api/repos/{repo_id}/pipelines - List pipelines
- GET /api/repos/{repo_id}/pipelines/{number} - Get pipeline details
- GET /api/repos/{repo_id}/pipelines/latest - Get latest pipeline
- POST /api/repos/{repo_id}/pipelines - Trigger manual pipeline
- POST /api/repos/{repo_id}/pipelines/{number}/cancel - Cancel pipeline
- GET /api/repos/{repo_id}/logs/{number}/{step_id} - Get step logs
- GET /api/user - Get current user (for connection test)
"""
import base64
import requests
import logging
from typing import Optional
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class BuildInfo:
"""Information about a Woodpecker CI pipeline."""
build_id: int # Pipeline number
branch: str
status: str # "SUCCESS", "FAILURE", "RUNNING", "PENDING", "UNKNOWN"
status_text: Optional[str]
commit: str
build_type: str # Repository full name (e.g., "cleargrow/controller")
web_url: str
class WoodpeckerClient:
"""
Client for Woodpecker CI REST API.
Requires an API token generated from the Woodpecker UI.
"""
# Map Woodpecker pipeline status to normalized status
STATUS_MAP = {
"success": "SUCCESS",
"failure": "FAILURE",
"error": "FAILURE",
"killed": "FAILURE",
"running": "RUNNING",
"pending": "PENDING",
"blocked": "PENDING",
"declined": "FAILURE",
"skipped": "SUCCESS",
}
def __init__(self, base_url: str, token: str):
"""
Initialize Woodpecker CI client.
Args:
base_url: Woodpecker server URL (e.g., https://ci.cleargrow.io)
token: API token for authentication
"""
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-Type': 'application/json',
})
# Cache for repo name -> repo ID mapping
self._repo_cache: dict[str, int] = {}
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False
def close(self):
"""Close the HTTP session."""
if self.session:
self.session.close()
logger.debug("Woodpecker session closed")
def _normalize_status(self, status: str) -> str:
"""Convert Woodpecker status to normalized status."""
return self.STATUS_MAP.get(status.lower(), "UNKNOWN")
def _get_repo_id(self, repo_name: str) -> Optional[int]:
"""
Get Woodpecker repo ID from repo full name.
Args:
repo_name: Repository full name (e.g., "cleargrow/controller")
Returns:
Repo ID or None if not found
"""
if repo_name in self._repo_cache:
return self._repo_cache[repo_name]
# API: GET /api/repos/lookup/{owner}/{repo}
url = f"{self.base_url}/api/repos/lookup/{repo_name}"
try:
resp = self.session.get(url, timeout=30)
if resp.status_code == 404:
logger.warning(f"Repository not found: {repo_name}")
return None
resp.raise_for_status()
repo_data = resp.json()
repo_id = repo_data.get('id')
if repo_id:
self._repo_cache[repo_name] = repo_id
return repo_id
except requests.exceptions.RequestException as e:
logger.error(f"Failed to lookup repo {repo_name}: {e}")
return None
def _build_web_url(self, repo_id: int, pipeline_number: int) -> str:
"""Build the web URL for a pipeline."""
return f"{self.base_url}/repos/{repo_id}/pipeline/{pipeline_number}"
def get_builds_for_branch(
self,
build_type: str,
branch: str,
count: int = 1
) -> list[BuildInfo]:
"""
Get recent pipelines for a specific branch.
Args:
build_type: Repository full name (e.g., "cleargrow/controller")
branch: Branch name (e.g., "issue/CG-34")
count: Maximum number of builds to return
Returns:
List of BuildInfo objects, most recent first
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
# API: GET /api/repos/{repo_id}/pipelines
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': count * 5} # Fetch extra to filter by branch
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
if p.get('branch') != branch:
continue
builds.append(BuildInfo(
build_id=p['number'],
branch=p.get('branch', branch),
status=self._normalize_status(p.get('status', 'unknown')),
status_text=p.get('message', p.get('status')),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
))
if len(builds) >= count:
break
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipelines for {branch}: {e}")
return []
def get_build_by_id(self, build_type: str, build_id: int) -> Optional[BuildInfo]:
"""
Get pipeline by number.
Args:
build_type: Repository full name
build_id: Pipeline number
Returns:
BuildInfo or None if not found
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return None
# API: GET /api/repos/{repo_id}/pipelines/{number}
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
try:
resp = self.session.get(url, timeout=30)
if resp.status_code == 404:
return None
resp.raise_for_status()
p = resp.json()
return BuildInfo(
build_id=p['number'],
branch=p.get('branch', ''),
status=self._normalize_status(p.get('status', 'unknown')),
status_text=p.get('message', p.get('status')),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
)
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipeline {build_id}: {e}")
return None
def get_build_log_excerpt(
self,
build_type: str,
build_id: int,
lines: int = 100
) -> str:
"""
Get last N lines of build log.
Args:
build_type: Repository full name
build_id: Pipeline number
lines: Number of lines from end to return
Returns:
Build log excerpt as string
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return "(Repository not found)"
# First get pipeline to find step IDs
# API: GET /api/repos/{repo_id}/pipelines/{number}
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
try:
resp = self.session.get(url, timeout=30)
resp.raise_for_status()
pipeline = resp.json()
all_logs = []
for workflow in pipeline.get('workflows', []):
for step in workflow.get('children', []):
step_id = step.get('id')
if not step_id:
continue
# API: GET /api/repos/{repo_id}/logs/{number}/{step_id}
log_url = f"{self.base_url}/api/repos/{repo_id}/logs/{build_id}/{step_id}"
try:
log_resp = self.session.get(log_url, timeout=60)
if log_resp.status_code == 200:
for entry in log_resp.json():
if isinstance(entry, dict):
data = entry.get('data')
if data:
try:
decoded = base64.b64decode(data).decode('utf-8', errors='replace')
all_logs.append(decoded)
except Exception:
all_logs.append(str(data))
except requests.exceptions.RequestException:
continue
if all_logs:
return '\n'.join(all_logs[-lines:])
return "(No logs available)"
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get build log for pipeline {build_id}: {e}")
return f"(Failed to retrieve build log: {e})"
def get_running_builds(self, build_type: str = None) -> list[BuildInfo]:
"""
Get currently running pipelines.
Args:
build_type: Optional repository full name to filter by
Returns:
List of BuildInfo for running pipelines
"""
if not build_type:
return []
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
# Get recent pipelines and filter for running
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': 20}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
status = p.get('status', '').lower()
if status != 'running':
continue
builds.append(BuildInfo(
build_id=p['number'],
branch=p.get('branch', ''),
status='RUNNING',
status_text=p.get('message'),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
))
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get running pipelines: {e}")
return []
def get_queued_builds(self, build_type: str = None) -> list[dict]:
"""
Get pending/blocked pipelines.
Args:
build_type: Optional repository full name to filter by
Returns:
List of queued build info dicts
"""
if not build_type:
return []
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': 20}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
queued = []
for p in resp.json():
status = p.get('status', '').lower()
if status not in ('pending', 'blocked'):
continue
queued.append({
'id': p.get('number'),
'branchName': p.get('branch'),
'buildType': build_type,
})
return queued
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get queued pipelines: {e}")
return []
def cancel_build(self, build_type: str, build_id: int) -> bool:
"""
Cancel a running pipeline.
Args:
build_type: Repository full name
build_id: Pipeline number
Returns:
True if cancelled successfully
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return False
# API: POST /api/repos/{repo_id}/pipelines/{number}/cancel
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}/cancel"
try:
resp = self.session.post(url, timeout=30)
resp.raise_for_status()
logger.info(f"Cancelled pipeline {build_id} for {build_type}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to cancel pipeline {build_id}: {e}")
return False
def test_connection(self) -> bool:
"""
Test connection to Woodpecker CI server.
Returns:
True if connection successful
"""
# API: GET /api/user - returns current user info
url = f"{self.base_url}/api/user"
try:
resp = self.session.get(url, timeout=10)
resp.raise_for_status()
user_info = resp.json()
logger.info(f"Connected to Woodpecker CI as: {user_info.get('login', 'unknown')}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Failed to connect to Woodpecker CI: {e}")
return False
def get_pipelines(
self,
build_type: str,
limit: int = 50,
branch: Optional[str] = None,
status: Optional[str] = None,
) -> list[BuildInfo]:
"""
Get ALL pipelines for a repository (not just running).
Args:
build_type: Repository full name (e.g., "cleargrow/controller")
limit: Maximum number of pipelines to return
branch: Optional branch name to filter by
status: Optional status to filter by (success, failure, running, pending)
Returns:
List of BuildInfo objects, most recent first
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
# API: GET /api/repos/{repo_id}/pipelines
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': limit}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
# Filter by branch if specified
if branch and p.get('branch') != branch:
continue
# Filter by status if specified
normalized_status = self._normalize_status(p.get('status', 'unknown'))
if status:
if status.upper() != normalized_status:
continue
builds.append(BuildInfo(
build_id=p['number'],
branch=p.get('branch', ''),
status=normalized_status,
status_text=p.get('message', p.get('status')),
commit=p.get('commit', ''),
build_type=build_type,
web_url=self._build_web_url(repo_id, p['number']),
))
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipelines for {build_type}: {e}")
return []
def get_pipelines_extended(
self,
build_type: str,
limit: int = 50,
) -> list[dict]:
"""
Get pipelines with extended information (including timing data).
Args:
build_type: Repository full name (e.g., "cleargrow/controller")
limit: Maximum number of pipelines to return
Returns:
List of pipeline dicts with full data
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return []
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
params = {'per_page': limit}
try:
resp = self.session.get(url, params=params, timeout=30)
resp.raise_for_status()
builds = []
for p in resp.json():
builds.append({
'id': p.get('number'),
'number': p.get('number'),
'repo': build_type.split('/')[-1] if '/' in build_type else build_type,
'repo_full': build_type,
'status': self._normalize_status(p.get('status', 'unknown')).lower(),
'event': p.get('event', 'push'),
'branch': p.get('branch', ''),
'message': p.get('message', ''),
'author': p.get('author', ''),
'author_avatar': p.get('author_avatar', ''),
'commit': p.get('commit', ''),
'started': p.get('started'),
'finished': p.get('finished'),
'created': p.get('created'),
'web_url': self._build_web_url(repo_id, p.get('number', 0)),
})
return builds
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get pipelines for {build_type}: {e}")
return []
def retry_pipeline(self, build_type: str, pipeline_number: int) -> dict:
"""
Restart/retry a pipeline by creating a new one.
Args:
build_type: Repository full name
pipeline_number: Pipeline number to restart
Returns:
Dict with success status and new pipeline info
"""
repo_id = self._get_repo_id(build_type)
if not repo_id:
return {"success": False, "message": f"Repository not found: {build_type}"}
# Woodpecker uses POST to the pipeline endpoint to restart it
# API: POST /api/repos/{repo_id}/pipelines/{number}
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{pipeline_number}"
try:
resp = self.session.post(url, timeout=30)
resp.raise_for_status()
new_pipeline = resp.json()
logger.info(f"Restarted pipeline {pipeline_number} -> {new_pipeline.get('number')} for {build_type}")
return {
"success": True,
"build_id": new_pipeline.get('number'),
"message": f"Pipeline restarted as #{new_pipeline.get('number')}",
"web_url": self._build_web_url(repo_id, new_pipeline.get('number', 0)),
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to retry pipeline {pipeline_number}: {e}")
return {"success": False, "message": str(e)}