Skip to main content

Creating Tools

Four ways to create tools for the BEAM agent, from simplest to most flexible.

Choose Your Approach

MethodBest ForComplexity
Factory functionsWrapping existing scripts/executablesEasiest
DecoratorsSimple Python functionsEasy
SubclassingComplex tools with setup/cleanupMedium
Built-in toolsPre-built tools shipped with agentJust import

Method 1: Factory Functions

Best for: Wrapping existing Bash, Python, PowerShell scripts, or executables.

No code changes to your existing scripts. Just tell BEAM how to call them.

Wrap a Bash Script

from beam_agent import bash_script

# Your script receives: --input <file> --output <file>
tool = bash_script(
name="file_hasher",
script_path="/opt/tools/hash_files.sh",
)

agent.register_tool("file_hasher", tool)

Wrap a Python Script

from beam_agent import python_script

tool = python_script(
name="analyzer",
script_path="/opt/tools/analyze.py",
interpreter="/opt/tools/venv/bin/python", # Custom virtualenv
)

Wrap a PowerShell Script

from beam_agent import powershell_script

tool = powershell_script(
name="audit_tool",
script_path="C:/Scripts/Audit-System.ps1",
)

Wrap an Executable

from beam_agent import executable

tool = executable(
name="scanner",
exe_path="/opt/bin/scanner",
args_template=["{input}", "{output}"], # Positional args
)

Custom Argument Formats

Scripts don't need to use --input/--output. Configure any format:

from beam_agent import BeamScriptTool, ScriptConfig

# Windows-style flags
tool = BeamScriptTool(
name="legacy_tool",
config=ScriptConfig(
command=["C:/Tools/scanner.exe"],
args_template=["/in:{input}", "/out:{output}", "/format:json"],
),
)

# Environment variables instead of arguments
tool = BeamScriptTool(
name="env_tool",
config=ScriptConfig(
command=["/opt/tools/processor"],
args_template=[], # No CLI args
env_mapping={
"INPUT_FILE": "{input}",
"OUTPUT_FILE": "{output}",
},
),
)

# Workflow parameters in arguments
tool = BeamScriptTool(
name="configurable_tool",
config=ScriptConfig(
command=["python3", "/opt/scanner.py"],
args_template=[
"--input", "{input}",
"--output", "{output}",
"--target", "{param.target_host}", # From workflow message
"--port", "{param.port}",
],
),
)

Available Placeholders

PlaceholderDescription
{input}First input file path (from input_paths)
{output}Output file path
{param.name}Parameter from workflow message
{env.NAME}Environment variable
{exec_id}Execution ID

Method 2: Decorators

Best for: Simple Python functions that don't need setup/cleanup.

from beam_agent import beam_tool
from pathlib import Path
from typing import Dict

@beam_tool(name="port_scanner", timeout=600)
async def scan_ports(
input_paths: list,
output_path: Path,
parameters: Dict
) -> Dict:
"""Scan TCP ports on a target host."""

target = parameters.get('target')
ports = parameters.get('ports', [80, 443, 22])

open_ports = []
for port in ports:
if await check_port(target, port):
open_ports.append(port)

return {
"target": target,
"open_ports": open_ports
}

Tools are auto-registered when imported:

# In your agent startup
import my_tools # Tools are now registered

Tool Interface

Every decorated tool receives:

ParameterTypeDescription
input_pathslistInput files from previous workflow nodes (can be empty)
output_pathPathWhere to write results
parametersDictConfiguration from workflow message

Return a dict and it's automatically written as JSON to output_path.


Method 3: Subclassing

Best for: Tools that need initialization, cleanup, or maintain state.

BeamTool (Full Control)

from beam_agent import BeamTool
from beam_agent.core.base import ExecutionContext, ExecutionResult
import json

class DNSResolver(BeamTool):
def __init__(self):
super().__init__(name="dns_resolver", timeout=60)

async def setup(self, context: ExecutionContext):
"""Initialize resources before execution."""
self.resolver = await create_resolver()

