Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
556 lines
19 KiB
Python
556 lines
19 KiB
Python
"""
|
|
Woodpecker CI API client for build status monitoring.
|
|
|
|
API Reference: https://woodpecker-ci.org/api
|
|
Based on: https://github.com/woodpecker-ci/woodpecker/blob/main/server/api/
|
|
|
|
Key endpoints:
|
|
- GET /api/repos/lookup/{owner}/{repo} - Get repo by full name
|
|
- GET /api/repos/{repo_id}/pipelines - List pipelines
|
|
- GET /api/repos/{repo_id}/pipelines/{number} - Get pipeline details
|
|
- GET /api/repos/{repo_id}/pipelines/latest - Get latest pipeline
|
|
- POST /api/repos/{repo_id}/pipelines - Trigger manual pipeline
|
|
- POST /api/repos/{repo_id}/pipelines/{number}/cancel - Cancel pipeline
|
|
- GET /api/repos/{repo_id}/logs/{number}/{step_id} - Get step logs
|
|
- GET /api/user - Get current user (for connection test)
|
|
"""
|
|
|
|
import base64
|
|
import requests
|
|
import logging
|
|
from typing import Optional
|
|
from dataclasses import dataclass
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class BuildInfo:
|
|
"""Information about a Woodpecker CI pipeline."""
|
|
build_id: int # Pipeline number
|
|
branch: str
|
|
status: str # "SUCCESS", "FAILURE", "RUNNING", "PENDING", "UNKNOWN"
|
|
status_text: Optional[str]
|
|
commit: str
|
|
build_type: str # Repository full name (e.g., "cleargrow/controller")
|
|
web_url: str
|
|
|
|
|
|
class WoodpeckerClient:
|
|
"""
|
|
Client for Woodpecker CI REST API.
|
|
|
|
Requires an API token generated from the Woodpecker UI.
|
|
"""
|
|
|
|
# Map Woodpecker pipeline status to normalized status
|
|
STATUS_MAP = {
|
|
"success": "SUCCESS",
|
|
"failure": "FAILURE",
|
|
"error": "FAILURE",
|
|
"killed": "FAILURE",
|
|
"running": "RUNNING",
|
|
"pending": "PENDING",
|
|
"blocked": "PENDING",
|
|
"declined": "FAILURE",
|
|
"skipped": "SUCCESS",
|
|
}
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
"""
|
|
Initialize Woodpecker CI client.
|
|
|
|
Args:
|
|
base_url: Woodpecker server URL (e.g., https://ci.cleargrow.io)
|
|
token: API token for authentication
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'Authorization': f'Bearer {token}',
|
|
'Accept': 'application/json',
|
|
'Content-Type': 'application/json',
|
|
})
|
|
# Cache for repo name -> repo ID mapping
|
|
self._repo_cache: dict[str, int] = {}
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close the HTTP session."""
|
|
if self.session:
|
|
self.session.close()
|
|
logger.debug("Woodpecker session closed")
|
|
|
|
def _normalize_status(self, status: str) -> str:
|
|
"""Convert Woodpecker status to normalized status."""
|
|
return self.STATUS_MAP.get(status.lower(), "UNKNOWN")
|
|
|
|
def _get_repo_id(self, repo_name: str) -> Optional[int]:
|
|
"""
|
|
Get Woodpecker repo ID from repo full name.
|
|
|
|
Args:
|
|
repo_name: Repository full name (e.g., "cleargrow/controller")
|
|
|
|
Returns:
|
|
Repo ID or None if not found
|
|
"""
|
|
if repo_name in self._repo_cache:
|
|
return self._repo_cache[repo_name]
|
|
|
|
# API: GET /api/repos/lookup/{owner}/{repo}
|
|
url = f"{self.base_url}/api/repos/lookup/{repo_name}"
|
|
try:
|
|
resp = self.session.get(url, timeout=30)
|
|
if resp.status_code == 404:
|
|
logger.warning(f"Repository not found: {repo_name}")
|
|
return None
|
|
resp.raise_for_status()
|
|
repo_data = resp.json()
|
|
repo_id = repo_data.get('id')
|
|
if repo_id:
|
|
self._repo_cache[repo_name] = repo_id
|
|
return repo_id
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to lookup repo {repo_name}: {e}")
|
|
return None
|
|
|
|
def _build_web_url(self, repo_id: int, pipeline_number: int) -> str:
|
|
"""Build the web URL for a pipeline."""
|
|
return f"{self.base_url}/repos/{repo_id}/pipeline/{pipeline_number}"
|
|
|
|
def get_builds_for_branch(
|
|
self,
|
|
build_type: str,
|
|
branch: str,
|
|
count: int = 1
|
|
) -> list[BuildInfo]:
|
|
"""
|
|
Get recent pipelines for a specific branch.
|
|
|
|
Args:
|
|
build_type: Repository full name (e.g., "cleargrow/controller")
|
|
branch: Branch name (e.g., "issue/CG-34")
|
|
count: Maximum number of builds to return
|
|
|
|
Returns:
|
|
List of BuildInfo objects, most recent first
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return []
|
|
|
|
# API: GET /api/repos/{repo_id}/pipelines
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
|
|
params = {'per_page': count * 5} # Fetch extra to filter by branch
|
|
|
|
try:
|
|
resp = self.session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
builds = []
|
|
for p in resp.json():
|
|
if p.get('branch') != branch:
|
|
continue
|
|
|
|
builds.append(BuildInfo(
|
|
build_id=p['number'],
|
|
branch=p.get('branch', branch),
|
|
status=self._normalize_status(p.get('status', 'unknown')),
|
|
status_text=p.get('message', p.get('status')),
|
|
commit=p.get('commit', ''),
|
|
build_type=build_type,
|
|
web_url=self._build_web_url(repo_id, p['number']),
|
|
))
|
|
|
|
if len(builds) >= count:
|
|
break
|
|
|
|
return builds
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get pipelines for {branch}: {e}")
|
|
return []
|
|
|
|
def get_build_by_id(self, build_type: str, build_id: int) -> Optional[BuildInfo]:
|
|
"""
|
|
Get pipeline by number.
|
|
|
|
Args:
|
|
build_type: Repository full name
|
|
build_id: Pipeline number
|
|
|
|
Returns:
|
|
BuildInfo or None if not found
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return None
|
|
|
|
# API: GET /api/repos/{repo_id}/pipelines/{number}
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
|
|
|
|
try:
|
|
resp = self.session.get(url, timeout=30)
|
|
if resp.status_code == 404:
|
|
return None
|
|
resp.raise_for_status()
|
|
|
|
p = resp.json()
|
|
return BuildInfo(
|
|
build_id=p['number'],
|
|
branch=p.get('branch', ''),
|
|
status=self._normalize_status(p.get('status', 'unknown')),
|
|
status_text=p.get('message', p.get('status')),
|
|
commit=p.get('commit', ''),
|
|
build_type=build_type,
|
|
web_url=self._build_web_url(repo_id, p['number']),
|
|
)
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get pipeline {build_id}: {e}")
|
|
return None
|
|
|
|
def get_build_log_excerpt(
|
|
self,
|
|
build_type: str,
|
|
build_id: int,
|
|
lines: int = 100
|
|
) -> str:
|
|
"""
|
|
Get last N lines of build log.
|
|
|
|
Args:
|
|
build_type: Repository full name
|
|
build_id: Pipeline number
|
|
lines: Number of lines from end to return
|
|
|
|
Returns:
|
|
Build log excerpt as string
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return "(Repository not found)"
|
|
|
|
# First get pipeline to find step IDs
|
|
# API: GET /api/repos/{repo_id}/pipelines/{number}
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}"
|
|
try:
|
|
resp = self.session.get(url, timeout=30)
|
|
resp.raise_for_status()
|
|
pipeline = resp.json()
|
|
|
|
all_logs = []
|
|
for workflow in pipeline.get('workflows', []):
|
|
for step in workflow.get('children', []):
|
|
step_id = step.get('id')
|
|
if not step_id:
|
|
continue
|
|
|
|
# API: GET /api/repos/{repo_id}/logs/{number}/{step_id}
|
|
log_url = f"{self.base_url}/api/repos/{repo_id}/logs/{build_id}/{step_id}"
|
|
try:
|
|
log_resp = self.session.get(log_url, timeout=60)
|
|
if log_resp.status_code == 200:
|
|
for entry in log_resp.json():
|
|
if isinstance(entry, dict):
|
|
data = entry.get('data')
|
|
if data:
|
|
try:
|
|
decoded = base64.b64decode(data).decode('utf-8', errors='replace')
|
|
all_logs.append(decoded)
|
|
except Exception:
|
|
all_logs.append(str(data))
|
|
except requests.exceptions.RequestException:
|
|
continue
|
|
|
|
if all_logs:
|
|
return '\n'.join(all_logs[-lines:])
|
|
return "(No logs available)"
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get build log for pipeline {build_id}: {e}")
|
|
return f"(Failed to retrieve build log: {e})"
|
|
|
|
def get_running_builds(self, build_type: str = None) -> list[BuildInfo]:
|
|
"""
|
|
Get currently running pipelines.
|
|
|
|
Args:
|
|
build_type: Optional repository full name to filter by
|
|
|
|
Returns:
|
|
List of BuildInfo for running pipelines
|
|
"""
|
|
if not build_type:
|
|
return []
|
|
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return []
|
|
|
|
# Get recent pipelines and filter for running
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
|
|
params = {'per_page': 20}
|
|
|
|
try:
|
|
resp = self.session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
builds = []
|
|
for p in resp.json():
|
|
status = p.get('status', '').lower()
|
|
if status != 'running':
|
|
continue
|
|
|
|
builds.append(BuildInfo(
|
|
build_id=p['number'],
|
|
branch=p.get('branch', ''),
|
|
status='RUNNING',
|
|
status_text=p.get('message'),
|
|
commit=p.get('commit', ''),
|
|
build_type=build_type,
|
|
web_url=self._build_web_url(repo_id, p['number']),
|
|
))
|
|
|
|
return builds
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get running pipelines: {e}")
|
|
return []
|
|
|
|
def get_queued_builds(self, build_type: str = None) -> list[dict]:
|
|
"""
|
|
Get pending/blocked pipelines.
|
|
|
|
Args:
|
|
build_type: Optional repository full name to filter by
|
|
|
|
Returns:
|
|
List of queued build info dicts
|
|
"""
|
|
if not build_type:
|
|
return []
|
|
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return []
|
|
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
|
|
params = {'per_page': 20}
|
|
|
|
try:
|
|
resp = self.session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
queued = []
|
|
for p in resp.json():
|
|
status = p.get('status', '').lower()
|
|
if status not in ('pending', 'blocked'):
|
|
continue
|
|
|
|
queued.append({
|
|
'id': p.get('number'),
|
|
'branchName': p.get('branch'),
|
|
'buildType': build_type,
|
|
})
|
|
|
|
return queued
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get queued pipelines: {e}")
|
|
return []
|
|
|
|
def cancel_build(self, build_type: str, build_id: int) -> bool:
|
|
"""
|
|
Cancel a running pipeline.
|
|
|
|
Args:
|
|
build_type: Repository full name
|
|
build_id: Pipeline number
|
|
|
|
Returns:
|
|
True if cancelled successfully
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return False
|
|
|
|
# API: POST /api/repos/{repo_id}/pipelines/{number}/cancel
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{build_id}/cancel"
|
|
|
|
try:
|
|
resp = self.session.post(url, timeout=30)
|
|
resp.raise_for_status()
|
|
logger.info(f"Cancelled pipeline {build_id} for {build_type}")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to cancel pipeline {build_id}: {e}")
|
|
return False
|
|
|
|
def test_connection(self) -> bool:
|
|
"""
|
|
Test connection to Woodpecker CI server.
|
|
|
|
Returns:
|
|
True if connection successful
|
|
"""
|
|
# API: GET /api/user - returns current user info
|
|
url = f"{self.base_url}/api/user"
|
|
|
|
try:
|
|
resp = self.session.get(url, timeout=10)
|
|
resp.raise_for_status()
|
|
user_info = resp.json()
|
|
logger.info(f"Connected to Woodpecker CI as: {user_info.get('login', 'unknown')}")
|
|
return True
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to connect to Woodpecker CI: {e}")
|
|
return False
|
|
|
|
def get_pipelines(
|
|
self,
|
|
build_type: str,
|
|
limit: int = 50,
|
|
branch: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
) -> list[BuildInfo]:
|
|
"""
|
|
Get ALL pipelines for a repository (not just running).
|
|
|
|
Args:
|
|
build_type: Repository full name (e.g., "cleargrow/controller")
|
|
limit: Maximum number of pipelines to return
|
|
branch: Optional branch name to filter by
|
|
status: Optional status to filter by (success, failure, running, pending)
|
|
|
|
Returns:
|
|
List of BuildInfo objects, most recent first
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return []
|
|
|
|
# API: GET /api/repos/{repo_id}/pipelines
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
|
|
params = {'per_page': limit}
|
|
|
|
try:
|
|
resp = self.session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
builds = []
|
|
for p in resp.json():
|
|
# Filter by branch if specified
|
|
if branch and p.get('branch') != branch:
|
|
continue
|
|
|
|
# Filter by status if specified
|
|
normalized_status = self._normalize_status(p.get('status', 'unknown'))
|
|
if status:
|
|
if status.upper() != normalized_status:
|
|
continue
|
|
|
|
builds.append(BuildInfo(
|
|
build_id=p['number'],
|
|
branch=p.get('branch', ''),
|
|
status=normalized_status,
|
|
status_text=p.get('message', p.get('status')),
|
|
commit=p.get('commit', ''),
|
|
build_type=build_type,
|
|
web_url=self._build_web_url(repo_id, p['number']),
|
|
))
|
|
|
|
return builds
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get pipelines for {build_type}: {e}")
|
|
return []
|
|
|
|
def get_pipelines_extended(
|
|
self,
|
|
build_type: str,
|
|
limit: int = 50,
|
|
) -> list[dict]:
|
|
"""
|
|
Get pipelines with extended information (including timing data).
|
|
|
|
Args:
|
|
build_type: Repository full name (e.g., "cleargrow/controller")
|
|
limit: Maximum number of pipelines to return
|
|
|
|
Returns:
|
|
List of pipeline dicts with full data
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return []
|
|
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines"
|
|
params = {'per_page': limit}
|
|
|
|
try:
|
|
resp = self.session.get(url, params=params, timeout=30)
|
|
resp.raise_for_status()
|
|
|
|
builds = []
|
|
for p in resp.json():
|
|
builds.append({
|
|
'id': p.get('number'),
|
|
'number': p.get('number'),
|
|
'repo': build_type.split('/')[-1] if '/' in build_type else build_type,
|
|
'repo_full': build_type,
|
|
'status': self._normalize_status(p.get('status', 'unknown')).lower(),
|
|
'event': p.get('event', 'push'),
|
|
'branch': p.get('branch', ''),
|
|
'message': p.get('message', ''),
|
|
'author': p.get('author', ''),
|
|
'author_avatar': p.get('author_avatar', ''),
|
|
'commit': p.get('commit', ''),
|
|
'started': p.get('started'),
|
|
'finished': p.get('finished'),
|
|
'created': p.get('created'),
|
|
'web_url': self._build_web_url(repo_id, p.get('number', 0)),
|
|
})
|
|
|
|
return builds
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get pipelines for {build_type}: {e}")
|
|
return []
|
|
|
|
def retry_pipeline(self, build_type: str, pipeline_number: int) -> dict:
|
|
"""
|
|
Restart/retry a pipeline by creating a new one.
|
|
|
|
Args:
|
|
build_type: Repository full name
|
|
pipeline_number: Pipeline number to restart
|
|
|
|
Returns:
|
|
Dict with success status and new pipeline info
|
|
"""
|
|
repo_id = self._get_repo_id(build_type)
|
|
if not repo_id:
|
|
return {"success": False, "message": f"Repository not found: {build_type}"}
|
|
|
|
# Woodpecker uses POST to the pipeline endpoint to restart it
|
|
# API: POST /api/repos/{repo_id}/pipelines/{number}
|
|
url = f"{self.base_url}/api/repos/{repo_id}/pipelines/{pipeline_number}"
|
|
|
|
try:
|
|
resp = self.session.post(url, timeout=30)
|
|
resp.raise_for_status()
|
|
new_pipeline = resp.json()
|
|
|
|
logger.info(f"Restarted pipeline {pipeline_number} -> {new_pipeline.get('number')} for {build_type}")
|
|
return {
|
|
"success": True,
|
|
"build_id": new_pipeline.get('number'),
|
|
"message": f"Pipeline restarted as #{new_pipeline.get('number')}",
|
|
"web_url": self._build_web_url(repo_id, new_pipeline.get('number', 0)),
|
|
}
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to retry pipeline {pipeline_number}: {e}")
|
|
return {"success": False, "message": str(e)}
|