Custom MCP Client Integration

This guide shows you how to build a custom MCP (Model Context Protocol) client that connects to the Orchex MCP server. This is useful if you're building your own AI-powered tools or integrating Orchex into custom workflows.

Overview

The Model Context Protocol (MCP) is an open protocol that standardizes how AI applications provide context to large language models. Orchex implements an MCP server that exposes its manifest execution capabilities through this protocol.

What You'll Build

A custom client that:

  • Connects to the Orchex MCP server
  • Lists available tools (orchestration operations)
  • Executes orchestrations programmatically
  • Handles streaming responses and artifacts

Prerequisites

  • Node.js 18+ or Python 3.10+
  • Orchex installed (npm install -g @wundam/orchex)
  • API key for your preferred LLM provider (Anthropic, OpenAI, Gemini, or Ollama)
  • Understanding of async/await or async programming
  • Familiarity with TypeScript or Python

MCP Protocol Basics

Architecture

┌─────────────────┐         ┌──────────────────┐
│  Your Client    │ ◄─────► │  Orchex Server   │
│                 │   MCP   │                  │
│ - Tool listing  │         │ - init           │
│ - Tool calling  │         │ - execute        │
│ - Result handling│        │ - status / learn │
└─────────────────┘         └──────────────────┘

Communication Flow

  1. Initialize: Client starts the Orchex MCP server process
  2. List Tools: Client requests available tools from server
  3. Call Tool: Client invokes a tool with parameters
  4. Stream Response: Server streams execution progress
  5. Handle Artifacts: Client processes generated files

Transport

MCP uses stdio transport - the server runs as a subprocess, and communication happens over stdin/stdout using JSON-RPC 2.0 messages.

Provider Configuration

Before starting the Orchex MCP server, configure your LLM provider via environment variables:

# Anthropic (Claude)
export ANTHROPIC_API_KEY="sk-ant-..."

# OpenAI (GPT-4, GPT-4.5)
export OPENAI_API_KEY="sk-..."

# Google Gemini
export GEMINI_API_KEY="AI..."

# Ollama (local models)
export OLLAMA_BASE_URL="http://localhost:11434"

# Optional: Force specific provider
export ORCHEX_PROVIDER="openai"

Orchex auto-detects the provider based on which API key is set. Priority: Anthropic > OpenAI > Gemini > Ollama.

TypeScript/Node.js Implementation

Installation

npm install @modelcontextprotocol/sdk

Basic Client

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import { spawn } from "child_process";

interface OrchexClientOptions {
  provider?: 'anthropic' | 'openai' | 'gemini' | 'ollama';
  apiKey?: string;
  ollamaBaseUrl?: string;
}

class OrchexMCPClient {
  private client: Client;
  private transport: StdioClientTransport;
  private options: OrchexClientOptions;

  constructor(options: OrchexClientOptions = {}) {
    this.options = options;
  }

  async connect() {
    // Build environment variables for the provider
    const env: Record<string, string> = { ...process.env };

    if (this.options.provider) {
      env.ORCHEX_PROVIDER = this.options.provider;
    }

    if (this.options.apiKey) {
      switch (this.options.provider) {
        case 'anthropic':
          env.ANTHROPIC_API_KEY = this.options.apiKey;
          break;
        case 'openai':
          env.OPENAI_API_KEY = this.options.apiKey;
          break;
        case 'gemini':
          env.GEMINI_API_KEY = this.options.apiKey;
          break;
      }
    }

    if (this.options.ollamaBaseUrl) {
      env.OLLAMA_BASE_URL = this.options.ollamaBaseUrl;
    }

    // Create transport using server's stdin/stdout with provider config
    this.transport = new StdioClientTransport({
      command: "npx",
      args: ["-y", "@wundam/orchex"],
      env,
    });

    // Initialize MCP client
    this.client = new Client(
      {
        name: "my-orchex-client",
        version: "1.0.0",
      },
      {
        capabilities: {
          tools: {},
        },
      }
    );

    // Connect to server
    await this.client.connect(this.transport);
    console.log("Connected to Orchex MCP server");
  }

  async listTools() {
    const response = await this.client.listTools();
    return response.tools;
  }

  async executeOrchestration(mode: string = "auto") {
    const result = await this.client.callTool({
      name: "execute",
      arguments: {
        mode,
      },
    });

    return result;
  }

  async close() {
    await this.client.close();
  }
}