async def execute(self, context: ExecutionContext) -> ExecutionResult:
"""Main execution logic."""
domains = context.parameters.get('domains', [])

results = {}
for domain in domains:
results[domain] = await self.resolver.query(domain)

with open(context.output_path, 'w') as f:
json.dump(results, f)

return ExecutionResult(
status="success",
exit_code=0,
output_path=context.output_path
)

async def cleanup(self, context: ExecutionContext):
"""Cleanup resources after execution."""
await self.resolver.close()

Register manually:

agent.register_tool("dns_resolver", DNSResolver())

BeamAPITool (HTTP Requests)

For tools that make HTTP API calls. Manages connection pooling automatically.

from beam_agent import BeamAPITool
from beam_agent.core.base import ExecutionContext, ExecutionResult

class VirusTotalChecker(BeamAPITool):
def __init__(self, api_key: str):
super().__init__(
name="virustotal_checker",
base_url="https://www.virustotal.com/api/v3",
timeout=30
)
self.api_key = api_key

async def execute(self, context: ExecutionContext) -> ExecutionResult:
file_hash = context.parameters.get('hash')

# self.session is automatically available (aiohttp)
async with self.session.get(
f"{self.base_url}/files/{file_hash}",
headers={"x-apikey": self.api_key}
) as resp:
data = await resp.json()

# ... process and return result

Method 4: Built-in Tools

Pre-built tools shipped with BEAM. Import and register.

IntegrationProxyTool

Proxies HTTP requests to internal services unreachable from the cloud.

from beam_agent.tools import IntegrationProxyTool

agent.register_tool("integration_proxy", IntegrationProxyTool())

Use case: Call internal APIs, on-premise services, or VPN-only endpoints from NINA workflows. Credentials can be encrypted end-to-end when encryption_key is configured.

When body_only=True (default), only the API response body is returned as output — matching the behaviour of NINA's integrations service. Set body_only=False to include status code, headers, and other metadata.


Complete Agent Example

Putting it all together:

#!/usr/bin/env python3
import asyncio
import os
from beam_agent import BeamAgent, BeamConfig, beam_tool, bash_script
from beam_agent.tools import IntegrationProxyTool
from pathlib import Path

# Method 2: Decorator
@beam_tool(name="ip_lookup", timeout=60)
async def lookup_ip(input_paths, output_path, parameters):
ip = parameters.get('ip')
# ... lookup logic
return {"ip": ip, "info": "..."}

# Method 3: Subclass (imported from another file)
from my_tools.dns_resolver import DNSResolver

async def main():
config = BeamConfig.from_yaml(Path("config.yaml"))
agent = BeamAgent(config)

# Method 4: Built-in tool
agent.register_tool("integration_proxy", IntegrationProxyTool())

# Method 3: Subclassed tool
agent.register_tool("dns_resolver", DNSResolver())

# Method 1: Factory function
if Path("/opt/tools/scanner.sh").exists():
agent.register_tool("scanner", bash_script(
name="scanner",
script_path="/opt/tools/scanner.sh",
))

# Method 2: Decorator tools are auto-registered on import

await agent.start()

if __name__ == "__main__":
asyncio.run(main())

Input and Output

Reading Input

# From previous workflow nodes (files — any format, not just JSON)
if input_paths:
content = input_paths[0].read_text()

# Or iterate over all input files
for path in input_paths:
process(path.read_text())

# From workflow message (parameters)
target = parameters.get('target')

Writing Output

# Option 1: Return dict (auto-written as JSON)
return {"status": "success", "results": [...]}

# Option 2: Write directly
with open(output_path, 'w') as f:
json.dump(results, f)
return None

Error Handling

Return Errors (Workflow Continues)

@beam_tool()
async def safe_tool(input_paths, output_path, parameters):
try:
result = await risky_operation()
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "message": str(e)}

Raise Exceptions (Workflow Stops)

@beam_tool()
async def strict_tool(input_paths, output_path, parameters):
if 'target' not in parameters:
raise ValueError("Parameter 'target' is required")

Next Steps