Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
592 lines
21 KiB
Python
592 lines
21 KiB
Python
"""
|
|
YouTrack REST API interface.
|
|
|
|
YouTrack has no documented rate limits for self-hosted instances,
|
|
making it ideal for automated agent systems.
|
|
|
|
API Documentation: https://www.jetbrains.com/help/youtrack/server/api-reference.html
|
|
"""
|
|
|
|
import logging
|
|
import requests
|
|
from dataclasses import dataclass
|
|
from typing import Optional, List
|
|
from urllib.parse import urljoin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class YouTrackError(Exception):
|
|
"""Base exception for YouTrack API errors."""
|
|
pass
|
|
|
|
|
|
class YouTrackAuthError(YouTrackError):
|
|
"""Authentication failed."""
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class YouTrackIssue:
|
|
"""Represents a YouTrack issue."""
|
|
id: str # e.g., "CG-123"
|
|
id_readable: str # Same as id for display
|
|
summary: str
|
|
description: str
|
|
project_id: str # e.g., "CG"
|
|
state: str # e.g., "Ready", "In Progress"
|
|
priority: str = "Normal" # e.g., "Critical", "Major", "Normal", "Minor"
|
|
custom_fields: dict = None # Additional fields
|
|
reporter: str = ""
|
|
created: int = 0 # Unix timestamp ms
|
|
updated: int = 0
|
|
|
|
def __post_init__(self):
|
|
if self.custom_fields is None:
|
|
self.custom_fields = {}
|
|
|
|
@property
|
|
def issue_number(self) -> int:
|
|
"""Extract numeric part of issue ID (e.g., 'CG-123' -> 123)."""
|
|
parts = self.id.split('-')
|
|
return int(parts[-1]) if parts else 0
|
|
|
|
@property
|
|
def priority_order(self) -> int:
|
|
"""Return numeric priority for sorting (lower = higher priority)."""
|
|
priority_map = {
|
|
"Show-stopper": 0,
|
|
"Critical": 1,
|
|
"Major": 2,
|
|
"Normal": 3,
|
|
"Minor": 4,
|
|
}
|
|
return priority_map.get(self.priority, 3)
|
|
|
|
|
|
@dataclass
|
|
class YouTrackComment:
|
|
"""Represents a comment on an issue."""
|
|
id: str
|
|
text: str
|
|
author: str
|
|
created: int # Unix timestamp ms
|
|
|
|
|
|
@dataclass
|
|
class WorkItem:
|
|
"""Represents a YouTrack time tracking work item."""
|
|
id: str
|
|
duration_minutes: int
|
|
date: int # Unix timestamp ms
|
|
author: str
|
|
work_type: str # "Development", "Testing", "Documentation"
|
|
text: str = ""
|
|
|
|
|
|
@dataclass
|
|
class BoardConfig:
|
|
"""Agile board configuration for status tracking."""
|
|
board_id: str
|
|
board_name: str
|
|
states: dict[str, str] # name -> state_id mapping
|
|
|
|
|
|
class YouTrackClient:
|
|
"""
|
|
YouTrack REST API client.
|
|
|
|
Uses permanent token authentication.
|
|
Supports context manager protocol for proper resource cleanup.
|
|
"""
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
"""
|
|
Initialize YouTrack client.
|
|
|
|
Args:
|
|
base_url: YouTrack instance URL (e.g., https://track.yourdomain.com)
|
|
token: Permanent token from YouTrack (Profile -> Account Security -> Tokens)
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.api_url = f"{self.base_url}/api"
|
|
self.token = token
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
"Authorization": f"Bearer {token}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
})
|
|
|
|
def __enter__(self):
|
|
"""Enter context manager."""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Exit context manager, closing the session."""
|
|
self.close()
|
|
return False
|
|
|
|
def close(self):
|
|
"""Close the HTTP session and release resources."""
|
|
if self.session:
|
|
self.session.close()
|
|
logger.debug("YouTrack session closed")
|
|
|
|
def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
|
|
"""Make an API request."""
|
|
url = f"{self.api_url}/{endpoint.lstrip('/')}"
|
|
logger.debug(f"YouTrack API: {method} {url}")
|
|
|
|
response = self.session.request(method, url, **kwargs)
|
|
|
|
if response.status_code == 401:
|
|
raise YouTrackAuthError("Invalid or expired token")
|
|
|
|
if response.status_code == 403:
|
|
raise YouTrackAuthError(f"Permission denied: {response.text}")
|
|
|
|
if not response.ok:
|
|
logger.error(f"YouTrack API error: {response.status_code} - {response.text[:500]}")
|
|
response.raise_for_status()
|
|
|
|
return response
|
|
|
|
def _get(self, endpoint: str, params: dict = None) -> dict:
|
|
"""GET request returning JSON."""
|
|
response = self._request("GET", endpoint, params=params)
|
|
return response.json() if response.text else {}
|
|
|
|
def _post(self, endpoint: str, data: dict = None) -> dict:
|
|
"""POST request returning JSON."""
|
|
response = self._request("POST", endpoint, json=data)
|
|
return response.json() if response.text else {}
|
|
|
|
# =========================================================================
|
|
# Connection Test
|
|
# =========================================================================
|
|
|
|
def test_connection(self) -> dict:
|
|
"""Test connection and return server info."""
|
|
try:
|
|
# Get current user to verify auth
|
|
user = self._get("users/me", params={"fields": "login,name,email"})
|
|
logger.info(f"Connected to YouTrack as: {user.get('login', 'unknown')}")
|
|
return {"status": "ok", "user": user}
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Connection test failed - authentication error: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Connection test failed - connection error: {e}")
|
|
return {"status": "error", "message": f"Connection error: {e}"}
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Connection test failed - timeout: {e}")
|
|
return {"status": "error", "message": f"Timeout: {e}"}
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Connection test failed - request error: {e}")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# =========================================================================
|
|
# Projects
|
|
# =========================================================================
|
|
|
|
def get_projects(self) -> List[dict]:
|
|
"""Get all accessible projects."""
|
|
return self._get("admin/projects", params={
|
|
"fields": "id,name,shortName,description"
|
|
})
|
|
|
|
def get_project(self, project_id: str) -> dict:
|
|
"""Get a specific project by short name (e.g., 'CG')."""
|
|
return self._get(f"admin/projects/{project_id}", params={
|
|
"fields": "id,name,shortName,description,customFields(field(name),bundle(values(name)))"
|
|
})
|
|
|
|
# =========================================================================
|
|
# Issues
|
|
# =========================================================================
|
|
|
|
def get_issue(self, issue_id: str) -> YouTrackIssue:
|
|
"""
|
|
Get a single issue by ID.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
"""
|
|
data = self._get(f"issues/{issue_id}", params={
|
|
"fields": "id,idReadable,summary,description,project(id,shortName),"
|
|
"reporter(login),created,updated,customFields(name,value(name))"
|
|
})
|
|
return self._parse_issue(data)
|
|
|
|
def get_issues_by_query(self, query: str, limit: int = 100) -> List[YouTrackIssue]:
|
|
"""
|
|
Search issues using YouTrack query language.
|
|
|
|
Example queries:
|
|
- "project: CG State: Ready"
|
|
- "project: CG #Unresolved"
|
|
- "project: CG State: {In Progress}"
|
|
- "project: CG updated: Today"
|
|
|
|
Args:
|
|
query: YouTrack search query
|
|
limit: Maximum results (default 100)
|
|
"""
|
|
data = self._get("issues", params={
|
|
"query": query,
|
|
"$top": limit,
|
|
"fields": "id,idReadable,summary,description,project(id,shortName),"
|
|
"reporter(login),created,updated,customFields(name,value(name))"
|
|
})
|
|
return [self._parse_issue(item) for item in data]
|
|
|
|
def get_issues_by_state(self, project: str, state: str, sort_by_priority: bool = True) -> List[YouTrackIssue]:
|
|
"""
|
|
Get issues in a specific state, sorted by priority.
|
|
|
|
Args:
|
|
project: Project short name (e.g., 'CG')
|
|
state: State name (e.g., 'Ready', 'In Progress')
|
|
sort_by_priority: If True, return highest priority first
|
|
|
|
Returns:
|
|
List of issues sorted by priority (Critical > Major > Normal > Minor)
|
|
"""
|
|
# Escape state names with spaces
|
|
state_query = f"{{{state}}}" if " " in state else state
|
|
# Sort by Priority in YouTrack query (asc = Show-stopper first)
|
|
query = f"project: {project} State: {state_query} sort by: Priority asc"
|
|
issues = self.get_issues_by_query(query)
|
|
|
|
# Also sort in Python as backup (in case YouTrack sorting differs)
|
|
if sort_by_priority:
|
|
issues.sort(key=lambda i: i.priority_order)
|
|
|
|
return issues
|
|
|
|
def update_issue_state(self, issue_id: str, state: str) -> bool:
|
|
"""
|
|
Update an issue's state.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
state: New state name (e.g., 'In Progress', 'Done')
|
|
"""
|
|
try:
|
|
# YouTrack custom field update format
|
|
self._post(f"issues/{issue_id}", data={
|
|
"customFields": [
|
|
{
|
|
"name": "State",
|
|
"$type": "StateIssueCustomField",
|
|
"value": {"name": state}
|
|
}
|
|
]
|
|
})
|
|
logger.info(f"Updated {issue_id} state to '{state}'")
|
|
return True
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Failed to update {issue_id} state - auth error: {e}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to update {issue_id} state - connection error: {e}")
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to update {issue_id} state - timeout: {e}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to update {issue_id} state: {e}")
|
|
return False
|
|
|
|
def update_issue_field(self, issue_id: str, field_name: str, value: str) -> bool:
|
|
"""
|
|
Update a custom field on an issue.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
field_name: Custom field name
|
|
value: New value
|
|
"""
|
|
try:
|
|
self._post(f"issues/{issue_id}", data={
|
|
"customFields": [
|
|
{
|
|
"name": field_name,
|
|
"value": {"name": value}
|
|
}
|
|
]
|
|
})
|
|
logger.info(f"Updated {issue_id} {field_name} to '{value}'")
|
|
return True
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Failed to update {issue_id} {field_name} - auth error: {e}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to update {issue_id} {field_name} - connection error: {e}")
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to update {issue_id} {field_name} - timeout: {e}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to update {issue_id} {field_name}: {e}")
|
|
return False
|
|
|
|
def _parse_issue(self, data: dict) -> YouTrackIssue:
|
|
"""Parse issue data into YouTrackIssue object."""
|
|
# Extract state and priority from custom fields
|
|
state = ""
|
|
priority = "Normal"
|
|
custom_fields = {}
|
|
for field in data.get("customFields", []):
|
|
name = field.get("name", "")
|
|
value = field.get("value")
|
|
if isinstance(value, dict):
|
|
value = value.get("name", "")
|
|
elif isinstance(value, list):
|
|
value = ", ".join(v.get("name", str(v)) for v in value)
|
|
|
|
custom_fields[name] = value
|
|
if name.lower() == "state":
|
|
state = value or ""
|
|
elif name.lower() == "priority":
|
|
priority = value or "Normal"
|
|
|
|
project = data.get("project", {})
|
|
|
|
return YouTrackIssue(
|
|
id=data.get("idReadable", data.get("id", "")),
|
|
id_readable=data.get("idReadable", ""),
|
|
summary=data.get("summary", ""),
|
|
description=data.get("description", ""),
|
|
project_id=project.get("shortName", project.get("id", "")),
|
|
state=state,
|
|
priority=priority,
|
|
custom_fields=custom_fields,
|
|
reporter=data.get("reporter", {}).get("login", ""),
|
|
created=data.get("created", 0),
|
|
updated=data.get("updated", 0),
|
|
)
|
|
|
|
# =========================================================================
|
|
# Comments
|
|
# =========================================================================
|
|
|
|
def get_issue_comments(self, issue_id: str) -> List[YouTrackComment]:
|
|
"""Get all comments on an issue."""
|
|
data = self._get(f"issues/{issue_id}/comments", params={
|
|
"fields": "id,text,author(login),created"
|
|
})
|
|
return [
|
|
YouTrackComment(
|
|
id=c.get("id", ""),
|
|
text=c.get("text", ""),
|
|
author=c.get("author", {}).get("login", "unknown"),
|
|
created=c.get("created", 0),
|
|
)
|
|
for c in data
|
|
]
|
|
|
|
def add_issue_comment(self, issue_id: str, text: str) -> bool:
|
|
"""
|
|
Add a comment to an issue.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
text: Comment text (Markdown supported)
|
|
"""
|
|
try:
|
|
self._post(f"issues/{issue_id}/comments", data={"text": text})
|
|
logger.info(f"Added comment to {issue_id}")
|
|
return True
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Failed to add comment to {issue_id} - auth error: {e}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to add comment to {issue_id} - connection error: {e}")
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to add comment to {issue_id} - timeout: {e}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to add comment to {issue_id}: {e}")
|
|
return False
|
|
|
|
# =========================================================================
|
|
# Time Tracking (Work Items)
|
|
# =========================================================================
|
|
|
|
def _get_work_type_id(self, issue_id: str, work_type_name: str) -> Optional[str]:
|
|
"""
|
|
Get the ID of a work item type by name for a given issue's project.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123' (used to determine project)
|
|
work_type_name: Name of the work type (e.g., 'Development')
|
|
|
|
Returns:
|
|
Work type ID if found, None otherwise
|
|
"""
|
|
try:
|
|
# Get project ID from issue
|
|
project_id = issue_id.split("-")[0]
|
|
|
|
# Fetch work item types for the project
|
|
data = self._get(f"admin/projects/{project_id}/timeTrackingSettings/workItemTypes", params={
|
|
"fields": "id,name"
|
|
})
|
|
|
|
for wt in data:
|
|
if wt.get("name", "").lower() == work_type_name.lower():
|
|
return wt.get("id")
|
|
|
|
logger.debug(f"Work type '{work_type_name}' not found in project {project_id}")
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Could not fetch work types for {issue_id}: {e}")
|
|
return None
|
|
|
|
def add_work_item(self, issue_id: str, duration_minutes: int,
|
|
work_type: str = None, text: str = "") -> bool:
|
|
"""
|
|
Log time spent on an issue.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
duration_minutes: Time spent in minutes (minimum 1)
|
|
work_type: Work type name (e.g., 'Development', 'Testing', 'Documentation')
|
|
If the type doesn't exist in the project, it will be omitted.
|
|
text: Optional description of the work done
|
|
|
|
Returns:
|
|
True if work item was created successfully
|
|
"""
|
|
try:
|
|
import time as time_module
|
|
|
|
data = {
|
|
"duration": {"minutes": max(1, duration_minutes)},
|
|
"date": int(time_module.time() * 1000), # Current time in ms
|
|
"text": text,
|
|
}
|
|
|
|
# Try to find work type ID if specified
|
|
if work_type:
|
|
work_type_id = self._get_work_type_id(issue_id, work_type)
|
|
if work_type_id:
|
|
data["type"] = {"id": work_type_id}
|
|
|
|
self._post(f"issues/{issue_id}/timeTracking/workItems", data=data)
|
|
logger.info(f"Added {duration_minutes}m work item to {issue_id} (type: {work_type})")
|
|
return True
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Failed to add work item to {issue_id} - auth error: {e}")
|
|
return False
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.error(f"Failed to add work item to {issue_id} - connection error: {e}")
|
|
return False
|
|
except requests.exceptions.Timeout as e:
|
|
logger.error(f"Failed to add work item to {issue_id} - timeout: {e}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to add work item to {issue_id}: {e}")
|
|
return False
|
|
|
|
def get_work_items(self, issue_id: str) -> List[WorkItem]:
|
|
"""
|
|
Get all work items for an issue.
|
|
|
|
Args:
|
|
issue_id: Issue ID like 'CG-123'
|
|
|
|
Returns:
|
|
List of WorkItem objects
|
|
"""
|
|
try:
|
|
data = self._get(f"issues/{issue_id}/timeTracking/workItems", params={
|
|
"fields": "id,duration(minutes),date,author(login),type(name),text"
|
|
})
|
|
return [
|
|
WorkItem(
|
|
id=item.get("id", ""),
|
|
duration_minutes=item.get("duration", {}).get("minutes", 0),
|
|
date=item.get("date", 0),
|
|
author=item.get("author", {}).get("login", "unknown"),
|
|
work_type=item.get("type", {}).get("name", "") if item.get("type") else "",
|
|
text=item.get("text", ""),
|
|
)
|
|
for item in data
|
|
]
|
|
except YouTrackAuthError as e:
|
|
logger.error(f"Failed to get work items for {issue_id} - auth error: {e}")
|
|
return []
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to get work items for {issue_id}: {e}")
|
|
return []
|
|
|
|
# =========================================================================
|
|
# Agile Boards (Kanban)
|
|
# =========================================================================
|
|
|
|
def get_agile_boards(self) -> List[dict]:
|
|
"""Get all agile boards."""
|
|
return self._get("agiles", params={
|
|
"fields": "id,name,projects(shortName)"
|
|
})
|
|
|
|
def get_board_sprints(self, board_id: str) -> List[dict]:
|
|
"""Get sprints for an agile board."""
|
|
return self._get(f"agiles/{board_id}/sprints", params={
|
|
"fields": "id,name,start,finish,isDefault"
|
|
})
|
|
|
|
# =========================================================================
|
|
# Webhooks
|
|
# =========================================================================
|
|
|
|
def get_webhooks(self) -> List[dict]:
|
|
"""Get all configured webhooks."""
|
|
return self._get("admin/globalSettings/webhooks", params={
|
|
"fields": "id,name,url,enabled,events"
|
|
})
|
|
|
|
def create_webhook(self, name: str, url: str, events: List[str] = None) -> dict:
|
|
"""
|
|
Create a webhook for issue events.
|
|
|
|
Args:
|
|
name: Webhook name
|
|
url: Callback URL
|
|
events: List of event types (default: issue changes)
|
|
Options: 'IssueCreated', 'IssueChanged', 'IssueCommentCreated'
|
|
"""
|
|
if events is None:
|
|
events = ["IssueChanged"]
|
|
|
|
return self._post("admin/globalSettings/webhooks", data={
|
|
"name": name,
|
|
"url": url,
|
|
"enabled": True,
|
|
"events": events,
|
|
})
|
|
|
|
|
|
def load_youtrack_config(config: dict) -> Optional[YouTrackClient]:
|
|
"""
|
|
Load YouTrack client from configuration.
|
|
|
|
Expected config structure:
|
|
youtrack:
|
|
base_url: https://track.yourdomain.com
|
|
token: perm:xxx
|
|
"""
|
|
yt_config = config.get("youtrack", {})
|
|
base_url = yt_config.get("base_url")
|
|
token = yt_config.get("token")
|
|
|
|
if not base_url or not token:
|
|
logger.warning("YouTrack configuration incomplete (missing base_url or token)")
|
|
return None
|
|
|
|
return YouTrackClient(base_url, token)
|