Skip to main content

Example Tools

Working examples for each tool creation method.


Factory Functions (Wrapping Scripts)

Bash Script

from beam_agent import bash_script

tool = bash_script("file_hasher", "/opt/tools/hash_files.sh")
agent.register_tool("file_hasher", tool)

Executable with Custom Args

from beam_agent import executable

tool = executable(
name="scanner",
exe_path="/opt/bin/scanner",
args_template=["{input}", "{output}"],
)
agent.register_tool("scanner", tool)

PowerShell Script

from beam_agent import powershell_script

tool = powershell_script(
name="system_audit",
script_path="C:/Scripts/Audit-System.ps1",
args_template=["-InputPath", "{input}", "-OutputPath", "{output}", "-Verbose"],
timeout=600,
)
agent.register_tool("system_audit", tool)

Windows Executable

from beam_agent import executable

tool = executable(
name="vulnerability_scanner",
exe_path=r"C:\Tools\scanner.exe",
args_template=["/config:{input}", "/report:{output}", "/format:json"],
)
agent.register_tool("vulnerability_scanner", tool)

Environment Variables Instead of Args

from beam_agent import BeamScriptTool, ScriptConfig

tool = BeamScriptTool(
name="secure_scanner",
config=ScriptConfig(
command=["/opt/tools/scanner"],
env_mapping={
"INPUT_FILE": "{input}",
"OUTPUT_FILE": "{output}",
"API_KEY": "{param.api_key}",
},
),
)

Decorator Tools

Port Scanner

import asyncio
from beam_agent import beam_tool
from pathlib import Path
from typing import Dict

@beam_tool(name="port_scanner", timeout=600)
async def scan_ports(input_paths: list, output_path: Path, parameters: Dict) -> Dict:
target = parameters.get('target')
ports = parameters.get('ports', [80, 443, 22, 3306])
timeout = parameters.get('timeout', 2)

if not target and input_paths:
target = input_paths[0].read_text().strip()

if not target:
raise ValueError("Target required")

async def check(host, port):
try:
_, w = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
w.close()
await w.wait_closed()
return True
except:
return False

results = await asyncio.gather(*[check(target, p) for p in ports])
open_ports = [p for p, ok in zip(ports, results) if ok]

return {"target": target, "open_ports": sorted(open_ports)}

Data Filter (Process Previous Node Output)

@beam_tool(name="filter_high_severity")
async def filter_results(input_paths, output_path, parameters):
if not input_paths:
raise ValueError("Input file required")

import json
results = json.loads(input_paths[0].read_text())
min_severity = parameters.get('min_severity', 7)

filtered = [r for r in results if r.get('severity', 0) >= min_severity]

return {"filtered_count": len(filtered), "results": filtered}

Claude Code Prompt

Run a Claude Code prompt from the input file:

@beam_tool(name="claude-prompt", timeout=60)
async def claude_prompt(input_paths: list, output_path: Path, parameters: Dict) -> Dict:
import json

# Read prompt from input file or parameters
prompt = None
if input_paths and input_paths[0].exists():
prompt = input_paths[0].read_text().strip()
if not prompt:
prompt = parameters.get("prompt", "Summarize the input")

try:
proc = await asyncio.create_subprocess_exec(
"claude", "-p", prompt, "--output-format", "json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=45)

try:
return json.loads(stdout.decode("utf-8"))
except json.JSONDecodeError:
return {"response": stdout.decode("utf-8", errors="replace")}

except FileNotFoundError:
return {"error": "claude CLI not found"}
except asyncio.TimeoutError:
return {"error": "claude CLI timed out"}

Nmap Scanner (Parse Targets from Input)

@beam_tool(name="nmap-scan", timeout=300)
async def nmap_scan(input_paths: list, output_path: Path, parameters: Dict) -> Dict:
import xml.etree.ElementTree as ET

# Read targets from input file (one per line)
targets = []
if input_paths:
targets = [l.strip() for l in input_paths[0].read_text().splitlines() if l.strip()]
targets.extend(parameters.get("targets", []))

if not targets:
return {"error": "No targets provided"}

flags = parameters.get("flags", "-sV -T4 --open")
cmd = ["nmap"] + flags.split() + ["-oX", "-"] + targets

proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=280)

