Move a lot of files into archive. Picam3 autofocus tweaks, add gps wait option in startup_tx.sh

pull/16/head
Mark Jessop 2024-09-15 14:06:22 +09:30
rodzic 80810f0008
commit e404294075
16 zmienionych plików z 211 dodań i 246 usunięć

Wyświetl plik

@ -1,11 +1,12 @@
#!/bin/bash
#
# Wenet TX-side Initialisation Script - Systemd Unit Version
# 2024-07-21 Mark Jessop <vk5qi@rfhead.net>
# Wenet TX-side Initialisation Script
# 2024-09-14 Mark Jessop <vk5qi@rfhead.net>
#
# Run this to set up an attached RFM98W and start transmitting!
# Replace the transmit frequency and callsign with your own.
#
#
# A callsign which will be included in the Wenet Packets.
# This MUST be <= 6 characters long.
@ -78,6 +79,13 @@ done
echo "Waiting another 10 seconds before startup."
sleep 10
# OPTIONAL - Wait for the GNSS receiver to obtain lock before starting up the camera and transmitter.
# This may help with getting first GNSS lock after boot.
# --waitforlock 10 Wait for up to 10 minutes before timing out and continuing anyway
# --lockcount 60 Wait for 60 sequential valid 3D fixed before exiting (2 Hz update rate, so 60 -> 30 seconds)
# --locksats 6 Only consider a fix as valid if it has more than 6 SVs in use.
#python3 ublox.py --waitforlock 10 --lockcount 60 --locksats 6 $GPSPORT
# Start the main TX Script.
#
@ -87,7 +95,8 @@ sleep 10
# Add a logo overlay in the bottom right of the image. This must be a transparent PNG file.
# --logo yourlogo.png \
# Set a fixed focus position on a PiCam v3 (NOTE: The Picamv3 focus drifts with temperature - beware!!!)
# --lensposition 0.5 \
# 0.0 = Infinity
# --lensposition 0.0 \
python3 tx_picamera2_gps.py \
--rfm98w $SPIDEVICE \

Wyświetl plik

@ -31,8 +31,6 @@ class WenetPiCamera2(object):
Uses the new libcamera-based PiCamera2 library.
Captures multiple images, picks the best, then
transmits it via a PacketTX object.
"""
# White balance text to enum lookup
@ -48,13 +46,13 @@ class WenetPiCamera2(object):
def __init__(self,
callsign = "N0CALL",
tx_resolution=(1936,1088),
tx_resolution=0.5,
num_images=1,
image_delay=0.0,
vertical_flip = False,
horizontal_flip = False,
whitebalance = 'auto',
lens_position = 0.0,
lens_position = -1,
temp_filename_prefix = 'picam_temp',
debug_ptr = None
):
@ -64,9 +62,9 @@ class WenetPiCamera2(object):
Keyword Arguments:
callsign: The callsign to be used when converting images to SSDV. Must be <=6 characters in length.
tx_resolution: Tuple (x,y) containing desired image *transmit* resolution.
tx_resolution: Either a tuple (x,y) containing desired image *transmit* resolution, OR a scaling factor from full size.
NOTE: both x and y need to be multiples of 16 to be used with SSDV.
NOTE: This will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right.
NOTE: If you manually specify a transmit resolution, this will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right.
num_images: Number of images to capture in sequence when the 'capture' function is called.
The 'best' (largest filesize) image is selected and saved.
@ -78,6 +76,7 @@ class WenetPiCamera2(object):
whitebalance: White balance mode - allowed values: Auto, Incandescent, Tungesten, Fluorescent, Indoor, Daylight, Cloudy
lens_position: Lens Position setting (float), 0.0 = Infinity, 10 = very close.
Only usable on Pi Camera v3 modules.
Set to -1 to use continuous autofocus mode.
temp_filename_prefix: prefix used for temporary files.
@ -96,6 +95,7 @@ class WenetPiCamera2(object):
self.horizontal_flip = horizontal_flip
self.vertical_flip = vertical_flip
self.lens_position = lens_position
self.autofocus_mode = False
if whitebalance.lower() in self.wb_lookup:
self.whitebalance = self.wb_lookup[whitebalance.lower()]
@ -103,6 +103,7 @@ class WenetPiCamera2(object):
self.whitebalance = self.wb_lookup['auto']
self.init_camera()
@ -140,15 +141,19 @@ class WenetPiCamera2(object):
)
# Set Pi Camera 3 lens position
if 'LensPosition' in self.cam.camera_controls and self.lens_position>0:
self.debug_ptr("Configured lens position to " + str(self.lens_position))
self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position})
if 'LensPosition' in self.cam.camera_controls:
if self.lens_position>=0.0:
self.debug_ptr("Configured lens position to " + str(self.lens_position))
self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position})
else:
self.cam.set_controls({"AfMode": controls.AfModeEnum.Continuous})
# Enable the camera, effectively opening the 'shutter'.
# This lets the camera gain control algs start to settle.
#self.cam.start()
# In autofocus mode, we need to start the camera now, so it can start figuring out its focus.
if 'LensPosition' in self.cam.camera_controls and self.lens_position<0.0:
self.debug_message("Enabling camera for image capture")
self.cam.start()
# NOTE - Trying out starting and stopping the camera just around image capture time.
# If we are not in autofocus mode, we start the camera only when we need it.
# This may help deal with crashes after the camera is running for a long time, and also
# may help decrease CPU usage a little.
@ -169,7 +174,7 @@ class WenetPiCamera2(object):
self.cam.stop()
self.cam.close()
def capture(self, filename='picam.jpg', quality=90, bayer=False):
def capture(self, filename='picam.jpg', quality=90):
""" Capture an image using the PiCam
Keyword Arguments:
@ -180,25 +185,32 @@ class WenetPiCamera2(object):
self.cam.options['quality'] = quality
# Set other settings, White Balance, exposure metering, etc.
# TODO - Maybe expose some of these settings?
self.cam.set_controls(
{'AwbMode': self.whitebalance,
'AeMeteringMode': controls.AeMeteringModeEnum.Matrix,
'NoiseReductionMode': controls.draft.NoiseReductionModeEnum.Off}
)
# Set Pi Camera 3 lens position
if 'LensPosition' in self.cam.camera_controls and self.lens_position>0:
self.debug_ptr("Configured lens position to " + str(self.lens_position))
self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position})
# Set Pi Camera 3 lens position, or ensure we are in continuous autofocus mode.
if 'LensPosition' in self.cam.camera_controls:
if self.lens_position>=0.0:
self.debug_ptr("Configured lens position to " + str(self.lens_position))
self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position})
else:
self.cam.set_controls({"AfMode": controls.AfModeEnum.Continuous})
self.debug_message("Enabling camera for image capture")
try:
self.cam.start()
except Exception as e:
self.debug_message("Could not enable camera! - " + str(e))
sleep(1)
return False
# If we're not using autofocus, then camera would not have been started yet.
# Start it now.
if 'LensPosition' not in self.cam.camera_controls or self.lens_position>=0.0:
try:
self.debug_message("Enabling camera for image capture")
self.cam.start()
except Exception as e:
self.debug_message("Could not enable camera! - " + str(e))
sleep(1)
return False
sleep(3)
@ -206,6 +218,7 @@ class WenetPiCamera2(object):
for i in range(self.num_images):
self.debug_message("Capturing Image %d of %d" % (i+1,self.num_images))
# Wrap this in error handling in case we lose the camera for some reason.
try:
self.cam.capture_file("%s_%d.jpg" % (self.temp_filename_prefix,i))
print(f"Image captured: {time.time()}")
@ -216,8 +229,9 @@ class WenetPiCamera2(object):
# Immediately return false. Not much point continuing to try and capture images.
return False
self.debug_message("Disabling camera.")
self.cam.stop()
if 'LensPosition' not in self.cam.camera_controls or self.lens_position>=0.0:
self.debug_message("Disabling camera.")
self.cam.stop()
# Otherwise, continue to pick the 'best' image based on filesize.
self.debug_message("Choosing Best Image.")

Wyświetl plik

@ -0,0 +1,4 @@
# WARNING
These scripts are from previous versions of the Wenet code, with some being specific to particular launches. They have been moved here for future reference use.
As the wenet codebase has moved on to libcamera and other changes, these scripts will likely not function correctly, and will need significant modification to be usable.

Wyświetl plik

