Skip to content

Agents module

Agent

An agent is a wrapper around a LLM model that can generate responses to input text.

Attributes:

Name Type Description
name str

The name of the agent.

model str

The LLM model that the agent will use. Models follow the format of "/" and are loaded using litellm. Examples: "openai/gpt-4o" "gemini/gemini-pro" "bedrock/bedrock-claude-v1"

response_format type[BaseModel] | None

The Pydantic model that the response will be validated against.

parameters dict[str, Any]

A dictionary of parameters that the model will use. These parameters are specific to the model. Examples: {"temperature": 0.5} {"max_tokens": 100}

tools list[Tool]

The tools exposed to the agent.

mcp list[MCPEndpoint]

The MCP endpoints that the agent can invoke.

hooks AgentHooks

Callbacks to invoke during various points of the agent runtime.

Source code in src/redpanda/agents/_agent.py
class Agent:
    """
    An agent is a wrapper around a LLM model that can generate responses to input text.

    Attributes:
        name: The name of the agent.
        model: The LLM model that the agent will use. Models follow the format of
            "<provider>/<model>" and are loaded using [litellm](https://docs.litellm.ai/docs/providers).
            Examples:
                "openai/gpt-4o"
                "gemini/gemini-pro"
                "bedrock/bedrock-claude-v1"
        response_format: The Pydantic model that the response will be validated against.
        parameters: A dictionary of parameters that the model will use.
            These parameters are specific to the model.
            Examples:
                {"temperature": 0.5}
                {"max_tokens": 100}
                {"temperature": 0.5, "max_tokens": 100}
        tools: The tools exposed to the agent.
        mcp: The MCP endpoints that the agent can invoke.
        hooks: Callbacks to invoke during various points of the agent runtime.
    """

    name: str
    model: str
    instructions: str | None
    response_format: type[BaseModel] | None
    parameters: dict[str, Any]
    tools: list[Tool]
    mcp: list[MCPEndpoint]
    hooks: AgentHooks

    def __init__(
        self,
        name: str,
        model: str,
        instructions: str | None = None,
        response_type: type[BaseModel] | None = None,
        tools: list[Tool] | None = None,
        mcp: list[MCPEndpoint] | None = None,
        hooks: AgentHooks | None = None,
        **kwargs: Any,
    ):
        """
        Args:
            name: The name of the agent.
            model: The LLM model that the agent will use. Models follow the format of
                "<provider>/<model>" and are loaded using [litellm](https://docs.litellm.ai/docs/providers).
                Examples:
                    "openai/gpt-4o"
                    "gemini/gemini-pro"
                    "bedrock/bedrock-claude-v1"
            instructions: The system prompt for the agent.
            response_type: The Pydantic model that the response will be validated against.
                If None, the response will be a string.
            tools: The tools exposed to the agent.
            mcp: The MCP endpoints that the agent can invoke.
            hooks: Callbacks to invoke during various points of the agent runtime.
            **kwargs: A dictionary of parameters that the model will use.
                These parameters are specific to the model.
                Examples:
                    {"temperature": 0.5}
                    {"max_tokens": 100}
                    {"temperature": 0.5, "max_tokens": 100}
        """
        self.name = name
        self.model = model
        self.instructions = instructions
        self.response_format = response_type
        self.parameters = kwargs
        self.tools = tools or []
        self.mcp = mcp or []
        self.hooks = hooks or AgentHooks()

    async def run(self, input: str) -> Any:
        """
        Generate a response from the model given an input text.

        Args:
            input: The input text that the model will use to generate a response.
        Returns:
            The generated response from the model.
        """
        await self.hooks.on_start(self)
        async with AsyncExitStack() as stack:
            mcp_clients: list[MCPClient] = []
            for server in self.mcp:
                mcp_clients.append(await stack.enter_async_context(mcp_client(server)))

            tools = {tool.name: tool for tool in self.tools}

            for client in mcp_clients:
                await client.initialize()
                for tool in await client.list_tools():
                    if tool.name not in tools:
                        tools[tool.name] = tool
                    else:
                        # TODO: Warn on conflicting tools?
                        pass

            tool_defs = [
                {
                    "type": "function",
                    "function": {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": tool.parameters,
                    },
                }
                for tool in tools.values()
            ]
            messages: list[dict[str, str] | Message] = [{"role": "user", "content": input}]
            if self.instructions:
                messages = [{"role": "system", "content": self.instructions}] + messages
            while True:
                model_resp = await self._invoke_llm(messages, tool_defs)
                choice_resp = model_resp.choices[-1]
                if isinstance(choice_resp, StreamingChoices):
                    raise Exception("unexpected streaming response type")
                if choice_resp.message.tool_calls:
                    messages.append(choice_resp.message)
                    messages.extend(await self._call_tools(choice_resp.message.tool_calls, tools))
                    continue
                output = choice_resp.message.content
                if output is None:
                    raise Exception("unexpected response type of None")
                if self.response_format is not None:
                    output = self.response_format.model_validate_json(output)
                await self.hooks.on_end(self, output)
                return output

    async def _invoke_llm(
        self, messages: list[dict[str, str] | Message], tool_defs: list[dict[str, Any]]
    ):
        with trace.get_tracer("redpanda.agent").start_as_current_span("invoke_llm") as span:
            span.set_attribute("model", self.model)
            span.set_attribute(
                "messages",
                json.dumps([m.model_dump() if isinstance(m, BaseModel) else m for m in messages]),
            )
            resp = await acompletion(
                model=self.model,
                response_format=self.response_format,
                messages=messages,
                tools=tool_defs,
                **self.parameters,
            )
            if isinstance(resp, CustomStreamWrapper):
                raise Exception("unexpected response type of CustomStreamWrapper")
            if hasattr(resp, "usage") and isinstance(resp.usage, Usage):  # pyright: ignore[reportAttributeAccessIssue,reportUnknownMemberType]
                usage = resp.usage  # pyright: ignore[reportAttributeAccessIssue]
                span.set_attribute("completion_tokens", usage.completion_tokens)
                span.set_attribute("prompt_tokens", usage.prompt_tokens)
                span.set_attribute("total_tokens", usage.total_tokens)
                if usage.completion_tokens_details:
                    ctd = usage.completion_tokens_details
                    if ctd.accepted_prediction_tokens:
                        span.set_attribute(
                            "accepted_prediction_tokens", ctd.accepted_prediction_tokens
                        )
                    if ctd.rejected_prediction_tokens:
                        span.set_attribute(
                            "rejected_reasoning_tokens", ctd.rejected_prediction_tokens
                        )
                    if ctd.reasoning_tokens:
                        span.set_attribute("reasoning_tokens", ctd.reasoning_tokens)
                if usage.prompt_tokens_details:
                    ptd = usage.prompt_tokens_details
                    if ptd.cached_tokens:
                        span.set_attribute("cached_tokens", ptd.cached_tokens)
            span.set_attribute("response", json.dumps([m.model_dump() for m in resp.choices]))
            return resp

    async def _call_tools(
        self, tool_calls: list[ChatCompletionMessageToolCall], tools: dict[str, Tool]
    ) -> list[dict[str, Any]]:
        messages: list[dict[str, Any]] = []
        for tool_call in tool_calls:
            func = tool_call.function
            selected = func.name and tools.get(func.name)
            if not selected:
                raise Exception(f"tool {func.name} not found")
            await self.hooks.on_tool_start(self, selected, func.arguments)
            resp = await selected(json.loads(func.arguments))
            if isinstance(resp, ToolResponse):
                output: Any = []
                for content in resp.content:
                    if content.type == "text":
                        output.append(
                            {
                                "type": "text",
                                "text": content.data,
                            }
                        )
                    elif content.type == "image":
                        output.append(
                            {
                                "type": "image_url",
                                "image_url": {
                                    "url": f"data:{content.mime_type};base64,{content.data}",
                                },
                            }
                        )
                    else:
                        raise NotImplementedError(f"Unknown content type: {content.type}")
            if isinstance(resp, BaseModel):
                output = resp.model_dump_json()
            else:
                output = json.dumps(resp)
            await self.hooks.on_tool_end(
                self, selected, output if isinstance(output, str) else json.dumps(output)
            )
            messages.append(
                {
                    "tool_call_id": tool_call.id,
                    "role": "tool",
                    "name": selected.name,
                    "content": output,
                }
            )
        return messages

    def as_tool(self) -> Tool:
        # TODO(rockwood): support handoffs and passing more context
        return AgentTool(self)

