Skip to content

Instantly share code, notes, and snippets.

@bbartling
Last active December 13, 2025 19:00
Show Gist options
  • Select an option

  • Save bbartling/fe0dbae5fa4f934ad992f3d999a2153e to your computer and use it in GitHub Desktop.

Select an option

Save bbartling/fe0dbae5fa4f934ad992f3d999a2153e to your computer and use it in GitHub Desktop.
BACnet Auto-Scan to CSV with bacpypes3 and enhanced tester py with the bacpypes console

BACnet Diagnostic Tools

Two scripts for discovering, auditing, and manually testing BACnet networks.

1. Install Requirements

Run this once to get the required libraries:

pip install bacpypes3 ifaddr

2. Auto-Scan (bacnet_autoscan.py)

Scans a range of device IDs and saves a CSV inventory for each device found. Captures names, values, units, and priority arrays.

Run the scan:

python bacnet_autoscan.py --low-instance 1 --high-instance 3456999 --output-dir autoscan_csv

Output:

  • Check the autoscan_csv/ folder.
  • You will see files like audit_3456789.csv.

3. Interactive Shell (tester.py)

A command-line shell for manually reading, writing, and overriding points. Useful for testing if a point is commandable.

Start the shell:

python tester.py

(If you need to bind to a specific IP, add --address 192.168.1.X/24)

Shell Command Cheat Sheet

Once inside the shell (>), type these commands:

Discovery

> whois                     # Find all devices
> whois 1000 2000           # Find devices in range 1000-2000
> objects 192.168.1.20 123  # List all points on device 123

Reading Data

> read 192.168.1.20 analog-input,1 present-value
> read 192.168.1.20 binary-value,5 description

Checking Priorities See who is controlling a point before you override it.

> priority 192.168.1.20 binary-output,1

Overriding / Writing Format: write <IP> <Object> <Property> <Value> <Priority>

> write 192.168.1.20 binary-output,1 present-value active 8    # Turn ON (Priority 8)
> write 192.168.1.20 binary-output,1 present-value inactive 8  # Turn OFF (Priority 8)
> write 192.168.1.20 binary-output,1 present-value null 8      # Release Override (Auto)

Exit