@ -204,6 +204,86 @@ class RFM98W_Serial(object):
self.start()
class SerialOnly(object):
"""
Transmitter Wrapper that does not initialise any radios.
"""
def __init__(
self,
baudrate=115200,
serial_port=None,
reinit_count=5000
):
self.baudrate = baudrate
self.serial_port = serial_port
self.reinit_count = reinit_count
self.tx_packet_count = 0
self.start()
def start(self):
"""
Initialise (or re-initialise) the Serial connection.
"""
# Initialise the Serial port for modulation
if self.serial_port:
try:
self.serial = serial.Serial(self.serial_port, self.baudrate)
logging.info(f"SerialOnly - Opened Serial port {self.serial_port} for modulation.")
except Exception as e:
logging.critical(f"SerialOnly - Could not open serial port! Error: {str(e)}")
self.serial = None
else:
# If no serial port info provided, write out to a binary debug file.
self.serial = BinaryDebug()
logging.info("SerialOnly - No serial port provided - using Binary Debug output (binary_debug.bin)")
def shutdown(self):
"""
Shutdown the Serial connection.
"""
try:
# Close the serial connection
self.serial.close()
logging.info("SerialOnly - Closed Serial Port")
self.serial = None
except:
pass
return
def comms_ok(self):
"""
Dummy function, no radio comms to test.
"""
return True
def transmit_packet(self, packet):
"""
Modulate serial data, using a UART.
"""
if self.serial:
self.serial.write(packet)
# Increment transmit packet counter
self.tx_packet_count += 1
# If we have a reinitialisation count set, reinitialise the radio.
if self.reinit_count:
if self.tx_packet_count % self.reinit_count == 0:
logging.info(f"SerialOnly - Reinitialising Serial at {self.tx_packet_count} packets.")
self.start()
class BinaryDebug(object):
""" Debug binary 'transmitter' Class

Wyświetl plik

@ -1,211 +0,0 @@
#!/usr/bin/env python
#
# PiCam Transmitter Script - with GPS Data and Logo Overlay.
# Capture images from the PiCam, and transmit them.
#
# Copyright (C) 2018 Mark Jessop <vk5qi@rfhead.net>
# Released under GNU GPL v3 or later
#
import PacketTX
import WenetPiCam
import ublox
import argparse
import time
import os
import subprocess
import traceback
parser = argparse.ArgumentParser()
parser.add_argument("callsign", default="N0CALL", help="Payload Callsign")
parser.add_argument("--gps", default="/dev/ttyACM0", help="uBlox GPS Serial port. Defaults to /dev/ttyACM0")
parser.add_argument("--logo", default="none", help="Optional logo to overlay on image.")
parser.add_argument("--txport", default="/dev/ttyAMA0", type=str, help="Transmitter serial port. Defaults to /dev/ttyAMA0")
parser.add_argument("--baudrate", default=115200, type=int, help="Transmitter baud rate. Defaults to 115200 baud.")
parser.add_argument("--picamhq", default=False, action="store_true", help="Use PiCamera HQ image resolutions.")
args = parser.parse_args()
callsign = args.callsign
# Truncate callsign if it's too long.
if len(callsign) > 6:
callsign = callsign[:6]
print("Using Callsign: %s" % callsign)
# Start up Wenet TX.
tx = PacketTX.PacketTX(serial_port=args.txport, serial_baud=args.baudrate, callsign=callsign, log_file="debug.log", udp_listener=55674)
tx.start_tx()
# Sleep for a second to let the transmitter fire up.
time.sleep(1)
# Initialise a couple of global variables.
max_altitude = 0
system_time_set = False
# Disable Systemctl NTP synchronization so that we can set the system time on first GPS lock.
# This is necessary as NTP will refuse to sync the system time to the information we feed it via ntpshm unless
# the system clock is already within a few seconds.
ret_code = os.system("timedatectl set-ntp 0")
if ret_code == 0:
tx.transmit_text_message("GPS Debug: Disabled NTP Sync until GPS lock.")
else:
tx.transmit_text_message("GPS Debug: Could not disable NTP sync.")
def handle_gps_data(gps_data):
""" Handle GPS data passed to us from the UBloxGPS instance """
global max_altitude, tx, system_time_set
# Immediately generate and transmit a GPS packet.
tx.transmit_gps_telemetry(gps_data)
# If we have GPS fix, update the max altitude field.
if (gps_data['altitude'] > max_altitude) and (gps_data['gpsFix'] == 3):
max_altitude = gps_data['altitude']
# If we have GPS lock, set the system clock to it. (Only do this once.)
if (gps_data['gpsFix'] == 3) and not system_time_set:
dt = gps_data['datetime']
try:
new_time = dt.strftime('%Y-%m-%d %H:%M:%S')
ret_code = os.system("timedatectl set-time \"%s\"" % new_time)
if ret_code == 0:
tx.transmit_text_message("GPS Debug: System clock set to GPS time %s" % new_time)
else:
tx.transmit_text_message("GPS Debug: Attempt to set system clock failed!")
system_time_set = True
# Re-enable NTP synchronisation
ret_code = os.system("timedatectl set-ntp 1")
if ret_code == 0:
tx.transmit_text_message("GPS Debug: Re-enabled NTP sync.")
else:
tx.transmit_text_message("GPS Debug: Could not enable NTP sync.")
except:
tx.transmit_text_message("GPS Debug: Attempt to set system clock failed!")
# Try and start up the GPS rx thread.
try:
gps = ublox.UBloxGPS(port=args.gps,
dynamic_model = ublox.DYNAMIC_MODEL_AIRBORNE1G,
update_rate_ms = 1000,
debug_ptr = tx.transmit_text_message,
callback = handle_gps_data,
log_file = 'gps_data.log'
)
except Exception as e:
tx.transmit_text_message("ERROR: Could not Open GPS - %s" % str(e), repeats=5)
gps = None
# Define our post-processing callback function, which gets called by WenetPiCam
# after an image has been captured.
def post_process_image(filename):
""" Post-process the image, adding on Logo overlay and GPS data if requested. """
global gps, max_altitude, args, tx
# Try and grab current GPS data snapshot
try:
if gps != None:
gps_state = gps.read_state()
# Format time
short_time = gps_state['datetime'].strftime("%Y-%m-%d %H:%M:%S")
# Construct string which we will add onto the image.
if gps_state['numSV'] < 3:
# If we don't have enough sats for a lock, don't display any data.
# TODO: Use the GPS fix status values here instead.
gps_string = "No GPS Lock"
else:
gps_string = "%s Lat: %.5f Lon: %.5f Alt: %dm (%dm) Speed: H %03.1f kph V %02.1f m/s" % (
short_time,
gps_state['latitude'],
gps_state['longitude'],
int(gps_state['altitude']),
int(max_altitude),
gps_state['ground_speed'],
gps_state['ascent_rate'])
else:
gps_string = ""
except:
error_str = traceback.format_exc()
tx.transmit_text_message("GPS Data Access Failed: %s" % error_str)
gps_string = ""
# Build up our imagemagick 'convert' command line
overlay_str = "convert %s -gamma 0.8 -font Helvetica -pointsize 30 -gravity North " % filename
overlay_str += "-strokewidth 2 -stroke '#000C' -annotate +0+5 \"%s\" " % gps_string
overlay_str += "-stroke none -fill white -annotate +0+5 \"%s\" " % gps_string
# Add on logo overlay argument if we have been given one.
if args.logo != "none":
overlay_str += "%s -gravity SouthEast -composite " % args.logo
overlay_str += filename
tx.transmit_text_message("Adding overlays to image.")
os.system(overlay_str)
return
# Finally, initialise the PiCam capture object.
# Set the source and transmit image resolutions.
# For the PiCam HQ, we have a higher source resolution that we want to make use of!
# Note the transmit resolutions *must* be a multiple of 16.
if args.picamhq:
# Picam HQ Resolutions
_src_res = (4056,3040)
_tx_res = (1520,1136)
else:
# Picam V2 resolutions.
_src_res = (3280,2464)
_tx_res = (1488,1120)
picam = WenetPiCam.WenetPiCam(src_resolution=_src_res,
tx_resolution=_tx_res,
callsign=callsign,
num_images=5,
debug_ptr=tx.transmit_text_message,
vertical_flip=False,
horizontal_flip=False)
# .. and start it capturing continuously.
picam.run(destination_directory="./tx_images/",
tx = tx,
post_process_ptr = post_process_image
)
# Main 'loop'.
try:
while True:
# Do nothing!
# Sleep to avoid chewing up CPU cycles in this loop.
time.sleep(1)
# Catch CTRL-C, and exit cleanly.
# Only really used during debugging.
except KeyboardInterrupt:
print("Closing")
picam.stop()
tx.close()
gps.close()

Wyświetl plik

@ -32,7 +32,7 @@ parser.add_argument("--vflip", action='store_true', default=False, help="Flip ca
parser.add_argument("--hflip", action='store_true', default=False, help="Flip captured image horizontally.")
parser.add_argument("--resize", type=float, default=0.5, help="Resize raw image from camera by this factor before transmit (in both X/Y, to nearest multiple of 16 pixels). Default=0.5")
parser.add_argument("--whitebalance", type=str, default='daylight', help="White Balance setting: Auto, Daylight, Cloudy, Incandescent, Tungesten, Fluorescent, Indoor")
parser.add_argument("--lensposition", type=float, default=-1, help="For PiCam v3, set the lens position. Default: -1 = Autofocus")
parser.add_argument("--lensposition", type=float, default=-1.0, help="For PiCam v3, set the lens position. Default: -1 = Autofocus")
parser.add_argument("-v", "--verbose", action='store_true', default=False, help="Show additional debug info.")
args = parser.parse_args()

Wyświetl plik

@ -1214,10 +1214,13 @@ class UBloxGPS(object):
else:
pass
print("GPS RX thread closed.")
def close(self):
""" Close GPS Connection """
self.rx_running = False
time.sleep(0.5)
time.sleep(2)
self.gps.close()
if self.log_file != None:
self.log_file.close()
@ -1226,18 +1229,84 @@ if __name__ == "__main__":
""" Basic test script for the above UBloxGPS class.
Sets up GPS and prints out basic position information.
"""
import argparse
import logging
import sys
import time
def gps_test(state):
print(state)
parser = argparse.ArgumentParser()
parser.add_argument("serial_port", type=str, help="uBlox GPS Serial Port. For USB-attached uBlox chipsets, this is usually /dev/ttyACM0")
parser.add_argument("--baudrate", default=115200, type=int, help="uBlox GPS baud rate. for USB-attached chipsets this doesn't matter. Defaults to 115200 baud.")
parser.add_argument("--waitforlock", default=None, type=int, help="If set, exit after the GPS has obtained lock, or a timeout (in minutes) is reached.")
parser.add_argument("--lockcount", default=60, type=int, help="If waitforlock is specified, wait until the GPS reports lock for this many sequential fixes. Default: 60")
parser.add_argument("--locksats", default=None, type=int, help="If waitforlock is specified, also wait until the GPS reports this many SVs used in its solution. Default: None")
parser.add_argument("-v", "--verbose", action='store_true', default=False, help="Show additional debug info.")
args = parser.parse_args()
if args.verbose:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO
# Set up logging
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=logging_level)
gps = None
lock_counter = 0
start_time = time.time()
lock_acheived = False
def gps_callback(state):
global args, lock_counter, start_time, gps, lock_acheived
if args.waitforlock:
# If we have a GPS fix
if state['gpsFix'] == 3:
if args.locksats:
if state['numSV'] >= args.locksats:
lock_counter += 1
else:
lock_counter = 0
else:
lock_counter += 1
else:
lock_counter = 0
logging.info(f"Lock Counter: {lock_counter}/{args.lockcount}")
if lock_counter >= args.lockcount:
logging.info(f"GNSS lock maintained for {lock_counter} fixes! Exiting..")
lock_acheived = True
logging.info(f"GPS State: {state}")
gps = UBloxGPS(port=sys.argv[1], callback=gps_test, update_rate_ms=500, dynamic_model=DYNAMIC_MODEL_AIRBORNE1G, ntpd_update=True)
gps = UBloxGPS(
port=args.serial_port,
baudrate=args.baudrate,
callback=gps_callback,
update_rate_ms=500,
dynamic_model=DYNAMIC_MODEL_AIRBORNE1G,
ntpd_update=False
)
try:
while True:
time.sleep(1)
if args.waitforlock:
if ((time.time() - start_time) > (args.waitforlock*60)):
logging.critical("Reached lock timeout! Exiting...")
gps.close()
sys.exit(0)
if lock_acheived:
logging.info("Acheived GNSS lock!")
gps.close()
time.sleep(2)
sys.exit(0)
except KeyboardInterrupt:
gps.close()