update for payload

picamera2
Mark Jessop 2023-02-18 16:15:54 +10:30
rodzic 75d3c6268f
commit a79777efd0
2 zmienionych plików z 271 dodań i 270 usunięć

Wyświetl plik

@ -24,369 +24,370 @@ from threading import Thread
class WenetPiCamera2(object): class WenetPiCamera2(object):
""" PiCamera2 Wrapper Class """ PiCamera2 Wrapper Class
Raspberry Pi Camera 2 Image source for Wenet. Raspberry Pi Camera 2 Image source for Wenet.
Uses the new libcamera-based PiCamera2 library. Uses the new libcamera-based PiCamera2 library.
Captures multiple images, picks the best, then Captures multiple images, picks the best, then
transmits it via a PacketTX object. transmits it via a PacketTX object.
""" """
def __init__(self, def __init__(self,
callsign = "N0CALL", callsign = "N0CALL",
tx_resolution=(1936,1088), tx_resolution=(1936,1088),
num_images=1, num_images=1,
image_delay=0.5, image_delay=0.0,
vertical_flip = False, vertical_flip = False,
horizontal_flip = False, horizontal_flip = False,
greyworld = False, greyworld = False,
lens_position = 0.0, lens_position = 0.0,
temp_filename_prefix = 'picam_temp', temp_filename_prefix = 'picam_temp',
debug_ptr = None debug_ptr = None
): ):
""" Instantiate a WenetPiCam Object """ Instantiate a WenetPiCam Object
used to capture images from a PiCam using 'optimal' capture techniques. used to capture images from a PiCam using 'optimal' capture techniques.
Keyword Arguments: Keyword Arguments:
callsign: The callsign to be used when converting images to SSDV. Must be <=6 characters in length. callsign: The callsign to be used when converting images to SSDV. Must be <=6 characters in length.
tx_resolution: Tuple (x,y) containing desired image *transmit* resolution. tx_resolution: Tuple (x,y) containing desired image *transmit* resolution.
NOTE: both x and y need to be multiples of 16 to be used with SSDV. NOTE: both x and y need to be multiples of 16 to be used with SSDV.
NOTE: This will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right. NOTE: This will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right.
num_images: Number of images to capture in sequence when the 'capture' function is called. num_images: Number of images to capture in sequence when the 'capture' function is called.
The 'best' (largest filesize) image is selected and saved. The 'best' (largest filesize) image is selected and saved.
image_delay: Delay time (seconds) between each captured image. image_delay: Delay time (seconds) between each captured image.
vertical_flip: Flip captured images vertically. vertical_flip: Flip captured images vertically.
horizontal_flip: Flip captured images horizontally. horizontal_flip: Flip captured images horizontally.
Used to correct for picam orientation. Used to correct for picam orientation.
greyworld: Use Greyworld AWB setting, for IR-filtered images. greyworld: Use Greyworld AWB setting, for IR-filtered images.
lens_position: Lens Position setting (float), 0.0 = Infinity, 10 = very close. lens_position: Lens Position setting (float), 0.0 = Infinity, 10 = very close.
Only usable on Pi Camera v3 modules. Only usable on Pi Camera v3 modules.
temp_filename_prefix: prefix used for temporary files. temp_filename_prefix: prefix used for temporary files.
debug_ptr: 'pointer' to a function which can handle debug messages. debug_ptr: 'pointer' to a function which can handle debug messages.
This function needs to be able to accept a string. This function needs to be able to accept a string.
Used to get status messages into the downlink. Used to get status messages into the downlink.
""" """
self.debug_ptr = debug_ptr self.debug_ptr = debug_ptr
self.temp_filename_prefix = temp_filename_prefix self.temp_filename_prefix = temp_filename_prefix
self.num_images = num_images self.num_images = num_images
self.image_delay = image_delay self.image_delay = image_delay
self.callsign = callsign self.callsign = callsign
self.tx_resolution = tx_resolution self.tx_resolution = tx_resolution
self.src_resolution = src_resolution self.horizontal_flip = horizontal_flip
self.horizontal_flip = horizontal_flip self.vertical_flip = vertical_flip
self.vertical_flip = vertical_flip self.greyworld = greyworld
self.greyworld = greyworld
self.lens_position = lens_position self.lens_position = lens_position
self.init_camera() self.init_camera()
def init_camera(self): def init_camera(self):
# Attempt to start picam. # Attempt to start picam.
self.cam = Picamera2() self.cam = Picamera2()
self.camera_properties = self.cam.camera_properties self.camera_properties = self.cam.camera_properties
self.src_resolution = self.camera_properties['PixelArraySize'] self.debug_ptr("Camera Resolution: " + str(self.camera_properties['PixelArraySize']))
# Configure camera. # Configure camera.
capture_config = picam2.create_still_configuration( capture_config = self.cam.create_still_configuration(
transform=Transform(hflip=self.horizontal_flip, vflip=self.vertical_flip) transform=Transform(hflip=self.horizontal_flip, vflip=self.vertical_flip)
) )
self.cam.configure(capture_config) self.cam.configure(capture_config)
self.cam.set_controls( self.cam.set_controls(
{'AwbMode': controls.AwbModeEnum.Daylight, {'AwbMode': controls.AwbModeEnum.Daylight,
'AeMeteringMode', controls.AeMeteringModeEnum.Matrix} 'AeMeteringMode': controls.AeMeteringModeEnum.Matrix,
'NoiseReductionMode': controls.draft.NoiseReductionModeEnum.Off}
) )
# Set Pi Camera 3 lens position # Set Pi Camera 3 lens position
if 'LensPostion' in self.cam.camera_controls: if 'LensPosition' in self.cam.camera_controls:
self.debug_ptr("Configured lens position to " + str(self.lens_position))
self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position}) self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position})
# Start the 'preview' mode, effectively opening the 'shutter'. # Start the 'preview' mode, effectively opening the 'shutter'.
# This lets the camera gain control algs start to settle. # This lets the camera gain control algs start to settle.
self.cam.start() self.cam.start()
def debug_message(self, message): def debug_message(self, message):
""" Write a debug message. """ Write a debug message.
If debug_ptr was set to a function during init, this will If debug_ptr was set to a function during init, this will
pass the message to that function, else it will just print it. pass the message to that function, else it will just print it.
This is used mainly to get updates on image capture into the Wenet downlink. This is used mainly to get updates on image capture into the Wenet downlink.
""" """
message = "PiCam Debug: " + message message = "PiCam Debug: " + message
if self.debug_ptr != None: if self.debug_ptr != None:
self.debug_ptr(message) self.debug_ptr(message)
else: else:
print(message) print(message)
def close(self): def close(self):
self.cam.stop() self.cam.stop()
self.cam.close() self.cam.close()
def capture(self, filename='picam.jpg', quality=90, bayer=False): def capture(self, filename='picam.jpg', quality=90, bayer=False):
""" Capture an image using the PiCam """ Capture an image using the PiCam
Keyword Arguments: Keyword Arguments:
filename: destination filename. filename: destination filename.
""" """
self.cam.options['quality'] = quality self.cam.options['quality'] = quality
# Attempt to capture a set of images. # Attempt to capture a set of images.
for i in range(self.num_images): for i in range(self.num_images):
self.debug_message("Capturing Image %d of %d" % (i+1,self.num_images)) self.debug_message("Capturing Image %d of %d" % (i+1,self.num_images))
# Wrap this in error handling in case we lose the camera for some reason. # Wrap this in error handling in case we lose the camera for some reason.
try: try:
self.cam.capture_file("%s_%d.jpg" % (self.temp_filename_prefix,i)) self.cam.capture_file("%s_%d.jpg" % (self.temp_filename_prefix,i))
print(f"Image captured: {time.time()}") print(f"Image captured: {time.time()}")
if self.image_delay > 0: if self.image_delay > 0:
sleep(self.image_delay) sleep(self.image_delay)
except Exception as e: # TODO: Narrow this down... except Exception as e: # TODO: Narrow this down...
self.debug_message("Capture Error: %s" % str(e)) self.debug_message("Capture Error: %s" % str(e))
# Immediately return false. Not much point continuing to try and capture images. # Immediately return false. Not much point continuing to try and capture images.
return False return False
# Otherwise, continue to pick the 'best' image based on filesize. # Otherwise, continue to pick the 'best' image based on filesize.
self.debug_message("Choosing Best Image.") self.debug_message("Choosing Best Image.")
pic_list = glob.glob("%s_*.jpg" % self.temp_filename_prefix) pic_list = glob.glob("%s_*.jpg" % self.temp_filename_prefix)
pic_sizes = [] pic_sizes = []
# Iterate through list of images and get the file sizes. # Iterate through list of images and get the file sizes.
for pic in pic_list: for pic in pic_list:
pic_sizes.append(os.path.getsize(pic)) pic_sizes.append(os.path.getsize(pic))
largest_pic = pic_list[pic_sizes.index(max(pic_sizes))] largest_pic = pic_list[pic_sizes.index(max(pic_sizes))]
# Copy best image to target filename. # Copy best image to target filename.
self.debug_message("Copying image to storage with filename %s" % filename) self.debug_message("Copying image to storage with filename %s" % filename)
os.system("cp %s %s" % (largest_pic, filename)) os.system("cp %s %s" % (largest_pic, filename))
# Clean up temporary images. # Clean up temporary images.
os.system("rm %s_*.jpg" % self.temp_filename_prefix) os.system("rm %s_*.jpg" % self.temp_filename_prefix)
return True return True
def ssdvify(self, filename="output.jpg", image_id=0, quality=6): def ssdvify(self, filename="output.jpg", image_id=0, quality=6):
""" Convert a supplied JPEG image to SSDV. """ Convert a supplied JPEG image to SSDV.
Returns the filename of the converted SSDV image. Returns the filename of the converted SSDV image.
Keyword Arguments: Keyword Arguments:
filename: Source JPEG filename. filename: Source JPEG filename.
Output SSDV image will be saved to to a temporary file (webcam_temp.jpg) which should be Output SSDV image will be saved to to a temporary file (webcam_temp.jpg) which should be
transmitted immediately. transmitted immediately.
image_id: Image ID number. Must be incremented between images. image_id: Image ID number. Must be incremented between images.
quality: JPEG quality level: 4 - 7, where 7 is 'lossless' (not recommended). quality: JPEG quality level: 4 - 7, where 7 is 'lossless' (not recommended).
6 provides good quality at decent file-sizes. 6 provides good quality at decent file-sizes.
""" """
# Wrap image ID field if it's >255. # Wrap image ID field if it's >255.
image_id = image_id % 256 image_id = image_id % 256
# Resize image to the desired resolution. # Resize image to the desired resolution.
self.debug_message("Resizing image.") self.debug_message("Resizing image.")
return_code = os.system("convert %s -resize %dx%d\! picam_temp.jpg" % (filename, self.tx_resolution[0], self.tx_resolution[1])) return_code = os.system("convert %s -resize %dx%d\! picam_temp.jpg" % (filename, self.tx_resolution[0], self.tx_resolution[1]))
if return_code != 0: if return_code != 0:
self.debug_message("Resize operation failed!") self.debug_message("Resize operation failed!")
return "FAIL" return "FAIL"
# Get non-extension part of filename. # Get non-extension part of filename.
file_basename = filename[:-4] file_basename = filename[:-4]
# Construct SSDV command-line. # Construct SSDV command-line.
ssdv_command = "ssdv -e -n -q %d -c %s -i %d picam_temp.jpg picam_temp.ssdv" % (quality, self.callsign, image_id) ssdv_command = "ssdv -e -n -q %d -c %s -i %d picam_temp.jpg picam_temp.ssdv" % (quality, self.callsign, image_id)
print(ssdv_command) print(ssdv_command)
# Update debug message. # Update debug message.
self.debug_message("Converting image to SSDV.") self.debug_message("Converting image to SSDV.")
# Run SSDV converter. # Run SSDV converter.
return_code = os.system(ssdv_command) return_code = os.system(ssdv_command)
if return_code != 0: if return_code != 0:
self.debug_message("ERROR: Could not perform SSDV Conversion.") self.debug_message("ERROR: Could not perform SSDV Conversion.")
return "FAIL" return "FAIL"
else: else:
return "picam_temp.ssdv" return "picam_temp.ssdv"
auto_capture_running = False auto_capture_running = False
def auto_capture(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): def auto_capture(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0):
""" Automatically capture and transmit images in a loop. """ Automatically capture and transmit images in a loop.
Images are automatically saved to a supplied directory, with file-names Images are automatically saved to a supplied directory, with file-names
defined using a timestamp. defined using a timestamp.
Use the run() and stop() functions to start/stop this running. Use the run() and stop() functions to start/stop this running.
Keyword Arguments: Keyword Arguments:
destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here.
tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue.
post_process_ptr: An optional function which is called after the image is captured. This function post_process_ptr: An optional function which is called after the image is captured. This function
will be passed the path/filename of the captured image. will be passed the path/filename of the captured image.
This can be used to add overlays, etc to the image before it is SSDVified and transmitted. This can be used to add overlays, etc to the image before it is SSDVified and transmitted.
NOTE: This function need to modify the image in-place. NOTE: This function need to modify the image in-place.
delay: An optional delay in seconds between capturing images. Defaults to 0. delay: An optional delay in seconds between capturing images. Defaults to 0.
This delay is added on top of any delays caused while waiting for the transmit queue to empty. This delay is added on top of any delays caused while waiting for the transmit queue to empty.
start_id: Starting image ID. Defaults to 0. start_id: Starting image ID. Defaults to 0.
""" """
image_id = start_id image_id = start_id
while self.auto_capture_running: while self.auto_capture_running:
# Sleep before capturing next image. # Sleep before capturing next image.
sleep(delay) sleep(delay)
# Grab current timestamp. # Grab current timestamp.
capture_time = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%SZ") capture_time = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%SZ")
capture_filename = destination_directory + "/%s_picam.jpg" % capture_time capture_filename = destination_directory + "/%s_picam.jpg" % capture_time
# Attempt to capture. # Attempt to capture.
capture_successful = self.capture(capture_filename) capture_successful = self.capture(capture_filename)
# If capture was unsuccessful, try again in a little bit # If capture was unsuccessful, try again in a little bit
if not capture_successful: if not capture_successful:
sleep(5) sleep(5)
self.debug_message("Capture failed! Attempting to reset camera...") self.debug_message("Capture failed! Attempting to reset camera...")
try: try:
self.cam.stop() self.cam.stop()
self.cam.close() self.cam.close()
except: except:
self.debug_message("Closing camera object failed.") self.debug_message("Closing camera object failed.")
try: try:
self.init_camera() self.init_camera()
except: except:
self.debug_message("Error initializing camera!") self.debug_message("Error initializing camera!")
sleep(1) sleep(1)
continue continue
# Otherwise, proceed to post-processing step. # Otherwise, proceed to post-processing step.
if post_process_ptr != None: if post_process_ptr != None:
try: try:
self.debug_message("Running Image Post-Processing") self.debug_message("Running Image Post-Processing")
post_process_ptr(capture_filename) post_process_ptr(capture_filename)
except: except:
error_str = traceback.format_exc() error_str = traceback.format_exc()
self.debug_message("Image Post-Processing Failed: %s" % error_str) self.debug_message("Image Post-Processing Failed: %s" % error_str)
# SSDV'ify the image. # SSDV'ify the image.
ssdv_filename = self.ssdvify(capture_filename, image_id=image_id) ssdv_filename = self.ssdvify(capture_filename, image_id=image_id)
# Check the SSDV Conversion has completed properly. If not, continue # Check the SSDV Conversion has completed properly. If not, continue
if ssdv_filename == "FAIL": if ssdv_filename == "FAIL":
sleep(1) sleep(1)
continue continue
# Otherwise, read in the file and push into the TX buffer. # Otherwise, read in the file and push into the TX buffer.
file_size = os.path.getsize(ssdv_filename) file_size = os.path.getsize(ssdv_filename)
# Wait until the transmit queue is empty before pushing in packets. # Wait until the transmit queue is empty before pushing in packets.
self.debug_message("Waiting for SSDV TX queue to empty.") self.debug_message("Waiting for SSDV TX queue to empty.")
while tx.image_queue_empty() == False: while tx.image_queue_empty() == False:
sleep(0.05) # Sleep for a short amount of time. sleep(0.05) # Sleep for a short amount of time.
if self.auto_capture_running == False: if self.auto_capture_running == False:
return return
# Inform ground station we are about to send an image. # Inform ground station we are about to send an image.
self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256)) self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256))
# Push SSDV file into transmit queue. # Push SSDV file into transmit queue.
tx.queue_image_file(ssdv_filename) tx.queue_image_file(ssdv_filename)
# Increment image ID. # Increment image ID.
image_id = (image_id + 1) % 256 image_id = (image_id + 1) % 256
# Loop! # Loop!
self.debug_message("Uh oh, we broke out of the main thread. This is not good!") self.debug_message("Uh oh, we broke out of the main thread. This is not good!")
def run(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): def run(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0):
""" Start auto-capturing images in a thread. """ Start auto-capturing images in a thread.
Refer auto_capture function above. Refer auto_capture function above.
Keyword Arguments: Keyword Arguments:
destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here.
tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue.
post_process_ptr: An optional function which is called after the image is captured. This function post_process_ptr: An optional function which is called after the image is captured. This function
will be passed the path/filename of the captured image. will be passed the path/filename of the captured image.
This can be used to add overlays, etc to the image before it is SSDVified and transmitted. This can be used to add overlays, etc to the image before it is SSDVified and transmitted.
NOTE: This function need to modify the image in-place. NOTE: This function need to modify the image in-place.
delay: An optional delay in seconds between capturing images. Defaults to 0. delay: An optional delay in seconds between capturing images. Defaults to 0.
This delay is added on top of any delays caused while waiting for the transmit queue to empty. This delay is added on top of any delays caused while waiting for the transmit queue to empty.
start_id: Starting image ID. Defaults to 0. start_id: Starting image ID. Defaults to 0.
""" """
self.auto_capture_running = True self.auto_capture_running = True
capture_thread = Thread(target=self.auto_capture, kwargs=dict( capture_thread = Thread(target=self.auto_capture, kwargs=dict(
destination_directory=destination_directory, destination_directory=destination_directory,
tx = tx, tx = tx,
post_process_ptr=post_process_ptr, post_process_ptr=post_process_ptr,
delay=delay, delay=delay,
start_id=start_id)) start_id=start_id))
capture_thread.start() capture_thread.start()
def stop(self): def stop(self):
self.auto_capture_running = False self.auto_capture_running = False
# TODO: Non-blocking image capture. # TODO: Non-blocking image capture.
capture_finished = False capture_finished = False
def trigger_capture(): def trigger_capture():
pass pass
# Basic transmission test script. # Basic transmission test script.
if __name__ == "__main__": if __name__ == "__main__":
import PacketTX import PacketTX
import argparse import argparse
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("callsign", default="N0CALL", help="Payload Callsign") parser.add_argument("callsign", default="N0CALL", help="Payload Callsign")
parser.add_argument("--txport", default="/dev/ttyAMA0", type=str, help="Transmitter serial port. Defaults to /dev/ttyAMA0") parser.add_argument("--txport", default="/dev/ttyAMA0", type=str, help="Transmitter serial port. Defaults to /dev/ttyAMA0")
parser.add_argument("--baudrate", default=115200, type=int, help="Transmitter baud rate. Defaults to 115200 baud.") parser.add_argument("--baudrate", default=115200, type=int, help="Transmitter baud rate. Defaults to 115200 baud.")
args = parser.parse_args() args = parser.parse_args()
callsign = args.callsign callsign = args.callsign
print("Using Callsign: %s" % callsign) print("Using Callsign: %s" % callsign)
def post_process(filename): def post_process(filename):
print("Doing nothing with %s" % filename) print("Doing nothing with %s" % filename)
tx = PacketTX.PacketTX(serial_port=args.txport, serial_baud=args.baudrate, callsign=callsign) tx = PacketTX.PacketTX(serial_port=args.txport, serial_baud=args.baudrate, callsign=callsign)
tx.start_tx() tx.start_tx()
picam = WenetPiCam(src_resolution=(1920,1088), picam = WenetPiCam(src_resolution=(1920,1088),
tx_resolution=(1920,1088), tx_resolution=(1920,1088),
callsign=callsign, callsign=callsign,
num_images=5, num_images=5,
debug_ptr=tx.transmit_text_message, debug_ptr=tx.transmit_text_message,
vertical_flip=False, vertical_flip=False,
horizontal_flip=False) horizontal_flip=False)
picam.run(destination_directory="./tx_images/", picam.run(destination_directory="./tx_images/",
tx = tx, tx = tx,
post_process_ptr = post_process post_process_ptr = post_process
) )
try: try:
while True: while True:
tx.transmit_text_message("Waiting...") tx.transmit_text_message("Waiting...")
sleep(5) sleep(5)
except KeyboardInterrupt: except KeyboardInterrupt:
print("Closing") print("Closing")
picam.stop() picam.stop()
tx.close() tx.close()

Wyświetl plik

@ -135,7 +135,7 @@ def post_process_image(filename):
gps_string = "" gps_string = ""
# Build up our imagemagick 'convert' command line # Build up our imagemagick 'convert' command line
overlay_str = "convert %s -gamma 0.8 -font Helvetica -pointsize 30 -gravity North " % filename overlay_str = "convert %s -gamma 0.8 -font Helvetica -pointsize 40 -gravity North " % filename
overlay_str += "-strokewidth 2 -stroke '#000C' -annotate +0+5 \"%s\" " % gps_string overlay_str += "-strokewidth 2 -stroke '#000C' -annotate +0+5 \"%s\" " % gps_string
overlay_str += "-stroke none -fill white -annotate +0+5 \"%s\" " % gps_string overlay_str += "-stroke none -fill white -annotate +0+5 \"%s\" " % gps_string
# Add on logo overlay argument if we have been given one. # Add on logo overlay argument if we have been given one.