// Usage with different providers
// OpenAI
const openaiClient = new OrchexMCPClient({
  provider: 'openai',
  apiKey: process.env.OPENAI_API_KEY,
});
await openaiClient.connect();

// Anthropic
const claudeClient = new OrchexMCPClient({
  provider: 'anthropic',
  apiKey: process.env.ANTHROPIC_API_KEY,
});

// Ollama (local)
const ollamaClient = new OrchexMCPClient({
  provider: 'ollama',
  ollamaBaseUrl: 'http://localhost:11434',
});

// Execute with chosen provider
const tools = await openaiClient.listTools();
console.log("Available tools:", tools);

const result = await openaiClient.executeOrchestration("auto");
console.log("Execution result:", result);

await openaiClient.close();

Advanced: Streaming Progress

import { EventEmitter } from "events";

class StreamingOrchexClient extends OrchexMCPClient {
  private events = new EventEmitter();

  async executeWithProgress(manifestPath: string) {
    // Execute orchestration
    const result = await this.client.callTool({
      name: "execute",
      arguments: {
        mode: "auto",
      },
    });

    // Parse result for artifacts and status
    if (result.content && Array.isArray(result.content)) {
      for (const item of result.content) {
        if (item.type === "text") {
          // Parse orchestration artifact from JSON
          try {
            const artifact = JSON.parse(item.text);
            this.events.emit("artifact", artifact);
          } catch {
            // Plain text response
            this.events.emit("message", item.text);
          }
        }
      }
    }

    return result;
  }

  on(event: string, handler: (...args: any[]) => void) {
    this.events.on(event, handler);
  }
}

// Usage with progress tracking
const client = new StreamingOrchexClient();
await client.connect();

client.on("artifact", (artifact) => {
  console.log(`Wave ${artifact.currentWave}/${artifact.totalWaves}`);
  console.log(`Stream: ${artifact.currentStream?.id}`);
  console.log(`Status: ${artifact.currentStream?.status}`);
});

client.on("message", (text) => {
  console.log("Server message:", text);
});

await client.executeWithProgress("./my-manifest.yaml");

Error Handling

class RobustOrchexClient extends OrchexMCPClient {
  async executeOrchestrationSafely(manifestPath: string) {
    try {
      const result = await this.executeOrchestration(manifestPath);

      // Check if execution succeeded
      if (result.isError) {
        throw new Error(`Execution failed: ${result.content}`);
      }

      // Parse and validate artifact
      const content = result.content[0];
      if (content.type !== "text") {
        throw new Error("Unexpected response format");
      }

      const artifact = JSON.parse(content.text);

      // Check for failed streams
      const failedStreams = artifact.streams.filter(
        (s: any) => s.status === "failed"
      );

      if (failedStreams.length > 0) {
        console.warn(`${failedStreams.length} streams failed`);
        for (const stream of failedStreams) {
          console.error(`- ${stream.id}: ${stream.error}`);
        }
      }

      return artifact;
    } catch (error) {
      console.error("Execution error:", error);
      throw error;
    }
  }
}

Python Implementation

Installation

pip install mcp

Basic Client

import asyncio
import json
import os
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from typing import Optional, Literal

class OrchexMCPClient:
    def __init__(
        self,
        provider: Optional[Literal['anthropic', 'openai', 'gemini', 'ollama']] = None,
        api_key: Optional[str] = None,
        ollama_base_url: Optional[str] = None
    ):
        self.session = None
        self.exit_stack = None
        self.provider = provider
        self.api_key = api_key
        self.ollama_base_url = ollama_base_url

    async def connect(self):
        """Connect to Orchex MCP server with provider configuration"""
        # Build environment for provider
        env = dict(os.environ)

        if self.provider:
            env['ORCHEX_PROVIDER'] = self.provider

        if self.api_key:
            if self.provider == 'anthropic':
                env['ANTHROPIC_API_KEY'] = self.api_key
            elif self.provider == 'openai':
                env['OPENAI_API_KEY'] = self.api_key
            elif self.provider == 'gemini':
                env['GEMINI_API_KEY'] = self.api_key

        if self.ollama_base_url:
            env['OLLAMA_BASE_URL'] = self.ollama_base_url

        server_params = StdioServerParameters(
            command="npx",
            args=["-y", "@wundam/orchex"],
            env=env,
        )

        # Create stdio transport and session
        stdio_transport = await stdio_client(server_params)
        self.session = stdio_transport.session

        # Initialize connection
        await self.session.initialize()
        print("Connected to Orchex MCP server")

    async def list_tools(self):
        """List available tools"""
        response = await self.session.list_tools()
        return response.tools

    async def execute_orchestration(self, mode: str = "auto"):
        """Execute the active orchestration"""
        result = await self.session.call_tool(
            "execute",
            arguments={"mode": mode}
        )
        return result

    async def close(self):
        """Close the connection"""
        if self.session:
            await self.session.close()

