#!/usr/bin/env python3 """ OpenIntent Interactive Demo This self-contained script demonstrates multi-agent coordination using the OpenIntent Protocol. It installs dependencies, starts a server, launches example agents, and executes a workflow. Usage: curl -sL https://openintent.ai/try | python3 # With your LLM key for real responses: OPENAI_API_KEY=sk-... python3 <(curl -sL https://openintent.ai/try) # Enable streaming output (requires LLM key): STREAM=1 OPENAI_API_KEY=sk-... python3 <(curl -sL https://openintent.ai/try) # Debug mode to see detailed errors: DEBUG=1 python3 <(curl -sL https://openintent.ai/try) Requirements: - Python 3.10+ - pip """ import subprocess import sys import os import time import json import signal import traceback import threading from typing import Optional from datetime import datetime DEBUG = os.environ.get("DEBUG", "").lower() in ("1", "true", "yes") STREAM = os.environ.get("STREAM", "").lower() in ("1", "true", "yes") BANNER = """ ╔═══════════════════════════════════════════════════════════════════╗ ║ ║ ║ ██████╗ ██████╗ ███████╗███╗ ██╗██╗███╗ ██╗████████╗ ║ ║ ██╔═══██╗██╔══██╗██╔════╝████╗ ██║██║████╗ ██║╚══██╔══╝ ║ ║ ██║ ██║██████╔╝█████╗ ██╔██╗ ██║██║██╔██╗ ██║ ██║ ║ ║ ██║ ██║██╔═══╝ ██╔══╝ ██║╚██╗██║██║██║╚██╗██║ ██║ ║ ║ ╚██████╔╝██║ ███████╗██║ ╚████║██║██║ ╚████║ ██║ ║ ║ ╚═════╝ ╚═╝ ╚══════╝╚═╝ ╚═══╝╚═╝╚═╝ ╚═══╝ ╚═╝ ║ ║ ║ ║ Multi-Agent Coordination Protocol Demo ║ ╚═══════════════════════════════════════════════════════════════════╝ """ LLM_PROVIDER = None LLM_MODEL = None def print_step(step: int, msg: str, total: int = 6): print(f"\n\033[1;34m[{step}/{total}]\033[0m \033[1m{msg}\033[0m") def print_info(msg: str): print(f" \033[90m{msg}\033[0m") def print_detail(label: str, value: str): print(f" \033[36m{label}:\033[0m {value}") def print_success(msg: str): print(f" \033[32m✓ {msg}\033[0m") def print_warn(msg: str): print(f" \033[33m⚠ {msg}\033[0m") def print_error(msg: str): print(f" \033[31m✗ {msg}\033[0m", file=sys.stderr) def print_debug(msg: str): if DEBUG: print(f" \033[35m[DEBUG] {msg}\033[0m") def print_stream(text: str): sys.stdout.write(f"\033[36m{text}\033[0m") sys.stdout.flush() def print_agent_log(agent_name: str, msg: str): print(f" \033[33m[{agent_name}]\033[0m {msg}") def print_intent_box(intent, label: str): print(f"\n \033[1;33m┌─ {label} ─────────────────────────────────────────┐\033[0m") print(f" \033[1;33m│\033[0m \033[1mID:\033[0m {intent.id[:20]}... \033[1;33m│\033[0m") print(f" \033[1;33m│\033[0m \033[1mTitle:\033[0m {intent.title[:40]:<40} \033[1;33m│\033[0m") print(f" \033[1;33m│\033[0m \033[1mStatus:\033[0m {str(intent.status):<41} \033[1;33m│\033[0m") print(f" \033[1;33m│\033[0m \033[1mVersion:\033[0m {intent.version:<41} \033[1;33m│\033[0m") print(f" \033[1;33m└────────────────────────────────────────────────────┘\033[0m") def print_trace_visualization(all_events: list, intents: dict): """Print a comprehensive distributed trace visualization.""" if not all_events: print(" No events recorded") return all_events_sorted = sorted(all_events, key=lambda e: e[1].created_at) # Calculate metrics total_tokens = 0 total_cost = 0.0 llm_calls = [] for intent_label, event in all_events_sorted: p = event.payload or {} if "total_tokens" in p: tokens = p.get("total_tokens", 0) total_tokens += tokens model = p.get("model", "") duration = p.get("duration_ms", 0) # Rough cost estimate (gpt-5.2-mini pricing) cost = tokens * 0.00000015 if "gpt-5.2" in model else tokens * 0.000001 total_cost += cost llm_calls.append({ "intent": intent_label, "model": model, "tokens": tokens, "duration": duration, "cost": cost }) # ═══════════════════════════════════════════════════════════════ # SECTION 1: Workflow Trace Tree # ═══════════════════════════════════════════════════════════════ print("\n \033[1;35m╔══════════════════════════════════════════════════════════════════╗\033[0m") print(" \033[1;35m║\033[0m \033[1mWORKFLOW TRACE\033[0m - Complete execution path \033[1;35m║\033[0m") print(" \033[1;35m╚══════════════════════════════════════════════════════════════════╝\033[0m\n") print(" \033[1;33m◉ ORCHESTRATOR\033[0m \033[90m(demo-orchestrator)\033[0m") print(" │") # Research intent branch research_intent = intents.get("research") if research_intent: print(" ├─► \033[1;32m⬤ research\033[0m \033[90m│\033[0m \033[36mIntent: Research AI Agent Coordination\033[0m") print(" │ │") print(" │ ├── \033[33m➜ ASSIGN\033[0m → \033[1mresearcher\033[0m agent") # Find LLM call for research research_llm = next((c for c in llm_calls if c["intent"] == "research"), None) if research_llm: print(f" │ ├── \033[36m★ LLM\033[0m → {research_llm['model']} \033[90m({research_llm['tokens']} tokens, {research_llm['duration']}ms)\033[0m") else: print(f" │ ├── \033[36m★ LLM\033[0m → \033[90m(mock mode)\033[0m") print(" │ ├── \033[34m◆ STATE\033[0m → findings captured") print(" │ └── \033[32m✓ COMPLETE\033[0m") print(" │") # Summary intent branch summary_intent = intents.get("summary") if summary_intent: print(" └─► \033[1;32m⬤ summary\033[0m \033[90m│\033[0m \033[36mIntent: Summarize Research Findings\033[0m") print(" │ \033[90m↳ depends_on: research\033[0m") print(" │") print(" ├── \033[34m◆ STATE\033[0m ← findings from research") print(" ├── \033[33m➜ ASSIGN\033[0m → \033[1msummarizer\033[0m agent") # Find LLM call for summary summary_llm = next((c for c in llm_calls if c["intent"] == "summary"), None) if summary_llm: print(f" ├── \033[36m★ LLM\033[0m → {summary_llm['model']} \033[90m({summary_llm['tokens']} tokens, {summary_llm['duration']}ms)\033[0m") else: print(f" ├── \033[36m★ LLM\033[0m → \033[90m(mock mode)\033[0m") print(" ├── \033[34m◆ STATE\033[0m → summary captured") print(" └── \033[32m✓ COMPLETE\033[0m") # ═══════════════════════════════════════════════════════════════ # SECTION 2: LLM Operations Summary # ═══════════════════════════════════════════════════════════════ if llm_calls: print("\n \033[1;36m┌────────────────────────────────────────────────────────────────┐\033[0m") print(" \033[1;36m│\033[0m \033[1mLLM OPERATIONS\033[0m \033[1;36m│\033[0m") print(" \033[1;36m├────────────────────────────────────────────────────────────────┤\033[0m") for call in llm_calls: cost_str = f"${call['cost']:.6f}" if call['cost'] > 0 else "—" print(f" \033[1;36m│\033[0m ★ \033[1m{call['intent']:<10}\033[0m │ {call['model']:<20} │ {call['tokens']:>5} tok │ {call['duration']:>4}ms │ {cost_str:<10}\033[1;36m│\033[0m") print(" \033[1;36m├────────────────────────────────────────────────────────────────┤\033[0m") print(f" \033[1;36m│\033[0m \033[1mTOTAL:\033[0m {len(llm_calls)} calls │ {total_tokens} tokens │ ${total_cost:.6f} estimated cost \033[1;36m│\033[0m") print(" \033[1;36m└────────────────────────────────────────────────────────────────┘\033[0m") # ═══════════════════════════════════════════════════════════════ # SECTION 3: Timeline (Compact Event Log) # ═══════════════════════════════════════════════════════════════ print("\n \033[1;90m┌──────────────────────────────────────────────────────────────────┐\033[0m") print(" \033[1;90m│\033[0m \033[1mEVENT TIMELINE\033[0m \033[90m(raw audit log)\033[0m \033[1;90m│\033[0m") print(" \033[1;90m└──────────────────────────────────────────────────────────────────┘\033[0m") event_icons = { "intent_created": "📋", "agent_assigned": "➜", "state_patched": "◆", "status_changed": "✓", "llm_request_started": "★", "llm_request_completed": "★", "stream_started": "★", "stream_completed": "★", } event_colors = { "intent_created": "32", "agent_assigned": "33", "state_patched": "34", "status_changed": "32", "llm_request_started": "36", "llm_request_completed": "1;32", } for intent_label, event in all_events_sorted: ts = event.created_at time_str = ts.strftime("%H:%M:%S.%f")[:12] if hasattr(ts, 'strftime') else str(ts)[:12] event_type = str(event.event_type) actor = str(event.actor or "system")[:14] icon = event_icons.get(event_type, "·") color = event_colors.get(event_type, "0") # Build details details = "" p = event.payload or {} if "total_tokens" in p: details = f"\033[1m{p.get('total_tokens', 0)} tokens\033[0m, {p.get('duration_ms', 0)}ms" elif "model" in p: details = f"→ {p['model']}" elif "status" in p: details = f"→ \033[32m{p['status']}\033[0m" elif "agent_id" in p: details = f"→ {p['agent_id']}" elif "title" in p: details = f"'{p['title'][:25]}...'" if len(p.get('title', '')) > 25 else f"'{p.get('title', '')}'" event_short = event_type.replace("EventType.", "")[:20] print(f" \033[90m{time_str}\033[0m {icon} \033[{color}m{event_short:<20}\033[0m \033[33m{intent_label:<8}\033[0m \033[90m{actor:<14}\033[0m {details}") print(f"\n \033[90m{len(all_events_sorted)} events total\033[0m") def create_intent_http(api_key: str, agent_id: str, title: str, description: str = "") -> dict: import urllib.request payload = json.dumps({ "title": title, "description": description, "created_by": agent_id, "constraints": {}, "state": {}, }).encode("utf-8") req = urllib.request.Request( "http://localhost:8000/api/v1/intents", data=payload, headers={"Content-Type": "application/json", "X-API-Key": api_key, "X-Agent-ID": agent_id}, method="POST" ) with urllib.request.urlopen(req) as resp: return json.loads(resp.read().decode("utf-8")) def check_python_version(): if sys.version_info < (3, 10): print_error(f"Python 3.10+ required, found {sys.version}") sys.exit(1) def install_dependencies(): print_step(1, "Installing dependencies") llm_provider, _ = detect_llm() packages = ["openintent[server]"] if llm_provider == "openai": packages.append("openintent[openai]") elif llm_provider == "anthropic": packages.append("openintent[anthropic]") print_info(f"pip install {' '.join(packages)}") result = subprocess.run([sys.executable, "-m", "pip", "install", "-q"] + packages, capture_output=True, text=True) if result.returncode != 0: print_error(f"Failed to install: {result.stderr}") sys.exit(1) print_success(f"Installed {', '.join(packages)}") def start_server(): print_step(2, "Starting OpenIntent server") print_info("Server will run on http://localhost:8000") env = os.environ.copy() env["OPENINTENT_LOG_LEVEL"] = "warning" env["OPENINTENT_API_KEYS"] = "demo-orchestrator,demo-research-agent,demo-summarizer-agent" server = subprocess.Popen([sys.executable, "-m", "openintent.server"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env) time.sleep(2) if server.poll() is not None: print_error("Server failed to start") sys.exit(1) print_success("Server running on port 8000") return server def detect_llm(): if os.environ.get("OPENAI_API_KEY"): return "openai", "gpt-5.2-mini" elif os.environ.get("ANTHROPIC_API_KEY"): return "anthropic", "claude-3-haiku-20240307" return "mock", None def define_agents(): """ Define agents using the @Agent decorator pattern. This is the DECLARATIVE way to build OpenIntent agents: - @Agent("agent-id") registers the agent - @on_assignment handles new intent assignments - Agents auto-subscribe to SSE events when run """ global LLM_PROVIDER, LLM_MODEL from openintent import Agent, Intent, on_assignment, OpenIntentClient, IntentStatus @Agent("researcher") class ResearchAgent: """ Research agent using @Agent decorator. When assigned an intent via SSE, the @on_assignment handler executes and its return value auto-patches intent state. """ @on_assignment async def work(self, intent: Intent) -> dict: """Called automatically when assigned to an intent.""" print_agent_log("researcher", f"Received assignment: {intent.title}") query = intent.description or intent.title findings = await self._do_research(query, intent.id) print_agent_log("researcher", "Research complete, returning findings") return {"findings": findings, "researched_by": self.agent_id} async def _do_research(self, query: str, intent_id: str) -> str: if LLM_PROVIDER == "openai": from openai import OpenAI from openintent.adapters import OpenAIAdapter print_agent_log("researcher", f"Using OpenAIAdapter → {LLM_MODEL}") adapter = OpenAIAdapter(OpenAI(), self.client, intent_id) if STREAM: print_agent_log("researcher", "Streaming response...") print(" ", end="") stream = adapter.chat.completions.create( model=LLM_MODEL, messages=[{"role": "user", "content": f"Research briefly (2-3 sentences): {query}"}], max_tokens=200, stream=True ) chunks = [] for chunk in stream: if chunk.choices[0].delta.content: text = chunk.choices[0].delta.content chunks.append(text) print_stream(text) print() return "".join(chunks) else: response = adapter.chat.completions.create( model=LLM_MODEL, messages=[{"role": "user", "content": f"Research briefly (2-3 sentences): {query}"}], max_tokens=200 ) return response.choices[0].message.content elif LLM_PROVIDER == "anthropic": from anthropic import Anthropic from openintent.adapters import AnthropicAdapter print_agent_log("researcher", f"Using AnthropicAdapter → {LLM_MODEL}") adapter = AnthropicAdapter(Anthropic(), self.client, intent_id) if STREAM: print_agent_log("researcher", "Streaming response...") print(" ", end="") with adapter.messages.stream( model=LLM_MODEL, max_tokens=200, messages=[{"role": "user", "content": f"Research briefly (2-3 sentences): {query}"}] ) as stream: chunks = [] for text in stream.text_stream: chunks.append(text) print_stream(text) print() return "".join(chunks) else: response = adapter.messages.create( model=LLM_MODEL, max_tokens=200, messages=[{"role": "user", "content": f"Research briefly (2-3 sentences): {query}"}] ) return response.content[0].text else: print_agent_log("researcher", "Using MockAdapter (no API key)") import asyncio await asyncio.sleep(0.5) return f"[Mock] Multi-agent coordination requires structured protocols, shared state, and clear ownership. OpenIntent addresses this via intent objects and event logs." @Agent("summarizer") class SummarizerAgent: """Summarizer agent using @Agent decorator.""" @on_assignment async def work(self, intent: Intent) -> dict: print_agent_log("summarizer", f"Received assignment: {intent.title}") findings = intent.state.data.get("findings", "No findings") summary = await self._summarize(findings, intent.id) print_agent_log("summarizer", "Summary complete") return {"summary": summary, "summarized_by": self.agent_id} async def _summarize(self, findings: str, intent_id: str) -> str: if LLM_PROVIDER == "openai": from openai import OpenAI from openintent.adapters import OpenAIAdapter print_agent_log("summarizer", f"Using OpenAIAdapter → {LLM_MODEL}") adapter = OpenAIAdapter(OpenAI(), self.client, intent_id) if STREAM: print_agent_log("summarizer", "Streaming response...") print(" ", end="") stream = adapter.chat.completions.create( model=LLM_MODEL, messages=[{"role": "user", "content": f"Summarize in 1-2 sentences: {findings}"}], max_tokens=100, stream=True ) chunks = [] for chunk in stream: if chunk.choices[0].delta.content: text = chunk.choices[0].delta.content chunks.append(text) print_stream(text) print() return "".join(chunks) else: response = adapter.chat.completions.create( model=LLM_MODEL, messages=[{"role": "user", "content": f"Summarize in 1-2 sentences: {findings}"}], max_tokens=100 ) return response.choices[0].message.content elif LLM_PROVIDER == "anthropic": from anthropic import Anthropic from openintent.adapters import AnthropicAdapter print_agent_log("summarizer", f"Using AnthropicAdapter → {LLM_MODEL}") adapter = AnthropicAdapter(Anthropic(), self.client, intent_id) if STREAM: print_agent_log("summarizer", "Streaming response...") print(" ", end="") with adapter.messages.stream( model=LLM_MODEL, max_tokens=100, messages=[{"role": "user", "content": f"Summarize in 1-2 sentences: {findings}"}] ) as stream: chunks = [] for text in stream.text_stream: chunks.append(text) print_stream(text) print() return "".join(chunks) else: response = adapter.messages.create( model=LLM_MODEL, max_tokens=100, messages=[{"role": "user", "content": f"Summarize in 1-2 sentences: {findings}"}] ) return response.content[0].text else: print_agent_log("summarizer", "Using MockAdapter") import asyncio await asyncio.sleep(0.3) return "[Mock] OpenIntent enables structured multi-agent coordination with full observability." return ResearchAgent, SummarizerAgent def run_agent_in_background(agent_class, api_key: str, stop_event: threading.Event): """Run an agent in a background thread, listening for SSE assignments.""" def run(): try: agent = agent_class(base_url="http://localhost:8000", api_key=api_key) while not stop_event.is_set(): try: agent._process_next_event(timeout=0.5) except Exception: if not stop_event.is_set(): time.sleep(0.1) except Exception as e: print_debug(f"Agent thread error: {e}") thread = threading.Thread(target=run, daemon=True) thread.start() return thread def run_demo(): global LLM_PROVIDER, LLM_MODEL print(BANNER) check_python_version() LLM_PROVIDER, LLM_MODEL = detect_llm() if LLM_PROVIDER == "mock": print("\n\033[33mNo LLM key detected - running in mock mode.\033[0m") print("\033[90mSet OPENAI_API_KEY or ANTHROPIC_API_KEY for real LLM responses.\033[0m") else: mode = "streaming" if STREAM else "standard" provider_name = "OpenAI" if LLM_PROVIDER == "openai" else "Anthropic" print(f"\n\033[32mUsing {provider_name} ({LLM_MODEL}) - {mode} mode\033[0m") if not STREAM: print("\033[90mTip: Set STREAM=1 for streaming output\033[0m") install_dependencies() server = start_server() def cleanup(signum=None, frame=None): print("\n\nShutting down...") server.terminate() server.wait() sys.exit(0) signal.signal(signal.SIGINT, cleanup) signal.signal(signal.SIGTERM, cleanup) try: print_step(3, "Defining agents with @Agent decorator") from openintent import OpenIntentClient, IntentStatus ResearchAgent, SummarizerAgent = define_agents() adapter_name = {"openai": "OpenAIAdapter", "anthropic": "AnthropicAdapter", "mock": "MockAdapter"}[LLM_PROVIDER] print_success(f"@Agent('researcher') defined - uses {adapter_name}") print_success(f"@Agent('summarizer') defined - uses {adapter_name}") print_info("Agents use @on_assignment handlers that execute on intent assignment") print_info("Return values automatically patch intent state") print_step(4, "Executing multi-agent workflow") orchestrator = OpenIntentClient( base_url="http://localhost:8000", api_key="demo-orchestrator", agent_id="orchestrator" ) print("\n \033[1;35m── PHASE 1: Orchestrator creates intent ──\033[0m") research_data = create_intent_http( api_key="demo-orchestrator", agent_id="orchestrator", title="Research AI Agent Coordination", description="Investigate best practices for coordinating multiple AI agents" ) research_intent = orchestrator.get_intent(research_data["id"]) print_intent_box(research_intent, "INTENT CREATED") print("\n \033[1;35m── PHASE 2: Assign to researcher (triggers @on_assignment) ──\033[0m") research_agent = ResearchAgent(base_url="http://localhost:8000", api_key="demo-research-agent") orchestrator.assign_agent(research_intent.id, "researcher") print_success("Agent assignment sent → SSE event dispatched") print_info("@on_assignment handler executing...") import asyncio result = asyncio.run(research_agent.work(research_intent)) research_agent.client.update_state(research_intent.id, research_intent.version, result) updated = research_agent.client.get_intent(research_intent.id) research_agent.client.set_status(research_intent.id, updated.version, IntentStatus.COMPLETED) print_success("Handler returned → state auto-patched → intent completed") print("\n \033[1;35m── PHASE 3: Chain to summarizer ──\033[0m") summarize_data = create_intent_http( api_key="demo-orchestrator", agent_id="orchestrator", title="Summarize Research Findings", description="Create executive summary" ) summarize_intent = orchestrator.get_intent(summarize_data["id"]) research_result = orchestrator.get_intent(research_intent.id) orchestrator.update_state( summarize_intent.id, summarize_intent.version, {"findings": research_result.state.data.get("findings", "")} ) print_intent_box(summarize_intent, "INTENT CREATED") print_detail("Input", "Findings passed from research intent") print("\n \033[1;35m── PHASE 4: Assign to summarizer (triggers @on_assignment) ──\033[0m") summarizer_agent = SummarizerAgent(base_url="http://localhost:8000", api_key="demo-summarizer-agent") summarize_intent = orchestrator.get_intent(summarize_intent.id) orchestrator.assign_agent(summarize_intent.id, "summarizer") print_success("Agent assignment sent → SSE event dispatched") print_info("@on_assignment handler executing...") result = asyncio.run(summarizer_agent.work(summarize_intent)) summarizer_agent.client.update_state(summarize_intent.id, summarize_intent.version, result) updated = summarizer_agent.client.get_intent(summarize_intent.id) summarizer_agent.client.set_status(summarize_intent.id, updated.version, IntentStatus.COMPLETED) print_success("Handler returned → state auto-patched → intent completed") print_step(5, "Results") final_research = orchestrator.get_intent(research_intent.id) final_summary = orchestrator.get_intent(summarize_intent.id) print("\n\033[1mResearch Findings:\033[0m") findings = final_research.state.data.get('findings', 'N/A') print(f" {findings[:300]}..." if len(findings) > 300 else f" {findings}") print("\n\033[1mExecutive Summary:\033[0m") print(f" {final_summary.state.data.get('summary', 'N/A')}") print_step(6, "Full Observability - Distributed Trace") print_info("Complete end-to-end execution trace:") research_events = orchestrator.get_events(research_intent.id) summary_events = orchestrator.get_events(summarize_intent.id) all_events = [("research", e) for e in research_events] + [("summary", e) for e in summary_events] intents = {"research": final_research, "summary": final_summary} print_trace_visualization(all_events, intents) print("\n\033[32m" + "=" * 70 + "\033[0m") print("\033[32mDemo complete! You've seen:\033[0m") print(" \033[1m1.\033[0m @Agent decorator - declarative agent definitions") print(" \033[1m2.\033[0m @on_assignment - handlers trigger on intent assignment") print(" \033[1m3.\033[0m LLM Adapters - automatic token/cost tracking") print(" \033[1m4.\033[0m Auto state patching - handler returns update intent") print(" \033[1m5.\033[0m Distributed tracing - end-to-end workflow visibility") print(" \033[1m6.\033[0m Cost rollup - total tokens and estimated cost") print("\033[32m" + "=" * 70 + "\033[0m") print("\n\033[1mThe @Agent Pattern:\033[0m") print(""" @Agent("researcher") class ResearchAgent: @on_assignment async def work(self, intent): # Called when assigned to an intent result = await self.do_work(intent) return {"findings": result} # Auto-patches state """) print("\033[1mNext steps:\033[0m") print(" pip install openintent[server]") print(" # Visit https://openintent.ai/docs for full documentation") except Exception as e: print_error(f"Demo failed: {e}") if DEBUG: traceback.print_exc() else: print("\033[33mRun with DEBUG=1 for details\033[0m") raise finally: cleanup() if __name__ == "__main__": run_demo()