Files
agentrunner/webhook_server.py
CI System 1aee8779c7 feat: orchestrator UI, dashboard improvements, and workflow fixes
Orchestrator:
- Add orchestrator chat interface with streaming responses
- MCP server integration for YouTrack queries
- Quick actions for backlog review, triage analysis
- Dynamic suggestions based on conversation context
- Action approval/rejection workflow

Dashboard improvements:
- Add font preloading to prevent FOUC
- CSS spinner for loading state (no icon font dependency)
- Wait for fonts before showing UI
- Fix workflow pipeline alignment
- Fix user message contrast (dark blue background)
- Auto-scroll chat, actions, suggestions panels
- Add keyboard shortcuts system
- Add toast notifications
- Add theme toggle (dark/light mode)
- New pages: orchestrator, repos, system, analytics

Workflow fixes:
- Skip Build state when agent determines no changes needed
- Check branch exists before attempting push
- Include comments in get_issues MCP response
- Simplified orchestrator prompt focused on Backlog management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 15:37:49 -07:00

501 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Webhook server for instant issue detection.
Receives webhooks from YouTrack when issues change state,
eliminating polling latency for faster agent response.
Usage:
# Standalone
python webhook_server.py --config config.yaml
# Or import and use with runner
from webhook_server import WebhookServer
"""
import argparse
import hashlib
import hmac
import json
import logging
import threading
from dataclasses import dataclass
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
from typing import Callable, Optional
from urllib.parse import parse_qs, urlparse
import yaml
logger = logging.getLogger(__name__)
@dataclass
class WebhookEvent:
"""Parsed webhook event from YouTrack."""
event_type: str # "issue_updated", "issue_created", etc.
issue_id: str # "CG-123"
project: str # "CG"
old_state: Optional[str] # Previous state (for state changes)
new_state: Optional[str] # New state (for state changes)
raw_payload: dict # Full webhook payload
class WebhookHandler(BaseHTTPRequestHandler):
"""HTTP request handler for YouTrack webhooks."""
# Class-level references set by WebhookServer
secret: Optional[str] = None
on_event: Optional[Callable[[WebhookEvent], None]] = None
def log_message(self, format, *args):
"""Override to use our logger."""
logger.debug(f"Webhook HTTP: {format % args}")
def do_POST(self):
"""Handle POST requests (webhooks)."""
# Parse path
parsed = urlparse(self.path)
if parsed.path != "/webhook/youtrack":
self.send_error(404, "Not Found")
return
# Read body
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_error(400, "Empty body")
return
body = self.rfile.read(content_length)
# Verify signature if secret configured
if self.secret:
signature = self.headers.get("X-Hub-Signature-256", "")
if not self._verify_signature(body, signature):
logger.warning("Webhook signature verification failed")
self.send_error(403, "Invalid signature")
return
# Parse JSON
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in webhook: {e}")
self.send_error(400, "Invalid JSON")
return
# Parse event
event = self._parse_event(payload)
if not event:
logger.debug("Ignoring unrecognized webhook event")
self.send_response(200)
self.end_headers()
return
logger.info(f"Webhook received: {event.event_type} for {event.issue_id}")
# Dispatch event
if self.on_event:
try:
# Run in thread to not block webhook response
threading.Thread(
target=self.on_event,
args=(event,),
daemon=True
).start()
except Exception as e:
logger.error(f"Error dispatching webhook event: {e}")
# Always respond 200 quickly
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
def do_GET(self):
"""Handle GET requests (health check)."""
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "healthy"}')
else:
self.send_error(404, "Not Found")
def _verify_signature(self, body: bytes, signature: str) -> bool:
"""Verify HMAC-SHA256 signature."""
if not signature.startswith("sha256="):
return False
expected = hmac.new(
self.secret.encode("utf-8"),
body,
hashlib.sha256
).hexdigest()
provided = signature[7:] # Remove "sha256=" prefix
return hmac.compare_digest(expected, provided)
def _parse_event(self, payload: dict) -> Optional[WebhookEvent]:
"""Parse YouTrack webhook payload into WebhookEvent."""
# YouTrack webhook format varies by configuration
# This handles the common "issue updated" format
# Try to extract issue info
issue = payload.get("issue", {})
if not issue:
# Some webhooks put issue at top level
if "idReadable" in payload:
issue = payload
else:
return None
issue_id = issue.get("idReadable", "")
if not issue_id:
return None
# Extract project from issue ID
project = issue_id.split("-")[0] if "-" in issue_id else ""
# Determine event type and state changes
event_type = payload.get("type", "issue_updated")
# Look for state change in field changes
old_state = None
new_state = None
# YouTrack sends field changes in different formats
changes = payload.get("fieldChanges", [])
for change in changes:
if change.get("name") == "State":
old_state = change.get("oldValue", {}).get("name")
new_state = change.get("newValue", {}).get("name")
break
# Also check issue fields for current state
if not new_state:
fields = issue.get("customFields", [])
for field in fields:
if field.get("name") == "State":
value = field.get("value", {})
new_state = value.get("name") if isinstance(value, dict) else value
break
return WebhookEvent(
event_type=event_type,
issue_id=issue_id,
project=project,
old_state=old_state,
new_state=new_state,
raw_payload=payload,
)
class WebhookServer:
"""
HTTP server for receiving YouTrack webhooks and serving dashboard API.
Usage:
server = WebhookServer(
host="0.0.0.0",
port=8765,
secret="your-secret",
on_event=handle_event
)
server.start() # Non-blocking
# ... later ...
server.stop()
"""
# Timeout for thread join during shutdown
SHUTDOWN_TIMEOUT = 10 # seconds
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
secret: Optional[str] = None,
on_event: Optional[Callable[[WebhookEvent], None]] = None,
runner=None, # Optional runner reference for dashboard API
oauth=None, # Optional OAuth handler for authentication
):
self.host = host
self.port = port
self.secret = secret
self.on_event = on_event
self.runner = runner
self.oauth = oauth
self._server: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
self._shutdown_event = threading.Event()
self._handler_class = None
def start(self):
"""Start the webhook server in a background thread."""
# Reset shutdown event
self._shutdown_event.clear()
# Determine which handler to use
if self.runner:
# Use dashboard API handler (includes webhook support)
try:
from api_server import DashboardAPIHandler, setup_api_handler
setup_api_handler(self.runner, self.oauth)
self._handler_class = self._create_combined_handler(DashboardAPIHandler)
logger.info("Dashboard API enabled")
except ImportError as e:
logger.warning(f"Dashboard API not available: {e}")
self._handler_class = WebhookHandler
else:
# Use basic webhook handler
self._handler_class = WebhookHandler
# Configure handler class
self._handler_class.secret = self.secret
self._handler_class.on_event = self.on_event
# Create threaded server to handle concurrent requests (needed for SSE + polling)
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True # Don't block shutdown waiting for threads
self._server = ThreadedHTTPServer((self.host, self.port), self._handler_class)
# Start in thread (non-daemon so it can be properly joined)
self._thread = threading.Thread(
target=self._serve_loop,
name="WebhookServer",
daemon=False
)
self._thread.start()
logger.info(f"Webhook server started on {self.host}:{self.port}")
def _create_combined_handler(self, api_handler_class):
"""Create a handler that combines webhook and API functionality."""
webhook_secret = self.secret
webhook_callback = self.on_event
class CombinedHandler(api_handler_class):
"""Combined webhook + dashboard API handler."""
secret = webhook_secret
on_event = webhook_callback
def handle_webhook(self):
"""Handle YouTrack webhook POST."""
# Read body
content_length = int(self.headers.get("Content-Length", 0))
if content_length == 0:
self.send_error(400, "Empty body")
return
body = self.rfile.read(content_length)
# Verify signature if secret configured
if self.secret:
signature = self.headers.get("X-Hub-Signature-256", "")
if not self._verify_signature(body, signature):
logger.warning("Webhook signature verification failed")
self.send_error(403, "Invalid signature")
return
# Parse JSON
try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as e:
logger.warning(f"Invalid JSON in webhook: {e}")
self.send_error(400, "Invalid JSON")
return
# Parse event
event = self._parse_webhook_event(payload)
if not event:
logger.debug("Ignoring unrecognized webhook event")
self.send_response(200)
self.end_headers()
return
logger.info(f"Webhook received: {event.event_type} for {event.issue_id}")
# Dispatch event
if self.on_event:
try:
threading.Thread(
target=self.on_event,
args=(event,),
daemon=True
).start()
except Exception as e:
logger.error(f"Error dispatching webhook event: {e}")
# Respond quickly
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(b'{"status": "ok"}')
def _verify_signature(self, body: bytes, signature: str) -> bool:
"""Verify HMAC-SHA256 signature."""
if not signature.startswith("sha256="):
return False
expected = hmac.new(
self.secret.encode("utf-8"),
body,
hashlib.sha256
).hexdigest()
provided = signature[7:]
return hmac.compare_digest(expected, provided)
def _parse_webhook_event(self, payload: dict) -> Optional[WebhookEvent]:
"""Parse YouTrack webhook payload."""
issue = payload.get("issue", {})
if not issue:
if "idReadable" in payload:
issue = payload
else:
return None
issue_id = issue.get("idReadable", "")
if not issue_id:
return None
project = issue_id.split("-")[0] if "-" in issue_id else ""
event_type = payload.get("type", "issue_updated")
old_state = None
new_state = None
changes = payload.get("fieldChanges", [])
for change in changes:
if change.get("name") == "State":
old_state = change.get("oldValue", {}).get("name")
new_state = change.get("newValue", {}).get("name")
break
if not new_state:
fields = issue.get("customFields", [])
for field in fields:
if field.get("name") == "State":
value = field.get("value", {})
new_state = value.get("name") if isinstance(value, dict) else value
break
return WebhookEvent(
event_type=event_type,
issue_id=issue_id,
project=project,
old_state=old_state,
new_state=new_state,
raw_payload=payload,
)
return CombinedHandler
def _serve_loop(self):
"""Server loop that checks for shutdown signal."""
if self._server:
self._server.serve_forever()
def stop(self):
"""Stop the webhook server gracefully."""
logger.debug("Initiating webhook server shutdown...")
# Signal shutdown
self._shutdown_event.set()
# Shutdown the HTTP server (this will cause serve_forever to return)
if self._server:
try:
self._server.shutdown()
except OSError as e:
logger.warning(f"Error shutting down HTTP server: {e}")
finally:
try:
self._server.server_close()
except OSError as e:
logger.warning(f"Error closing server socket: {e}")
self._server = None
# Wait for thread to finish
if self._thread:
self._thread.join(timeout=self.SHUTDOWN_TIMEOUT)
if self._thread.is_alive():
logger.warning(f"Webhook server thread did not terminate within {self.SHUTDOWN_TIMEOUT}s")
else:
logger.debug("Webhook server thread terminated cleanly")
self._thread = None
logger.info("Webhook server stopped")
@property
def is_running(self) -> bool:
return self._thread is not None and self._thread.is_alive()
def load_webhook_config(config: dict, runner=None, oauth=None) -> Optional[WebhookServer]:
"""Create WebhookServer from config dict."""
webhook_config = config.get("webhook", {})
if not webhook_config.get("enabled", False):
return None
return WebhookServer(
host=webhook_config.get("host", "0.0.0.0"),
port=webhook_config.get("port", 8765),
secret=webhook_config.get("secret"),
runner=runner,
oauth=oauth,
)
def main():
"""Standalone webhook server for testing."""
parser = argparse.ArgumentParser(description="YouTrack Webhook Server")
parser.add_argument("-c", "--config", default="config.yaml", help="Config file")
parser.add_argument("--host", default="0.0.0.0", help="Bind host")
parser.add_argument("--port", type=int, default=8765, help="Bind port")
parser.add_argument("--secret", help="Webhook secret")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
def handle_event(event: WebhookEvent):
print(f"\n{'='*60}")
print(f"Event: {event.event_type}")
print(f"Issue: {event.issue_id}")
print(f"Project: {event.project}")
print(f"State: {event.old_state}{event.new_state}")
print(f"{'='*60}\n")
server = WebhookServer(
host=args.host,
port=args.port,
secret=args.secret,
on_event=handle_event,
)
print(f"Starting webhook server on {args.host}:{args.port}")
print("Waiting for webhooks... (Ctrl+C to stop)")
print(f"\nConfigure YouTrack webhook URL:")
print(f" http://YOUR_SERVER:{args.port}/webhook/youtrack\n")
server.start()
try:
while True:
import time
time.sleep(1)
except KeyboardInterrupt:
print("\nShutting down...")
server.stop()
if __name__ == "__main__":
main()