> exit
#!/usr/bin/env python
"""
BACnet Auto-Scan
=======================================================
Scans a range of devices and builds a "Wide" CSV inventory.
Each row is one BACnet Object, containing columns for common
properties useful for network documentation and troubleshooting.
Features:
- Auto-discovery (Who-Is) with robust fallbacks.
- Falls back to index-by-index reading if bulk object-list read fails.
- Captures Present-Value, Description, Reliability, and Status-Flags.
- Compacts Priority Arrays into a single readable column.
Usage:
python bacnet-autoscan-network.py --low-instance 100 --high-instance 3456799 --output-dir ~/audit_reports
"""
import asyncio
import csv
import logging
import os
from typing import Any, List, Optional, Tuple, Dict
import bacpypes3
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.app import Application
from bacpypes3.apdu import AbortPDU, AbortReason, ErrorRejectAbortNack
from bacpypes3.constructeddata import AnyAtomic
from bacpypes3.pdu import Address
from bacpypes3.primitivedata import ObjectIdentifier, BitString
from bacpypes3.vendor import get_vendor_info
# Global application instance
app: Optional[Application] = None
log = logging.getLogger(__name__)
async def get_device_object_list(
device_address: Address,
device_identifier: ObjectIdentifier,
) -> List[ObjectIdentifier]:
"""
Reads the object-list from a device.
INCLUDES FALLBACK: Handles standard arrays and index-by-index reading.
"""
assert app is not None
object_list: List[ObjectIdentifier] = []
log.info(" - Reading object-list from %s...", device_identifier)
# 1. Try reading entire array at once (Fastest)
try:
object_list = await app.read_property(
device_address, device_identifier, "object-list"
)
return object_list
except (AbortPDU, ErrorRejectAbortNack):
pass
# 2. FALLBACK MECHANISM
try:
list_len = await app.read_property(
device_address, device_identifier, "object-list", array_index=0
)
log.info(" * Fallback triggered: Reading %s objects one-by-one...", list_len)
for i in range(list_len):
obj_id = await app.read_property(
device_address, device_identifier, "object-list", array_index=i + 1
)
object_list.append(obj_id)
if i % 10 == 0:
print(".", end="", flush=True)
print("")
return object_list
except Exception as e:
log.warning(" ! Failed to read object-list even with fallback: %s", e)
return []
async def read_prop_safe(
dev_addr: Address,
obj_id: ObjectIdentifier,
prop_id: str
) -> str:
"""
Reads a single property safely. Returns string representation or empty string if failed.
"""
assert app is not None
try:
val = await app.read_property(dev_addr, obj_id, prop_id)
if isinstance(val, AnyAtomic):
val = val.get_value()
if isinstance(val, BitString):
return str(val)
if hasattr(val, "attr"):
return str(val.attr)
return str(val)
except (ErrorRejectAbortNack, AbortPDU, AttributeError, ValueError):
# Property doesn't exist or isn't supported
return ""
except Exception as e:
log.debug(f"Error reading {prop_id} on {obj_id}: {e}")
return ""
async def get_priority_array_compact(
dev_addr: Address,
obj_id: ObjectIdentifier
) -> str:
"""
Reads priority array and returns a compact JSON-like string of ONLY active slots.
Example: "{8: 72.0, 16: 70.0}"
"""
assert app is not None
try:
pa = await app.read_property(dev_addr, obj_id, "priority-array")
except (ErrorRejectAbortNack, AbortPDU):
# Specific catch for "Unknown Property" (Error Class: Property, Code: Unknown)
# This occurs on Devices, Binary Inputs, etc.
return ""
except Exception as e:
# Catch-all to prevent script crash
log.debug(f"Could not read priority-array: {e}")
return ""
if not pa:
return ""
active_slots = {}
for idx, item in enumerate(pa):
if item is not None:
val_type = getattr(item, "_choice", None)
val = getattr(item, val_type, None) if val_type else None
if isinstance(val, AnyAtomic):
val = val.get_value()
if val is not None:
active_slots[idx + 1] = val
if not active_slots:
return ""
return str(active_slots)
async def scan_range(low: int, high: int, output_dir: Optional[str]):
assert app is not None
if output_dir:
os.makedirs(output_dir, exist_ok=True)
log.info(f"Broadcasting Who-Is {low} - {high}...")
i_ams = await app.who_is(low, high)
if not i_ams:
log.info("No devices found.")
return
headers = [
"DeviceID", "IP", "ObjType", "ObjInst",
"Name", "Description",
"PresentValue", "Units/StateText",
"Reliability", "OutOfService", "StatusFlags",
"PriorityArray(Active)"
]
for i_am in i_ams:
dev_id_obj: ObjectIdentifier = i_am.iAmDeviceIdentifier
dev_addr: Address = i_am.pduSource
instance = dev_id_obj[1]
if not (low <= instance <= high):
continue
log.info(f"Scanning Device {instance} @ {dev_addr}...")
obj_list = await get_device_object_list(dev_addr, dev_id_obj)
if not obj_list:
continue
rows = []
for obj_id in obj_list:
obj_type, obj_inst = obj_id
name = await read_prop_safe(dev_addr, obj_id, "object-name")
desc = await read_prop_safe(dev_addr, obj_id, "description")
pv = await read_prop_safe(dev_addr, obj_id, "present-value")
units = await read_prop_safe(dev_addr, obj_id, "units")
if not units:
active_txt = await read_prop_safe(dev_addr, obj_id, "active-text")
if active_txt:
units = f"Active: {active_txt}"
rel = await read_prop_safe(dev_addr, obj_id, "reliability")
oos = await read_prop_safe(dev_addr, obj_id, "out-of-service")
flags = await read_prop_safe(dev_addr, obj_id, "status-flags")
pa_str = await get_priority_array_compact(dev_addr, obj_id)
row = [
instance, str(dev_addr), str(obj_type), obj_inst,
name, desc,
pv, units,
rel, oos, flags,
pa_str
]
rows.append(row)
print(f" > Found {obj_type}:{obj_inst} | {name} | {pv} {units}")
if output_dir:
fname = os.path.join(output_dir, f"audit_{instance}.csv")
with open(fname, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(headers)
writer.writerows(rows)
log.info(f"Saved audit report to {fname}")
async def main():
global app
parser = SimpleArgumentParser()
parser.add_argument("--low-instance", type=int, required=True)
parser.add_argument("--high-instance", type=int, required=True)
parser.add_argument("--output-dir", type=str, default="bacnet_audit")
args = parser.parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
app = Application.from_args(args)
try:
await scan_range(args.low_instance, args.high_instance, args.output_dir)
finally:
app.close()
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import re
from typing import List, Optional, Tuple
from bacpypes3.pdu import Address
from bacpypes3.comm import bind
from bacpypes3.debugging import bacpypes_debugging
from bacpypes3.argparse import SimpleArgumentParser
from bacpypes3.app import Application
from bacpypes3.console import Console
from bacpypes3.cmd import Cmd
from bacpypes3.primitivedata import Null, ObjectIdentifier
from bacpypes3.npdu import IAmRouterToNetwork
from bacpypes3.constructeddata import AnyAtomic
from bacpypes3.apdu import (
ErrorRejectAbortNack,
PropertyReference,
PropertyIdentifier,
ErrorType,
AbortPDU,
AbortReason
)
from bacpypes3.vendor import get_vendor_info
from bacpypes3.netservice import NetworkAdapter
# 'property[index]' matching
property_index_re = re.compile(r"^([A-Za-z-]+)(?:\[([0-9]+)\])?$")
# globals
app: Optional[Application] = None
@bacpypes_debugging
class InteractiveCmd(Cmd):
"""
Interactive BACnet Console
"""
async def do_whois(
self, low_limit: Optional[int] = None, high_limit: Optional[int] = None
) -> None:
"""
Send a Who-Is request and print responses.
usage: whois [ low_limit high_limit ]
"""
print(f"Broadcasting Who-Is {low_limit if low_limit else ''} {high_limit if high_limit else ''}...")
i_ams = await app.who_is(low_limit, high_limit)
if not i_ams:
print("No response(s) received")
else:
for i_am in i_ams:
dev_addr: Address = i_am.pduSource
dev_id: ObjectIdentifier = i_am.iAmDeviceIdentifier
vendor_id = i_am.vendorID
print(f"Device {dev_id} @ {dev_addr} (Vendor: {vendor_id})")
async def do_objects(self, address: Address, instance_id: int) -> None:
"""
List all objects in a specific device.
Includes fallback logic if the device does not support bulk object-list reads.
usage: objects <ip_address> <device_instance_id>
example: objects 192.168.1.10 1001
"""
device_identifier = ObjectIdentifier(f"device,{instance_id}")
print(f"Reading object-list from {device_identifier} @ {address}...")
object_list = []
# 1. Try reading entire array at once (Fastest)
try:
object_list = await app.read_property(
address, device_identifier, "object-list"
)
except (AbortPDU, ErrorRejectAbortNack) as e:
print(f"Standard read failed ({e}), attempting fallback method...")
# 2. FALLBACK: Read Length, then read index-by-index
try:
list_len = await app.read_property(
address, device_identifier, "object-list", array_index=0
)
print(f"Device contains {list_len} objects. Reading one by one...")
for i in range(list_len):
obj_id = await app.read_property(
address, device_identifier, "object-list", array_index=i + 1
)
object_list.append(obj_id)
if i % 10 == 0:
print(".", end="", flush=True)
print() # Newline
except Exception as err:
print(f"Failed to read object list: {err}")
return
print(f"Found {len(object_list)} objects:")
for obj in object_list:
# Optional: Try to get the name for a nicer display
try:
name = await app.read_property(address, obj, "object-name")
except:
name = "???"
print(f" - {obj} : {name}")
async def do_read(
self,
address: Address,
object_identifier: ObjectIdentifier,
property_identifier: str,
) -> None:
"""
Read a single property.
usage: read <address> <objid> <prop>
example: read 192.168.1.10 analog-value,1 present-value
"""
# Split the property identifier and its index
property_index_match = property_index_re.match(property_identifier)
if not property_index_match:
print("Property specification incorrect")
return
prop_id, array_index = property_index_match.groups()
if array_index is not None:
array_index = int(array_index)
print(f"Reading {object_identifier} {property_identifier} from {address}...")
try:
value = await app.read_property(
address, object_identifier, prop_id, array_index
)
if isinstance(value, AnyAtomic):
value = value.get_value()
print(f" = {value}")
except ErrorRejectAbortNack as err:
print(f" ! Error: {err}")
async def do_write(
self,
address: Address,
object_identifier: ObjectIdentifier,
property_identifier: str,
value: str,
priority: int = -1,
) -> None:
"""
Write a property value.
usage: write <address> <objid> <prop> <value> [priority]
example: write 192.168.1.10 analog-value,1 present-value 50.0 8
"""
# Parse property index
property_index_match = property_index_re.match(property_identifier)
if not property_index_match:
print("Property specification incorrect")
return
prop_id, array_index = property_index_match.groups()
if array_index is not None:
array_index = int(array_index)
# Handle 'null' for releasing overrides
if value.lower() == "null":
if priority == -1:
print("Error: 'null' can only be used with a specific priority level.")
return
value = Null(())
try:
print(f"Writing to {object_identifier}...")
await app.write_property(
address,
object_identifier,
prop_id,
value,
array_index,
priority,
)
print(" Write successful (Ack received).")
except ErrorRejectAbortNack as err:
print(f" ! Write failed: {err}")
async def do_priority(
self,
address: Address,
object_identifier: ObjectIdentifier,
) -> None:
"""
Display the Priority Array of an object.
usage: priority <address> <objid>
"""
try:
response = await app.read_property(
address, object_identifier, "priority-array"
)
if not response:
print("Priority array is empty or None.")
return
print(f"Priority Array for {object_identifier}:")
has_entries = False
for index, priority_value in enumerate(response):
val_type = priority_value._choice
val = getattr(priority_value, val_type, None)
# Only print slots that are NOT null
if val_type != "null":
has_entries = True
if isinstance(val, AnyAtomic):
val = val.get_value()
print(f" [{index + 1}] : {val} ({val_type})")
if not has_entries:
print(" (All slots are NULL/Relinquished)")
except ErrorRejectAbortNack as err:
print(f"Error reading priority-array: {err}")
async def do_rpm(self, address: Address, *args: str) -> None:
"""
Read Property Multiple (Advanced Debugging).
usage: rpm <address> ( <objid> ( <prop[indx]> )... )...
"""
args_list = list(args)
# Get device info for correct datatype parsing
device_info = await app.device_info_cache.get_device_info(address)
vendor_info = get_vendor_info(
device_info.vendor_identifier if device_info else 0
)
parameter_list = []
while args_list:
obj_id = vendor_info.object_identifier(args_list.pop(0))
obj_class = vendor_info.get_object_class(obj_id[0])
if not obj_class:
print(f"Unknown object type: {obj_id}")
return
parameter_list.append(obj_id)
property_reference_list = []
while args_list:
prop_ref = PropertyReference(args_list.pop(0), vendor_info=vendor_info)
property_reference_list.append(prop_ref)
if args_list and ((":" in args_list[0]) or ("," in args_list[0])):
break
parameter_list.append(property_reference_list)
if not parameter_list:
print("Object identifier expected")
return
try:
response = await app.read_property_multiple(address, parameter_list)
for (obj_id, prop_id, arr_index, prop_value) in response:
print(f"{obj_id} {prop_id}{f'[{arr_index}]' if arr_index is not None else ''} = {prop_value}")
if isinstance(prop_value, ErrorType):
print(f" Error: {prop_value}")
except ErrorRejectAbortNack as err:
print(f"RPM Failed: {err}")
async def do_whohas(self, *args: str) -> None:
"""
Find devices containing a specific object ID or Name.
usage: whohas [ low_limit high_limit ] [ objid ] [ objname ]
"""
args_list = list(args)
low_limit = int(args_list.pop(0)) if args_list and args_list[0].isdigit() else None
high_limit = int(args_list.pop(0)) if args_list and args_list[0].isdigit() else None
obj_id = None
obj_name = None
if args_list:
try:
obj_id = ObjectIdentifier(args_list[0])
args_list.pop(0)
except ValueError:
pass
if args_list:
obj_name = args_list[0]
if obj_id is None and obj_name is None:
print("Usage: whohas [limits] <objid> OR <objname>")
return
print(f"Searching for {obj_id if obj_id else ''} {obj_name if obj_name else ''}...")
i_haves = await app.who_has(low_limit, high_limit, obj_id, obj_name)
if not i_haves:
print("No response(s)")
else:
for i_have in i_haves:
print(f"Device {i_have.deviceIdentifier} @ {i_have.pduSource} has {i_have.objectIdentifier} '{i_have.objectName}'")
async def do_router(self, address: Optional[Address] = None, network: Optional[int] = None) -> None:
"""
Discover BACnet routers.
usage: router [address] [network]
"""
print(f"Sending Who-Is-Router-To-Network...")
if not app.nse:
print("Network Service Element not enabled.")
return
result = await app.nse.who_is_router_to_network(destination=address, network=network)
if not result:
print("No routers found.")
return
for adapter, i_am_router in result:
# Logic to display router info
print(f"Router @ {i_am_router.pduSource} serves networks: {i_am_router.iartnNetworkList}")
async def main() -> None:
global app
parser = SimpleArgumentParser()
args = parser.parse_args()
console = Console()
cmd = InteractiveCmd()
bind(console, cmd)
app = Application.from_args(args)
print("\n--- Interactive BACnet Shell ---")
print("Type 'help' for commands (whois, read, write, objects, priority, etc.)")
print("--------------------------------\n")
try:
await console.fini.wait()
except KeyboardInterrupt:
pass
finally:
if app:
app.close()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment