From a79777efd0e9721a89d371ae3f8090a97433d95c Mon Sep 17 00:00:00 2001 From: Mark Jessop Date: Sat, 18 Feb 2023 16:15:54 +1030 Subject: [PATCH] update for payload --- tx/WenetPiCamera2.py | 539 +++++++++++++++++++++-------------------- tx/tx_picamera2_gps.py | 2 +- 2 files changed, 271 insertions(+), 270 deletions(-) diff --git a/tx/WenetPiCamera2.py b/tx/WenetPiCamera2.py index e70d113..27a8670 100644 --- a/tx/WenetPiCamera2.py +++ b/tx/WenetPiCamera2.py @@ -24,369 +24,370 @@ from threading import Thread class WenetPiCamera2(object): - """ PiCamera2 Wrapper Class + """ PiCamera2 Wrapper Class - Raspberry Pi Camera 2 Image source for Wenet. + Raspberry Pi Camera 2 Image source for Wenet. Uses the new libcamera-based PiCamera2 library. - Captures multiple images, picks the best, then - transmits it via a PacketTX object. + Captures multiple images, picks the best, then + transmits it via a PacketTX object. - """ + """ - def __init__(self, - callsign = "N0CALL", - tx_resolution=(1936,1088), - num_images=1, - image_delay=0.5, - vertical_flip = False, - horizontal_flip = False, - greyworld = False, + def __init__(self, + callsign = "N0CALL", + tx_resolution=(1936,1088), + num_images=1, + image_delay=0.0, + vertical_flip = False, + horizontal_flip = False, + greyworld = False, lens_position = 0.0, - temp_filename_prefix = 'picam_temp', - debug_ptr = None - ): + temp_filename_prefix = 'picam_temp', + debug_ptr = None + ): - """ Instantiate a WenetPiCam Object - used to capture images from a PiCam using 'optimal' capture techniques. + """ Instantiate a WenetPiCam Object + used to capture images from a PiCam using 'optimal' capture techniques. - Keyword Arguments: - callsign: The callsign to be used when converting images to SSDV. Must be <=6 characters in length. - tx_resolution: Tuple (x,y) containing desired image *transmit* resolution. - NOTE: both x and y need to be multiples of 16 to be used with SSDV. - NOTE: This will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right. + Keyword Arguments: + callsign: The callsign to be used when converting images to SSDV. Must be <=6 characters in length. + tx_resolution: Tuple (x,y) containing desired image *transmit* resolution. + NOTE: both x and y need to be multiples of 16 to be used with SSDV. + NOTE: This will resize with NO REGARD FOR ASPECT RATIO - it's up to you to get that right. - num_images: Number of images to capture in sequence when the 'capture' function is called. - The 'best' (largest filesize) image is selected and saved. - image_delay: Delay time (seconds) between each captured image. + num_images: Number of images to capture in sequence when the 'capture' function is called. + The 'best' (largest filesize) image is selected and saved. + image_delay: Delay time (seconds) between each captured image. - vertical_flip: Flip captured images vertically. - horizontal_flip: Flip captured images horizontally. - Used to correct for picam orientation. + vertical_flip: Flip captured images vertically. + horizontal_flip: Flip captured images horizontally. + Used to correct for picam orientation. greyworld: Use Greyworld AWB setting, for IR-filtered images. lens_position: Lens Position setting (float), 0.0 = Infinity, 10 = very close. Only usable on Pi Camera v3 modules. - temp_filename_prefix: prefix used for temporary files. + temp_filename_prefix: prefix used for temporary files. - debug_ptr: 'pointer' to a function which can handle debug messages. - This function needs to be able to accept a string. - Used to get status messages into the downlink. + debug_ptr: 'pointer' to a function which can handle debug messages. + This function needs to be able to accept a string. + Used to get status messages into the downlink. - """ + """ - self.debug_ptr = debug_ptr - self.temp_filename_prefix = temp_filename_prefix - self.num_images = num_images - self.image_delay = image_delay - self.callsign = callsign - self.tx_resolution = tx_resolution - self.src_resolution = src_resolution - self.horizontal_flip = horizontal_flip - self.vertical_flip = vertical_flip - self.greyworld = greyworld + self.debug_ptr = debug_ptr + self.temp_filename_prefix = temp_filename_prefix + self.num_images = num_images + self.image_delay = image_delay + self.callsign = callsign + self.tx_resolution = tx_resolution + self.horizontal_flip = horizontal_flip + self.vertical_flip = vertical_flip + self.greyworld = greyworld self.lens_position = lens_position - self.init_camera() + self.init_camera() - def init_camera(self): - # Attempt to start picam. - self.cam = Picamera2() + def init_camera(self): + # Attempt to start picam. + self.cam = Picamera2() self.camera_properties = self.cam.camera_properties - self.src_resolution = self.camera_properties['PixelArraySize'] + self.debug_ptr("Camera Resolution: " + str(self.camera_properties['PixelArraySize'])) - # Configure camera. - capture_config = picam2.create_still_configuration( + # Configure camera. + capture_config = self.cam.create_still_configuration( transform=Transform(hflip=self.horizontal_flip, vflip=self.vertical_flip) ) self.cam.configure(capture_config) self.cam.set_controls( {'AwbMode': controls.AwbModeEnum.Daylight, - 'AeMeteringMode', controls.AeMeteringModeEnum.Matrix} + 'AeMeteringMode': controls.AeMeteringModeEnum.Matrix, + 'NoiseReductionMode': controls.draft.NoiseReductionModeEnum.Off} ) # Set Pi Camera 3 lens position - if 'LensPostion' in self.cam.camera_controls: + if 'LensPosition' in self.cam.camera_controls: + self.debug_ptr("Configured lens position to " + str(self.lens_position)) self.cam.set_controls({"AfMode": controls.AfModeEnum.Manual, "LensPosition": self.lens_position}) - # Start the 'preview' mode, effectively opening the 'shutter'. - # This lets the camera gain control algs start to settle. - self.cam.start() + # Start the 'preview' mode, effectively opening the 'shutter'. + # This lets the camera gain control algs start to settle. + self.cam.start() - def debug_message(self, message): - """ Write a debug message. - If debug_ptr was set to a function during init, this will - pass the message to that function, else it will just print it. - This is used mainly to get updates on image capture into the Wenet downlink. + def debug_message(self, message): + """ Write a debug message. + If debug_ptr was set to a function during init, this will + pass the message to that function, else it will just print it. + This is used mainly to get updates on image capture into the Wenet downlink. - """ - message = "PiCam Debug: " + message - if self.debug_ptr != None: - self.debug_ptr(message) - else: - print(message) + """ + message = "PiCam Debug: " + message + if self.debug_ptr != None: + self.debug_ptr(message) + else: + print(message) - def close(self): - self.cam.stop() - self.cam.close() + def close(self): + self.cam.stop() + self.cam.close() - def capture(self, filename='picam.jpg', quality=90, bayer=False): - """ Capture an image using the PiCam - - Keyword Arguments: - filename: destination filename. - """ + def capture(self, filename='picam.jpg', quality=90, bayer=False): + """ Capture an image using the PiCam + + Keyword Arguments: + filename: destination filename. + """ self.cam.options['quality'] = quality - # Attempt to capture a set of images. - for i in range(self.num_images): - self.debug_message("Capturing Image %d of %d" % (i+1,self.num_images)) - # Wrap this in error handling in case we lose the camera for some reason. - try: + # Attempt to capture a set of images. + for i in range(self.num_images): + self.debug_message("Capturing Image %d of %d" % (i+1,self.num_images)) + # Wrap this in error handling in case we lose the camera for some reason. + try: self.cam.capture_file("%s_%d.jpg" % (self.temp_filename_prefix,i)) - print(f"Image captured: {time.time()}") - if self.image_delay > 0: - sleep(self.image_delay) - except Exception as e: # TODO: Narrow this down... - self.debug_message("Capture Error: %s" % str(e)) - # Immediately return false. Not much point continuing to try and capture images. - return False + print(f"Image captured: {time.time()}") + if self.image_delay > 0: + sleep(self.image_delay) + except Exception as e: # TODO: Narrow this down... + self.debug_message("Capture Error: %s" % str(e)) + # Immediately return false. Not much point continuing to try and capture images. + return False - - # Otherwise, continue to pick the 'best' image based on filesize. - self.debug_message("Choosing Best Image.") - pic_list = glob.glob("%s_*.jpg" % self.temp_filename_prefix) - pic_sizes = [] - # Iterate through list of images and get the file sizes. - for pic in pic_list: - pic_sizes.append(os.path.getsize(pic)) - largest_pic = pic_list[pic_sizes.index(max(pic_sizes))] + + # Otherwise, continue to pick the 'best' image based on filesize. + self.debug_message("Choosing Best Image.") + pic_list = glob.glob("%s_*.jpg" % self.temp_filename_prefix) + pic_sizes = [] + # Iterate through list of images and get the file sizes. + for pic in pic_list: + pic_sizes.append(os.path.getsize(pic)) + largest_pic = pic_list[pic_sizes.index(max(pic_sizes))] - # Copy best image to target filename. - self.debug_message("Copying image to storage with filename %s" % filename) - os.system("cp %s %s" % (largest_pic, filename)) - # Clean up temporary images. - os.system("rm %s_*.jpg" % self.temp_filename_prefix) + # Copy best image to target filename. + self.debug_message("Copying image to storage with filename %s" % filename) + os.system("cp %s %s" % (largest_pic, filename)) + # Clean up temporary images. + os.system("rm %s_*.jpg" % self.temp_filename_prefix) - return True + return True - def ssdvify(self, filename="output.jpg", image_id=0, quality=6): - """ Convert a supplied JPEG image to SSDV. - Returns the filename of the converted SSDV image. + def ssdvify(self, filename="output.jpg", image_id=0, quality=6): + """ Convert a supplied JPEG image to SSDV. + Returns the filename of the converted SSDV image. - Keyword Arguments: - filename: Source JPEG filename. - Output SSDV image will be saved to to a temporary file (webcam_temp.jpg) which should be - transmitted immediately. - image_id: Image ID number. Must be incremented between images. - quality: JPEG quality level: 4 - 7, where 7 is 'lossless' (not recommended). - 6 provides good quality at decent file-sizes. + Keyword Arguments: + filename: Source JPEG filename. + Output SSDV image will be saved to to a temporary file (webcam_temp.jpg) which should be + transmitted immediately. + image_id: Image ID number. Must be incremented between images. + quality: JPEG quality level: 4 - 7, where 7 is 'lossless' (not recommended). + 6 provides good quality at decent file-sizes. - """ + """ - # Wrap image ID field if it's >255. - image_id = image_id % 256 + # Wrap image ID field if it's >255. + image_id = image_id % 256 - # Resize image to the desired resolution. - self.debug_message("Resizing image.") - return_code = os.system("convert %s -resize %dx%d\! picam_temp.jpg" % (filename, self.tx_resolution[0], self.tx_resolution[1])) - if return_code != 0: - self.debug_message("Resize operation failed!") - return "FAIL" + # Resize image to the desired resolution. + self.debug_message("Resizing image.") + return_code = os.system("convert %s -resize %dx%d\! picam_temp.jpg" % (filename, self.tx_resolution[0], self.tx_resolution[1])) + if return_code != 0: + self.debug_message("Resize operation failed!") + return "FAIL" - # Get non-extension part of filename. - file_basename = filename[:-4] + # Get non-extension part of filename. + file_basename = filename[:-4] - # Construct SSDV command-line. - ssdv_command = "ssdv -e -n -q %d -c %s -i %d picam_temp.jpg picam_temp.ssdv" % (quality, self.callsign, image_id) - print(ssdv_command) - # Update debug message. - self.debug_message("Converting image to SSDV.") + # Construct SSDV command-line. + ssdv_command = "ssdv -e -n -q %d -c %s -i %d picam_temp.jpg picam_temp.ssdv" % (quality, self.callsign, image_id) + print(ssdv_command) + # Update debug message. + self.debug_message("Converting image to SSDV.") - # Run SSDV converter. - return_code = os.system(ssdv_command) + # Run SSDV converter. + return_code = os.system(ssdv_command) - if return_code != 0: - self.debug_message("ERROR: Could not perform SSDV Conversion.") - return "FAIL" - else: - return "picam_temp.ssdv" + if return_code != 0: + self.debug_message("ERROR: Could not perform SSDV Conversion.") + return "FAIL" + else: + return "picam_temp.ssdv" - auto_capture_running = False - def auto_capture(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): - """ Automatically capture and transmit images in a loop. - Images are automatically saved to a supplied directory, with file-names - defined using a timestamp. + auto_capture_running = False + def auto_capture(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): + """ Automatically capture and transmit images in a loop. + Images are automatically saved to a supplied directory, with file-names + defined using a timestamp. - Use the run() and stop() functions to start/stop this running. - - Keyword Arguments: - destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. - tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. - post_process_ptr: An optional function which is called after the image is captured. This function - will be passed the path/filename of the captured image. - This can be used to add overlays, etc to the image before it is SSDVified and transmitted. - NOTE: This function need to modify the image in-place. - delay: An optional delay in seconds between capturing images. Defaults to 0. - This delay is added on top of any delays caused while waiting for the transmit queue to empty. - start_id: Starting image ID. Defaults to 0. - """ + Use the run() and stop() functions to start/stop this running. + + Keyword Arguments: + destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. + tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. + post_process_ptr: An optional function which is called after the image is captured. This function + will be passed the path/filename of the captured image. + This can be used to add overlays, etc to the image before it is SSDVified and transmitted. + NOTE: This function need to modify the image in-place. + delay: An optional delay in seconds between capturing images. Defaults to 0. + This delay is added on top of any delays caused while waiting for the transmit queue to empty. + start_id: Starting image ID. Defaults to 0. + """ - image_id = start_id + image_id = start_id - while self.auto_capture_running: - # Sleep before capturing next image. - sleep(delay) + while self.auto_capture_running: + # Sleep before capturing next image. + sleep(delay) - # Grab current timestamp. - capture_time = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%SZ") - capture_filename = destination_directory + "/%s_picam.jpg" % capture_time + # Grab current timestamp. + capture_time = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%SZ") + capture_filename = destination_directory + "/%s_picam.jpg" % capture_time - # Attempt to capture. - capture_successful = self.capture(capture_filename) + # Attempt to capture. + capture_successful = self.capture(capture_filename) - # If capture was unsuccessful, try again in a little bit - if not capture_successful: - sleep(5) + # If capture was unsuccessful, try again in a little bit + if not capture_successful: + sleep(5) - self.debug_message("Capture failed! Attempting to reset camera...") + self.debug_message("Capture failed! Attempting to reset camera...") - try: - self.cam.stop() - self.cam.close() - except: - self.debug_message("Closing camera object failed.") + try: + self.cam.stop() + self.cam.close() + except: + self.debug_message("Closing camera object failed.") - try: - self.init_camera() - except: - self.debug_message("Error initializing camera!") - sleep(1) + try: + self.init_camera() + except: + self.debug_message("Error initializing camera!") + sleep(1) - continue + continue - # Otherwise, proceed to post-processing step. - if post_process_ptr != None: - try: - self.debug_message("Running Image Post-Processing") - post_process_ptr(capture_filename) - except: - error_str = traceback.format_exc() - self.debug_message("Image Post-Processing Failed: %s" % error_str) + # Otherwise, proceed to post-processing step. + if post_process_ptr != None: + try: + self.debug_message("Running Image Post-Processing") + post_process_ptr(capture_filename) + except: + error_str = traceback.format_exc() + self.debug_message("Image Post-Processing Failed: %s" % error_str) - # SSDV'ify the image. - ssdv_filename = self.ssdvify(capture_filename, image_id=image_id) + # SSDV'ify the image. + ssdv_filename = self.ssdvify(capture_filename, image_id=image_id) - # Check the SSDV Conversion has completed properly. If not, continue - if ssdv_filename == "FAIL": - sleep(1) - continue + # Check the SSDV Conversion has completed properly. If not, continue + if ssdv_filename == "FAIL": + sleep(1) + continue - # Otherwise, read in the file and push into the TX buffer. - file_size = os.path.getsize(ssdv_filename) + # Otherwise, read in the file and push into the TX buffer. + file_size = os.path.getsize(ssdv_filename) - # Wait until the transmit queue is empty before pushing in packets. - self.debug_message("Waiting for SSDV TX queue to empty.") - while tx.image_queue_empty() == False: - sleep(0.05) # Sleep for a short amount of time. - if self.auto_capture_running == False: - return + # Wait until the transmit queue is empty before pushing in packets. + self.debug_message("Waiting for SSDV TX queue to empty.") + while tx.image_queue_empty() == False: + sleep(0.05) # Sleep for a short amount of time. + if self.auto_capture_running == False: + return - # Inform ground station we are about to send an image. - self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256)) + # Inform ground station we are about to send an image. + self.debug_message("Transmitting %d PiCam SSDV Packets." % (file_size//256)) - # Push SSDV file into transmit queue. - tx.queue_image_file(ssdv_filename) + # Push SSDV file into transmit queue. + tx.queue_image_file(ssdv_filename) - # Increment image ID. - image_id = (image_id + 1) % 256 - # Loop! + # Increment image ID. + image_id = (image_id + 1) % 256 + # Loop! - self.debug_message("Uh oh, we broke out of the main thread. This is not good!") + self.debug_message("Uh oh, we broke out of the main thread. This is not good!") - def run(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): - """ Start auto-capturing images in a thread. + def run(self, destination_directory, tx, post_process_ptr=None, delay = 0, start_id = 0): + """ Start auto-capturing images in a thread. - Refer auto_capture function above. - - Keyword Arguments: - destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. - tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. - post_process_ptr: An optional function which is called after the image is captured. This function - will be passed the path/filename of the captured image. - This can be used to add overlays, etc to the image before it is SSDVified and transmitted. - NOTE: This function need to modify the image in-place. - delay: An optional delay in seconds between capturing images. Defaults to 0. - This delay is added on top of any delays caused while waiting for the transmit queue to empty. - start_id: Starting image ID. Defaults to 0. - """ + Refer auto_capture function above. + + Keyword Arguments: + destination_directory: Folder to save images to. Both raw JPEG and SSDV images are saved here. + tx: A reference to a PacketTX Object, which is used to transmit packets, and interrogate the TX queue. + post_process_ptr: An optional function which is called after the image is captured. This function + will be passed the path/filename of the captured image. + This can be used to add overlays, etc to the image before it is SSDVified and transmitted. + NOTE: This function need to modify the image in-place. + delay: An optional delay in seconds between capturing images. Defaults to 0. + This delay is added on top of any delays caused while waiting for the transmit queue to empty. + start_id: Starting image ID. Defaults to 0. + """ - self.auto_capture_running = True + self.auto_capture_running = True - capture_thread = Thread(target=self.auto_capture, kwargs=dict( - destination_directory=destination_directory, - tx = tx, - post_process_ptr=post_process_ptr, - delay=delay, - start_id=start_id)) + capture_thread = Thread(target=self.auto_capture, kwargs=dict( + destination_directory=destination_directory, + tx = tx, + post_process_ptr=post_process_ptr, + delay=delay, + start_id=start_id)) - capture_thread.start() + capture_thread.start() - def stop(self): - self.auto_capture_running = False + def stop(self): + self.auto_capture_running = False - # TODO: Non-blocking image capture. - capture_finished = False - def trigger_capture(): - pass + # TODO: Non-blocking image capture. + capture_finished = False + def trigger_capture(): + pass # Basic transmission test script. if __name__ == "__main__": - import PacketTX - import argparse + import PacketTX + import argparse - parser = argparse.ArgumentParser() - parser.add_argument("callsign", default="N0CALL", help="Payload Callsign") - parser.add_argument("--txport", default="/dev/ttyAMA0", type=str, help="Transmitter serial port. Defaults to /dev/ttyAMA0") - parser.add_argument("--baudrate", default=115200, type=int, help="Transmitter baud rate. Defaults to 115200 baud.") - args = parser.parse_args() + parser = argparse.ArgumentParser() + parser.add_argument("callsign", default="N0CALL", help="Payload Callsign") + parser.add_argument("--txport", default="/dev/ttyAMA0", type=str, help="Transmitter serial port. Defaults to /dev/ttyAMA0") + parser.add_argument("--baudrate", default=115200, type=int, help="Transmitter baud rate. Defaults to 115200 baud.") + args = parser.parse_args() - callsign = args.callsign - print("Using Callsign: %s" % callsign) + callsign = args.callsign + print("Using Callsign: %s" % callsign) - def post_process(filename): - print("Doing nothing with %s" % filename) + def post_process(filename): + print("Doing nothing with %s" % filename) - tx = PacketTX.PacketTX(serial_port=args.txport, serial_baud=args.baudrate, callsign=callsign) - tx.start_tx() + tx = PacketTX.PacketTX(serial_port=args.txport, serial_baud=args.baudrate, callsign=callsign) + tx.start_tx() - picam = WenetPiCam(src_resolution=(1920,1088), - tx_resolution=(1920,1088), - callsign=callsign, - num_images=5, - debug_ptr=tx.transmit_text_message, - vertical_flip=False, - horizontal_flip=False) + picam = WenetPiCam(src_resolution=(1920,1088), + tx_resolution=(1920,1088), + callsign=callsign, + num_images=5, + debug_ptr=tx.transmit_text_message, + vertical_flip=False, + horizontal_flip=False) - picam.run(destination_directory="./tx_images/", - tx = tx, - post_process_ptr = post_process - ) - try: - while True: - tx.transmit_text_message("Waiting...") - sleep(5) - except KeyboardInterrupt: - print("Closing") - picam.stop() - tx.close() + picam.run(destination_directory="./tx_images/", + tx = tx, + post_process_ptr = post_process + ) + try: + while True: + tx.transmit_text_message("Waiting...") + sleep(5) + except KeyboardInterrupt: + print("Closing") + picam.stop() + tx.close() diff --git a/tx/tx_picamera2_gps.py b/tx/tx_picamera2_gps.py index 1870f3e..bb37ffb 100644 --- a/tx/tx_picamera2_gps.py +++ b/tx/tx_picamera2_gps.py @@ -135,7 +135,7 @@ def post_process_image(filename): gps_string = "" # Build up our imagemagick 'convert' command line - overlay_str = "convert %s -gamma 0.8 -font Helvetica -pointsize 30 -gravity North " % filename + overlay_str = "convert %s -gamma 0.8 -font Helvetica -pointsize 40 -gravity North " % filename overlay_str += "-strokewidth 2 -stroke '#000C' -annotate +0+5 \"%s\" " % gps_string overlay_str += "-stroke none -fill white -annotate +0+5 \"%s\" " % gps_string # Add on logo overlay argument if we have been given one.