Added test images and some initial code.
|
@ -0,0 +1,101 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Serial Packet Transmitter Class
|
||||
#
|
||||
# Frames packets (preamble, unique word, checksum)
|
||||
# and transmits them out of a serial port.
|
||||
#
|
||||
# Mark Jessop <vk5qi@rfhead.net>
|
||||
#
|
||||
|
||||
|
||||
import serial,Queue,sys,crcmod,struct
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
import numpy as np
|
||||
|
||||
class BinaryDebug(object):
|
||||
def __init__(self):
|
||||
self.f = open("debug.bin",'wb')
|
||||
|
||||
def write(self,data):
|
||||
# TODO: Add in RS232 framing
|
||||
raw_data = np.array([],dtype=np.uint8)
|
||||
for d in data:
|
||||
d_array = np.unpackbits(np.fromstring(d,dtype=np.uint8))
|
||||
raw_data = np.concatenate((raw_data,[0],d_array[::-1],[1]))
|
||||
|
||||
self.f.write(raw_data.astype(np.uint8).tostring())
|
||||
|
||||
def close(self):
|
||||
self.f.close()
|
||||
|
||||
|
||||
class PacketTX(object):
|
||||
txqueue = Queue.Queue(4096) # Up to 1MB of 256 byte packets
|
||||
transmit_active = False
|
||||
debug = False
|
||||
|
||||
unique_word = "\xab\xcd"
|
||||
preamble = "\x55"*16
|
||||
idle_sequence = "\x55"*256
|
||||
|
||||
|
||||
def __init__(self,serial_port="/dev/ttyAMA0", serial_baud=115200, payload_length=256, debug = False):
|
||||
|
||||
if debug == True:
|
||||
self.s = BinaryDebug()
|
||||
self.debug = True
|
||||
else:
|
||||
self.s = serial.Serial(serial_port,serial_baud)
|
||||
self.payload_length = payload_length
|
||||
|
||||
self.crc16 = crcmod.predefined.mkCrcFun('crc-ccitt-false')
|
||||
|
||||
def start_tx(self):
|
||||
self.transmit_active = True
|
||||
txthread = Thread(target=self.tx_thread)
|
||||
txthread.start()
|
||||
|
||||
def frame_packet(self,packet):
|
||||
# Ensure payload size is equal to the desired payload length
|
||||
if len(packet) > self.payload_length:
|
||||
packet = packet[:self.payload_length]
|
||||
|
||||
if len(packet) < self.payload_length:
|
||||
packet = packet + "\x00"*(self.payload_length - len(packet))
|
||||
|
||||
crc = struct.pack("<H",self.crc16(packet))
|
||||
return self.preamble + self.unique_word + packet + crc
|
||||
|
||||
|
||||
def tx_thread(self):
|
||||
while self.transmit_active:
|
||||
if self.txqueue.qsize()>0:
|
||||
packet = self.txqueue.get_nowait()
|
||||
self.s.write(packet)
|
||||
else:
|
||||
if not self.debug:
|
||||
self.s.write(self.idle_sequence)
|
||||
else:
|
||||
sleep(0.05)
|
||||
|
||||
print("Closing Thread")
|
||||
self.s.close()
|
||||
|
||||
def close(self):
|
||||
self.transmit_active = False
|
||||
|
||||
def wait(self):
|
||||
while not self.txqueue.empty():
|
||||
sleep(0.01)
|
||||
|
||||
def tx_packet(self,packet,blocking = False):
|
||||
self.txqueue.put(self.frame_packet(packet))
|
||||
|
||||
if blocking:
|
||||
while not self.txqueue.empty():
|
||||
sleep(0.01)
|
||||
|
||||
|
||||
|
Po Szerokość: | Wysokość: | Rozmiar: 426 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 377 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 472 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 563 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 382 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 411 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 372 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 458 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 346 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 510 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 528 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 502 KiB |
Po Szerokość: | Wysokość: | Rozmiar: 325 KiB |
|
@ -0,0 +1,34 @@
|
|||
#!/usr/bin/env python
|
||||
#
|
||||
# Quick script to resize and SSDV-compress a selection of test images.
|
||||
# Original resolution: 1488x1120
|
||||
# 800x600
|
||||
# 640x480
|
||||
# 320x240
|
||||
#
|
||||
# Requires:
|
||||
# Imagemagick
|
||||
# ssdv (https://github.com/fsphil/ssdv)
|
||||
|
||||
import os, glob
|
||||
|
||||
# images should be named 1.jpg, 2.jpg, etc.
|
||||
image_numbers = xrange(1,14)
|
||||
new_sizes = ["800x608","640x480","320x240"]
|
||||
callsign = "VK5QI"
|
||||
|
||||
# Resize images.
|
||||
for x in image_numbers:
|
||||
# make a copy of the raw image.
|
||||
os.system("cp %d.jpg %d_raw.jpg" % (x,x))
|
||||
# Produce resized images.
|
||||
|
||||
for size in new_sizes:
|
||||
os.system("convert %d.jpg -resize %s\! %d_%s.jpg" % (x,size,x,size))
|
||||
|
||||
# Compress with SSDV.
|
||||
new_sizes.append("raw")
|
||||
|
||||
for x in image_numbers:
|
||||
for size in new_sizes:
|
||||
os.system("ssdv -e -c %s -i %d %d_%s.jpg %d_%s.ssdv" % (callsign,x,x,size,x,size))
|
|
@ -0,0 +1,42 @@
|
|||
#
|
||||
# Test Transmitter Script
|
||||
# Transmit a set of images from the test_images directory
|
||||
#
|
||||
# Mark Jessop <vk5qi@rfhead.net>
|
||||
#
|
||||
|
||||
import PacketTX, sys, os
|
||||
|
||||
# Set to whatever resolution you want to test.
|
||||
file_path = "./test_images/%d_800x608.ssdv" # _raw, _800x608, _640x480, _320x240
|
||||
image_numbers = xrange(1,14)
|
||||
|
||||
def transmit_file(filename, tx_object):
|
||||
file_size = os.path.getsize(filename)
|
||||
|
||||
if file_size % 256 > 0:
|
||||
print("File size not a multiple of 256 bytes!")
|
||||
return
|
||||
|
||||
print("Transmitting %d Packets." % (file_size/256))
|
||||
|
||||
f = open(filename,'rb')
|
||||
|
||||
for x in range(file_size/256):
|
||||
data = f.read(256)
|
||||
tx_object.tx_packet(data)
|
||||
|
||||
f.close()
|
||||
print("Waiting for tx queue to empty...")
|
||||
tx_object.wait()
|
||||
|
||||
|
||||
tx = PacketTX.PacketTX()
|
||||
tx.start_tx()
|
||||
|
||||
for img in image_numbers:
|
||||
filename = file_path % img
|
||||
print("\nTXing: %s" % filename)
|
||||
transmit_file(filename,tx)
|
||||
|
||||
tx.close()
|