Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
551 lines
18 KiB
Python
551 lines
18 KiB
Python
"""
|
|
Gitea REST API interface.
|
|
|
|
Gitea has no documented rate limits for self-hosted instances.
|
|
API Documentation: https://docs.gitea.com/api/1.20/
|
|
|
|
Used for:
|
|
- Repository operations
|
|
- Issue comments (linked to YouTrack issues)
|
|
- Webhooks
|
|
"""
|
|
|
|
import logging
|
|
import subprocess
|
|
import requests
|
|
from dataclasses import dataclass
|
|
from typing import Optional, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GiteaError(Exception):
|
|
"""Base exception for Gitea API errors."""
|
|
pass
|
|
|
|
|
|
class GiteaAuthError(GiteaError):
|
|
"""Authentication failed."""
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class GiteaRepo:
|
|
"""Represents a Gitea repository."""
|
|
id: int
|
|
name: str
|
|
full_name: str # owner/repo
|
|
description: str
|
|
clone_url: str
|
|
ssh_url: str
|
|
default_branch: str
|
|
|
|
|
|
@dataclass
|
|
class GiteaComment:
|
|
"""Represents a comment on an issue/PR."""
|
|
id: int
|
|
body: str
|
|
user: str
|
|
created_at: str
|
|
updated_at: str
|
|
|
|
|
|
class GiteaClient:
|
|
"""
|
|
Gitea REST API client.
|
|
|
|
Uses access token authentication.
|
|
Supports context manager protocol for proper resource cleanup.
|
|
"""
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
"""
|
|
Initialize Gitea client.
|
|
|
|
Args:
|
|
base_url: Gitea instance URL (e.g., https://git.yourdomain.com)
|
|
token: Access token from Gitea (Settings -> Applications -> Generate Token)
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_url = f"{self.base_url}/api/v1"
|
|
self.token = token
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
})
|
|
|
|
def __enter__(self):
|
|
"""Enter context manager."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Exit context manager, closing the session."""
|
|
self.close()
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close the HTTP session and release resources."""
|
|
if self.session:
|
|
self.session.close()
|
|
logger.debug("Gitea session closed")
|
|
|
|
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
|
|
"""Make an API request."""
|
|
url = f"{self.api_url}/{endpoint.lstrip('/')}"
|
|
logger.debug(f"Gitea API: {method} {url}")
|
|
|
|
response = self.session.request(method, url, **kwargs)
|
|
|
|
if response.status_code == 401:
|
|
raise GiteaAuthError("Invalid or expired token")
|
|
|
|
if response.status_code == 403:
|
|
raise GiteaAuthError(f"Permission denied: {response.text}")
|
|
|
|
if not response.ok:
|
|
logger.error(f"Gitea API error: {response.status_code} - {response.text[:500]}")
|
|
response.raise_for_status()
|
|
|
|
return response
|
|
|
|
def _get(self, endpoint: str, params: dict = None) -> dict:
|
|
"""GET request returning JSON."""
|
|
response = self._request("GET", endpoint, params=params)
|
|
return response.json() if response.text else {}
|
|
|
|
def _post(self, endpoint: str, data: dict = None) -> dict:
|
|
"""POST request returning JSON."""
|
|
response = self._request("POST", endpoint, json=data)
|
|
return response.json() if response.text else {}
|
|
|
|
def _patch(self, endpoint: str, data: dict = None) -> dict:
|
|
"""PATCH request returning JSON."""
|
|
response = self._request("PATCH", endpoint, json=data)
|
|
return response.json() if response.text else {}
|
|
|
|
# =========================================================================
|
|
# Connection Test
|
|
# =========================================================================
|
|
|
|
def test_connection(self) -> dict:
|
|
"""Test connection and return server info."""
|
|
try:
|
|
# Get current user to verify auth
|
|
user = self._get("user")
|
|
version = self._get("version")
|
|
logger.info(f"Connected to Gitea {version.get('version', 'unknown')} as: {user.get('login', 'unknown')}")
|
|
return {
|
|
"status": "ok",
|
|
"user": user.get("login"),
|
|
"version": version.get("version"),
|
|
}
|
|
except GiteaAuthError as e:
|
|
logger.error(f"Connection test failed - authentication error: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Connection test failed - connection error: {e}")
|
|
return {"status": "error", "message": f"Connection error: {e}"}
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Connection test failed - timeout: {e}")
|
|
return {"status": "error", "message": f"Timeout: {e}"}
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Connection test failed - request error: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# =========================================================================
|
|
# Repositories
|
|
# =========================================================================
|
|
|
|
def get_repos(self, owner: str = None) -> List[GiteaRepo]:
|
|
"""
|
|
Get repositories.
|
|
|
|
Args:
|
|
owner: Filter by owner (optional). If None, returns user's repos.
|
|
"""
|
|
if owner:
|
|
data = self._get(f"users/{owner}/repos")
|
|
else:
|
|
data = self._get("user/repos")
|
|
|
|
return [self._parse_repo(r) for r in data]
|
|
|
|
def get_repo(self, owner: str, repo: str) -> GiteaRepo:
|
|
"""Get a specific repository."""
|
|
data = self._get(f"repos/{owner}/{repo}")
|
|
return self._parse_repo(data)
|
|
|
|
def _parse_repo(self, data: dict) -> GiteaRepo:
|
|
"""Parse repo data into GiteaRepo object."""
|
|
return GiteaRepo(
|
|
id=data.get("id", 0),
|
|
name=data.get("name", ""),
|
|
full_name=data.get("full_name", ""),
|
|
description=data.get("description", ""),
|
|
clone_url=data.get("clone_url", ""),
|
|
ssh_url=data.get("ssh_url", ""),
|
|
default_branch=data.get("default_branch", "main"),
|
|
)
|
|
|
|
# =========================================================================
|
|
# Issues (for comments - main tracking is in YouTrack)
|
|
# =========================================================================
|
|
|
|
def get_issue(self, owner: str, repo: str, issue_number: int) -> dict:
|
|
"""Get issue details (if using Gitea issues as secondary)."""
|
|
return self._get(f"repos/{owner}/{repo}/issues/{issue_number}")
|
|
|
|
def get_issue_comments(self, owner: str, repo: str, issue_number: int) -> List[GiteaComment]:
|
|
"""Get comments on an issue."""
|
|
data = self._get(f"repos/{owner}/{repo}/issues/{issue_number}/comments")
|
|
return [
|
|
GiteaComment(
|
|
id=c.get("id", 0),
|
|
body=c.get("body", ""),
|
|
user=c.get("user", {}).get("login", "unknown"),
|
|
created_at=c.get("created_at", ""),
|
|
updated_at=c.get("updated_at", ""),
|
|
)
|
|
for c in data
|
|
]
|
|
|
|
def add_issue_comment(self, owner: str, repo: str, issue_number: int, body: str) -> bool:
|
|
"""
|
|
Add a comment to an issue.
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
issue_number: Issue number
|
|
body: Comment text (Markdown supported)
|
|
"""
|
|
try:
|
|
self._post(f"repos/{owner}/{repo}/issues/{issue_number}/comments", data={
|
|
"body": body
|
|
})
|
|
logger.info(f"Added comment to {owner}/{repo}#{issue_number}")
|
|
return True
|
|
except GiteaAuthError as e:
|
|
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - auth error: {e}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - connection error: {e}")
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number} - timeout: {e}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to add comment to {owner}/{repo}#{issue_number}: {e}")
|
|
return False
|
|
|
|
# =========================================================================
|
|
# Commits
|
|
# =========================================================================
|
|
|
|
def get_commits(self, owner: str, repo: str, branch: str = None, limit: int = 10) -> List[dict]:
|
|
"""Get recent commits."""
|
|
params = {"limit": limit}
|
|
if branch:
|
|
params["sha"] = branch
|
|
return self._get(f"repos/{owner}/{repo}/commits", params=params)
|
|
|
|
def get_commit(self, owner: str, repo: str, sha: str) -> dict:
|
|
"""Get a specific commit."""
|
|
return self._get(f"repos/{owner}/{repo}/git/commits/{sha}")
|
|
|
|
# =========================================================================
|
|
# Branches
|
|
# =========================================================================
|
|
|
|
def get_branches(self, owner: str, repo: str) -> List[dict]:
|
|
"""Get all branches."""
|
|
return self._get(f"repos/{owner}/{repo}/branches")
|
|
|
|
def get_branch(self, owner: str, repo: str, branch: str) -> dict:
|
|
"""Get a specific branch."""
|
|
return self._get(f"repos/{owner}/{repo}/branches/{branch}")
|
|
|
|
# =========================================================================
|
|
# Pull Requests
|
|
# =========================================================================
|
|
|
|
def get_pull_requests(self, owner: str, repo: str, state: str = "open") -> List[dict]:
|
|
"""Get pull requests."""
|
|
return self._get(f"repos/{owner}/{repo}/pulls", params={"state": state})
|
|
|
|
def create_pull_request(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
title: str,
|
|
head: str,
|
|
base: str,
|
|
body: str = ""
|
|
) -> dict:
|
|
"""Create a pull request."""
|
|
return self._post(f"repos/{owner}/{repo}/pulls", data={
|
|
"title": title,
|
|
"head": head,
|
|
"base": base,
|
|
"body": body,
|
|
})
|
|
|
|
# =========================================================================
|
|
# Webhooks
|
|
# =========================================================================
|
|
|
|
def get_webhooks(self, owner: str, repo: str) -> List[dict]:
|
|
"""Get webhooks for a repository."""
|
|
return self._get(f"repos/{owner}/{repo}/hooks")
|
|
|
|
def create_webhook(
|
|
self,
|
|
owner: str,
|
|
repo: str,
|
|
url: str,
|
|
events: List[str] = None,
|
|
secret: str = ""
|
|
) -> dict:
|
|
"""
|
|
Create a webhook for repository events.
|
|
|
|
Args:
|
|
owner: Repository owner
|
|
repo: Repository name
|
|
url: Callback URL
|
|
events: List of event types (default: push)
|
|
Options: 'create', 'delete', 'fork', 'push', 'issues',
|
|
'issue_comment', 'pull_request', 'release'
|
|
secret: Webhook secret for signature verification
|
|
"""
|
|
if events is None:
|
|
events = ["push"]
|
|
|
|
return self._post(f"repos/{owner}/{repo}/hooks", data={
|
|
"type": "gitea",
|
|
"active": True,
|
|
"events": events,
|
|
"config": {
|
|
"url": url,
|
|
"content_type": "json",
|
|
"secret": secret,
|
|
}
|
|
})
|
|
|
|
|
|
def load_gitea_config(config: dict) -> Optional[GiteaClient]:
|
|
"""
|
|
Load Gitea client from configuration.
|
|
|
|
Expected config structure:
|
|
gitea:
|
|
base_url: https://git.yourdomain.com
|
|
token: xxx
|
|
"""
|
|
gitea_config = config.get("gitea", {})
|
|
base_url = gitea_config.get("base_url")
|
|
token = gitea_config.get("token")
|
|
|
|
if not base_url or not token:
|
|
logger.warning("Gitea configuration incomplete (missing base_url or token)")
|
|
return None
|
|
|
|
return GiteaClient(base_url, token)
|
|
|
|
|
|
# =============================================================================
|
|
# Git CLI Operations (for local repo management)
|
|
# =============================================================================
|
|
|
|
def git_clone(url: str, path: str, branch: str = None) -> bool:
|
|
"""Clone a repository."""
|
|
cmd = ["git", "clone"]
|
|
if branch:
|
|
cmd.extend(["-b", branch])
|
|
cmd.extend([url, path])
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logger.error(f"git clone failed: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def git_pull(path: str) -> bool:
|
|
"""Pull latest changes."""
|
|
result = subprocess.run(
|
|
["git", "pull"],
|
|
cwd=path,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode != 0:
|
|
logger.error(f"git pull failed: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def git_branch_exists(path: str, branch: str) -> bool:
|
|
"""Check if a local branch exists."""
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--verify", branch],
|
|
cwd=path,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
return result.returncode == 0
|
|
|
|
|
|
def git_current_branch(path: str) -> str:
|
|
"""Get the current branch name."""
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
|
cwd=path,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
return result.stdout.strip() if result.returncode == 0 else ""
|
|
|
|
|
|
def git_push(path: str, remote: str = "origin", branch: str = None) -> bool:
|
|
"""Push changes. Returns True if push succeeds or if there's nothing to push."""
|
|
# If a specific branch is requested, check if it exists
|
|
if branch:
|
|
if not git_branch_exists(path, branch):
|
|
logger.info(f"Branch {branch} does not exist locally, nothing to push")
|
|
return True # Not an error - agent may have determined no changes needed
|
|
|
|
# Check if we're on the right branch
|
|
current = git_current_branch(path)
|
|
if current != branch:
|
|
logger.info(f"Not on branch {branch} (on {current}), nothing to push")
|
|
return True # Not an error
|
|
|
|
cmd = ["git", "push", "-u", remote]
|
|
if branch:
|
|
cmd.append(branch)
|
|
|
|
result = subprocess.run(cmd, cwd=path, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logger.error(f"git push failed: {result.stderr}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def git_status(path: str) -> dict:
|
|
"""Get repository status."""
|
|
result = subprocess.run(
|
|
["git", "status", "--porcelain"],
|
|
cwd=path,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
modified = []
|
|
added = []
|
|
deleted = []
|
|
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
status = line[:2]
|
|
filename = line[3:]
|
|
|
|
if 'M' in status:
|
|
modified.append(filename)
|
|
elif 'A' in status or '?' in status:
|
|
added.append(filename)
|
|
elif 'D' in status:
|
|
deleted.append(filename)
|
|
|
|
return {
|
|
"modified": modified,
|
|
"added": added,
|
|
"deleted": deleted,
|
|
"clean": len(modified) + len(added) + len(deleted) == 0,
|
|
}
|
|
|
|
|
|
def git_merge_to_main(path: str, feature_branch: str, delete_after: bool = True) -> tuple[bool, str]:
|
|
"""
|
|
Merge a feature branch to main and optionally delete it.
|
|
|
|
Args:
|
|
path: Path to the repository
|
|
feature_branch: Name of the feature branch (e.g., "issue/CG-30")
|
|
delete_after: Whether to delete the feature branch after merge
|
|
|
|
Returns:
|
|
Tuple of (success: bool, message: str)
|
|
"""
|
|
try:
|
|
# Fetch latest from origin
|
|
result = subprocess.run(
|
|
["git", "fetch", "origin"],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Failed to fetch: {result.stderr}"
|
|
|
|
# Checkout main
|
|
result = subprocess.run(
|
|
["git", "checkout", "main"],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Failed to checkout main: {result.stderr}"
|
|
|
|
# Pull latest main
|
|
result = subprocess.run(
|
|
["git", "pull", "origin", "main"],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Failed to pull main: {result.stderr}"
|
|
|
|
# Merge feature branch with no-ff to preserve history
|
|
result = subprocess.run(
|
|
["git", "merge", "--no-ff", f"origin/{feature_branch}", "-m", f"Merge {feature_branch} to main"],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
# Abort merge if it failed
|
|
subprocess.run(["git", "merge", "--abort"], cwd=path, capture_output=True)
|
|
return False, f"Failed to merge: {result.stderr}"
|
|
|
|
# Push merged main
|
|
result = subprocess.run(
|
|
["git", "push", "origin", "main"],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
return False, f"Failed to push main: {result.stderr}"
|
|
|
|
logger.info(f"Merged {feature_branch} to main in {path}")
|
|
|
|
# Delete remote feature branch if requested
|
|
if delete_after:
|
|
result = subprocess.run(
|
|
["git", "push", "origin", "--delete", feature_branch],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
if result.returncode != 0:
|
|
logger.warning(f"Failed to delete remote branch {feature_branch}: {result.stderr}")
|
|
else:
|
|
logger.info(f"Deleted remote branch {feature_branch}")
|
|
|
|
# Delete local feature branch
|
|
subprocess.run(
|
|
["git", "branch", "-d", feature_branch],
|
|
cwd=path, capture_output=True, text=True
|
|
)
|
|
|
|
return True, f"Successfully merged {feature_branch} to main"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error merging {feature_branch}: {e}")
|
|
return False, str(e)
|