Orchestrator: - Add orchestrator chat interface with streaming responses - MCP server integration for YouTrack queries - Quick actions for backlog review, triage analysis - Dynamic suggestions based on conversation context - Action approval/rejection workflow Dashboard improvements: - Add font preloading to prevent FOUC - CSS spinner for loading state (no icon font dependency) - Wait for fonts before showing UI - Fix workflow pipeline alignment - Fix user message contrast (dark blue background) - Auto-scroll chat, actions, suggestions panels - Add keyboard shortcuts system - Add toast notifications - Add theme toggle (dark/light mode) - New pages: orchestrator, repos, system, analytics Workflow fixes: - Skip Build state when agent determines no changes needed - Check branch exists before attempting push - Include comments in get_issues MCP response - Simplified orchestrator prompt focused on Backlog management 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
775 lines
33 KiB
Python
775 lines
33 KiB
Python
"""
|
|
Orchestrator agent for managing the ClearGrow development workflow.
|
|
|
|
Provides an AI-powered assistant that analyzes issues, makes recommendations,
|
|
and executes approved actions across the development platform.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Callable, Generator, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
from runner import Runner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class OrchestratorMessage:
|
|
"""A single message in the orchestrator conversation."""
|
|
id: str
|
|
role: str # "user", "assistant", "system", "action_result"
|
|
content: str
|
|
timestamp: datetime
|
|
actions: list = field(default_factory=list) # Suggested actions from assistant
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"role": self.role,
|
|
"content": self.content,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"actions": self.actions,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class OrchestratorAction:
|
|
"""A proposed action awaiting user approval."""
|
|
id: str
|
|
kind: str # "transition", "batch_transition", "comment", "assign"
|
|
description: str
|
|
params: dict
|
|
created_at: datetime
|
|
status: str = "pending" # "pending", "approved", "rejected", "executed", "failed"
|
|
result: Optional[dict] = None
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"kind": self.kind,
|
|
"description": self.description,
|
|
"params": self.params,
|
|
"created_at": self.created_at.isoformat(),
|
|
"status": self.status,
|
|
"result": self.result,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SuggestedPrompt:
|
|
"""A suggested follow-up prompt for the user."""
|
|
id: str
|
|
icon: str
|
|
label: str
|
|
description: str
|
|
prompt: str
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.id,
|
|
"icon": self.icon,
|
|
"label": self.label,
|
|
"description": self.description,
|
|
"prompt": self.prompt,
|
|
}
|
|
|
|
|
|
class OrchestratorSession:
|
|
"""Manages a single orchestrator agent session."""
|
|
|
|
# Stream buffer size for reading Claude output
|
|
STREAM_BUFFER_TIMEOUT = 0.1
|
|
|
|
def __init__(self, runner: "Runner", prompt_path: Optional[Path] = None):
|
|
self.runner = runner
|
|
self.session_id = str(uuid.uuid4())
|
|
self.created_at = datetime.now()
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Creating new OrchestratorSession")
|
|
|
|
# Conversation state
|
|
self.messages: list[OrchestratorMessage] = []
|
|
self.pending_actions: dict[str, OrchestratorAction] = {}
|
|
self.suggested_prompts: list[SuggestedPrompt] = []
|
|
self._messages_lock = threading.Lock()
|
|
|
|
# Process state (per-message, not persistent)
|
|
self.process: Optional[subprocess.Popen] = None
|
|
self._output_thread: Optional[threading.Thread] = None
|
|
self._shutdown_event = threading.Event()
|
|
self._current_response = ""
|
|
self._response_complete = threading.Event()
|
|
|
|
# Claude session ID for --resume
|
|
self._claude_session_id: Optional[str] = None
|
|
|
|
# Load system prompt
|
|
self.prompt_path = prompt_path or Path(__file__).parent / "prompts" / "orchestrator.md"
|
|
self._system_prompt = self._load_system_prompt()
|
|
|
|
# Load suggestions prompt
|
|
self.suggestions_prompt_path = Path(__file__).parent / "prompts" / "orchestrator_suggestions.md"
|
|
self._suggestions_prompt = self._load_suggestions_prompt()
|
|
|
|
# Claude command configuration
|
|
self.claude_command = runner.config.get("claude_command", "claude")
|
|
|
|
# Working directory for orchestrator
|
|
self.work_dir = Path(__file__).parent
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Initialized with prompt_path={self.prompt_path}")
|
|
|
|
def _load_system_prompt(self) -> str:
|
|
"""Load the orchestrator system prompt."""
|
|
if self.prompt_path.exists():
|
|
content = self.prompt_path.read_text()
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Loaded system prompt ({len(content)} chars)")
|
|
return content
|
|
|
|
# Fallback minimal prompt if file doesn't exist
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Prompt not found at {self.prompt_path}, using fallback")
|
|
return """You are the ClearGrow Orchestrator, an AI assistant that helps manage
|
|
the development workflow. Analyze issues and suggest actions to improve workflow efficiency.
|
|
|
|
Output actions in JSON format:
|
|
{"type": "action", "action": {"id": "uuid", "kind": "transition", "description": "...", "params": {...}}}
|
|
"""
|
|
|
|
def _load_suggestions_prompt(self) -> str:
|
|
"""Load the suggestions generation prompt."""
|
|
if self.suggestions_prompt_path.exists():
|
|
content = self.suggestions_prompt_path.read_text()
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Loaded suggestions prompt ({len(content)} chars)")
|
|
return content
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions prompt not found, using fallback")
|
|
return "Generate 2-3 follow-up suggestions as a JSON array."
|
|
|
|
def start(self) -> bool:
|
|
"""
|
|
Initialize the orchestrator session (no subprocess started yet).
|
|
|
|
The actual Claude process is started per-message using -p flag.
|
|
Returns True if initialized successfully.
|
|
"""
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Session started")
|
|
return True
|
|
|
|
def _build_initial_context(self) -> str:
|
|
"""Build minimal context - orchestrator will use MCP tools to fetch data."""
|
|
return """Use the available MCP tools to fetch data about issues and agents.
|
|
|
|
Start with get_workflow_summary to see the current state of all issues."""
|
|
|
|
def _run_claude_message(self, prompt: str) -> Generator[dict, None, None]:
|
|
"""
|
|
Run a Claude command for a single message and yield response chunks.
|
|
|
|
Uses -p flag for the prompt, and --resume to continue conversation.
|
|
Uses MCP server for internal API access (YouTrack, agent pool).
|
|
"""
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Running Claude message (prompt length: {len(prompt)})")
|
|
|
|
# MCP config path
|
|
mcp_config = Path(__file__).parent / "mcp_config.json"
|
|
|
|
# Build command with MCP tools only (no file system access)
|
|
cmd = [
|
|
self.claude_command,
|
|
"-p", prompt,
|
|
"--output-format", "stream-json",
|
|
"--verbose",
|
|
"--mcp-config", str(mcp_config),
|
|
"--strict-mcp-config", # Only use our MCP server, no other tools
|
|
"--permission-mode", "bypassPermissions", # Auto-approve MCP tool calls
|
|
]
|
|
|
|
# Add resume flag if we have a previous session
|
|
if self._claude_session_id:
|
|
cmd.extend(["--resume", self._claude_session_id])
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Resuming Claude session: {self._claude_session_id}")
|
|
else:
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Starting new Claude session")
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Command: {cmd[0]} -p '...' --output-format stream-json ...")
|
|
|
|
try:
|
|
self.process = subprocess.Popen(
|
|
cmd,
|
|
cwd=str(self.work_dir),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
)
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Subprocess started (PID: {self.process.pid})")
|
|
|
|
# Read output line by line
|
|
response_text = ""
|
|
line_count = 0
|
|
for line in self.process.stdout:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
line_count += 1
|
|
chunk_result = self._handle_output_line(line)
|
|
if chunk_result:
|
|
if chunk_result.get("type") == "text":
|
|
response_text += chunk_result.get("content", "")
|
|
yield {"type": "chunk", "content": chunk_result.get("content", "")}
|
|
elif chunk_result.get("type") == "session_id":
|
|
self._claude_session_id = chunk_result.get("session_id")
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Got Claude session ID: {self._claude_session_id}")
|
|
elif chunk_result.get("type") == "actions":
|
|
yield {"type": "actions", "actions": chunk_result.get("actions", [])}
|
|
|
|
# Wait for process to complete
|
|
return_code = self.process.wait()
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Claude process completed (return_code={return_code}, lines={line_count}, response_length={len(response_text)})")
|
|
|
|
# Check stderr for errors
|
|
stderr = self.process.stderr.read()
|
|
if stderr:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Claude stderr: {stderr[:500]}")
|
|
|
|
# Store the response
|
|
if response_text:
|
|
self._add_assistant_message(response_text)
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Added assistant message to history")
|
|
|
|
# Generate actions and follow-up suggestions with a second call
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Generating suggestions...")
|
|
generated = self._generate_suggestions()
|
|
|
|
if generated.get("actions"):
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding {len(generated['actions'])} actions")
|
|
yield {
|
|
"type": "actions",
|
|
"actions": generated["actions"]
|
|
}
|
|
else:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] No actions generated")
|
|
|
|
if generated.get("suggestions"):
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding {len(generated['suggestions'])} suggestions")
|
|
yield {
|
|
"type": "suggestions",
|
|
"suggestions": generated["suggestions"]
|
|
}
|
|
else:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] No suggestions generated")
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding complete")
|
|
yield {"type": "complete"}
|
|
|
|
except FileNotFoundError:
|
|
logger.error(f"[ORCH:{self.session_id[:8]}] Claude command not found: {self.claude_command}")
|
|
yield {"type": "error", "message": "Claude command not found"}
|
|
except Exception as e:
|
|
logger.exception(f"[ORCH:{self.session_id[:8]}] Error running orchestrator command: {e}")
|
|
yield {"type": "error", "message": str(e)}
|
|
finally:
|
|
self.process = None
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Claude message completed")
|
|
|
|
def _handle_output_line(self, line: str) -> Optional[dict]:
|
|
"""Process a single line of output from Claude. Returns parsed data or None."""
|
|
try:
|
|
data = json.loads(line)
|
|
msg_type = data.get("type", "")
|
|
|
|
if msg_type == "assistant":
|
|
# Extract text from assistant message
|
|
message = data.get("message", {})
|
|
content_blocks = message.get("content", [])
|
|
|
|
text_content = ""
|
|
for block in content_blocks:
|
|
if block.get("type") == "text":
|
|
text_content += block.get("text", "")
|
|
|
|
if text_content:
|
|
return {"type": "text", "content": text_content}
|
|
|
|
elif msg_type == "result":
|
|
# Response complete - extract session_id for resume
|
|
session_id = data.get("session_id")
|
|
if session_id:
|
|
return {"type": "session_id", "session_id": session_id}
|
|
|
|
return None
|
|
|
|
except json.JSONDecodeError:
|
|
# Non-JSON output
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Non-JSON output: {line[:100]}")
|
|
return None
|
|
|
|
def _generate_suggestions(self) -> dict:
|
|
"""
|
|
Generate follow-up suggestions and actions using a second Claude call.
|
|
|
|
Uses --resume to continue the conversation with full context.
|
|
Returns dict with actions and suggestions lists.
|
|
"""
|
|
if not self._claude_session_id:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] No session ID for suggestions generation")
|
|
return {"actions": [], "suggestions": []}
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Generating follow-up suggestions with --resume {self._claude_session_id}")
|
|
|
|
cmd = [
|
|
self.claude_command,
|
|
"-p", self._suggestions_prompt,
|
|
"--output-format", "json",
|
|
"--resume", self._claude_session_id,
|
|
"--max-turns", "1",
|
|
]
|
|
|
|
try:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Running suggestions command...")
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=str(self.work_dir),
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
)
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Suggestions command completed (return_code={result.returncode})")
|
|
|
|
if result.returncode != 0:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions generation failed (code={result.returncode}): {result.stderr[:500]}")
|
|
return {"actions": [], "suggestions": []}
|
|
|
|
# Parse JSON output
|
|
output = result.stdout.strip()
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Suggestions raw output ({len(output)} chars): {output[:200]}...")
|
|
|
|
data = json.loads(output)
|
|
|
|
# Extract the response text
|
|
response_text = ""
|
|
if isinstance(data, dict) and "result" in data:
|
|
response_text = data["result"]
|
|
elif isinstance(data, str):
|
|
response_text = data
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Extracted response text ({len(response_text)} chars): {response_text[:300]}...")
|
|
|
|
# Parse actions and suggestions from response
|
|
parsed = self._parse_actions_and_suggestions(response_text)
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(parsed['actions'])} actions and {len(parsed['suggestions'])} suggestions")
|
|
|
|
return parsed
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions generation timed out")
|
|
return {"actions": [], "suggestions": []}
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Failed to parse suggestions JSON: {e}")
|
|
return {"actions": [], "suggestions": []}
|
|
except Exception as e:
|
|
logger.exception(f"[ORCH:{self.session_id[:8]}] Error generating suggestions: {e}")
|
|
return {"actions": [], "suggestions": []}
|
|
|
|
def _parse_actions_and_suggestions(self, text: str) -> dict:
|
|
"""Parse actions and suggestions JSON from response text."""
|
|
import re
|
|
|
|
result = {"actions": [], "suggestions": []}
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Parsing actions/suggestions from text ({len(text)} chars)")
|
|
|
|
# Try to find a JSON object with actions and suggestions
|
|
# Look for {...} pattern containing "actions" or "suggestions"
|
|
match = re.search(r'\{[\s\S]*"(?:actions|suggestions)"[\s\S]*\}', text)
|
|
if match:
|
|
json_str = match.group()
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Found JSON object ({len(json_str)} chars)")
|
|
try:
|
|
data = json.loads(json_str)
|
|
if isinstance(data, dict):
|
|
# Parse actions
|
|
raw_actions = data.get("actions", [])
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Found {len(raw_actions)} raw actions")
|
|
|
|
for i, a in enumerate(raw_actions):
|
|
if isinstance(a, dict) and a.get("kind"):
|
|
action_id = a.get("id", f"act-{i+1}")
|
|
action = OrchestratorAction(
|
|
id=action_id,
|
|
kind=a.get("kind"),
|
|
description=a.get("description", ""),
|
|
params=a.get("params", {}),
|
|
created_at=datetime.now(),
|
|
)
|
|
with self._messages_lock:
|
|
self.pending_actions[action.id] = action
|
|
result["actions"].append(action.to_dict())
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Created action: id={action_id}, kind={action.kind}, desc={action.description[:50]}")
|
|
else:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Skipping invalid action at index {i}: {a}")
|
|
|
|
# Parse suggestions
|
|
raw_suggestions = data.get("suggestions", [])
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Found {len(raw_suggestions)} raw suggestions")
|
|
|
|
for i, s in enumerate(raw_suggestions):
|
|
if isinstance(s, dict) and s.get("label") and s.get("prompt"):
|
|
sug = {
|
|
"id": s.get("id", f"sug-{i+1}"),
|
|
"icon": s.get("icon", "lightbulb"),
|
|
"label": s.get("label", ""),
|
|
"description": s.get("description", ""),
|
|
"prompt": s.get("prompt", ""),
|
|
}
|
|
result["suggestions"].append(sug)
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Created suggestion: {sug['label']}")
|
|
else:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Skipping invalid suggestion at index {i}: {s}")
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(result['actions'])} actions, {len(result['suggestions'])} suggestions")
|
|
return result
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] JSON decode error: {e}")
|
|
else:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] No JSON object found, trying array fallback")
|
|
|
|
# Fallback: try to find just a suggestions array (old format)
|
|
match = re.search(r'\[[\s\S]*?\]', text)
|
|
if match:
|
|
try:
|
|
suggestions = json.loads(match.group())
|
|
if isinstance(suggestions, list):
|
|
for i, s in enumerate(suggestions):
|
|
if isinstance(s, dict) and s.get("label") and s.get("prompt"):
|
|
result["suggestions"].append({
|
|
"id": s.get("id", f"sug-{i+1}"),
|
|
"icon": s.get("icon", "lightbulb"),
|
|
"label": s.get("label", ""),
|
|
"description": s.get("description", ""),
|
|
"prompt": s.get("prompt", ""),
|
|
})
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(result['suggestions'])} suggestions from array fallback")
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Array fallback JSON decode error: {e}")
|
|
else:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] No JSON found in response text")
|
|
|
|
return result
|
|
|
|
def _add_assistant_message(self, content: str):
|
|
"""Add an assistant message to the conversation."""
|
|
msg = OrchestratorMessage(
|
|
id=str(uuid.uuid4()),
|
|
role="assistant",
|
|
content=content,
|
|
timestamp=datetime.now(),
|
|
actions=[a.to_dict() for a in self.pending_actions.values() if a.status == "pending"],
|
|
)
|
|
|
|
with self._messages_lock:
|
|
self.messages.append(msg)
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Added assistant message (id={msg.id[:8]}, length={len(content)})")
|
|
|
|
def send_message(self, user_message: str) -> Generator[dict, None, None]:
|
|
"""
|
|
Send a user message and yield response chunks.
|
|
|
|
Yields dicts with streaming response data.
|
|
"""
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] send_message called (length={len(user_message)})")
|
|
|
|
# Clear previous suggestions for new message
|
|
with self._messages_lock:
|
|
self.suggested_prompts.clear()
|
|
|
|
# Add user message to history
|
|
user_msg = OrchestratorMessage(
|
|
id=str(uuid.uuid4()),
|
|
role="user",
|
|
content=user_message,
|
|
timestamp=datetime.now(),
|
|
)
|
|
|
|
with self._messages_lock:
|
|
self.messages.append(user_msg)
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Added user message to history (total messages: {len(self.messages)})")
|
|
|
|
# Build the full prompt
|
|
# For first message, include system prompt and context
|
|
if self._claude_session_id is None:
|
|
initial_context = self._build_initial_context()
|
|
full_prompt = f"{self._system_prompt}\n\n---\n\n{initial_context}\n\n---\n\nUser request: {user_message}"
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Built initial prompt (length={len(full_prompt)})")
|
|
else:
|
|
# For subsequent messages, just send the user message
|
|
full_prompt = user_message
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Using user message as prompt (resuming session)")
|
|
|
|
# Run Claude and yield results
|
|
yield from self._run_claude_message(full_prompt)
|
|
|
|
def execute_action(self, action_id: str, approved: bool) -> dict:
|
|
"""
|
|
Execute or reject a pending action.
|
|
|
|
Returns result dict with success status and details.
|
|
"""
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] execute_action called: action_id={action_id}, approved={approved}")
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Available actions: {list(self.pending_actions.keys())}")
|
|
|
|
# Get action reference with lock, but don't hold lock during execution
|
|
action = None
|
|
if not self._messages_lock.acquire(timeout=5.0):
|
|
logger.error(f"[ORCH:{self.session_id[:8]}] execute_action: failed to acquire lock (timeout)")
|
|
return {"success": False, "error": "Internal error: lock timeout"}
|
|
|
|
try:
|
|
action = self.pending_actions.get(action_id)
|
|
if not action:
|
|
logger.error(f"[ORCH:{self.session_id[:8]}] Action {action_id} not found in pending_actions")
|
|
return {"success": False, "error": f"Action {action_id} not found"}
|
|
|
|
if action.status != "pending":
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Action {action_id} already {action.status}")
|
|
return {"success": False, "error": f"Action already {action.status}"}
|
|
|
|
# Mark as processing to prevent duplicate execution
|
|
action.status = "processing"
|
|
finally:
|
|
self._messages_lock.release()
|
|
|
|
if not approved:
|
|
action.status = "rejected"
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Action {action_id} rejected")
|
|
return {"success": True, "status": "rejected"}
|
|
|
|
# Execute based on action kind
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Executing action: kind={action.kind}, params={action.params}")
|
|
try:
|
|
result = self._execute_action_impl(action)
|
|
action.status = "executed"
|
|
action.result = result
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Action {action_id} executed successfully: {result}")
|
|
|
|
# Add result message to conversation
|
|
result_msg = OrchestratorMessage(
|
|
id=str(uuid.uuid4()),
|
|
role="action_result",
|
|
content=f"Action executed: {action.description}\nResult: {json.dumps(result)}",
|
|
timestamp=datetime.now(),
|
|
)
|
|
|
|
with self._messages_lock:
|
|
self.messages.append(result_msg)
|
|
|
|
return {"success": True, "status": "executed", "result": result}
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[ORCH:{self.session_id[:8]}] Failed to execute action {action_id}: {e}")
|
|
action.status = "failed"
|
|
action.result = {"error": str(e)}
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def _execute_action_impl(self, action: OrchestratorAction) -> dict:
|
|
"""Execute the actual action based on kind."""
|
|
kind = action.kind
|
|
params = action.params
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] _execute_action_impl: kind={kind}")
|
|
|
|
if kind == "transition":
|
|
# Transition single issue
|
|
issue_id = params.get("issue_id")
|
|
new_state = params.get("state")
|
|
|
|
if not issue_id or not new_state:
|
|
raise ValueError("Missing issue_id or state")
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Transitioning {issue_id} to {new_state}")
|
|
self.runner.youtrack.update_issue_state(issue_id, new_state)
|
|
return {"issue_id": issue_id, "new_state": new_state}
|
|
|
|
elif kind == "batch_transition":
|
|
# Transition multiple issues
|
|
issue_ids = params.get("issue_ids", [])
|
|
new_state = params.get("state")
|
|
|
|
if not issue_ids or not new_state:
|
|
raise ValueError("Missing issue_ids or state")
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Batch transitioning {len(issue_ids)} issues to {new_state}")
|
|
results = []
|
|
for issue_id in issue_ids:
|
|
try:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Transitioning {issue_id} to {new_state}")
|
|
self.runner.youtrack.update_issue_state(issue_id, new_state)
|
|
results.append({"issue_id": issue_id, "success": True})
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] {issue_id} transitioned successfully")
|
|
except Exception as e:
|
|
logger.error(f"[ORCH:{self.session_id[:8]}] Failed to transition {issue_id}: {e}")
|
|
results.append({"issue_id": issue_id, "success": False, "error": str(e)})
|
|
|
|
return {"results": results, "new_state": new_state}
|
|
|
|
elif kind == "comment":
|
|
# Add comment to issue
|
|
issue_id = params.get("issue_id")
|
|
text = params.get("text")
|
|
|
|
if not issue_id or not text:
|
|
raise ValueError("Missing issue_id or text")
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Adding comment to {issue_id}")
|
|
self.runner.youtrack.add_issue_comment(issue_id, text)
|
|
return {"issue_id": issue_id, "commented": True}
|
|
|
|
else:
|
|
raise ValueError(f"Unknown action kind: {kind}")
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current session status."""
|
|
is_processing = self.process is not None and self.process.poll() is None
|
|
|
|
with self._messages_lock:
|
|
pending_count = len([a for a in self.pending_actions.values() if a.status == "pending"])
|
|
status = {
|
|
"session_id": self.session_id,
|
|
"active": len(self.messages) > 0 or self._claude_session_id is not None,
|
|
"processing": is_processing,
|
|
"created_at": self.created_at.isoformat(),
|
|
"message_count": len(self.messages),
|
|
"pending_actions": [
|
|
a.to_dict() for a in self.pending_actions.values()
|
|
if a.status == "pending"
|
|
],
|
|
}
|
|
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] get_status: active={status['active']}, messages={status['message_count']}, pending_actions={pending_count}")
|
|
return status
|
|
|
|
def get_messages(self, since_index: int = 0) -> list[dict]:
|
|
"""Get conversation messages since an index."""
|
|
with self._messages_lock:
|
|
messages = [m.to_dict() for m in self.messages[since_index:]]
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] get_messages(since={since_index}): returning {len(messages)} messages")
|
|
return messages
|
|
|
|
def stop(self):
|
|
"""Stop the orchestrator session."""
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Stopping session...")
|
|
|
|
if self.process:
|
|
try:
|
|
logger.debug(f"[ORCH:{self.session_id[:8]}] Terminating subprocess (PID: {self.process.pid})")
|
|
self.process.terminate()
|
|
try:
|
|
self.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning(f"[ORCH:{self.session_id[:8]}] Process didn't terminate, killing...")
|
|
self.process.kill()
|
|
self.process.wait()
|
|
except Exception as e:
|
|
logger.error(f"[ORCH:{self.session_id[:8]}] Error stopping process: {e}")
|
|
finally:
|
|
self.process = None
|
|
|
|
# Clear session state
|
|
self._claude_session_id = None
|
|
with self._messages_lock:
|
|
msg_count = len(self.messages)
|
|
action_count = len(self.pending_actions)
|
|
self.messages.clear()
|
|
self.pending_actions.clear()
|
|
|
|
logger.info(f"[ORCH:{self.session_id[:8]}] Session stopped (cleared {msg_count} messages, {action_count} actions)")
|
|
|
|
|
|
class OrchestratorManager:
|
|
"""Manages orchestrator sessions (singleton-like behavior)."""
|
|
|
|
def __init__(self, runner: "Runner"):
|
|
self.runner = runner
|
|
self._current_session: Optional[OrchestratorSession] = None
|
|
self._lock = threading.Lock()
|
|
logger.info("[ORCH-MGR] OrchestratorManager initialized")
|
|
|
|
def get_session(self) -> Optional[OrchestratorSession]:
|
|
"""Get the current orchestrator session if active."""
|
|
with self._lock:
|
|
# Session is active if it has messages or a Claude session ID
|
|
if self._current_session:
|
|
has_messages = bool(self._current_session.messages)
|
|
has_claude_id = bool(self._current_session._claude_session_id)
|
|
if has_messages or has_claude_id:
|
|
logger.debug(f"[ORCH-MGR] get_session: returning session {self._current_session.session_id[:8]} (messages={has_messages}, claude_id={has_claude_id})")
|
|
return self._current_session
|
|
else:
|
|
logger.debug("[ORCH-MGR] get_session: session exists but not active (no messages or claude_id)")
|
|
else:
|
|
logger.debug("[ORCH-MGR] get_session: no current session")
|
|
return None
|
|
|
|
def start_session(self) -> tuple[OrchestratorSession, bool]:
|
|
"""
|
|
Start a new orchestrator session.
|
|
|
|
Returns (session, created) tuple.
|
|
- If existing session is active, returns (existing, False)
|
|
- If new session started, returns (new, True)
|
|
"""
|
|
with self._lock:
|
|
# Check for existing active session (has messages or Claude session ID)
|
|
if self._current_session:
|
|
if self._current_session.messages or self._current_session._claude_session_id:
|
|
logger.info(f"[ORCH-MGR] start_session: returning existing session {self._current_session.session_id[:8]}")
|
|
return self._current_session, False
|
|
|
|
# Create new session
|
|
logger.info("[ORCH-MGR] start_session: creating new session")
|
|
session = OrchestratorSession(self.runner)
|
|
if session.start():
|
|
self._current_session = session
|
|
logger.info(f"[ORCH-MGR] start_session: created session {session.session_id[:8]}")
|
|
return session, True
|
|
else:
|
|
logger.error("[ORCH-MGR] start_session: failed to start session")
|
|
raise RuntimeError("Failed to start orchestrator session")
|
|
|
|
def stop_session(self) -> bool:
|
|
"""Stop the current orchestrator session."""
|
|
with self._lock:
|
|
if self._current_session:
|
|
session_id = self._current_session.session_id[:8]
|
|
logger.info(f"[ORCH-MGR] stop_session: stopping session {session_id}")
|
|
self._current_session.stop()
|
|
self._current_session = None
|
|
return True
|
|
logger.debug("[ORCH-MGR] stop_session: no session to stop")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get orchestrator status."""
|
|
with self._lock:
|
|
if self._current_session:
|
|
return self._current_session.get_status()
|
|
return {
|
|
"session_id": None,
|
|
"active": False,
|
|
"message_count": 0,
|
|
"pending_actions": [],
|
|
}
|