#!/usr/bin/env python3 # /// script # requires-python = ">=3.13" # dependencies = [ # "meshcore", # "prometheus-client", # "pyyaml", # ] # /// import asyncio import json import yaml import time from meshcore import MeshCore, EventType, packets from prometheus_client import CollectorRegistry, Gauge, push_to_gateway def parse_public_key_from_contact_data(contact_data): public_key_length = 64 packet_type_length = len(str(packets.PacketType.CONTACT_URI.value)) packet_path_specifier_length = 2 if contact_data and contact_data.startswith(str(packets.PacketType.CONTACT_URI.value)): path_length = int(contact_data[packet_type_length:packet_type_length+packet_path_specifier_length]) offset = packet_type_length + packet_path_specifier_length + path_length * 2 return contact_data[offset:offset+public_key_length] return False async def setup(meshcore, config): await meshcore.commands.set_time(int(time.time())) result = await meshcore.commands.set_radio(config["radio_settings"]["freq"], config["radio_settings"]["bw"], config["radio_settings"]["sf"], config["radio_settings"]["cr"]) if result.type is EventType.ERROR: print("Failed to setup radio") exit(1) await meshcore.commands.set_manual_add_contacts(False) for repeater in config["repeaters"]: public_key = parse_public_key_from_contact_data(repeater["contact_data"]) if not public_key: print("Failed to parse public key from contact data: %s" % repeater["contact_data"]) continue repeater["public_key"] = public_key try: result = await meshcore.commands.import_contact(bytes.fromhex(repeater["contact_data"])) if result.type is EventType.ERROR: raise Exception("Failed to import contact") except: print("Failed add contact from contact data: %s\nAborting." % repeater["contact_data"]) exit(1) await meshcore.commands.set_manual_add_contacts(True) await meshcore.ensure_contacts() async def main(): with open("config.yaml", "r") as file: config = yaml.safe_load(file) meshcore = await MeshCore.create_serial(config["serial_device_path"]) await setup(meshcore, config) for repeater in config["repeaters"]: contact = meshcore.get_contact_by_key_prefix(repeater["public_key"]) await meshcore.commands.change_contact_path(contact, "".join(repeater["path"])) print(contact) tries = 0 while(tries <= config["retries"]): tries += 1 await meshcore.commands.send_login(contact, repeater["password"]) result = await meshcore.wait_for_event(EventType.LOGIN_SUCCESS, timeout=config["timeout"]) if result is None: print("Timeout waiting on login for %s" % repeater["public_key"]) else: break if tries == config["retries"] + 1: print("Maximum login retries exceeded for %s" % repeater["public_key"]) break result = None tries = 0 while(tries <= config["retries"]): tries += 1 result = await meshcore.commands.send_telemetry_req(repeater["public_key"]) if result.type == EventType.ERROR: print("Error sending telemetry request for %s" % repeater["public_key"]) continue result = await meshcore.wait_for_event(EventType.TELEMETRY_RESPONSE, timeout=config["timeout"]) if result is None: print("Timeout waiting on telemetry for %s" % repeater["public_key"]) continue else: break if tries == config["retries"] + 1: print("Maximum telemetry request retries exceeded for %s" % repeater["public_key"]) else: telemetry_data_list = result.payload["lpp"] registry = CollectorRegistry() for telemetry_data in telemetry_data_list: if telemetry_data["type"] == "voltage": gauge = Gauge("voltage", "Battery Voltage", registry=registry) gauge.set(telemetry_data["value"]) push_to_gateway(config["prometheus"]["pushgateway_address"], job=config["prometheus"]["job"], grouping_key={"instance": contact["adv_name"]}, registry=registry) if __name__ == "__main__": asyncio.run(main())