Files
agentrunner/gitea_client.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

551 lines
18 KiB
Python

"""
Gitea REST API interface.
Gitea has no documented rate limits for self-hosted instances.
API Documentation: https://docs.gitea.com/api/1.20/
Used for:
- Repository operations
- Issue comments (linked to YouTrack issues)
- Webhooks
"""
import logging
import subprocess
import requests
from dataclasses import dataclass
from typing import Optional, List
logger = logging.getLogger(__name__)
class GiteaError(Exception):
"""Base exception for Gitea API errors."""
pass
class GiteaAuthError(GiteaError):
"""Authentication failed."""
pass
@dataclass
class GiteaRepo:
"""Represents a Gitea repository."""
id: int
name: str
full_name: str # owner/repo
description: str
clone_url: str
ssh_url: str
default_branch: str
@dataclass
class GiteaComment:
"""Represents a comment on an issue/PR."""
id: int
body: str
user: str
created_at: str
updated_at: str
class GiteaClient:
"""
Gitea REST API client.
Uses access token authentication.
Supports context manager protocol for proper resource cleanup.
"""
def __init__(self, base_url: str, token: str):
"""
Initialize Gitea client.
Args:
base_url: Gitea instance URL (e.g., https://git.yourdomain.com)
token: Access token from Gitea (Settings -> Applications -> Generate Token)
"""
self.base_url = base_url.rstrip('/')
self.api_url = f"{self.base_url}/api/v1"
self.token = token
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"token {token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closing the session."""
self.close()
return False
def close(self):
"""Close the HTTP session and release resources."""
if self.session:
self.session.close()
logger.debug("Gitea session closed")
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an API request."""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
logger.debug(f"Gitea API: {method} {url}")
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise GiteaAuthError("Invalid or expired token")
if response.status_code == 403:
raise GiteaAuthError(f"Permission denied: {response.text}")
if not response.ok:
logger.error(f"Gitea API error: {response.status_code} - {response.text[:500]}")
response.raise_for_status()
return response
def _get(self, endpoint: str, params: dict = None) -> dict:
"""GET request returning JSON."""
response = self._request("GET", endpoint, params=params)
return response.json() if response.text else {}
def _post(self, endpoint: str, data: dict = None) -> dict:
"""POST request returning JSON."""
response = self._request("POST", endpoint, json=data)
return response.json() if response.text else {}
def _patch(self, endpoint: str, data: dict = None) -> dict:
"""PATCH request returning JSON."""
response = self._request("PATCH", endpoint, json=data)
return response.json() if response.text else {}
# =========================================================================
# Connection Test
# =========================================================================
def test_connection(self) -> dict:
"""Test connection and return server info."""
try:
# Get current user to verify auth
user = self._get("user")
version = self._get("version")
logger.info(f"Connected to Gitea {version.get('version', 'unknown')} as: {user.get('login', 'unknown')}")
return {
"status": "ok",
"user": user.get("login"),
"version": version.get("version"),
}
except GiteaAuthError as e:
logger.error(f"Connection test failed - authentication error: {e}")
return {"status": "error", "message": str(e)}
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection test failed - connection error: {e}")
return {"status": "error", "message": f"Connection error: {e}"}
except requests.exceptions.Timeout as e:
logger.error(f"Connection test failed - timeout: {e}")
return {"status": "error", "message": f"Timeout: {e}"}
except requests.exceptions.RequestException as e:
logger.error(f"Connection test failed - request error: {e}")
return {"status": "error", "message": str(e)}
# =========================================================================
# Repositories
# =========================================================================
def get_repos(self, owner: str = None) -> List[GiteaRepo]:
"""
Get repositories.
Args:
owner: Filter by owner (optional). If None, returns user's repos.
"""
if owner:
data = self._get(f"users/{owner}/repos")
else:
data = self._get("user/repos")
return [self._parse_repo(r) for r in data]
def get_repo(self, owner: str, repo: str) -> GiteaRepo:
"""Get a specific repository."""
data = self._get(f"repos/{owner}/{repo}")
return self._parse_repo(data)
def _parse_repo(self, data: dict) -> GiteaRepo:
"""Parse repo data into GiteaRepo object."""
return GiteaRepo(
id=data.get("id", 0),
name=data.get("name", ""),
full_name=data.get("full_name", ""),
description=data.get("description", ""),
clone_url=data.get("clone_url", ""),
ssh_url=data.get("ssh_url", ""),
default_branch=data.get("default_branch", "main"),
)
# =========================================================================
# Issues (for comments - main tracking is in YouTrack)
# =========================================================================
def get_issue(self, owner: str, repo: str, issue_number: int) -> dict:
"""Get issue details (if using Gitea issues as secondary)."""
return self._get(f"repos/{owner}/{repo}/issues/{issue_number}")
def get_issue_comments(self, owner: str, repo: str, issue_number: int) -> List[GiteaComment]:
"""Get comments on an issue."""
data = self._get(f"repos/{owner}/{repo}/issues/{issue_number}/comments")
return [
GiteaComment(
id=c.get("id", 0),
body=c.get("body", ""),
user=c.get("user", {}).get("login", "unknown"),
created_at=c.get("created_at", ""),
updated_at=c.get("updated_at", ""),
)
for c in data
]
def add_issue_comment(self, owner: str, repo: str, issue_number: int, body: str) -> bool:
"""
Add a comment to an issue.
Args:
owner: Repository owner
repo: Repository name
issue_number: Issue number
body: Comment text (Markdown supported)
"""
try:
self._post(f"repos/{owner}/{repo}/issues/{issue_number}/comments", data={
"body": body
})
logger.info(f"Added comment to {owner}/{repo}#{issue_number}")
return True
except GiteaAuthError as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number}: {e}")
return False
# =========================================================================
# Commits
# =========================================================================
def get_commits(self, owner: str, repo: str, branch: str = None, limit: int = 10) -> List[dict]:
"""Get recent commits."""
params = {"limit": limit}
if branch:
params["sha"] = branch
return self._get(f"repos/{owner}/{repo}/commits", params=params)
def get_commit(self, owner: str, repo: str, sha: str) -> dict:
"""Get a specific commit."""
return self._get(f"repos/{owner}/{repo}/git/commits/{sha}")
# =========================================================================
# Branches
# =========================================================================
def get_branches(self, owner: str, repo: str) -> List[dict]:
"""Get all branches."""
return self._get(f"repos/{owner}/{repo}/branches")
def get_branch(self, owner: str, repo: str, branch: str) -> dict:
"""Get a specific branch."""
return self._get(f"repos/{owner}/{repo}/branches/{branch}")
# =========================================================================
# Pull Requests
# =========================================================================
def get_pull_requests(self, owner: str, repo: str, state: str = "open") -> List[dict]:
"""Get pull requests."""
return self._get(f"repos/{owner}/{repo}/pulls", params={"state": state})
def create_pull_request(
self,
owner: str,
repo: str,
title: str,
head: str,
base: str,
body: str = ""
) -> dict:
"""Create a pull request."""
return self._post(f"repos/{owner}/{repo}/pulls", data={
"title": title,
"head": head,
"base": base,
"body": body,
})
# =========================================================================
# Webhooks
# =========================================================================
def get_webhooks(self, owner: str, repo: str) -> List[dict]:
"""Get webhooks for a repository."""
return self._get(f"repos/{owner}/{repo}/hooks")
def create_webhook(
self,
owner: str,
repo: str,
url: str,
events: List[str] = None,
secret: str = ""
) -> dict:
"""
Create a webhook for repository events.
Args:
owner: Repository owner
repo: Repository name
url: Callback URL
events: List of event types (default: push)
Options: 'create', 'delete', 'fork', 'push', 'issues',
'issue_comment', 'pull_request', 'release'
secret: Webhook secret for signature verification
"""
if events is None:
events = ["push"]
return self._post(f"repos/{owner}/{repo}/hooks", data={
"type": "gitea",
"active": True,
"events": events,
"config": {
"url": url,
"content_type": "json",
"secret": secret,
}
})
def load_gitea_config(config: dict) -> Optional[GiteaClient]:
"""
Load Gitea client from configuration.
Expected config structure:
gitea:
base_url: https://git.yourdomain.com
token: xxx
"""
gitea_config = config.get("gitea", {})
base_url = gitea_config.get("base_url")
token = gitea_config.get("token")
if not base_url or not token:
logger.warning("Gitea configuration incomplete (missing base_url or token)")
return None
return GiteaClient(base_url, token)
# =============================================================================
# Git CLI Operations (for local repo management)
# =============================================================================
def git_clone(url: str, path: str, branch: str = None) -> bool:
"""Clone a repository."""
cmd = ["git", "clone"]
if branch:
cmd.extend(["-b", branch])
cmd.extend([url, path])
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"git clone failed: {result.stderr}")
return False
return True
def git_pull(path: str) -> bool:
"""Pull latest changes."""
result = subprocess.run(
["git", "pull"],
cwd=path,
capture_output=True,
text=True
)
if result.returncode != 0:
logger.error(f"git pull failed: {result.stderr}")
return False
return True
def git_branch_exists(path: str, branch: str) -> bool:
"""Check if a local branch exists."""
result = subprocess.run(
["git", "rev-parse", "--verify", branch],
cwd=path,
capture_output=True,
text=True
)
return result.returncode == 0
def git_current_branch(path: str) -> str:
"""Get the current branch name."""
result = subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
cwd=path,
capture_output=True,
text=True
)
return result.stdout.strip() if result.returncode == 0 else ""
def git_push(path: str, remote: str = "origin", branch: str = None) -> bool:
"""Push changes. Returns True if push succeeds or if there's nothing to push."""
# If a specific branch is requested, check if it exists
if branch:
if not git_branch_exists(path, branch):
logger.info(f"Branch {branch} does not exist locally, nothing to push")
return True # Not an error - agent may have determined no changes needed
# Check if we're on the right branch
current = git_current_branch(path)
if current != branch:
logger.info(f"Not on branch {branch} (on {current}), nothing to push")
return True # Not an error
cmd = ["git", "push", "-u", remote]
if branch:
cmd.append(branch)
result = subprocess.run(cmd, cwd=path, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"git push failed: {result.stderr}")
return False
return True
def git_status(path: str) -> dict:
"""Get repository status."""
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=path,
capture_output=True,
text=True
)
modified = []
added = []
deleted = []
for line in result.stdout.strip().split('\n'):
if not line:
continue
status = line[:2]
filename = line[3:]
if 'M' in status:
modified.append(filename)
elif 'A' in status or '?' in status:
added.append(filename)
elif 'D' in status:
deleted.append(filename)
return {
"modified": modified,
"added": added,
"deleted": deleted,
"clean": len(modified) + len(added) + len(deleted) == 0,
}
def git_merge_to_main(path: str, feature_branch: str, delete_after: bool = True) -> tuple[bool, str]:
"""
Merge a feature branch to main and optionally delete it.
Args:
path: Path to the repository
feature_branch: Name of the feature branch (e.g., "issue/CG-30")
delete_after: Whether to delete the feature branch after merge
Returns:
Tuple of (success: bool, message: str)
"""
try:
# Fetch latest from origin
result = subprocess.run(
["git", "fetch", "origin"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to fetch: {result.stderr}"
# Checkout main
result = subprocess.run(
["git", "checkout", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to checkout main: {result.stderr}"
# Pull latest main
result = subprocess.run(
["git", "pull", "origin", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to pull main: {result.stderr}"
# Merge feature branch with no-ff to preserve history
result = subprocess.run(
["git", "merge", "--no-ff", f"origin/{feature_branch}", "-m", f"Merge {feature_branch} to main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
# Abort merge if it failed
subprocess.run(["git", "merge", "--abort"], cwd=path, capture_output=True)
return False, f"Failed to merge: {result.stderr}"
# Push merged main
result = subprocess.run(
["git", "push", "origin", "main"],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
return False, f"Failed to push main: {result.stderr}"
logger.info(f"Merged {feature_branch} to main in {path}")
# Delete remote feature branch if requested
if delete_after:
result = subprocess.run(
["git", "push", "origin", "--delete", feature_branch],
cwd=path, capture_output=True, text=True
)
if result.returncode != 0:
logger.warning(f"Failed to delete remote branch {feature_branch}: {result.stderr}")
else:
logger.info(f"Deleted remote branch {feature_branch}")
# Delete local feature branch
subprocess.run(
["git", "branch", "-d", feature_branch],
cwd=path, capture_output=True, text=True
)
return True, f"Successfully merged {feature_branch} to main"
except Exception as e:
logger.error(f"Error merging {feature_branch}: {e}")
return False, str(e)