# Parse XML — only open ports
hosts = []
root = ET.fromstring(stdout.decode("utf-8", errors="replace"))
for host_el in root.findall("host"):
addr = host_el.find("address")
ports = []
for port_el in host_el.findall("ports/port"):
state = port_el.find("state")
if state is not None and state.get("state") == "open":
svc = port_el.find("service")
info = {"port": int(port_el.get("portid")), "protocol": port_el.get("protocol")}
if svc is not None:
info["service"] = svc.get("name", "")
if svc.get("product"):
info["product"] = svc.get("product")
ports.append(info)
hosts.append({
"ip": addr.get("addr") if addr is not None else "unknown",
"ports": ports,
})

return {"targets": targets, "hosts": hosts, "total_open_ports": sum(len(h["ports"]) for h in hosts)}

Subclassed Tools

DNS Resolver (BeamTool)

from beam_agent import BeamTool
from beam_agent.core.base import ExecutionContext, ExecutionResult
import json, aiodns

class DNSResolver(BeamTool):
def __init__(self):
super().__init__(name="dns_resolver", timeout=60)

async def execute(self, context: ExecutionContext) -> ExecutionResult:
domains = context.parameters.get('domains', [])
if not domains and context.input_paths:
domains = [l.strip() for l in context.input_paths[0].read_text().splitlines() if l.strip()]

if not domains:
return ExecutionResult(status="failure", exit_code=1, error="No domains provided")

resolver = aiodns.DNSResolver()
results = {}
for domain in domains:
try:
records = await resolver.query(domain, 'A')
results[domain] = [r.host for r in records]
except Exception as e:
results[domain] = {"error": str(e)}

with open(context.output_path, 'w') as f:
json.dump(results, f)

return ExecutionResult(status="success", exit_code=0, output_path=context.output_path)

VirusTotal Checker (BeamAPITool)

from beam_agent import BeamAPITool
from beam_agent.core.base import ExecutionContext, ExecutionResult
import json

class VirusTotalChecker(BeamAPITool):
def __init__(self, api_key: str):
super().__init__(name="virustotal_checker", base_url="https://www.virustotal.com/api/v3", timeout=30)
self.api_key = api_key

async def execute(self, context: ExecutionContext) -> ExecutionResult:
file_hash = context.parameters.get('hash')
if not file_hash:
return ExecutionResult(status="failure", exit_code=1, error="Hash required")

async with self.session.get(
f"{self.base_url}/files/{file_hash}",
headers={"x-apikey": self.api_key}
) as resp:
if resp.status == 404:
result = {"hash": file_hash, "status": "not_found"}
else:
data = await resp.json()
stats = data['data']['attributes']['last_analysis_stats']
result = {"hash": file_hash, "malicious": stats['malicious'], "harmless": stats['harmless']}

with open(context.output_path, 'w') as f:
json.dump(result, f)

return ExecutionResult(status="success", exit_code=0, output_path=context.output_path)

Registration:

import os
agent.register_tool("virustotal_checker", VirusTotalChecker(api_key=os.environ['VIRUSTOTAL_API_KEY']))

Built-in: Integration Proxy

from beam_agent.tools import IntegrationProxyTool
agent.register_tool("integration_proxy", IntegrationProxyTool())

Encryption is auto-configured from encryption_key in your config. Returns only the API response body by default.


Complete Agent

All methods together:

#!/usr/bin/env python3
import asyncio
import os
from pathlib import Path

from beam_agent import BeamAgent, BeamConfig, beam_tool, bash_script
from beam_agent.tools import IntegrationProxyTool

@beam_tool(name="ip_lookup", timeout=60)
async def lookup_ip(input_paths, output_path, parameters):
import socket
ip = parameters.get('ip')
try:
return {"ip": ip, "hostname": socket.gethostbyaddr(ip)[0]}
except socket.herror:
return {"ip": ip, "hostname": None}

async def main():
config = BeamConfig.from_yaml(Path("config.yaml"))
agent = BeamAgent(config)

agent.register_tool("integration_proxy", IntegrationProxyTool())

if Path("/opt/tools/scanner.sh").exists():
agent.register_tool("scanner", bash_script("scanner", "/opt/tools/scanner.sh"))

await agent.start()

if __name__ == "__main__":
asyncio.run(main())

Next Steps