Merge branch 'longnames-sqlite'

feature/plugins-parameters
Jeremiah K 2023-04-18 12:31:03 -05:00
commit db993a0ba8
1 zmienionych plików z 82 dodań i 117 usunięć

189
main.py
Wyświetl plik

@ -1,51 +1,78 @@
import asyncio import asyncio
import threading import time
import json
import sqlite3
import logging import logging
import yaml
import re import re
import sqlite3
import yaml
import meshtastic.tcp_interface import meshtastic.tcp_interface
import meshtastic.serial_interface import meshtastic.serial_interface
from nio import AsyncClient, AsyncClientConfig, MatrixRoom, RoomMessageText, RoomMessage from nio import AsyncClient, AsyncClientConfig, MatrixRoom, RoomMessageText
from pubsub import pub from pubsub import pub
from meshtastic import mesh_pb2
from yaml.loader import SafeLoader from yaml.loader import SafeLoader
bot_start_time = int(time.time() * 1000)
logging.basicConfig() logging.basicConfig()
logger = logging.getLogger(name="meshtastic.matrix.relay") logger = logging.getLogger(name="meshtastic.matrix.relay")
# Collect configuration # Load configuration
relay_config = None
with open("config.yaml", "r") as f: with open("config.yaml", "r") as f:
relay_config = yaml.load(f, Loader=SafeLoader) relay_config = yaml.load(f, Loader=SafeLoader)
if relay_config["logging"]["level"] == "debug": logger.setLevel(getattr(logging, relay_config["logging"]["level"].upper()))
logger.setLevel(logging.DEBUG)
elif relay_config["logging"]["level"] == "info": # Initialize SQLite database
logger.setLevel(logging.INFO) def initialize_database():
elif relay_config["logging"]["level"] == "warn": conn = sqlite3.connect("meshtastic.sqlite")
logger.setLevel(logging.WARN) cursor = conn.cursor()
elif relay_config["logging"]["level"] == "error": cursor.execute(
logger.setLevel(logging.ERROR) "CREATE TABLE IF NOT EXISTS longnames (meshtastic_id TEXT PRIMARY KEY, longname TEXT)"
)
conn.commit()
conn.close()
# Get the longname for a given Meshtastic ID
def get_longname(meshtastic_id):
conn = sqlite3.connect("meshtastic.sqlite")
cursor = conn.cursor()
cursor.execute(
"SELECT longname FROM longnames WHERE meshtastic_id=?", (meshtastic_id,)
)
result = cursor.fetchone()
conn.close()
return result[0] if result else None
# Save the longname for a given Meshtastic ID
def save_longname(meshtastic_id, longname):
conn = sqlite3.connect("meshtastic.sqlite")
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO longnames (meshtastic_id, longname) VALUES (?, ?)",
(meshtastic_id, longname),
)
conn.commit()
conn.close()
def update_longnames():
if meshtastic_interface.nodes:
for node in meshtastic_interface.nodes.values():
user = node.get("user")
if user:
meshtastic_id = user["id"]
longname = user.get("longName", "N/A")
save_longname(meshtastic_id, longname)
# Connect to the Meshtastic device # Initialize Meshtastic interface
logger.info(f"Starting Meshtastic <==> Matrix Relay...")
# Add a new configuration option to select between serial and network connections
connection_type = relay_config["meshtastic"]["connection_type"] connection_type = relay_config["meshtastic"]["connection_type"]
if connection_type == "serial": if connection_type == "serial":
serial_port = relay_config["meshtastic"]["serial_port"] serial_port = relay_config["meshtastic"]["serial_port"]
logger.info(f"Connecting to radio using serial port {serial_port} ...") logger.info(f"Connecting to radio using serial port {serial_port} ...")
meshtastic_interface = meshtastic.serial_interface.SerialInterface(serial_port) meshtastic_interface = meshtastic.serial_interface.SerialInterface(serial_port)
logger.info(f"Connected to radio using serial port {serial_port}.")
else: else:
target_host = relay_config["meshtastic"]["host"] target_host = relay_config["meshtastic"]["host"]
logger.info(f"Connecting to radio at {target_host} ...") logger.info(f"Connecting to radio at {target_host} ...")
meshtastic_interface = meshtastic.tcp_interface.TCPInterface(hostname=target_host) meshtastic_interface = meshtastic.tcp_interface.TCPInterface(hostname=target_host)
logger.info(f"Connected to radio at {target_host}.")
matrix_client = None matrix_client = None
@ -55,43 +82,7 @@ matrix_access_token = relay_config["matrix"]["access_token"]
bot_user_id = relay_config["matrix"]["bot_user_id"] bot_user_id = relay_config["matrix"]["bot_user_id"]
matrix_room_id = relay_config["matrix"]["room_id"] matrix_room_id = relay_config["matrix"]["room_id"]
# SQLite configuration # Send message to the Matrix room
db_file = "meshtastic.sqlite"
db = sqlite3.connect(db_file)
# Initialize the database
db.execute("CREATE TABLE IF NOT EXISTS nodes (id TEXT PRIMARY KEY, longname TEXT)")
db.execute(
"CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY, sender_id TEXT, text TEXT, timestamp INTEGER)"
)
db.commit()
# Function to insert or update a node
def upsert_node(id, longname):
db.execute(
"INSERT OR REPLACE INTO nodes (id, longname) VALUES (?, ?)", (id, longname)
)
db.commit()
# Function to insert a message
def insert_message(sender_id, text, timestamp):
db.execute(
"INSERT INTO messages (sender_id, text, timestamp) VALUES (?, ?, ?)",
(sender_id, text, timestamp),
)
db.commit()
# Function to get the node's longname
def get_node_longname(sender):
cursor = db.cursor()
cursor.execute("SELECT longname FROM nodes WHERE id = ?", (sender,))
row = cursor.fetchone()
return row[0] if row else sender
async def matrix_relay(matrix_client, message): async def matrix_relay(matrix_client, message):
try: try:
await asyncio.wait_for( await asyncio.wait_for(
@ -105,91 +96,61 @@ async def matrix_relay(matrix_client, message):
logger.info(f"Sent inbound radio message to matrix room: {matrix_room_id}") logger.info(f"Sent inbound radio message to matrix room: {matrix_room_id}")
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.error( logger.error(f"Timed out while waiting for Matrix response")
f"Timed out while waiting for Matrix response - room {matrix_room_id}: {e}"
)
except Exception as e: except Exception as e:
logger.error( logger.error(f"Error sending radio message to matrix room {matrix_room_id}: {e}")
f"Error sending radio message to matrix room {matrix_room_id}: {e}"
)
# Callback for new messages from Meshtastic # Callback for new messages from Meshtastic
def on_meshtastic_message(packet, loop=None): def on_meshtastic_message(packet, loop=None):
global matrix_client
sender = packet["fromId"] sender = packet["fromId"]
if "text" in packet["decoded"] and packet["decoded"]["text"]: if "text" in packet["decoded"] and packet["decoded"]["text"]:
text = packet["decoded"]["text"] text = packet["decoded"]["text"]
# timestamp = packet["received"]
if logger.level == logging.DEBUG:
logger.debug(f"Processing radio message from {sender}: {text}")
elif logger.level == logging.INFO:
logger.info(f"Processing inbound radio message from {sender}") logger.info(f"Processing inbound radio message from {sender}")
formatted_message = f"{sender}: {text}" longname = get_longname(sender) or sender
# create an event loop formatted_message = f"{longname}: {text}"
asyncio.run_coroutine_threadsafe( asyncio.run_coroutine_threadsafe(
matrix_relay(matrix_client, formatted_message), matrix_relay(matrix_client, formatted_message),
loop=loop, loop=loop,
) )
# insert_message(sender, text, timestamp)
# Callback for new messages in Matrix room # Callback for new messages in Matrix room
async def on_room_message(room: MatrixRoom, event: RoomMessageText) -> None: async def on_room_message(room: MatrixRoom, event: RoomMessageText) -> None:
logger.info(
f"Detected inbound matrix message from {event.sender} in room {room.room_id}"
)
if room.room_id == matrix_room_id and event.sender != bot_user_id: if room.room_id == matrix_room_id and event.sender != bot_user_id:
target_node = None message_timestamp = event.server_timestamp
if event.formatted_body: # Only process messages with a timestamp greater than the bot's start time
text = event.formatted_body.strip() if message_timestamp > bot_start_time:
else:
text = event.body.strip() text = event.body.strip()
logger.debug(f"Processing matrix message from {event.sender}: {text}") logger.info(f"Processing matrix message from {event.sender}: {text}")
# Opportunistically detect node in message text !124abcd: display_name_response = await matrix_client.get_displayname(event.sender)
match = re.search(r"(![\da-z]+):", text) display_name = (display_name_response.displayname or event.sender)[:8]
if match:
target_node = match.group(1)
text = re.sub(r"<mx-reply>.*?</mx-reply>", "", text) text = f"{display_name}: {text}"
text = f"{event.source['sender']}: {text}" text = text[0:218] # 218 = 228 (max message length) - 8 (max display name length) - 1 (colon + space)
text = text[0:80]
if target_node: if relay_config["meshtastic"]["broadcast_enabled"]:
logger.debug( logger.info(f"Sending radio message from {display_name} to radio broadcast")
f"Sending radio message from {event.sender} to {target_node} ..."
)
meshtastic_interface.sendText(
text=text,
channelIndex=relay_config["meshtastic"]["channel"],
destinationId=target_node,
)
logger.info(f"Sent radio message from {event.sender} to {target_node}")
elif relay_config["meshtastic"]["broadcast_enabled"]:
logger.debug(
f"Sending radio message from {event.sender} to radio broadcast ..."
)
meshtastic_interface.sendText( meshtastic_interface.sendText(
text=text, channelIndex=relay_config["meshtastic"]["channel"] text=text, channelIndex=relay_config["meshtastic"]["channel"]
) )
logger.info(f"Sent radio message from {event.sender} to radio broadcast") else:
elif not relay_config["meshtastic"]["broadcast_enabled"]: logger.debug(f"Broadcast not supported: Message from {display_name} dropped.")
logger.debug(
f"Broadcast not supported: Message from {event.sender} dropped."
)
async def main(): async def main():
global matrix_client global matrix_client
# Initialize the SQLite database
initialize_database()
config = AsyncClientConfig(encryption_enabled=False) config = AsyncClientConfig(encryption_enabled=False)
matrix_client = AsyncClient(matrix_homeserver, bot_user_id, config=config) matrix_client = AsyncClient(matrix_homeserver, bot_user_id, config=config)
matrix_client.access_token = matrix_access_token matrix_client.access_token = matrix_access_token
@ -205,7 +166,11 @@ async def main():
matrix_client.add_event_callback(on_room_message, RoomMessageText) matrix_client.add_event_callback(on_room_message, RoomMessageText)
# Start the Matrix client # Start the Matrix client
await matrix_client.sync_forever(timeout=30000) while True:
# Update longnames
update_longnames()
await matrix_client.sync_forever(timeout=30000)
await asyncio.sleep(60) # Update longnames every 60 seconds
asyncio.run(main()) asyncio.run(main())