Files
agentrunner/youtrack_client.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

592 lines
21 KiB
Python

"""
YouTrack REST API interface.
YouTrack has no documented rate limits for self-hosted instances,
making it ideal for automated agent systems.
API Documentation: https://www.jetbrains.com/help/youtrack/server/api-reference.html
"""
import logging
import requests
from dataclasses import dataclass
from typing import Optional, List
from urllib.parse import urljoin
logger = logging.getLogger(__name__)
class YouTrackError(Exception):
"""Base exception for YouTrack API errors."""
pass
class YouTrackAuthError(YouTrackError):
"""Authentication failed."""
pass
@dataclass
class YouTrackIssue:
"""Represents a YouTrack issue."""
id: str # e.g., "CG-123"
id_readable: str # Same as id for display
summary: str
description: str
project_id: str # e.g., "CG"
state: str # e.g., "Ready", "In Progress"
priority: str = "Normal" # e.g., "Critical", "Major", "Normal", "Minor"
custom_fields: dict = None # Additional fields
reporter: str = ""
created: int = 0 # Unix timestamp ms
updated: int = 0
def __post_init__(self):
if self.custom_fields is None:
self.custom_fields = {}
@property
def issue_number(self) -> int:
"""Extract numeric part of issue ID (e.g., 'CG-123' -> 123)."""
parts = self.id.split('-')
return int(parts[-1]) if parts else 0
@property
def priority_order(self) -> int:
"""Return numeric priority for sorting (lower = higher priority)."""
priority_map = {
"Show-stopper": 0,
"Critical": 1,
"Major": 2,
"Normal": 3,
"Minor": 4,
}
return priority_map.get(self.priority, 3)
@dataclass
class YouTrackComment:
"""Represents a comment on an issue."""
id: str
text: str
author: str
created: int # Unix timestamp ms
@dataclass
class WorkItem:
"""Represents a YouTrack time tracking work item."""
id: str
duration_minutes: int
date: int # Unix timestamp ms
author: str
work_type: str # "Development", "Testing", "Documentation"
text: str = ""
@dataclass
class BoardConfig:
"""Agile board configuration for status tracking."""
board_id: str
board_name: str
states: dict[str, str] # name -> state_id mapping
class YouTrackClient:
"""
YouTrack REST API client.
Uses permanent token authentication.
Supports context manager protocol for proper resource cleanup.
"""
def __init__(self, base_url: str, token: str):
"""
Initialize YouTrack client.
Args:
base_url: YouTrack instance URL (e.g., https://track.yourdomain.com)
token: Permanent token from YouTrack (Profile -> Account Security -> Tokens)
"""
self.base_url = base_url.rstrip('/')
self.api_url = f"{self.base_url}/api"
self.token = token
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Accept": "application/json",
"Content-Type": "application/json",
})
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager, closing the session."""
self.close()
return False
def close(self):
"""Close the HTTP session and release resources."""
if self.session:
self.session.close()
logger.debug("YouTrack session closed")
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
"""Make an API request."""
url = f"{self.api_url}/{endpoint.lstrip('/')}"
logger.debug(f"YouTrack API: {method} {url}")
response = self.session.request(method, url, **kwargs)
if response.status_code == 401:
raise YouTrackAuthError("Invalid or expired token")
if response.status_code == 403:
raise YouTrackAuthError(f"Permission denied: {response.text}")
if not response.ok:
logger.error(f"YouTrack API error: {response.status_code} - {response.text[:500]}")
response.raise_for_status()
return response
def _get(self, endpoint: str, params: dict = None) -> dict:
"""GET request returning JSON."""
response = self._request("GET", endpoint, params=params)
return response.json() if response.text else {}
def _post(self, endpoint: str, data: dict = None) -> dict:
"""POST request returning JSON."""
response = self._request("POST", endpoint, json=data)
return response.json() if response.text else {}
# =========================================================================
# Connection Test
# =========================================================================
def test_connection(self) -> dict:
"""Test connection and return server info."""
try:
# Get current user to verify auth
user = self._get("users/me", params={"fields": "login,name,email"})
logger.info(f"Connected to YouTrack as: {user.get('login', 'unknown')}")
return {"status": "ok", "user": user}
except YouTrackAuthError as e:
logger.error(f"Connection test failed - authentication error: {e}")
return {"status": "error", "message": str(e)}
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection test failed - connection error: {e}")
return {"status": "error", "message": f"Connection error: {e}"}
except requests.exceptions.Timeout as e:
logger.error(f"Connection test failed - timeout: {e}")
return {"status": "error", "message": f"Timeout: {e}"}
except requests.exceptions.RequestException as e:
logger.error(f"Connection test failed - request error: {e}")
return {"status": "error", "message": str(e)}
# =========================================================================
# Projects
# =========================================================================
def get_projects(self) -> List[dict]:
"""Get all accessible projects."""
return self._get("admin/projects", params={
"fields": "id,name,shortName,description"
})
def get_project(self, project_id: str) -> dict:
"""Get a specific project by short name (e.g., 'CG')."""
return self._get(f"admin/projects/{project_id}", params={
"fields": "id,name,shortName,description,customFields(field(name),bundle(values(name)))"
})
# =========================================================================
# Issues
# =========================================================================
def get_issue(self, issue_id: str) -> YouTrackIssue:
"""
Get a single issue by ID.
Args:
issue_id: Issue ID like 'CG-123'
"""
data = self._get(f"issues/{issue_id}", params={
"fields": "id,idReadable,summary,description,project(id,shortName),"
"reporter(login),created,updated,customFields(name,value(name))"
})
return self._parse_issue(data)
def get_issues_by_query(self, query: str, limit: int = 100) -> List[YouTrackIssue]:
"""
Search issues using YouTrack query language.
Example queries:
- "project: CG State: Ready"
- "project: CG #Unresolved"
- "project: CG State: {In Progress}"
- "project: CG updated: Today"
Args:
query: YouTrack search query
limit: Maximum results (default 100)
"""
data = self._get("issues", params={
"query": query,
"$top": limit,
"fields": "id,idReadable,summary,description,project(id,shortName),"
"reporter(login),created,updated,customFields(name,value(name))"
})
return [self._parse_issue(item) for item in data]
def get_issues_by_state(self, project: str, state: str, sort_by_priority: bool = True) -> List[YouTrackIssue]:
"""
Get issues in a specific state, sorted by priority.
Args:
project: Project short name (e.g., 'CG')
state: State name (e.g., 'Ready', 'In Progress')
sort_by_priority: If True, return highest priority first
Returns:
List of issues sorted by priority (Critical > Major > Normal > Minor)
"""
# Escape state names with spaces
state_query = f"{{{state}}}" if " " in state else state
# Sort by Priority in YouTrack query (asc = Show-stopper first)
query = f"project: {project} State: {state_query} sort by: Priority asc"
issues = self.get_issues_by_query(query)
# Also sort in Python as backup (in case YouTrack sorting differs)
if sort_by_priority:
issues.sort(key=lambda i: i.priority_order)
return issues
def update_issue_state(self, issue_id: str, state: str) -> bool:
"""
Update an issue's state.
Args:
issue_id: Issue ID like 'CG-123'
state: New state name (e.g., 'In Progress', 'Done')
"""
try:
# YouTrack custom field update format
self._post(f"issues/{issue_id}", data={
"customFields": [
{
"name": "State",
"$type": "StateIssueCustomField",
"value": {"name": state}
}
]
})
logger.info(f"Updated {issue_id} state to '{state}'")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to update {issue_id} state - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to update {issue_id} state - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to update {issue_id} state - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update {issue_id} state: {e}")
return False
def update_issue_field(self, issue_id: str, field_name: str, value: str) -> bool:
"""
Update a custom field on an issue.
Args:
issue_id: Issue ID like 'CG-123'
field_name: Custom field name
value: New value
"""
try:
self._post(f"issues/{issue_id}", data={
"customFields": [
{
"name": field_name,
"value": {"name": value}
}
]
})
logger.info(f"Updated {issue_id} {field_name} to '{value}'")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to update {issue_id} {field_name} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to update {issue_id} {field_name} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to update {issue_id} {field_name} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to update {issue_id} {field_name}: {e}")
return False
def _parse_issue(self, data: dict) -> YouTrackIssue:
"""Parse issue data into YouTrackIssue object."""
# Extract state and priority from custom fields
state = ""
priority = "Normal"
custom_fields = {}
for field in data.get("customFields", []):
name = field.get("name", "")
value = field.get("value")
if isinstance(value, dict):
value = value.get("name", "")
elif isinstance(value, list):
value = ", ".join(v.get("name", str(v)) for v in value)
custom_fields[name] = value
if name.lower() == "state":
state = value or ""
elif name.lower() == "priority":
priority = value or "Normal"
project = data.get("project", {})
return YouTrackIssue(
id=data.get("idReadable", data.get("id", "")),
id_readable=data.get("idReadable", ""),
summary=data.get("summary", ""),
description=data.get("description", ""),
project_id=project.get("shortName", project.get("id", "")),
state=state,
priority=priority,
custom_fields=custom_fields,
reporter=data.get("reporter", {}).get("login", ""),
created=data.get("created", 0),
updated=data.get("updated", 0),
)
# =========================================================================
# Comments
# =========================================================================
def get_issue_comments(self, issue_id: str) -> List[YouTrackComment]:
"""Get all comments on an issue."""
data = self._get(f"issues/{issue_id}/comments", params={
"fields": "id,text,author(login),created"
})
return [
YouTrackComment(
id=c.get("id", ""),
text=c.get("text", ""),
author=c.get("author", {}).get("login", "unknown"),
created=c.get("created", 0),
)
for c in data
]
def add_issue_comment(self, issue_id: str, text: str) -> bool:
"""
Add a comment to an issue.
Args:
issue_id: Issue ID like 'CG-123'
text: Comment text (Markdown supported)
"""
try:
self._post(f"issues/{issue_id}/comments", data={"text": text})
logger.info(f"Added comment to {issue_id}")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to add comment to {issue_id} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to add comment to {issue_id} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to add comment to {issue_id} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to add comment to {issue_id}: {e}")
return False
# =========================================================================
# Time Tracking (Work Items)
# =========================================================================
def _get_work_type_id(self, issue_id: str, work_type_name: str) -> Optional[str]:
"""
Get the ID of a work item type by name for a given issue's project.
Args:
issue_id: Issue ID like 'CG-123' (used to determine project)
work_type_name: Name of the work type (e.g., 'Development')
Returns:
Work type ID if found, None otherwise
"""
try:
# Get project ID from issue
project_id = issue_id.split("-")[0]
# Fetch work item types for the project
data = self._get(f"admin/projects/{project_id}/timeTrackingSettings/workItemTypes", params={
"fields": "id,name"
})
for wt in data:
if wt.get("name", "").lower() == work_type_name.lower():
return wt.get("id")
logger.debug(f"Work type '{work_type_name}' not found in project {project_id}")
return None
except Exception as e:
logger.debug(f"Could not fetch work types for {issue_id}: {e}")
return None
def add_work_item(self, issue_id: str, duration_minutes: int,
work_type: str = None, text: str = "") -> bool:
"""
Log time spent on an issue.
Args:
issue_id: Issue ID like 'CG-123'
duration_minutes: Time spent in minutes (minimum 1)
work_type: Work type name (e.g., 'Development', 'Testing', 'Documentation')
If the type doesn't exist in the project, it will be omitted.
text: Optional description of the work done
Returns:
True if work item was created successfully
"""
try:
import time as time_module
data = {
"duration": {"minutes": max(1, duration_minutes)},
"date": int(time_module.time() * 1000), # Current time in ms
"text": text,
}
# Try to find work type ID if specified
if work_type:
work_type_id = self._get_work_type_id(issue_id, work_type)
if work_type_id:
data["type"] = {"id": work_type_id}
self._post(f"issues/{issue_id}/timeTracking/workItems", data=data)
logger.info(f"Added {duration_minutes}m work item to {issue_id} (type: {work_type})")
return True
except YouTrackAuthError as e:
logger.error(f"Failed to add work item to {issue_id} - auth error: {e}")
return False
except requests.exceptions.ConnectionError as e:
logger.error(f"Failed to add work item to {issue_id} - connection error: {e}")
return False
except requests.exceptions.Timeout as e:
logger.error(f"Failed to add work item to {issue_id} - timeout: {e}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Failed to add work item to {issue_id}: {e}")
return False
def get_work_items(self, issue_id: str) -> List[WorkItem]:
"""
Get all work items for an issue.
Args:
issue_id: Issue ID like 'CG-123'
Returns:
List of WorkItem objects
"""
try:
data = self._get(f"issues/{issue_id}/timeTracking/workItems", params={
"fields": "id,duration(minutes),date,author(login),type(name),text"
})
return [
WorkItem(
id=item.get("id", ""),
duration_minutes=item.get("duration", {}).get("minutes", 0),
date=item.get("date", 0),
author=item.get("author", {}).get("login", "unknown"),
work_type=item.get("type", {}).get("name", "") if item.get("type") else "",
text=item.get("text", ""),
)
for item in data
]
except YouTrackAuthError as e:
logger.error(f"Failed to get work items for {issue_id} - auth error: {e}")
return []
except requests.exceptions.RequestException as e:
logger.error(f"Failed to get work items for {issue_id}: {e}")
return []
# =========================================================================
# Agile Boards (Kanban)
# =========================================================================
def get_agile_boards(self) -> List[dict]:
"""Get all agile boards."""
return self._get("agiles", params={
"fields": "id,name,projects(shortName)"
})
def get_board_sprints(self, board_id: str) -> List[dict]:
"""Get sprints for an agile board."""
return self._get(f"agiles/{board_id}/sprints", params={
"fields": "id,name,start,finish,isDefault"
})
# =========================================================================
# Webhooks
# =========================================================================
def get_webhooks(self) -> List[dict]:
"""Get all configured webhooks."""
return self._get("admin/globalSettings/webhooks", params={
"fields": "id,name,url,enabled,events"
})
def create_webhook(self, name: str, url: str, events: List[str] = None) -> dict:
"""
Create a webhook for issue events.
Args:
name: Webhook name
url: Callback URL
events: List of event types (default: issue changes)
Options: 'IssueCreated', 'IssueChanged', 'IssueCommentCreated'
"""
if events is None:
events = ["IssueChanged"]
return self._post("admin/globalSettings/webhooks", data={
"name": name,
"url": url,
"enabled": True,
"events": events,
})
def load_youtrack_config(config: dict) -> Optional[YouTrackClient]:
"""
Load YouTrack client from configuration.
Expected config structure:
youtrack:
base_url: https://track.yourdomain.com
token: perm:xxx
"""
yt_config = config.get("youtrack", {})
base_url = yt_config.get("base_url")
token = yt_config.get("token")
if not base_url or not token:
logger.warning("YouTrack configuration incomplete (missing base_url or token)")
return None
return YouTrackClient(base_url, token)