Python SDK

Install the SDK:

pip install rebuno

The public surface is small: Agent, Client, Runner, tool, MCPServer, remote, execution.

Authentication

The kernel uses --bearer-token / REBUNO_BEARER_TOKEN to configure bearer token authentication. The Python SDK accepts an api_key parameter which maps to this bearer token, sent as an Authorization: Bearer <api_key> header on all HTTP requests and SSE connections.

agent = Agent("researcher", kernel_url="http://localhost:8080", api_key="your-secret-token")

When constructed without arguments, Agent, Client, and Runner read REBUNO_URL and REBUNO_API_KEY from the environment. In development mode (kernel started without --bearer-token), api_key can be omitted.

Writing an Agent

An agent is a function plus a one-line entry point. The handler signature is the input schema — required parameters become required input fields; defaults make them optional.

from rebuno import Agent, execution

agent = Agent("my-agent")

async def process(prompt: str) -> dict:
    return {"answer": prompt.upper(), "execution_id": execution.id}

agent.run(process)

execution is a module-level proxy resolving to the current ExecutionState via a ContextVar. Use it inside process (or any tool) to access id, session_id, input, labels, history. Outside an active execution, attribute access raises RuntimeError.

Input shapes

The framework introspects the handler signature once and routes claim.input to it three ways:

# Kwargs (most common)
async def process(prompt: str, repo_url: str = ""):
    ...

# Pydantic model — validation, defaults, JSON Schema
class Input(BaseModel):
    prompt: str
    repo_url: str = ""

async def process(input: Input):
    ...

# Raw dict — opaque passthrough
async def process(input: dict):
    ...

A missing required input fails the execution with a clear error message before the handler runs.

Local Tools

Tools are plain module-level functions. @tool registers them globally; the wrapper submits a kernel intent before running the body.

from rebuno import tool

@tool("web_search")
async def search(query: str, limit: int = 10) -> dict:
    """Search the web."""
    return {"results": await do_search(query, limit)}

Pass them to your framework as a list:

async def process(prompt: str):
    graph = create_agent(llm, [search])
    return await graph.ainvoke({"messages": [HumanMessage(prompt)]})

Remote Tools

@tool(remote=True) declares a tool whose body lives on a runner. The body is a stub — the kernel routes execution. Useful for type-checked imperative calls in agent code:

@tool("compute.heavy", remote=True)
async def heavy(data: str) -> str:
    """Run on a runner."""

# Inside a handler:
result = await heavy(data="x")

For LLM-driven discovery (no source-coupling), use remote.Tools(prefix) instead — see Remote tool discovery.

Mixing local, remote, and MCP

Local @tool functions, remote @tool(remote=True) stubs, MCP tools, and discovered remote tools can all coexist in one tool list:

graph = create_agent(llm, [
    search,                  # local @tool
    heavy,                   # remote @tool stub
    *github.tools,           # local MCP (MCPServer)
    *compute.tools,          # remote discovery (remote.Tools)
])

Direct invocation via execution

You can also invoke tools or wait on signals through the execution proxy:

async def process(prompt: str):
    result = await execution.invoke_tool("web_search", {"query": prompt})
    approval = await execution.wait_signal("approval")
    return {"answer": result, "approved": approval.get("approved")}

Most agents won't need this — return a value to complete, raise to fail. The proxy is for the rare case where the framework's tool plumbing isn't enough.

Framework integration

The wrappers preserve __name__, __doc__, and synthesized inspect.Signature. Any framework that introspects callables works unmodified:

# LangGraph
from langgraph.prebuilt import create_react_agent
graph = create_react_agent(model=llm, tools=[search, *github.tools], prompt=SYSTEM_PROMPT)

Local MCP

Connect to MCP servers directly from the agent process. Requires pip install rebuno[mcp].

from rebuno import MCPServer

# Stdio server
fs = MCPServer(
    "filesystem",
    command="npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
)

# HTTP server
github = MCPServer(
    "github",
    url="https://api.githubcopilot.com/mcp/",
    headers={"Authorization": "Bearer xxx"},
)

Tools are namespaced by prefix (defaults to name): filesystem.read_file, github.create_pr, etc.

Splat into your tool list:

graph = create_agent(llm, [..., *fs.tools, *github.tools])

Servers connect at agent.run() startup and disconnect on shutdown. Tool calls go through the kernel for policy/audit; the MCP transport happens in the agent process.

Remote tool discovery

Use remote.Tools(prefix) to consume tools that runners host elsewhere — the agent never holds credentials, never opens MCP transport. Schemas come from the kernel directory (GET /v0/tools).

from rebuno import remote

github  = remote.Tools("github")
compute = remote.Tools("compute")

async def process(prompt: str):
    graph = create_agent(llm, [..., *github.tools, *compute.tools])
    return await graph.ainvoke({"messages": [HumanMessage(prompt)]})

Each call submits an intent with remote=True; the kernel routes to whichever runner advertises the tool ID. Works for any source the runner publishes — @tool Python functions, MCP servers, or both.

Schemas are fetched once at agent startup. Restart the agent to pick up runner-side schema changes.