__init__

__init__(
    name: str,
    model: str,
    instructions: str | None = None,
    response_type: type[BaseModel] | None = None,
    tools: list[Tool] | None = None,
    mcp: list[MCPEndpoint] | None = None,
    hooks: AgentHooks | None = None,
    **kwargs: Any,
)

Parameters:

Name Type Description Default
name str

The name of the agent.

required
model str

The LLM model that the agent will use. Models follow the format of "/" and are loaded using litellm. Examples: "openai/gpt-4o" "gemini/gemini-pro" "bedrock/bedrock-claude-v1"

required
instructions str | None

The system prompt for the agent.

None
response_type type[BaseModel] | None

The Pydantic model that the response will be validated against. If None, the response will be a string.

None
tools list[Tool] | None

The tools exposed to the agent.

None
mcp list[MCPEndpoint] | None

The MCP endpoints that the agent can invoke.

None
hooks AgentHooks | None

Callbacks to invoke during various points of the agent runtime.

None
**kwargs Any

A dictionary of parameters that the model will use. These parameters are specific to the model. Examples: {"temperature": 0.5} {"max_tokens": 100}

{}
Source code in src/redpanda/agents/_agent.py
def __init__(
    self,
    name: str,
    model: str,
    instructions: str | None = None,
    response_type: type[BaseModel] | None = None,
    tools: list[Tool] | None = None,
    mcp: list[MCPEndpoint] | None = None,
    hooks: AgentHooks | None = None,
    **kwargs: Any,
):
    """
    Args:
        name: The name of the agent.
        model: The LLM model that the agent will use. Models follow the format of
            "<provider>/<model>" and are loaded using [litellm](https://docs.litellm.ai/docs/providers).
            Examples:
                "openai/gpt-4o"
                "gemini/gemini-pro"
                "bedrock/bedrock-claude-v1"
        instructions: The system prompt for the agent.
        response_type: The Pydantic model that the response will be validated against.
            If None, the response will be a string.
        tools: The tools exposed to the agent.
        mcp: The MCP endpoints that the agent can invoke.
        hooks: Callbacks to invoke during various points of the agent runtime.
        **kwargs: A dictionary of parameters that the model will use.
            These parameters are specific to the model.
            Examples:
                {"temperature": 0.5}
                {"max_tokens": 100}
                {"temperature": 0.5, "max_tokens": 100}
    """
    self.name = name
    self.model = model
    self.instructions = instructions
    self.response_format = response_type
    self.parameters = kwargs
    self.tools = tools or []
    self.mcp = mcp or []
    self.hooks = hooks or AgentHooks()

