Files
agentrunner/ssh_metrics.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

295 lines
8.7 KiB
Python

"""
SSH-based metrics collector for remote VPS servers.
Periodically collects CPU, memory, and disk metrics from remote servers via SSH.
"""
import logging
import subprocess
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Optional
logger = logging.getLogger(__name__)
# SSH key path for ClearGrow servers
# Use cleargrow user's key since service runs as cleargrow
SSH_KEY_PATH = "/home/cleargrow/.ssh/cleargrow_ed25519"
# Server configuration
VPS_SERVERS = {
"vps-git": {
"ip": "139.144.29.179",
"user": "root",
},
"vps-track": {
"ip": "170.187.143.45",
"user": "root",
},
"vps-ci": {
"ip": "173.230.138.66",
"user": "root",
},
}
# Remote command to collect metrics (single command for efficiency)
METRICS_COMMAND = """
echo "CPU:$(grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage}')"
echo "MEM:$(free -m | awk 'NR==2{printf "%d %d %.1f", $3, $2, $3*100/$2}')"
echo "DISK:$(df -BG / | awk 'NR==2{gsub("G",""); printf "%d %d %.1f", $3, $2, $5}')"
echo "UPTIME:$(cat /proc/uptime | awk '{print int($1)}')"
"""
@dataclass
class ServerMetrics:
"""Metrics collected from a server."""
cpu_percent: float
memory_used_mb: int
memory_total_mb: int
memory_percent: float
disk_used_gb: float
disk_total_gb: float
disk_percent: float
uptime: int
collected_at: datetime
error: Optional[str] = None
class SSHMetricsCollector:
"""
Collects system metrics from remote servers via SSH.
Uses a background thread to periodically refresh metrics,
with in-memory caching to avoid blocking API requests.
"""
def __init__(self, refresh_interval: int = 60):
"""
Initialize the collector.
Args:
refresh_interval: How often to refresh metrics (seconds)
"""
self._metrics: Dict[str, ServerMetrics] = {}
self._lock = threading.Lock()
self._refresh_interval = refresh_interval
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self):
"""Start the background metrics collection thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._collection_loop, daemon=True)
self._thread.start()
logger.info("SSH metrics collector started")
def stop(self):
"""Stop the background collection thread."""
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("SSH metrics collector stopped")
def get_metrics(self, server_id: str) -> Optional[ServerMetrics]:
"""
Get cached metrics for a server.
Args:
server_id: Server identifier (e.g., "vps-git")
Returns:
Cached metrics or None if not available
"""
with self._lock:
return self._metrics.get(server_id)
def get_all_metrics(self) -> Dict[str, ServerMetrics]:
"""Get cached metrics for all servers."""
with self._lock:
return dict(self._metrics)
def _collection_loop(self):
"""Background loop that periodically collects metrics."""
while self._running:
self._collect_all()
time.sleep(self._refresh_interval)
def _collect_all(self):
"""Collect metrics from all configured servers."""
for server_id, config in VPS_SERVERS.items():
try:
metrics = self._collect_from_server(
server_id,
config["ip"],
config["user"]
)
with self._lock:
self._metrics[server_id] = metrics
except Exception as e:
logger.warning(f"Failed to collect metrics from {server_id}: {e}")
with self._lock:
self._metrics[server_id] = ServerMetrics(
cpu_percent=0,
memory_used_mb=0,
memory_total_mb=0,
memory_percent=0,
disk_used_gb=0,
disk_total_gb=0,
disk_percent=0,
uptime=0,
collected_at=datetime.now(),
error=str(e)
)
def _collect_from_server(
self,
server_id: str,
ip: str,
user: str
) -> ServerMetrics:
"""
Collect metrics from a single server via SSH.
Args:
server_id: Server identifier for logging
ip: Server IP address
user: SSH username
Returns:
ServerMetrics object with collected data
"""
try:
result = subprocess.run(
[
"ssh",
"-i", SSH_KEY_PATH,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=5",
"-o", "BatchMode=yes",
f"{user}@{ip}",
METRICS_COMMAND
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
raise RuntimeError(f"SSH failed: {result.stderr.strip()}")
return self._parse_metrics(result.stdout)
except subprocess.TimeoutExpired:
raise RuntimeError("SSH connection timed out")
except Exception as e:
raise RuntimeError(f"SSH error: {e}")
def _parse_metrics(self, output: str) -> ServerMetrics:
"""
Parse the output from the metrics command.
Args:
output: Raw output from the SSH command
Returns:
Parsed ServerMetrics object
"""
cpu_percent = 0.0
memory_used_mb = 0
memory_total_mb = 0
memory_percent = 0.0
disk_used_gb = 0.0
disk_total_gb = 0.0
disk_percent = 0.0
uptime = 0
for line in output.strip().split("\n"):
if line.startswith("CPU:"):
try:
cpu_percent = float(line.split(":")[1])
except (ValueError, IndexError):
pass
elif line.startswith("MEM:"):
try:
parts = line.split(":")[1].strip().split()
memory_used_mb = int(parts[0])
memory_total_mb = int(parts[1])
memory_percent = float(parts[2])
except (ValueError, IndexError):
pass
elif line.startswith("DISK:"):
try:
parts = line.split(":")[1].strip().split()
disk_used_gb = float(parts[0])
disk_total_gb = float(parts[1])
disk_percent = float(parts[2])
except (ValueError, IndexError):
pass
elif line.startswith("UPTIME:"):
try:
uptime = int(line.split(":")[1])
except (ValueError, IndexError):
pass
return ServerMetrics(
cpu_percent=round(cpu_percent, 1),
memory_used_mb=memory_used_mb,
memory_total_mb=memory_total_mb,
memory_percent=round(memory_percent, 1),
disk_used_gb=round(disk_used_gb, 1),
disk_total_gb=round(disk_total_gb, 1),
disk_percent=round(disk_percent, 1),
uptime=uptime,
collected_at=datetime.now()
)
# Global collector instance
_collector: Optional[SSHMetricsCollector] = None
def get_collector() -> SSHMetricsCollector:
"""Get or create the global SSH metrics collector."""
global _collector
if _collector is None:
_collector = SSHMetricsCollector(refresh_interval=60)
_collector.start()
return _collector
def get_server_metrics(server_id: str) -> Optional[dict]:
"""
Get metrics for a server as a dictionary.
Args:
server_id: Server identifier (e.g., "vps-git")
Returns:
Dictionary with metrics or None if not available
"""
collector = get_collector()
metrics = collector.get_metrics(server_id)
if metrics is None:
return None
if metrics.error:
return None
return {
"cpuPercent": metrics.cpu_percent,
"memoryPercent": metrics.memory_percent,
"memoryUsedMB": metrics.memory_used_mb,
"memoryTotalMB": metrics.memory_total_mb,
"diskPercent": metrics.disk_percent,
"diskUsedGB": metrics.disk_used_gb,
"diskTotalGB": metrics.disk_total_gb,
"uptime": metrics.uptime,
}