Constructor Parameters

Agent

ParameterTypeDefaultDescription
agent_idstrrequiredUnique identifier for this agent. Must match the agent ID used in policy and execution creation.
kernel_urlstr | Noneenv REBUNO_URLURL of the kernel (e.g., http://localhost:8080).
api_keystr | Noneenv REBUNO_API_KEYBearer token. Maps to the kernel's --bearer-token.
consumer_idstr""SSE connection identifier. Auto-generated as {agent_id}-{random} if empty. See Consumer ID.
reconnect_delayfloat3.0Initial reconnect delay (seconds).
max_reconnect_delayfloat60.0Cap on reconnect backoff.

Runner

ParameterTypeDefaultDescription
runner_idstrrequiredUnique identifier.
kernel_urlstr | Noneenv REBUNO_URLURL of the kernel.
api_keystr | Noneenv REBUNO_API_KEYBearer token.
capabilitiesIterable[str] | Noneall @tool IDsTool IDs to advertise. If omitted, every @tool in the registry is advertised plus any tools from MCP servers added with runner.host(...).
consumer_idstr""Auto-generated if empty.

Client

ParameterTypeDefaultDescription
base_urlstr | Noneenv REBUNO_URLURL of the kernel.
api_keystr | Noneenv REBUNO_API_KEYBearer token.
timeoutfloat35.0HTTP timeout (seconds).
max_retriesint3Retry attempts on retryable failures.

Consumer ID

The consumer_id identifies a specific SSE connection instance for a given agent_id (or runner_id). It serves several purposes:

  • Multiple consumers: Multiple processes can connect with the same agent_id but different consumer_id values for redundancy and load distribution.
  • Round-robin assignment: The kernel round-robins execution assignments across all connected consumers for the same agent.
  • Uniqueness: Each consumer_id must be unique per connection. If omitted, the SDK auto-generates one.
agent1 = Agent("researcher", consumer_id="researcher-instance-1")
agent2 = Agent("researcher", consumer_id="researcher-instance-2")

execution Reference

from rebuno import execution exposes the current execution. Valid only inside a handler or a @tool body running under agent.run().

PropertyDescription
execution.idCurrent execution ID
execution.session_idCurrent session ID
execution.agent_idAgent ID
execution.inputRaw input data from the execution request
execution.labelsExecution labels
execution.historyPrevious step history
MethodDescription
await execution.invoke_tool(tool_id, arguments)Invoke a tool and wait for the result
await execution.wait_signal(signal_type)Wait until an external signal arrives
await execution.complete(output)Mark complete (usually unnecessary — return from the handler)
await execution.fail(error)Mark failed (usually unnecessary — raise from the handler)

Writing a Runner

A runner advertises capabilities, publishes their schemas to the kernel directory, and services job assignments. Tools are the same @tool decorator used by agents — the runner picks them up from the global registry.

from rebuno import Runner, tool

@tool("compute.heavy")
async def heavy(data: str) -> str:
    """Run on a runner."""
    return await do_work(data)

Runner("compute-1").run()

Schemas for @tool functions are auto-derived from inspect.signature via pydantic. Annotated[X, Field(description=...)] flows through to JSON Schema descriptions, so the LLM (on the agent side) gets useful per-parameter docs.

Hosting an MCP server on a runner

Use runner.host() to back tools with an MCP transport. The runner connects to MCP, lists tools, registers their schemas with the kernel, and forwards invocations.

from rebuno import Runner, MCPServer

runner = Runner("github-runner")
runner.host(MCPServer(
    "github",
    url="https://api.githubcopilot.com/mcp/",
    headers={"Authorization": "Bearer xxx"},
))
runner.run()

Agents consume those tools via remote.Tools("github"). Credentials never leave the runner.

Mixing @tool and MCP on a runner

Allowed. The runner publishes the union of both. Hard error at startup if any tool ID collides — pick one source per tool ID.

Client (external services)

For services that aren't agents (Discord bots, GitHub webhooks, dashboards), Client is the HTTP API:

from rebuno import Client

client = Client()  # env-configured

# Fire and forget
ex = await client.create("swe", input={"prompt": "..."})

# Stream events until terminal
async for event in client.run("swe", input={"prompt": "..."}):
    print(event.tool_id)

# Convenience: create + stream + return final state
result = await client.run_until_complete("swe", input={"prompt": "..."}, on_event=cb)
MethodDescription
await client.create(agent_id, input=, labels=)Create an execution
await client.get(id)Fetch current state
await client.list(...)List executions with filters
await client.cancel(id)Cancel a running execution
await client.send_signal(id, type, payload=)Send a signal
async for event in client.events(id)Stream events (terminates on terminal event)
async for event in client.run(agent_id, input=)Create + stream
await client.run_until_complete(...)Create + stream + return final Execution
await client.list_tools(prefix=)Read the kernel tool directory

Timeouts

Step timeouts are controlled by policy rules. When a policy rule includes timeout_ms in its then block, that value is used as the step deadline. Otherwise the global StepTimeout (default: 5 min) applies. See Policy for details.