run async

run(input: str) -> Any

Generate a response from the model given an input text.

Parameters:

Name Type Description Default
input str

The input text that the model will use to generate a response.

required

Returns: The generated response from the model.

Source code in src/redpanda/agents/_agent.py
async def run(self, input: str) -> Any:
    """
    Generate a response from the model given an input text.

    Args:
        input: The input text that the model will use to generate a response.
    Returns:
        The generated response from the model.
    """
    await self.hooks.on_start(self)
    async with AsyncExitStack() as stack:
        mcp_clients: list[MCPClient] = []
        for server in self.mcp:
            mcp_clients.append(await stack.enter_async_context(mcp_client(server)))

        tools = {tool.name: tool for tool in self.tools}

        for client in mcp_clients:
            await client.initialize()
            for tool in await client.list_tools():
                if tool.name not in tools:
                    tools[tool.name] = tool
                else:
                    # TODO: Warn on conflicting tools?
                    pass

        tool_defs = [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters,
                },
            }
            for tool in tools.values()
        ]
        messages: list[dict[str, str] | Message] = [{"role": "user", "content": input}]
        if self.instructions:
            messages = [{"role": "system", "content": self.instructions}] + messages
        while True:
            model_resp = await self._invoke_llm(messages, tool_defs)
            choice_resp = model_resp.choices[-1]
            if isinstance(choice_resp, StreamingChoices):
                raise Exception("unexpected streaming response type")
            if choice_resp.message.tool_calls:
                messages.append(choice_resp.message)
                messages.extend(await self._call_tools(choice_resp.message.tool_calls, tools))
                continue
            output = choice_resp.message.content
            if output is None:
                raise Exception("unexpected response type of None")
            if self.response_format is not None:
                output = self.response_format.model_validate_json(output)
            await self.hooks.on_end(self, output)
            return output

AgentHooks

A class that receives callbacks on various lifecycle events for a specific agent.

Source code in src/redpanda/agents/_agent.py
class AgentHooks:
    """
    A class that receives callbacks on various lifecycle events for a specific agent.
    """

    async def on_start(
        self,
        agent: "Agent",
    ) -> None:
        """Called before the agent is invoked."""
        _ = agent

    async def on_end(
        self,
        agent: "Agent",
        output: Any,
    ) -> None:
        """Called when the agent produces a final output."""
        _ = agent, output

    async def on_tool_start(
        self,
        agent: "Agent",
        tool: Tool,
        args: str,
    ) -> None:
        """Called before a tool is invoked."""
        _ = agent, tool, args

    async def on_tool_end(
        self,
        agent: "Agent",
        tool: Tool,
        result: str,
    ) -> None:
        """Called after a tool is invoked."""
        _ = agent, tool, result

