kopia lustrzana https://github.com/mate-dev/meshtastic-matrix-relay
302 wiersze
9.2 KiB
Python
302 wiersze
9.2 KiB
Python
import staticmaps
|
|
import s2sphere
|
|
import math
|
|
import random
|
|
import io
|
|
import re
|
|
from PIL import Image
|
|
from nio import AsyncClient, UploadResponse
|
|
from plugins.base_plugin import BasePlugin
|
|
|
|
|
|
class TextLabel(staticmaps.Object):
|
|
def __init__(self, latlng: s2sphere.LatLng, text: str, fontSize: int = 12) -> None:
|
|
staticmaps.Object.__init__(self)
|
|
self._latlng = latlng
|
|
self._text = text
|
|
self._margin = 4
|
|
self._arrow = 16
|
|
self._font_size = fontSize
|
|
|
|
def latlng(self) -> s2sphere.LatLng:
|
|
return self._latlng
|
|
|
|
def bounds(self) -> s2sphere.LatLngRect:
|
|
return s2sphere.LatLngRect.from_point(self._latlng)
|
|
|
|
def extra_pixel_bounds(self) -> staticmaps.PixelBoundsT:
|
|
# Guess text extents.
|
|
tw = len(self._text) * self._font_size * 0.5
|
|
th = self._font_size * 1.2
|
|
w = max(self._arrow, tw + 2.0 * self._margin)
|
|
return (int(w / 2.0), int(th + 2.0 * self._margin + self._arrow), int(w / 2), 0)
|
|
|
|
def render_pillow(self, renderer: staticmaps.PillowRenderer) -> None:
|
|
x, y = renderer.transformer().ll2pixel(self.latlng())
|
|
x = x + renderer.offset_x()
|
|
|
|
tw, th = renderer.draw().textsize(self._text)
|
|
w = max(self._arrow, tw + 2 * self._margin)
|
|
h = th + 2 * self._margin
|
|
|
|
path = [
|
|
(x, y),
|
|
(x + self._arrow / 2, y - self._arrow),
|
|
(x + w / 2, y - self._arrow),
|
|
(x + w / 2, y - self._arrow - h),
|
|
(x - w / 2, y - self._arrow - h),
|
|
(x - w / 2, y - self._arrow),
|
|
(x - self._arrow / 2, y - self._arrow),
|
|
]
|
|
|
|
renderer.draw().polygon(path, fill=(255, 255, 255, 255))
|
|
renderer.draw().line(path, fill=(255, 0, 0, 255))
|
|
renderer.draw().text(
|
|
(x - tw / 2, y - self._arrow - h / 2 - th / 2),
|
|
self._text,
|
|
fill=(0, 0, 0, 255),
|
|
)
|
|
|
|
def render_cairo(self, renderer: staticmaps.CairoRenderer) -> None:
|
|
x, y = renderer.transformer().ll2pixel(self.latlng())
|
|
|
|
ctx = renderer.context()
|
|
ctx.select_font_face("Sans", cairo.FONT_SLANT_NORMAL, cairo.FONT_WEIGHT_NORMAL)
|
|
|
|
ctx.set_font_size(self._font_size)
|
|
x_bearing, y_bearing, tw, th, _, _ = ctx.text_extents(self._text)
|
|
|
|
w = max(self._arrow, tw + 2 * self._margin)
|
|
h = th + 2 * self._margin
|
|
|
|
path = [
|
|
(x, y),
|
|
(x + self._arrow / 2, y - self._arrow),
|
|
(x + w / 2, y - self._arrow),
|
|
(x + w / 2, y - self._arrow - h),
|
|
(x - w / 2, y - self._arrow - h),
|
|
(x - w / 2, y - self._arrow),
|
|
(x - self._arrow / 2, y - self._arrow),
|
|
]
|
|
|
|
ctx.set_source_rgb(1, 1, 1)
|
|
ctx.new_path()
|
|
for p in path:
|
|
ctx.line_to(*p)
|
|
ctx.close_path()
|
|
ctx.fill()
|
|
|
|
ctx.set_source_rgb(1, 0, 0)
|
|
ctx.set_line_width(1)
|
|
ctx.new_path()
|
|
for p in path:
|
|
ctx.line_to(*p)
|
|
ctx.close_path()
|
|
ctx.stroke()
|
|
|
|
ctx.set_source_rgb(0, 0, 0)
|
|
ctx.set_line_width(1)
|
|
ctx.move_to(
|
|
x - tw / 2 - x_bearing, y - self._arrow - h / 2 - y_bearing - th / 2
|
|
)
|
|
ctx.show_text(self._text)
|
|
ctx.stroke()
|
|
|
|
def render_svg(self, renderer: staticmaps.SvgRenderer) -> None:
|
|
x, y = renderer.transformer().ll2pixel(self.latlng())
|
|
|
|
# guess text extents
|
|
tw = len(self._text) * self._font_size * 0.5
|
|
th = self._font_size * 1.2
|
|
|
|
w = max(self._arrow, tw + 2 * self._margin)
|
|
h = th + 2 * self._margin
|
|
|
|
path = renderer.drawing().path(
|
|
fill="#ffffff",
|
|
stroke="#ff0000",
|
|
stroke_width=1,
|
|
opacity=1.0,
|
|
)
|
|
path.push(f"M {x} {y}")
|
|
path.push(f" l {self._arrow / 2} {-self._arrow}")
|
|
path.push(f" l {w / 2 - self._arrow / 2} 0")
|
|
path.push(f" l 0 {-h}")
|
|
path.push(f" l {-w} 0")
|
|
path.push(f" l 0 {h}")
|
|
path.push(f" l {w / 2 - self._arrow / 2} 0")
|
|
path.push("Z")
|
|
renderer.group().add(path)
|
|
|
|
renderer.group().add(
|
|
renderer.drawing().text(
|
|
self._text,
|
|
text_anchor="middle",
|
|
dominant_baseline="central",
|
|
insert=(x, y - self._arrow - h / 2),
|
|
font_family="sans-serif",
|
|
font_size=f"{self._font_size}px",
|
|
fill="#000000",
|
|
)
|
|
)
|
|
|
|
|
|
def anonymize_location(lat, lon, radius=1000):
|
|
# Generate random offsets for latitude and longitude
|
|
lat_offset = random.uniform(-radius / 111320, radius / 111320)
|
|
lon_offset = random.uniform(
|
|
-radius / (111320 * math.cos(lat)), radius / (111320 * math.cos(lat))
|
|
)
|
|
|
|
# Apply the offsets to the location coordinates
|
|
new_lat = lat + lat_offset
|
|
new_lon = lon + lon_offset
|
|
|
|
return new_lat, new_lon
|
|
|
|
|
|
def get_map(locations, zoom=None, image_size=None, anonymize=True, radius=10000):
|
|
"""
|
|
Anonymize a location to 10km by default
|
|
"""
|
|
context = staticmaps.Context()
|
|
context.set_tile_provider(staticmaps.tile_provider_OSM)
|
|
context.set_zoom(zoom)
|
|
|
|
for location in locations:
|
|
if anonymize:
|
|
new_location = anonymize_location(
|
|
lat=float(location["lat"]),
|
|
lon=float(location["lon"]),
|
|
radius=radius,
|
|
)
|
|
radio = staticmaps.create_latlng(new_location[0], new_location[1])
|
|
else:
|
|
radio = staticmaps.create_latlng(
|
|
float(location["lat"]), float(location["lon"])
|
|
)
|
|
context.add_object(TextLabel(radio, location["label"], fontSize=50))
|
|
|
|
# render non-anti-aliased png
|
|
if image_size:
|
|
return context.render_pillow(image_size[0], image_size[1])
|
|
else:
|
|
return context.render_pillow(1000, 1000)
|
|
|
|
|
|
async def upload_image(client: AsyncClient, image: Image.Image) -> UploadResponse:
|
|
buffer = io.BytesIO()
|
|
image.save(buffer, format="PNG")
|
|
image_data = buffer.getvalue()
|
|
|
|
response, maybe_keys = await client.upload(
|
|
io.BytesIO(image_data),
|
|
content_type="image/png",
|
|
filename="location.png",
|
|
filesize=len(image_data),
|
|
)
|
|
|
|
return response
|
|
|
|
|
|
async def send_room_image(
|
|
client: AsyncClient, room_id: str, upload_response: UploadResponse
|
|
):
|
|
response = await client.room_send(
|
|
room_id=room_id,
|
|
message_type="m.room.message",
|
|
content={"msgtype": "m.image", "url": upload_response.content_uri, "body": ""},
|
|
)
|
|
|
|
|
|
async def send_image(client: AsyncClient, room_id: str, image: Image.Image):
|
|
response = await upload_image(client=client, image=image)
|
|
await send_room_image(client, room_id, upload_response=response)
|
|
|
|
|
|
class Plugin(BasePlugin):
|
|
plugin_name = "map"
|
|
|
|
@property
|
|
def description(self):
|
|
return (
|
|
f"Map of mesh radio nodes. Supports `zoom` and `size` options to customize"
|
|
)
|
|
|
|
async def handle_meshtastic_message(
|
|
self, packet, formatted_message, longname, meshnet_name
|
|
):
|
|
return False
|
|
|
|
def get_matrix_commands(self):
|
|
return [self.plugin_name]
|
|
|
|
def get_mesh_commands(self):
|
|
return []
|
|
|
|
async def handle_room_message(self, room, event, full_message):
|
|
full_message = full_message.strip()
|
|
if not self.matches(full_message):
|
|
return False
|
|
|
|
from matrix_utils import connect_matrix
|
|
from meshtastic_utils import connect_meshtastic
|
|
|
|
matrix_client = await connect_matrix()
|
|
meshtastic_client = connect_meshtastic()
|
|
|
|
pattern = r"^.*:(?: !map(?: zoom=(\d+))?(?: size=(\d+),(\d+))?)?$"
|
|
match = re.match(pattern, full_message)
|
|
|
|
# Indicate this message is not meant for this plugin
|
|
if not match:
|
|
return False
|
|
|
|
zoom = match.group(1)
|
|
image_size = match.group(2, 3)
|
|
|
|
try:
|
|
zoom = int(zoom)
|
|
except:
|
|
zoom = self.config["zoom"] if "zoom" in self.config else 8
|
|
|
|
if zoom < 0 or zoom > 30:
|
|
zoom = 8
|
|
|
|
try:
|
|
image_size = (int(image_size[0]), int(image_size[1]))
|
|
except:
|
|
image_size = (
|
|
self.config["image_width"] if "image_width" in self.config else 1000,
|
|
self.config["image_height"] if "image_height" in self.config else 1000,
|
|
)
|
|
|
|
if image_size[0] > 1000 or image_size[1] > 1000:
|
|
image_size = (1000, 1000)
|
|
|
|
locations = []
|
|
for node, info in meshtastic_client.nodes.items():
|
|
if "position" in info and "latitude" in info["position"]:
|
|
locations.append(
|
|
{
|
|
"lat": info["position"]["latitude"],
|
|
"lon": info["position"]["longitude"],
|
|
"label": info["user"]["shortName"],
|
|
}
|
|
)
|
|
|
|
anonymize = self.config["anonymize"] if "anonymize" in self.config else True
|
|
radius = self.config["radius"] if "radius" in self.config else 1000
|
|
|
|
pillow_image = get_map(
|
|
locations=locations,
|
|
zoom=zoom,
|
|
image_size=image_size,
|
|
anonymize=anonymize,
|
|
radius=radius,
|
|
)
|
|
|
|
await send_image(matrix_client, room.room_id, pillow_image)
|
|
|
|
return True
|