# Usage with different providers
async def main():
    # OpenAI
    openai_client = OrchexMCPClient(
        provider='openai',
        api_key=os.environ.get('OPENAI_API_KEY')
    )
    await openai_client.connect()

    # Or Anthropic
    # claude_client = OrchexMCPClient(
    #     provider='anthropic',
    #     api_key=os.environ.get('ANTHROPIC_API_KEY')
    # )

    # Or Ollama (local)
    # ollama_client = OrchexMCPClient(
    #     provider='ollama',
    #     ollama_base_url='http://localhost:11434'
    # )

    # List available tools
    tools = await openai_client.list_tools()
    print("Available tools:")
    for tool in tools:
        print(f"- {tool.name}: {tool.description}")

    # Execute orchestration
    result = await openai_client.execute_orchestration("auto")
    print("Execution result:", result)

    await openai_client.close()

if __name__ == "__main__":
    asyncio.run(main())

With Progress Tracking

import json
from typing import Callable, Optional

class StreamingOrchexClient(OrchexMCPClient):
    def __init__(self):
        super().__init__()
        self.on_artifact: Optional[Callable] = None
        self.on_message: Optional[Callable] = None

    async def execute_with_progress(self, mode: str = "auto"):
        """Execute orchestration with progress callbacks"""
        result = await self.execute_orchestration(mode)

        # Process result content
        if result.content:
            for item in result.content:
                if item.type == "text":
                    try:
                        # Try to parse as artifact JSON
                        artifact = json.loads(item.text)
                        if self.on_artifact:
                            self.on_artifact(artifact)
                    except json.JSONDecodeError:
                        # Plain text message
                        if self.on_message:
                            self.on_message(item.text)

        return result

# Usage
async def main():
    client = StreamingOrchexClient()
    await client.connect()

    # Set up callbacks
    def handle_artifact(artifact):
        print(f"Wave {artifact['currentWave']}/{artifact['totalWaves']}")
        if artifact.get('currentStream'):
            stream = artifact['currentStream']
            print(f"Stream: {stream['id']} - {stream['status']}")

    def handle_message(text):
        print(f"Message: {text}")

    client.on_artifact = handle_artifact
    client.on_message = handle_message

    # Execute with progress
    await client.execute_with_progress("auto")
    await client.close()

asyncio.run(main())

Available MCP Tools

The Orchex MCP server exposes these tools:

`init`

Initialize a new orchestration with a feature name and stream definitions.

Parameters:

  • feature (string, required): Feature name for the orchestration
  • streams (object, required): Map of stream ID to stream definition (name, owns, reads, deps, plan, setup, verify)
  • project_dir (string, optional): Project directory (defaults to cwd)

`execute`

Run the active orchestration — calls LLM API, applies artifacts, runs verification.

Parameters:

  • mode (string, optional): "auto" (all waves) or "wave" (one wave at a time). Default: "wave"
  • dry_run (boolean, optional): Preview without executing. Default: false
  • model (string, optional): LLM model override

Returns: Orchestration progress including stream statuses, artifacts, and token usage.

`status`

Get the current orchestration status and progress.

`complete`

Mark a stream as complete, or archive the entire orchestration.

Parameters:

  • stream_id (string, optional): Stream ID to mark complete
  • archive (boolean, optional): Archive the orchestration

`learn`

Parse a planning document and generate parallel stream definitions.

Parameters:

  • document_path (string, required): Path to markdown planning document

`init-plan`

Generate an annotated plan template optimized for orchex learn.

`recover`

Detect and recover streams stuck in in_progress or failed state.

`reload`

Restart the MCP server to pick up code or config changes.

Common Use Cases

1. CI/CD Pipeline Integration

// Run Orchex in GitHub Actions or GitLab CI
import { OrchexMCPClient } from "./orchex-client";