on_start async

on_start(agent: Agent) -> None

Called before the agent is invoked.

Source code in src/redpanda/agents/_agent.py
async def on_start(
    self,
    agent: "Agent",
) -> None:
    """Called before the agent is invoked."""
    _ = agent

on_end async

on_end(agent: Agent, output: Any) -> None

Called when the agent produces a final output.

Source code in src/redpanda/agents/_agent.py
async def on_end(
    self,
    agent: "Agent",
    output: Any,
) -> None:
    """Called when the agent produces a final output."""
    _ = agent, output

on_tool_start async

on_tool_start(agent: Agent, tool: Tool, args: str) -> None

Called before a tool is invoked.

Source code in src/redpanda/agents/_agent.py
async def on_tool_start(
    self,
    agent: "Agent",
    tool: Tool,
    args: str,
) -> None:
    """Called before a tool is invoked."""
    _ = agent, tool, args

on_tool_end async

on_tool_end(agent: Agent, tool: Tool, result: str) -> None

Called after a tool is invoked.

Source code in src/redpanda/agents/_agent.py
async def on_tool_end(
    self,
    agent: "Agent",
    tool: Tool,
    result: str,
) -> None:
    """Called after a tool is invoked."""
    _ = agent, tool, result

MCPEndpoint

An Base class for all endpoints (ways to connect) to MCP servers.

Source code in src/redpanda/agents/_mcp.py
class MCPEndpoint:
    """
    An Base class for all endpoints (ways to connect) to MCP servers.
    """

    # TODO(rockwood): support list change notifications
    _cache_enabled: bool
    _cached_tool_list: list[MCPToolDef] | None = None

    def __init__(self, cache_enabled: bool) -> None:
        self._cache_enabled = cache_enabled

SSEMCPEndpoint

Bases: MCPEndpoint

A MCP endpoint that communicates with an MCP server over Server-Sent Events.

Source code in src/redpanda/agents/_mcp.py
class SSEMCPEndpoint(MCPEndpoint):
    """
    A MCP endpoint that communicates with an MCP server over Server-Sent Events.
    """

    url: str

    def __init__(self, url: str, cache_enabled: bool = True):
        """
        Create a new SSEMCPEndpoint instance.

        Args:
            url: The URL of the SSE server.
            cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
        """
        super().__init__(cache_enabled)
        self.url = url

    @property
    def headers(self) -> dict[str, Any]:
        return {}

__init__

__init__(url: str, cache_enabled: bool = True)

Create a new SSEMCPEndpoint instance.

Parameters:

Name Type Description Default
url str

The URL of the SSE server.

required
cache_enabled bool

Whether to cache the list of {tools,resources,prompts} from the server.

True
Source code in src/redpanda/agents/_mcp.py
def __init__(self, url: str, cache_enabled: bool = True):
    """
    Create a new SSEMCPEndpoint instance.

    Args:
        url: The URL of the SSE server.
        cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
    """
    super().__init__(cache_enabled)
    self.url = url

StdioMCPEndpoint

Bases: MCPEndpoint

A MCP endpoint that invokes a local process and communicates over stdin/stdout.

Source code in src/redpanda/agents/_mcp.py
class StdioMCPEndpoint(MCPEndpoint):
    """
    A MCP endpoint that invokes a local process and communicates over stdin/stdout.
    """

    params: StdioServerParameters

    def __init__(self, params: StdioServerParameters, cache_enabled: bool = True):
        """
        Create a new StdioMCPEndpoint instance.

        Args:
            params: The parameters for the server.
            cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
        """
        super().__init__(cache_enabled)
        self.params = params

__init__

__init__(
    params: StdioServerParameters,
    cache_enabled: bool = True,
)

Create a new StdioMCPEndpoint instance.

Parameters:

Name Type Description Default
params StdioServerParameters

The parameters for the server.

required
cache_enabled bool

Whether to cache the list of {tools,resources,prompts} from the server.

True
Source code in src/redpanda/agents/_mcp.py
def __init__(self, params: StdioServerParameters, cache_enabled: bool = True):
    """
    Create a new StdioMCPEndpoint instance.

    Args:
        params: The parameters for the server.
        cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
    """
    super().__init__(cache_enabled)
    self.params = params

StreamableHTTPMCPEndpoint

Bases: MCPEndpoint

A MCP endpoint that communicates with an MCP server over HTTP streaming.

