Files
agentrunner/orchestrator.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

775 lines
33 KiB
Python

"""
Orchestrator agent for managing the ClearGrow development workflow.
Provides an AI-powered assistant that analyzes issues, makes recommendations,
and executes approved actions across the development platform.
"""
import json
import logging
import subprocess
import threading
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, Generator, Optional
if TYPE_CHECKING:
from runner import Runner
logger = logging.getLogger(__name__)
@dataclass
class OrchestratorMessage:
"""A single message in the orchestrator conversation."""
id: str
role: str # "user", "assistant", "system", "action_result"
content: str
timestamp: datetime
actions: list = field(default_factory=list) # Suggested actions from assistant
def to_dict(self) -> dict:
return {
"id": self.id,
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat(),
"actions": self.actions,
}
@dataclass
class OrchestratorAction:
"""A proposed action awaiting user approval."""
id: str
kind: str # "transition", "batch_transition", "comment", "assign"
description: str
params: dict
created_at: datetime
status: str = "pending" # "pending", "approved", "rejected", "executed", "failed"
result: Optional[dict] = None
def to_dict(self) -> dict:
return {
"id": self.id,
"kind": self.kind,
"description": self.description,
"params": self.params,
"created_at": self.created_at.isoformat(),
"status": self.status,
"result": self.result,
}
@dataclass
class SuggestedPrompt:
"""A suggested follow-up prompt for the user."""
id: str
icon: str
label: str
description: str
prompt: str
def to_dict(self) -> dict:
return {
"id": self.id,
"icon": self.icon,
"label": self.label,
"description": self.description,
"prompt": self.prompt,
}
class OrchestratorSession:
"""Manages a single orchestrator agent session."""
# Stream buffer size for reading Claude output
STREAM_BUFFER_TIMEOUT = 0.1
def __init__(self, runner: "Runner", prompt_path: Optional[Path] = None):
self.runner = runner
self.session_id = str(uuid.uuid4())
self.created_at = datetime.now()
logger.info(f"[ORCH:{self.session_id[:8]}] Creating new OrchestratorSession")
# Conversation state
self.messages: list[OrchestratorMessage] = []
self.pending_actions: dict[str, OrchestratorAction] = {}
self.suggested_prompts: list[SuggestedPrompt] = []
self._messages_lock = threading.Lock()
# Process state (per-message, not persistent)
self.process: Optional[subprocess.Popen] = None
self._output_thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
self._current_response = ""
self._response_complete = threading.Event()
# Claude session ID for --resume
self._claude_session_id: Optional[str] = None
# Load system prompt
self.prompt_path = prompt_path or Path(__file__).parent / "prompts" / "orchestrator.md"
self._system_prompt = self._load_system_prompt()
# Load suggestions prompt
self.suggestions_prompt_path = Path(__file__).parent / "prompts" / "orchestrator_suggestions.md"
self._suggestions_prompt = self._load_suggestions_prompt()
# Claude command configuration
self.claude_command = runner.config.get("claude_command", "claude")
# Working directory for orchestrator
self.work_dir = Path(__file__).parent
logger.debug(f"[ORCH:{self.session_id[:8]}] Initialized with prompt_path={self.prompt_path}")
def _load_system_prompt(self) -> str:
"""Load the orchestrator system prompt."""
if self.prompt_path.exists():
content = self.prompt_path.read_text()
logger.debug(f"[ORCH:{self.session_id[:8]}] Loaded system prompt ({len(content)} chars)")
return content
# Fallback minimal prompt if file doesn't exist
logger.warning(f"[ORCH:{self.session_id[:8]}] Prompt not found at {self.prompt_path}, using fallback")
return """You are the ClearGrow Orchestrator, an AI assistant that helps manage
the development workflow. Analyze issues and suggest actions to improve workflow efficiency.
Output actions in JSON format:
{"type": "action", "action": {"id": "uuid", "kind": "transition", "description": "...", "params": {...}}}
"""
def _load_suggestions_prompt(self) -> str:
"""Load the suggestions generation prompt."""
if self.suggestions_prompt_path.exists():
content = self.suggestions_prompt_path.read_text()
logger.debug(f"[ORCH:{self.session_id[:8]}] Loaded suggestions prompt ({len(content)} chars)")
return content
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions prompt not found, using fallback")
return "Generate 2-3 follow-up suggestions as a JSON array."
def start(self) -> bool:
"""
Initialize the orchestrator session (no subprocess started yet).
The actual Claude process is started per-message using -p flag.
Returns True if initialized successfully.
"""
logger.info(f"[ORCH:{self.session_id[:8]}] Session started")
return True
def _build_initial_context(self) -> str:
"""Build minimal context - orchestrator will use MCP tools to fetch data."""
return """Use the available MCP tools to fetch data about issues and agents.
Start with get_workflow_summary to see the current state of all issues."""
def _run_claude_message(self, prompt: str) -> Generator[dict, None, None]:
"""
Run a Claude command for a single message and yield response chunks.
Uses -p flag for the prompt, and --resume to continue conversation.
Uses MCP server for internal API access (YouTrack, agent pool).
"""
logger.info(f"[ORCH:{self.session_id[:8]}] Running Claude message (prompt length: {len(prompt)})")
# MCP config path
mcp_config = Path(__file__).parent / "mcp_config.json"
# Build command with MCP tools only (no file system access)
cmd = [
self.claude_command,
"-p", prompt,
"--output-format", "stream-json",
"--verbose",
"--mcp-config", str(mcp_config),
"--strict-mcp-config", # Only use our MCP server, no other tools
"--permission-mode", "bypassPermissions", # Auto-approve MCP tool calls
]
# Add resume flag if we have a previous session
if self._claude_session_id:
cmd.extend(["--resume", self._claude_session_id])
logger.info(f"[ORCH:{self.session_id[:8]}] Resuming Claude session: {self._claude_session_id}")
else:
logger.info(f"[ORCH:{self.session_id[:8]}] Starting new Claude session")
logger.debug(f"[ORCH:{self.session_id[:8]}] Command: {cmd[0]} -p '...' --output-format stream-json ...")
try:
self.process = subprocess.Popen(
cmd,
cwd=str(self.work_dir),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
logger.debug(f"[ORCH:{self.session_id[:8]}] Subprocess started (PID: {self.process.pid})")
# Read output line by line
response_text = ""
line_count = 0
for line in self.process.stdout:
line = line.strip()
if not line:
continue
line_count += 1
chunk_result = self._handle_output_line(line)
if chunk_result:
if chunk_result.get("type") == "text":
response_text += chunk_result.get("content", "")
yield {"type": "chunk", "content": chunk_result.get("content", "")}
elif chunk_result.get("type") == "session_id":
self._claude_session_id = chunk_result.get("session_id")
logger.info(f"[ORCH:{self.session_id[:8]}] Got Claude session ID: {self._claude_session_id}")
elif chunk_result.get("type") == "actions":
yield {"type": "actions", "actions": chunk_result.get("actions", [])}
# Wait for process to complete
return_code = self.process.wait()
logger.info(f"[ORCH:{self.session_id[:8]}] Claude process completed (return_code={return_code}, lines={line_count}, response_length={len(response_text)})")
# Check stderr for errors
stderr = self.process.stderr.read()
if stderr:
logger.warning(f"[ORCH:{self.session_id[:8]}] Claude stderr: {stderr[:500]}")
# Store the response
if response_text:
self._add_assistant_message(response_text)
logger.debug(f"[ORCH:{self.session_id[:8]}] Added assistant message to history")
# Generate actions and follow-up suggestions with a second call
logger.info(f"[ORCH:{self.session_id[:8]}] Generating suggestions...")
generated = self._generate_suggestions()
if generated.get("actions"):
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding {len(generated['actions'])} actions")
yield {
"type": "actions",
"actions": generated["actions"]
}
else:
logger.debug(f"[ORCH:{self.session_id[:8]}] No actions generated")
if generated.get("suggestions"):
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding {len(generated['suggestions'])} suggestions")
yield {
"type": "suggestions",
"suggestions": generated["suggestions"]
}
else:
logger.debug(f"[ORCH:{self.session_id[:8]}] No suggestions generated")
logger.info(f"[ORCH:{self.session_id[:8]}] Yielding complete")
yield {"type": "complete"}
except FileNotFoundError:
logger.error(f"[ORCH:{self.session_id[:8]}] Claude command not found: {self.claude_command}")
yield {"type": "error", "message": "Claude command not found"}
except Exception as e:
logger.exception(f"[ORCH:{self.session_id[:8]}] Error running orchestrator command: {e}")
yield {"type": "error", "message": str(e)}
finally:
self.process = None
logger.debug(f"[ORCH:{self.session_id[:8]}] Claude message completed")
def _handle_output_line(self, line: str) -> Optional[dict]:
"""Process a single line of output from Claude. Returns parsed data or None."""
try:
data = json.loads(line)
msg_type = data.get("type", "")
if msg_type == "assistant":
# Extract text from assistant message
message = data.get("message", {})
content_blocks = message.get("content", [])
text_content = ""
for block in content_blocks:
if block.get("type") == "text":
text_content += block.get("text", "")
if text_content:
return {"type": "text", "content": text_content}
elif msg_type == "result":
# Response complete - extract session_id for resume
session_id = data.get("session_id")
if session_id:
return {"type": "session_id", "session_id": session_id}
return None
except json.JSONDecodeError:
# Non-JSON output
logger.debug(f"[ORCH:{self.session_id[:8]}] Non-JSON output: {line[:100]}")
return None
def _generate_suggestions(self) -> dict:
"""
Generate follow-up suggestions and actions using a second Claude call.
Uses --resume to continue the conversation with full context.
Returns dict with actions and suggestions lists.
"""
if not self._claude_session_id:
logger.warning(f"[ORCH:{self.session_id[:8]}] No session ID for suggestions generation")
return {"actions": [], "suggestions": []}
logger.info(f"[ORCH:{self.session_id[:8]}] Generating follow-up suggestions with --resume {self._claude_session_id}")
cmd = [
self.claude_command,
"-p", self._suggestions_prompt,
"--output-format", "json",
"--resume", self._claude_session_id,
"--max-turns", "1",
]
try:
logger.debug(f"[ORCH:{self.session_id[:8]}] Running suggestions command...")
result = subprocess.run(
cmd,
cwd=str(self.work_dir),
capture_output=True,
text=True,
timeout=30,
)
logger.debug(f"[ORCH:{self.session_id[:8]}] Suggestions command completed (return_code={result.returncode})")
if result.returncode != 0:
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions generation failed (code={result.returncode}): {result.stderr[:500]}")
return {"actions": [], "suggestions": []}
# Parse JSON output
output = result.stdout.strip()
logger.debug(f"[ORCH:{self.session_id[:8]}] Suggestions raw output ({len(output)} chars): {output[:200]}...")
data = json.loads(output)
# Extract the response text
response_text = ""
if isinstance(data, dict) and "result" in data:
response_text = data["result"]
elif isinstance(data, str):
response_text = data
logger.debug(f"[ORCH:{self.session_id[:8]}] Extracted response text ({len(response_text)} chars): {response_text[:300]}...")
# Parse actions and suggestions from response
parsed = self._parse_actions_and_suggestions(response_text)
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(parsed['actions'])} actions and {len(parsed['suggestions'])} suggestions")
return parsed
except subprocess.TimeoutExpired:
logger.warning(f"[ORCH:{self.session_id[:8]}] Suggestions generation timed out")
return {"actions": [], "suggestions": []}
except json.JSONDecodeError as e:
logger.warning(f"[ORCH:{self.session_id[:8]}] Failed to parse suggestions JSON: {e}")
return {"actions": [], "suggestions": []}
except Exception as e:
logger.exception(f"[ORCH:{self.session_id[:8]}] Error generating suggestions: {e}")
return {"actions": [], "suggestions": []}
def _parse_actions_and_suggestions(self, text: str) -> dict:
"""Parse actions and suggestions JSON from response text."""
import re
result = {"actions": [], "suggestions": []}
logger.debug(f"[ORCH:{self.session_id[:8]}] Parsing actions/suggestions from text ({len(text)} chars)")
# Try to find a JSON object with actions and suggestions
# Look for {...} pattern containing "actions" or "suggestions"
match = re.search(r'\{[\s\S]*"(?:actions|suggestions)"[\s\S]*\}', text)
if match:
json_str = match.group()
logger.debug(f"[ORCH:{self.session_id[:8]}] Found JSON object ({len(json_str)} chars)")
try:
data = json.loads(json_str)
if isinstance(data, dict):
# Parse actions
raw_actions = data.get("actions", [])
logger.debug(f"[ORCH:{self.session_id[:8]}] Found {len(raw_actions)} raw actions")
for i, a in enumerate(raw_actions):
if isinstance(a, dict) and a.get("kind"):
action_id = a.get("id", f"act-{i+1}")
action = OrchestratorAction(
id=action_id,
kind=a.get("kind"),
description=a.get("description", ""),
params=a.get("params", {}),
created_at=datetime.now(),
)
with self._messages_lock:
self.pending_actions[action.id] = action
result["actions"].append(action.to_dict())
logger.info(f"[ORCH:{self.session_id[:8]}] Created action: id={action_id}, kind={action.kind}, desc={action.description[:50]}")
else:
logger.warning(f"[ORCH:{self.session_id[:8]}] Skipping invalid action at index {i}: {a}")
# Parse suggestions
raw_suggestions = data.get("suggestions", [])
logger.debug(f"[ORCH:{self.session_id[:8]}] Found {len(raw_suggestions)} raw suggestions")
for i, s in enumerate(raw_suggestions):
if isinstance(s, dict) and s.get("label") and s.get("prompt"):
sug = {
"id": s.get("id", f"sug-{i+1}"),
"icon": s.get("icon", "lightbulb"),
"label": s.get("label", ""),
"description": s.get("description", ""),
"prompt": s.get("prompt", ""),
}
result["suggestions"].append(sug)
logger.debug(f"[ORCH:{self.session_id[:8]}] Created suggestion: {sug['label']}")
else:
logger.warning(f"[ORCH:{self.session_id[:8]}] Skipping invalid suggestion at index {i}: {s}")
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(result['actions'])} actions, {len(result['suggestions'])} suggestions")
return result
except json.JSONDecodeError as e:
logger.warning(f"[ORCH:{self.session_id[:8]}] JSON decode error: {e}")
else:
logger.debug(f"[ORCH:{self.session_id[:8]}] No JSON object found, trying array fallback")
# Fallback: try to find just a suggestions array (old format)
match = re.search(r'\[[\s\S]*?\]', text)
if match:
try:
suggestions = json.loads(match.group())
if isinstance(suggestions, list):
for i, s in enumerate(suggestions):
if isinstance(s, dict) and s.get("label") and s.get("prompt"):
result["suggestions"].append({
"id": s.get("id", f"sug-{i+1}"),
"icon": s.get("icon", "lightbulb"),
"label": s.get("label", ""),
"description": s.get("description", ""),
"prompt": s.get("prompt", ""),
})
logger.info(f"[ORCH:{self.session_id[:8]}] Parsed {len(result['suggestions'])} suggestions from array fallback")
except json.JSONDecodeError as e:
logger.warning(f"[ORCH:{self.session_id[:8]}] Array fallback JSON decode error: {e}")
else:
logger.warning(f"[ORCH:{self.session_id[:8]}] No JSON found in response text")
return result
def _add_assistant_message(self, content: str):
"""Add an assistant message to the conversation."""
msg = OrchestratorMessage(
id=str(uuid.uuid4()),
role="assistant",
content=content,
timestamp=datetime.now(),
actions=[a.to_dict() for a in self.pending_actions.values() if a.status == "pending"],
)
with self._messages_lock:
self.messages.append(msg)
logger.debug(f"[ORCH:{self.session_id[:8]}] Added assistant message (id={msg.id[:8]}, length={len(content)})")
def send_message(self, user_message: str) -> Generator[dict, None, None]:
"""
Send a user message and yield response chunks.
Yields dicts with streaming response data.
"""
logger.info(f"[ORCH:{self.session_id[:8]}] send_message called (length={len(user_message)})")
# Clear previous suggestions for new message
with self._messages_lock:
self.suggested_prompts.clear()
# Add user message to history
user_msg = OrchestratorMessage(
id=str(uuid.uuid4()),
role="user",
content=user_message,
timestamp=datetime.now(),
)
with self._messages_lock:
self.messages.append(user_msg)
logger.debug(f"[ORCH:{self.session_id[:8]}] Added user message to history (total messages: {len(self.messages)})")
# Build the full prompt
# For first message, include system prompt and context
if self._claude_session_id is None:
initial_context = self._build_initial_context()
full_prompt = f"{self._system_prompt}\n\n---\n\n{initial_context}\n\n---\n\nUser request: {user_message}"
logger.debug(f"[ORCH:{self.session_id[:8]}] Built initial prompt (length={len(full_prompt)})")
else:
# For subsequent messages, just send the user message
full_prompt = user_message
logger.debug(f"[ORCH:{self.session_id[:8]}] Using user message as prompt (resuming session)")
# Run Claude and yield results
yield from self._run_claude_message(full_prompt)
def execute_action(self, action_id: str, approved: bool) -> dict:
"""
Execute or reject a pending action.
Returns result dict with success status and details.
"""
logger.info(f"[ORCH:{self.session_id[:8]}] execute_action called: action_id={action_id}, approved={approved}")
logger.info(f"[ORCH:{self.session_id[:8]}] Available actions: {list(self.pending_actions.keys())}")
# Get action reference with lock, but don't hold lock during execution
action = None
if not self._messages_lock.acquire(timeout=5.0):
logger.error(f"[ORCH:{self.session_id[:8]}] execute_action: failed to acquire lock (timeout)")
return {"success": False, "error": "Internal error: lock timeout"}
try:
action = self.pending_actions.get(action_id)
if not action:
logger.error(f"[ORCH:{self.session_id[:8]}] Action {action_id} not found in pending_actions")
return {"success": False, "error": f"Action {action_id} not found"}
if action.status != "pending":
logger.warning(f"[ORCH:{self.session_id[:8]}] Action {action_id} already {action.status}")
return {"success": False, "error": f"Action already {action.status}"}
# Mark as processing to prevent duplicate execution
action.status = "processing"
finally:
self._messages_lock.release()
if not approved:
action.status = "rejected"
logger.info(f"[ORCH:{self.session_id[:8]}] Action {action_id} rejected")
return {"success": True, "status": "rejected"}
# Execute based on action kind
logger.info(f"[ORCH:{self.session_id[:8]}] Executing action: kind={action.kind}, params={action.params}")
try:
result = self._execute_action_impl(action)
action.status = "executed"
action.result = result
logger.info(f"[ORCH:{self.session_id[:8]}] Action {action_id} executed successfully: {result}")
# Add result message to conversation
result_msg = OrchestratorMessage(
id=str(uuid.uuid4()),
role="action_result",
content=f"Action executed: {action.description}\nResult: {json.dumps(result)}",
timestamp=datetime.now(),
)
with self._messages_lock:
self.messages.append(result_msg)
return {"success": True, "status": "executed", "result": result}
except Exception as e:
logger.exception(f"[ORCH:{self.session_id[:8]}] Failed to execute action {action_id}: {e}")
action.status = "failed"
action.result = {"error": str(e)}
return {"success": False, "error": str(e)}
def _execute_action_impl(self, action: OrchestratorAction) -> dict:
"""Execute the actual action based on kind."""
kind = action.kind
params = action.params
logger.debug(f"[ORCH:{self.session_id[:8]}] _execute_action_impl: kind={kind}")
if kind == "transition":
# Transition single issue
issue_id = params.get("issue_id")
new_state = params.get("state")
if not issue_id or not new_state:
raise ValueError("Missing issue_id or state")
logger.info(f"[ORCH:{self.session_id[:8]}] Transitioning {issue_id} to {new_state}")
self.runner.youtrack.update_issue_state(issue_id, new_state)
return {"issue_id": issue_id, "new_state": new_state}
elif kind == "batch_transition":
# Transition multiple issues
issue_ids = params.get("issue_ids", [])
new_state = params.get("state")
if not issue_ids or not new_state:
raise ValueError("Missing issue_ids or state")
logger.info(f"[ORCH:{self.session_id[:8]}] Batch transitioning {len(issue_ids)} issues to {new_state}")
results = []
for issue_id in issue_ids:
try:
logger.debug(f"[ORCH:{self.session_id[:8]}] Transitioning {issue_id} to {new_state}")
self.runner.youtrack.update_issue_state(issue_id, new_state)
results.append({"issue_id": issue_id, "success": True})
logger.debug(f"[ORCH:{self.session_id[:8]}] {issue_id} transitioned successfully")
except Exception as e:
logger.error(f"[ORCH:{self.session_id[:8]}] Failed to transition {issue_id}: {e}")
results.append({"issue_id": issue_id, "success": False, "error": str(e)})
return {"results": results, "new_state": new_state}
elif kind == "comment":
# Add comment to issue
issue_id = params.get("issue_id")
text = params.get("text")
if not issue_id or not text:
raise ValueError("Missing issue_id or text")
logger.info(f"[ORCH:{self.session_id[:8]}] Adding comment to {issue_id}")
self.runner.youtrack.add_issue_comment(issue_id, text)
return {"issue_id": issue_id, "commented": True}
else:
raise ValueError(f"Unknown action kind: {kind}")
def get_status(self) -> dict:
"""Get current session status."""
is_processing = self.process is not None and self.process.poll() is None
with self._messages_lock:
pending_count = len([a for a in self.pending_actions.values() if a.status == "pending"])
status = {
"session_id": self.session_id,
"active": len(self.messages) > 0 or self._claude_session_id is not None,
"processing": is_processing,
"created_at": self.created_at.isoformat(),
"message_count": len(self.messages),
"pending_actions": [
a.to_dict() for a in self.pending_actions.values()
if a.status == "pending"
],
}
logger.debug(f"[ORCH:{self.session_id[:8]}] get_status: active={status['active']}, messages={status['message_count']}, pending_actions={pending_count}")
return status
def get_messages(self, since_index: int = 0) -> list[dict]:
"""Get conversation messages since an index."""
with self._messages_lock:
messages = [m.to_dict() for m in self.messages[since_index:]]
logger.debug(f"[ORCH:{self.session_id[:8]}] get_messages(since={since_index}): returning {len(messages)} messages")
return messages
def stop(self):
"""Stop the orchestrator session."""
logger.info(f"[ORCH:{self.session_id[:8]}] Stopping session...")
if self.process:
try:
logger.debug(f"[ORCH:{self.session_id[:8]}] Terminating subprocess (PID: {self.process.pid})")
self.process.terminate()
try:
self.process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning(f"[ORCH:{self.session_id[:8]}] Process didn't terminate, killing...")
self.process.kill()
self.process.wait()
except Exception as e:
logger.error(f"[ORCH:{self.session_id[:8]}] Error stopping process: {e}")
finally:
self.process = None
# Clear session state
self._claude_session_id = None
with self._messages_lock:
msg_count = len(self.messages)
action_count = len(self.pending_actions)
self.messages.clear()
self.pending_actions.clear()
logger.info(f"[ORCH:{self.session_id[:8]}] Session stopped (cleared {msg_count} messages, {action_count} actions)")
class OrchestratorManager:
"""Manages orchestrator sessions (singleton-like behavior)."""
def __init__(self, runner: "Runner"):
self.runner = runner
self._current_session: Optional[OrchestratorSession] = None
self._lock = threading.Lock()
logger.info("[ORCH-MGR] OrchestratorManager initialized")
def get_session(self) -> Optional[OrchestratorSession]:
"""Get the current orchestrator session if active."""
with self._lock:
# Session is active if it has messages or a Claude session ID
if self._current_session:
has_messages = bool(self._current_session.messages)
has_claude_id = bool(self._current_session._claude_session_id)
if has_messages or has_claude_id:
logger.debug(f"[ORCH-MGR] get_session: returning session {self._current_session.session_id[:8]} (messages={has_messages}, claude_id={has_claude_id})")
return self._current_session
else:
logger.debug("[ORCH-MGR] get_session: session exists but not active (no messages or claude_id)")
else:
logger.debug("[ORCH-MGR] get_session: no current session")
return None
def start_session(self) -> tuple[OrchestratorSession, bool]:
"""
Start a new orchestrator session.
Returns (session, created) tuple.
- If existing session is active, returns (existing, False)
- If new session started, returns (new, True)
"""
with self._lock:
# Check for existing active session (has messages or Claude session ID)
if self._current_session:
if self._current_session.messages or self._current_session._claude_session_id:
logger.info(f"[ORCH-MGR] start_session: returning existing session {self._current_session.session_id[:8]}")
return self._current_session, False
# Create new session
logger.info("[ORCH-MGR] start_session: creating new session")
session = OrchestratorSession(self.runner)
if session.start():
self._current_session = session
logger.info(f"[ORCH-MGR] start_session: created session {session.session_id[:8]}")
return session, True
else:
logger.error("[ORCH-MGR] start_session: failed to start session")
raise RuntimeError("Failed to start orchestrator session")
def stop_session(self) -> bool:
"""Stop the current orchestrator session."""
with self._lock:
if self._current_session:
session_id = self._current_session.session_id[:8]
logger.info(f"[ORCH-MGR] stop_session: stopping session {session_id}")
self._current_session.stop()
self._current_session = None
return True
logger.debug("[ORCH-MGR] stop_session: no session to stop")
return False
def get_status(self) -> dict:
"""Get orchestrator status."""
with self._lock:
if self._current_session:
return self._current_session.get_status()
return {
"session_id": None,
"active": False,
"message_count": 0,
"pending_actions": [],
}