Creating Tools
Four ways to create tools for the BEAM agent, from simplest to most flexible.
Choose Your Approach
| Method | Best For | Complexity |
|---|---|---|
| Factory functions | Wrapping existing scripts/executables | Easiest |
| Decorators | Simple Python functions | Easy |
| Subclassing | Complex tools with setup/cleanup | Medium |
| Built-in tools | Pre-built tools shipped with agent | Just import |
Method 1: Factory Functions
Best for: Wrapping existing Bash, Python, PowerShell scripts, or executables.
No code changes to your existing scripts. Just tell BEAM how to call them.
Wrap a Bash Script
from beam_agent import bash_script
# Your script receives: --input <file> --output <file>
tool = bash_script(
name="file_hasher",
script_path="/opt/tools/hash_files.sh",
)
agent.register_tool("file_hasher", tool)
Wrap a Python Script
from beam_agent import python_script
tool = python_script(
name="analyzer",
script_path="/opt/tools/analyze.py",
interpreter="/opt/tools/venv/bin/python", # Custom virtualenv
)
Wrap a PowerShell Script
from beam_agent import powershell_script
tool = powershell_script(
name="audit_tool",
script_path="C:/Scripts/Audit-System.ps1",
)
Wrap an Executable
from beam_agent import executable
tool = executable(
name="scanner",
exe_path="/opt/bin/scanner",
args_template=["{input}", "{output}"], # Positional args
)
Custom Argument Formats
Scripts don't need to use --input/--output. Configure any format:
from beam_agent import BeamScriptTool, ScriptConfig
# Windows-style flags
tool = BeamScriptTool(
name="legacy_tool",
config=ScriptConfig(
command=["C:/Tools/scanner.exe"],
args_template=["/in:{input}", "/out:{output}", "/format:json"],
),
)
# Environment variables instead of arguments
tool = BeamScriptTool(
name="env_tool",
config=ScriptConfig(
command=["/opt/tools/processor"],
args_template=[], # No CLI args
env_mapping={
"INPUT_FILE": "{input}",
"OUTPUT_FILE": "{output}",
},
),
)
# Workflow parameters in arguments
tool = BeamScriptTool(
name="configurable_tool",
config=ScriptConfig(
command=["python3", "/opt/scanner.py"],
args_template=[
"--input", "{input}",
"--output", "{output}",
"--target", "{param.target_host}", # From workflow message
"--port", "{param.port}",
],
),
)
Available Placeholders
| Placeholder | Description |
|---|---|
{input} | First input file path (from input_paths) |
{output} | Output file path |
{param.name} | Parameter from workflow message |
{env.NAME} | Environment variable |
{exec_id} | Execution ID |
Method 2: Decorators
Best for: Simple Python functions that don't need setup/cleanup.
from beam_agent import beam_tool
from pathlib import Path
from typing import Dict
@beam_tool(name="port_scanner", timeout=600)
async def scan_ports(
input_paths: list,
output_path: Path,
parameters: Dict
) -> Dict:
"""Scan TCP ports on a target host."""
target = parameters.get('target')
ports = parameters.get('ports', [80, 443, 22])
open_ports = []
for port in ports:
if await check_port(target, port):
open_ports.append(port)
return {
"target": target,
"open_ports": open_ports
}
Tools are auto-registered when imported:
# In your agent startup
import my_tools # Tools are now registered
Tool Interface
Every decorated tool receives:
| Parameter | Type | Description |
|---|---|---|
input_paths | list | Input files from previous workflow nodes (can be empty) |
output_path | Path | Where to write results |
parameters | Dict | Configuration from workflow message |
Return a dict and it's automatically written as JSON to output_path.
Method 3: Subclassing
Best for: Tools that need initialization, cleanup, or maintain state.
BeamTool (Full Control)
from beam_agent import BeamTool
from beam_agent.core.base import ExecutionContext, ExecutionResult
import json
class DNSResolver(BeamTool):
def __init__(self):
super().__init__(name="dns_resolver", timeout=60)
async def setup(self, context: ExecutionContext):
"""Initialize resources before execution."""
self.resolver = await create_resolver()
async def execute(self, context: ExecutionContext) -> ExecutionResult:
"""Main execution logic."""
domains = context.parameters.get('domains', [])
results = {}
for domain in domains:
results[domain] = await self.resolver.query(domain)
with open(context.output_path, 'w') as f:
json.dump(results, f)
return ExecutionResult(
status="success",
exit_code=0,
output_path=context.output_path
)
async def cleanup(self, context: ExecutionContext):
"""Cleanup resources after execution."""
await self.resolver.close()
Register manually:
agent.register_tool("dns_resolver", DNSResolver())
BeamAPITool (HTTP Requests)
For tools that make HTTP API calls. Manages connection pooling automatically.
from beam_agent import BeamAPITool
from beam_agent.core.base import ExecutionContext, ExecutionResult
class VirusTotalChecker(BeamAPITool):
def __init__(self, api_key: str):
super().__init__(
name="virustotal_checker",
base_url="https://www.virustotal.com/api/v3",
timeout=30
)
self.api_key = api_key
async def execute(self, context: ExecutionContext) -> ExecutionResult:
file_hash = context.parameters.get('hash')
# self.session is automatically available (aiohttp)
async with self.session.get(
f"{self.base_url}/files/{file_hash}",
headers={"x-apikey": self.api_key}
) as resp:
data = await resp.json()
# ... process and return result
Method 4: Built-in Tools
Pre-built tools shipped with BEAM. Import and register.
IntegrationProxyTool
Proxies HTTP requests to internal services unreachable from the cloud.
from beam_agent.tools import IntegrationProxyTool
agent.register_tool("integration_proxy", IntegrationProxyTool())
Use case: Call internal APIs, on-premise services, or VPN-only endpoints from NINA workflows. Credentials can be encrypted end-to-end when encryption_key is configured.
When body_only=True (default), only the API response body is returned as output — matching the behaviour of NINA's integrations service. Set body_only=False to include status code, headers, and other metadata.
Complete Agent Example
Putting it all together:
#!/usr/bin/env python3
import asyncio
import os
from beam_agent import BeamAgent, BeamConfig, beam_tool, bash_script
from beam_agent.tools import IntegrationProxyTool
from pathlib import Path
# Method 2: Decorator
@beam_tool(name="ip_lookup", timeout=60)
async def lookup_ip(input_paths, output_path, parameters):
ip = parameters.get('ip')
# ... lookup logic
return {"ip": ip, "info": "..."}
# Method 3: Subclass (imported from another file)
from my_tools.dns_resolver import DNSResolver
async def main():
config = BeamConfig.from_yaml(Path("config.yaml"))
agent = BeamAgent(config)
# Method 4: Built-in tool
agent.register_tool("integration_proxy", IntegrationProxyTool())
# Method 3: Subclassed tool
agent.register_tool("dns_resolver", DNSResolver())
# Method 1: Factory function
if Path("/opt/tools/scanner.sh").exists():
agent.register_tool("scanner", bash_script(
name="scanner",
script_path="/opt/tools/scanner.sh",
))
# Method 2: Decorator tools are auto-registered on import
await agent.start()
if __name__ == "__main__":
asyncio.run(main())
Input and Output
Reading Input
# From previous workflow nodes (files — any format, not just JSON)
if input_paths:
content = input_paths[0].read_text()
# Or iterate over all input files
for path in input_paths:
process(path.read_text())
# From workflow message (parameters)
target = parameters.get('target')
Writing Output
# Option 1: Return dict (auto-written as JSON)
return {"status": "success", "results": [...]}
# Option 2: Write directly
with open(output_path, 'w') as f:
json.dump(results, f)
return None
Error Handling
Return Errors (Workflow Continues)
@beam_tool()
async def safe_tool(input_paths, output_path, parameters):
try:
result = await risky_operation()
return {"status": "success", "data": result}
except Exception as e:
return {"status": "error", "message": str(e)}
Raise Exceptions (Workflow Stops)
@beam_tool()
async def strict_tool(input_paths, output_path, parameters):
if 'target' not in parameters:
raise ValueError("Parameter 'target' is required")
Next Steps
- Tool Interface Reference - Detailed interface docs
- Example Tools - Working examples
- Secrets Management - Handle API keys securely