async function runInCI() {
  const client = new OrchexMCPClient();
  await client.connect();

  try {
    const result = await client.executeOrchestration(
      process.env.MANIFEST_PATH || "./ci-manifest.yaml"
    );

    // Parse artifact
    const artifact = JSON.parse(result.content[0].text);

    // Check for failures
    const failed = artifact.streams.filter(
      (s: any) => s.status === "failed"
    );

    if (failed.length > 0) {
      console.error("Execution failed");
      process.exit(1);
    }

    console.log("✓ All streams completed successfully");
  } finally {
    await client.close();
  }
}

2. Interactive CLI Tool

import asyncio
import sys
from orchex_client import StreamingOrchexClient

async def interactive_execution():
    client = StreamingOrchexClient()
    await client.connect()

    # Show progress bar
    def show_progress(artifact):
        current = artifact['currentWave']
        total = artifact['totalWaves']
        percent = (current / total) * 100
        bar = '█' * int(percent / 5) + '░' * (20 - int(percent / 5))
        print(f"\r[{bar}] {percent:.0f}% ({current}/{total} waves)", end='')

    client.on_artifact = show_progress

    manifest_path = sys.argv[1] if len(sys.argv) > 1 else "./manifest.yaml"
    await client.execute_with_progress(manifest_path)

    print("\n✓ Execution complete")
    await client.close()

asyncio.run(interactive_execution())

3. Web Service Integration

import express from "express";
import { OrchexMCPClient } from "./orchex-client";

const app = express();
app.use(express.json());

// Endpoint to execute manifests
app.post("/api/execute", async (req, res) => {
  const { manifestPath } = req.body;

  const client = new OrchexMCPClient();
  try {
    await client.connect();
    const result = await client.executeOrchestration(manifestPath);

    res.json({
      success: true,
      result: JSON.parse(result.content[0].text),
    });
  } catch (error) {
    res.status(500).json({
      success: false,
      error: error.message,
    });
  } finally {
    await client.close();
  }
});

app.listen(3000, () => {
  console.log("Orchex API server running on port 3000");
});

Best Practices

Connection Management

// ✓ Good: Reuse client for multiple operations
const client = new OrchexMCPClient();
await client.connect();

try {
  await client.executeOrchestration("./manifest1.yaml");
  await client.executeOrchestration("./manifest2.yaml");
} finally {
  await client.close();
}

// ✗ Bad: Create new client for each operation
for (const manifest of manifests) {
  const client = new OrchexMCPClient();
  await client.connect();
  await client.executeOrchestration(manifest);
  await client.close(); // Wasteful
}

Error Handling

// Always handle both connection and execution errors
try {
  await client.connect();
} catch (error) {
  console.error("Failed to connect to Orchex server:", error);
  // Check if orchex is installed
  // Check if server is accessible
  throw error;
}

try {
  const result = await client.executeOrchestration(manifestPath);
} catch (error) {
  console.error("Manifest execution failed:", error);
  // Check manifest syntax
  // Check file paths
  throw error;
}

Resource Cleanup

# Use context managers for automatic cleanup
from contextlib import asynccontextmanager

@asynccontextmanager
async def orchex_client():
    client = OrchexMCPClient()
    try:
        await client.connect()
        yield client
    finally:
        await client.close()

# Usage
async def main():
    async with orchex_client() as client:
        await client.execute_orchestration("./manifest.yaml")
    # Client automatically closed

Debugging

Enable Verbose Logging

// TypeScript: Log all MCP messages
const transport = new StdioClientTransport({
  command: "npx",
  args: ["-y", "@wundam/orchex"],
});

transport.onmessage = (message) => {
  console.log("→", JSON.stringify(message, null, 2));
};

transport.onerror = (error) => {
  console.error("✗", error);
};
# Python: Enable debug logging
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("mcp")
logger.setLevel(logging.DEBUG)

Common Issues

Server not starting:

# Check if orchex is installed
npx @wundam/orchex --version

# Test server manually
npx -y @wundam/orchex

Connection timeout:

// Increase timeout
const client = new Client(
  { name: "my-client", version: "1.0.0" },
  { capabilities: { tools: {} } },
  { timeout: 30000 } // 30 seconds
);

Invalid responses:

// Validate response structure
if (!result.content || !Array.isArray(result.content)) {
  throw new Error("Invalid response format");
}

Next Steps

Resources

Community

Have questions or want to share your MCP client implementation?