#!/usr/bin/env python3 """PromRFC MCP Proxy — local stdio-to-HTTP bridge with Okta OIDC login. This script acts as a local MCP server (stdio transport) that proxies all MCP JSON-RPC calls to the remote PromRFC Lambda endpoint. On first use, it opens a browser for Okta login (PKCE flow) and caches the token locally. Subsequent uses reuse the cached token. Usage in mcp.json: { "mcpServers": { "promrfc": { "command": "python3", "args": ["/path/to/promrfc_proxy.py"] } } } Or force re-login: python3 promrfc_proxy.py --login """ import hashlib import json import os import secrets import sys import time import webbrowser from base64 import urlsafe_b64encode from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path from threading import Event from typing import Any from urllib.parse import urlencode, parse_qs, urlparse from urllib.request import urlopen, Request from urllib.error import URLError # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- MCP_ENDPOINT = os.environ.get( "PROMRFC_MCP_URL", "https://promrfc.prometeiasaas.it/api/mcp" ) OKTA_ISSUER = "https://prometeia.okta.com/oauth2" OKTA_CLIENT_ID = "0oavxq0jr2dg8zsIL417" REDIRECT_PORT = 18923 REDIRECT_URI = f"http://localhost:{REDIRECT_PORT}/callback" SCOPES = "openid profile email offline_access" TOKEN_FILE = Path.home() / ".config" / "promrfc" / "token.json" # --------------------------------------------------------------------------- # Logging (stderr only — stdout is the MCP stdio channel) # --------------------------------------------------------------------------- def _log(msg: str) -> None: print(msg, file=sys.stderr, flush=True) # --------------------------------------------------------------------------- # Token management # --------------------------------------------------------------------------- def _load_token() -> dict | None: if not TOKEN_FILE.exists(): return None try: return json.loads(TOKEN_FILE.read_text()) except (json.JSONDecodeError, KeyError): return None def _save_token(data: dict) -> None: TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True) TOKEN_FILE.write_text(json.dumps(data, indent=2)) TOKEN_FILE.chmod(0o600) def _get_token() -> str: cached = _load_token() if cached: # Check if still valid (with 60s buffer) if cached.get("expires_at", 0) > time.time() + 60: return cached["access_token"] # Try silent refresh using refresh_token refreshed = _try_refresh(cached) if refreshed: _save_token(refreshed) return refreshed["access_token"] # Last resort: browser login token_data = _okta_login() _save_token(token_data) return token_data["access_token"] def _try_refresh(cached: dict) -> dict | None: """Try to refresh the Okta token without opening a browser.""" refresh_token = cached.get("refresh_token") if not refresh_token: return None _log("Token expired, refreshing silently...") try: body = urlencode({ "grant_type": "refresh_token", "client_id": OKTA_CLIENT_ID, "refresh_token": refresh_token, }).encode() req = Request( f"{OKTA_ISSUER}/v1/token", data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) with urlopen(req, timeout=10) as resp: tokens = json.loads(resp.read()) result = { "access_token": tokens["access_token"], "expires_at": time.time() + tokens.get("expires_in", 3600), } if tokens.get("refresh_token"): result["refresh_token"] = tokens["refresh_token"] else: result["refresh_token"] = refresh_token # keep the old one _log("Token refreshed silently!") return result except Exception as e: _log(f"Silent refresh failed: {e} — will open browser") # --------------------------------------------------------------------------- # Okta OIDC PKCE login # --------------------------------------------------------------------------- def _okta_login() -> dict: verifier = secrets.token_urlsafe(64) digest = hashlib.sha256(verifier.encode("ascii")).digest() challenge = urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") state = secrets.token_hex(16) auth_url = ( f"{OKTA_ISSUER}/v1/authorize?" + urlencode({ "client_id": OKTA_CLIENT_ID, "response_type": "code", "scope": SCOPES, "redirect_uri": REDIRECT_URI, "state": state, "code_challenge": challenge, "code_challenge_method": "S256", }) ) result: dict[str, Any] = {} done = Event() class H(BaseHTTPRequestHandler): def do_GET(self): p = parse_qs(urlparse(self.path).query) if p.get("error"): result["error"] = p["error"][0] elif p.get("state", [None])[0] != state: result["error"] = "state_mismatch" else: result["code"] = p.get("code", [None])[0] self.send_response(200) self.send_header("Content-Type", "text/html") self.end_headers() msg = "Login failed" if "error" in result else "✓ Login successful — you can close this tab" self.wfile.write(f"

{msg}

".encode()) done.set() def log_message(self, *a): pass srv = HTTPServer(("127.0.0.1", REDIRECT_PORT), H) srv.timeout = 120 _log("Opening browser for Okta login...") webbrowser.open(auth_url) while not done.is_set(): srv.handle_request() srv.server_close() if "error" in result: raise RuntimeError(f"Okta login failed: {result['error']}") # Exchange code for tokens token_body = urlencode({ "grant_type": "authorization_code", "client_id": OKTA_CLIENT_ID, "code": result["code"], "redirect_uri": REDIRECT_URI, "code_verifier": verifier, }).encode() req = Request( f"{OKTA_ISSUER}/v1/token", data=token_body, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) with urlopen(req, timeout=10) as resp: tokens = json.loads(resp.read()) _log("Login successful!") result_data = { "access_token": tokens["access_token"], "expires_at": time.time() + tokens.get("expires_in", 3600), } if tokens.get("refresh_token"): result_data["refresh_token"] = tokens["refresh_token"] return result_data # --------------------------------------------------------------------------- # HTTP call to remote MCP endpoint # --------------------------------------------------------------------------- def _call_mcp(payload: dict, token: str) -> dict: body = json.dumps(payload).encode("utf-8") req = Request( MCP_ENDPOINT, data=body, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {token}", "Accept": "application/json", }, method="POST", ) try: with urlopen(req, timeout=30) as resp: return json.loads(resp.read()) except URLError as e: if hasattr(e, "code") and e.code == 401: # Token expired — force re-login TOKEN_FILE.unlink(missing_ok=True) new_token = _get_token() req.remove_header("Authorization") req.add_header("Authorization", f"Bearer {new_token}") with urlopen(req, timeout=30) as resp: return json.loads(resp.read()) raise # --------------------------------------------------------------------------- # stdio MCP loop # --------------------------------------------------------------------------- def run_stdio(): token = _get_token() _log(f"PromRFC MCP proxy ready (endpoint: {MCP_ENDPOINT})") for line in sys.stdin: line = line.strip() if not line: continue try: msg = json.loads(line) except json.JSONDecodeError: continue # Forward to remote try: resp = _call_mcp(msg, token) except Exception as e: _log(f"Error calling MCP endpoint: {e}") if msg.get("id") is not None: resp = { "jsonrpc": "2.0", "error": {"code": -32603, "message": str(e)}, "id": msg.get("id"), } else: continue # Notifications (no id) may return empty if not resp or resp == "": continue sys.stdout.write(json.dumps(resp) + "\n") sys.stdout.flush() # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": if "--login" in sys.argv: TOKEN_FILE.unlink(missing_ok=True) _okta_login() _log("Token saved. You can now use the MCP server.") sys.exit(0) run_stdio()