Source code in src/redpanda/agents/_mcp.py
class StreamableHTTPMCPEndpoint(MCPEndpoint):
    """
    A MCP endpoint that communicates with an MCP server over HTTP streaming.
    """

    url: str

    def __init__(self, url: str, cache_enabled: bool = True):
        """
        Create a new StreamableHTTPMCPEndpoint instance.

        Args:
            url: The URL of the HTTP server.
            cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
        """
        super().__init__(cache_enabled)
        self.url = url

    @property
    def headers(self) -> dict[str, Any]:
        return {}

__init__

__init__(url: str, cache_enabled: bool = True)

Create a new StreamableHTTPMCPEndpoint instance.

Parameters:

Name Type Description Default
url str

The URL of the HTTP server.

required
cache_enabled bool

Whether to cache the list of {tools,resources,prompts} from the server.

True
Source code in src/redpanda/agents/_mcp.py
def __init__(self, url: str, cache_enabled: bool = True):
    """
    Create a new StreamableHTTPMCPEndpoint instance.

    Args:
        url: The URL of the HTTP server.
        cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
    """
    super().__init__(cache_enabled)
    self.url = url

WebsocketMCPEndpoint

Bases: MCPEndpoint

A MCP endpoint that communicates with an MCP server over a WebSocket.

Source code in src/redpanda/agents/_mcp.py
class WebsocketMCPEndpoint(MCPEndpoint):
    """
    A MCP endpoint that communicates with an MCP server over a WebSocket.
    """

    url: str

    def __init__(self, url: str, cache_enabled: bool = True):
        """
        Create a new WebsocketMCPEndpoint instance.

        Args:
            url: The URL of the WebSocket server.
            cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
        """
        super().__init__(cache_enabled)
        self.url = url

__init__

__init__(url: str, cache_enabled: bool = True)

Create a new WebsocketMCPEndpoint instance.

Parameters:

Name Type Description Default
url str

The URL of the WebSocket server.

required
cache_enabled bool

Whether to cache the list of {tools,resources,prompts} from the server.

True
Source code in src/redpanda/agents/_mcp.py
def __init__(self, url: str, cache_enabled: bool = True):
    """
    Create a new WebsocketMCPEndpoint instance.

    Args:
        url: The URL of the WebSocket server.
        cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server.
    """
    super().__init__(cache_enabled)
    self.url = url

Tool

A tool is a function that can be called by an LLM with a set of parameters.

Source code in src/redpanda/agents/_tools.py
class Tool:
    """
    A tool is a function that can be called by an LLM with a set of parameters.
    """

    name: str
    """
    The name of the tool.
    """
    description: str | None
    """
    An optional description of the tool.
    """
    parameters: dict[str, Any]
    """
    A dictionary of parameters (json_schema) that the tool requires.
    """

    def __init__(self, name: str, description: str | None, parameters: dict[str, Any]):
        """
        Initialize a new tool.
        """
        self.name = name
        self.description = description
        self.parameters = parameters

    async def __call__(self, args: dict[str, Any]) -> Any:
        """
        Call the tool with the given arguments (should match the provided schema).

        The return result can be:
        - Pydantic model, which will be serialized to JSON and passed back to the model as text.
        - string, which will be passed back to the model as text.
        - ToolResponse, which allows for more structured content to be passed back to the model.
        - Anything else will be serialized using `json.dumps` and passed back to the model as text.
        """
        raise NotImplementedError()

name instance-attribute

name: str = name

The name of the tool.

description instance-attribute

description: str | None = description

An optional description of the tool.

parameters instance-attribute

parameters: dict[str, Any] = parameters

A dictionary of parameters (json_schema) that the tool requires.

__init__

__init__(
    name: str,
    description: str | None,
    parameters: dict[str, Any],
)

Initialize a new tool.

Source code in src/redpanda/agents/_tools.py
def __init__(self, name: str, description: str | None, parameters: dict[str, Any]):
    """
    Initialize a new tool.
    """
    self.name = name
    self.description = description
    self.parameters = parameters

__call__ async

__call__(args: dict[str, Any]) -> Any

Call the tool with the given arguments (should match the provided schema).

The return result can be: - Pydantic model, which will be serialized to JSON and passed back to the model as text. - string, which will be passed back to the model as text. - ToolResponse, which allows for more structured content to be passed back to the model. - Anything else will be serialized using json.dumps and passed back to the model as text.

Source code in src/redpanda/agents/_tools.py
async def __call__(self, args: dict[str, Any]) -> Any:
    """
    Call the tool with the given arguments (should match the provided schema).

    The return result can be:
    - Pydantic model, which will be serialized to JSON and passed back to the model as text.
    - string, which will be passed back to the model as text.
    - ToolResponse, which allows for more structured content to be passed back to the model.
    - Anything else will be serialized using `json.dumps` and passed back to the model as text.
    """
    raise NotImplementedError()