Files
agentrunner/oauth.py
CI System 41d751b678 feat: add web dashboard with Gitea OAuth authentication
Implements a full management dashboard for the Agent Runner at
https://agent.cleargrow.io with real-time monitoring and control.

Backend changes:
- Add oauth.py: Gitea OAuth2 authentication with session management
- Add api_server.py: REST API endpoints and static file serving
- Add dashboard_api.py: Data aggregation layer for dashboard
- Modify agent.py: Add kill_task() and get_task() methods
- Modify runner.py: Add event broadcasting and OAuth initialization
- Modify webhook_server.py: Integrate dashboard API handler

Frontend (SvelteKit + TypeScript):
- Dashboard overview with health status, agent pool, issue counts
- Agents page with active task list and kill functionality
- Issues page with state filtering and transitions
- Builds page with Woodpecker CI integration
- Config page for runtime settings

Features:
- Gitea OAuth2 login (same pattern as Woodpecker CI)
- Real-time status updates via polling (5s interval)
- Agent termination from dashboard
- Issue state transitions
- Service health monitoring

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 07:11:01 -07:00

418 lines
13 KiB
Python

"""
Gitea OAuth2 authentication for Agent Runner Dashboard.
Implements the same OAuth2 flow pattern as Woodpecker CI.
"""
import hashlib
import hmac
import json
import logging
import secrets
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from http.cookies import SimpleCookie
from typing import Optional
from urllib.parse import urlencode, parse_qs, urlparse
import requests
logger = logging.getLogger(__name__)
@dataclass
class User:
"""Authenticated user from Gitea."""
id: int
login: str
full_name: str
email: str
avatar_url: str
is_admin: bool = False
def to_dict(self) -> dict:
return {
"id": self.id,
"login": self.login,
"full_name": self.full_name,
"email": self.email,
"avatar_url": self.avatar_url,
"is_admin": self.is_admin,
}
@dataclass
class Session:
"""User session with OAuth tokens."""
session_id: str
user: User
access_token: str
refresh_token: Optional[str] = None
token_expiry: Optional[datetime] = None
created_at: datetime = field(default_factory=datetime.now)
last_accessed: datetime = field(default_factory=datetime.now)
def is_expired(self, max_age_hours: int = 24) -> bool:
"""Check if session has expired."""
age = datetime.now() - self.created_at
return age > timedelta(hours=max_age_hours)
def touch(self):
"""Update last accessed time."""
self.last_accessed = datetime.now()
class SessionStore:
"""In-memory session storage with cleanup."""
def __init__(self, max_age_hours: int = 24, cleanup_interval: int = 3600):
self._sessions: dict[str, Session] = {}
self._max_age_hours = max_age_hours
self._last_cleanup = time.time()
self._cleanup_interval = cleanup_interval
def create(self, user: User, access_token: str,
refresh_token: Optional[str] = None,
token_expiry: Optional[datetime] = None) -> Session:
"""Create a new session for a user."""
self._maybe_cleanup()
session_id = secrets.token_urlsafe(32)
session = Session(
session_id=session_id,
user=user,
access_token=access_token,
refresh_token=refresh_token,
token_expiry=token_expiry,
)
self._sessions[session_id] = session
logger.info(f"Created session for user {user.login}")
return session
def get(self, session_id: str) -> Optional[Session]:
"""Get a session by ID, returns None if expired or not found."""
self._maybe_cleanup()
session = self._sessions.get(session_id)
if session is None:
return None
if session.is_expired(self._max_age_hours):
self.delete(session_id)
return None
session.touch()
return session
def delete(self, session_id: str) -> bool:
"""Delete a session."""
if session_id in self._sessions:
user = self._sessions[session_id].user.login
del self._sessions[session_id]
logger.info(f"Deleted session for user {user}")
return True
return False
def _maybe_cleanup(self):
"""Periodically clean up expired sessions."""
now = time.time()
if now - self._last_cleanup < self._cleanup_interval:
return
self._last_cleanup = now
expired = [
sid for sid, session in self._sessions.items()
if session.is_expired(self._max_age_hours)
]
for sid in expired:
del self._sessions[sid]
if expired:
logger.info(f"Cleaned up {len(expired)} expired sessions")
@property
def active_count(self) -> int:
return len(self._sessions)
class GiteaOAuth:
"""Gitea OAuth2 client."""
def __init__(
self,
gitea_url: str,
client_id: str,
client_secret: str,
redirect_uri: str,
allowed_users: Optional[list[str]] = None,
allowed_orgs: Optional[list[str]] = None,
session_max_age_hours: int = 24,
):
self.gitea_url = gitea_url.rstrip("/")
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.allowed_users = allowed_users
self.allowed_orgs = allowed_orgs
# OAuth endpoints
self.authorize_url = f"{self.gitea_url}/login/oauth/authorize"
self.token_url = f"{self.gitea_url}/login/oauth/access_token"
self.user_api_url = f"{self.gitea_url}/api/v1/user"
self.orgs_api_url = f"{self.gitea_url}/api/v1/user/orgs"
# Session management
self.sessions = SessionStore(max_age_hours=session_max_age_hours)
# CSRF state tokens (short-lived)
self._pending_states: dict[str, float] = {} # state -> timestamp
self._state_max_age = 600 # 10 minutes
def get_authorize_url(self, next_url: Optional[str] = None) -> tuple[str, str]:
"""
Generate OAuth2 authorization URL.
Returns:
Tuple of (authorize_url, state_token)
"""
state = secrets.token_urlsafe(32)
self._pending_states[state] = time.time()
self._cleanup_states()
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"response_type": "code",
"state": state,
}
url = f"{self.authorize_url}?{urlencode(params)}"
return url, state
def handle_callback(self, code: str, state: str) -> Optional[Session]:
"""
Handle OAuth2 callback.
Args:
code: Authorization code from Gitea
state: State token for CSRF verification
Returns:
Session if successful, None otherwise
"""
# Verify state
if not self._verify_state(state):
logger.warning("Invalid or expired OAuth state token")
return None
# Exchange code for token
token_data = self._exchange_code(code)
if not token_data:
return None
access_token = token_data.get("access_token")
refresh_token = token_data.get("refresh_token")
expires_in = token_data.get("expires_in")
token_expiry = None
if expires_in:
token_expiry = datetime.now() + timedelta(seconds=int(expires_in))
# Fetch user info
user = self._fetch_user(access_token)
if not user:
return None
# Check authorization
if not self._is_authorized(user, access_token):
logger.warning(f"User {user.login} is not authorized")
return None
# Create session
return self.sessions.create(
user=user,
access_token=access_token,
refresh_token=refresh_token,
token_expiry=token_expiry,
)
def get_session_from_cookie(self, cookie_header: str) -> Optional[Session]:
"""Extract and validate session from cookie header."""
if not cookie_header:
return None
cookie = SimpleCookie()
try:
cookie.load(cookie_header)
except Exception:
return None
session_cookie = cookie.get("agent_session")
if not session_cookie:
return None
return self.sessions.get(session_cookie.value)
def create_session_cookie(self, session: Session, secure: bool = True) -> str:
"""Create Set-Cookie header for session."""
parts = [
f"agent_session={session.session_id}",
"HttpOnly",
"Path=/",
"SameSite=Lax",
]
if secure:
parts.append("Secure")
# Set expiry
max_age = 24 * 60 * 60 # 24 hours
parts.append(f"Max-Age={max_age}")
return "; ".join(parts)
def create_logout_cookie(self) -> str:
"""Create Set-Cookie header to clear session."""
return "agent_session=; HttpOnly; Path=/; Max-Age=0"
def logout(self, session_id: str) -> bool:
"""Log out a session."""
return self.sessions.delete(session_id)
def _verify_state(self, state: str) -> bool:
"""Verify and consume a state token."""
self._cleanup_states()
timestamp = self._pending_states.pop(state, None)
if timestamp is None:
return False
age = time.time() - timestamp
return age < self._state_max_age
def _cleanup_states(self):
"""Remove expired state tokens."""
now = time.time()
expired = [
s for s, t in self._pending_states.items()
if now - t > self._state_max_age
]
for s in expired:
del self._pending_states[s]
def _exchange_code(self, code: str) -> Optional[dict]:
"""Exchange authorization code for access token."""
try:
response = requests.post(
self.token_url,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri,
},
headers={"Accept": "application/json"},
timeout=30,
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.error(f"Failed to exchange OAuth code: {e}")
return None
def _fetch_user(self, access_token: str) -> Optional[User]:
"""Fetch user info from Gitea API."""
try:
response = requests.get(
self.user_api_url,
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
timeout=30,
)
response.raise_for_status()
data = response.json()
return User(
id=data["id"],
login=data["login"],
full_name=data.get("full_name", ""),
email=data.get("email", ""),
avatar_url=data.get("avatar_url", ""),
is_admin=data.get("is_admin", False),
)
except requests.RequestException as e:
logger.error(f"Failed to fetch user info: {e}")
return None
def _is_authorized(self, user: User, access_token: str) -> bool:
"""Check if user is authorized to access the dashboard."""
# No restrictions configured - allow all authenticated users
if not self.allowed_users and not self.allowed_orgs:
return True
# Check username whitelist
if self.allowed_users and user.login in self.allowed_users:
return True
# Check organization membership
if self.allowed_orgs:
try:
response = requests.get(
self.orgs_api_url,
headers={
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
},
timeout=30,
)
response.raise_for_status()
orgs = response.json()
user_orgs = {org["username"] for org in orgs}
if user_orgs & set(self.allowed_orgs):
return True
except requests.RequestException as e:
logger.error(f"Failed to fetch user orgs: {e}")
return False
def create_oauth_from_config(config: dict) -> Optional[GiteaOAuth]:
"""Create GiteaOAuth instance from config.yaml settings."""
oauth_config = config.get("oauth")
if not oauth_config:
logger.warning("No OAuth configuration found - authentication disabled")
return None
gitea_config = config.get("gitea", {})
gitea_url = gitea_config.get("base_url")
if not gitea_url:
logger.error("Gitea base_url required for OAuth")
return None
client_id = oauth_config.get("client_id")
client_secret = oauth_config.get("client_secret")
if not client_id or not client_secret:
logger.error("OAuth client_id and client_secret required")
return None
redirect_uri = oauth_config.get(
"redirect_uri",
"https://agent.cleargrow.io/oauth/callback"
)
return GiteaOAuth(
gitea_url=gitea_url,
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
allowed_users=oauth_config.get("allowed_users"),
allowed_orgs=oauth_config.get("allowed_orgs"),
session_max_age_hours=oauth_config.get("session_max_age_hours", 24),
)