kopia lustrzana https://github.com/projecthorus/wenet
Added Orientation Telemetry support.
rodzic
9b1f5a78f7
commit
a3f6cabc6c
|
@ -21,6 +21,7 @@ class WENET_PACKET_TYPES:
|
|||
|
||||
class WENET_PACKET_LENGTHS:
|
||||
GPS_TELEMETRY = 35
|
||||
ORIENTATION_TELEMETRY = 43
|
||||
|
||||
def decode_packet_type(packet):
|
||||
# Convert packet to a list of integers before parsing.
|
||||
|
@ -282,14 +283,85 @@ def orientation_telemetry_decoder(packet):
|
|||
# which occurs when we are decoding a packet that has arrived via a UDP-broadcast JSON blob.
|
||||
packet = str(bytearray(packet))
|
||||
|
||||
# SHSSP Code goes here.
|
||||
# Some basic sanity checking of the packet before we attempt to decode.
|
||||
if len(packet) < WENET_PACKET_LENGTHS.ORIENTATION_TELEMETRY:
|
||||
return {'error': 'Orientation Telemetry Packet has invalid length.'}
|
||||
elif len(packet) > WENET_PACKET_LENGTHS.ORIENTATION_TELEMETRY:
|
||||
# If the packet is too big (which it will be, as it's padded with 0x55's), clip it.
|
||||
packet = packet[:WENET_PACKET_LENGTHS.ORIENTATION_TELEMETRY]
|
||||
else:
|
||||
pass
|
||||
|
||||
orientation_data = {}
|
||||
|
||||
# Wrap the next bit in exception handling.
|
||||
try:
|
||||
# Unpack the packet into a list.
|
||||
data = struct.unpack('>BHIBBBBBBBbfffffff', packet)
|
||||
|
||||
orientation_data['week'] = data[1]
|
||||
orientation_data['iTOW'] = data[2]/1000.0 # iTOW provided as milliseconds, convert to seconds.
|
||||
orientation_data['leapS'] = data[3]
|
||||
|
||||
# Produce human readable timestamp.
|
||||
orientation_data['timestamp'] = gps_weeksecondstoutc(orientation_data['week'], orientation_data['iTOW'], orientation_data['leapS'])
|
||||
|
||||
orientation_data['sys_status'] = data[4]
|
||||
orientation_data['sys_error'] = data[5]
|
||||
orientation_data['sys_cal'] = data[6]
|
||||
orientation_data['gyro_cal'] = data[7]
|
||||
orientation_data['accel_cal'] = data[8]
|
||||
orientation_data['magnet_cal'] = data[9]
|
||||
orientation_data['temp'] = data[10]
|
||||
|
||||
orientation_data['euler_heading'] = data[11]
|
||||
orientation_data['euler_roll'] = data[12]
|
||||
orientation_data['euler_pitch'] = data[13]
|
||||
|
||||
orientation_data['quaternion_x'] = data[14]
|
||||
orientation_data['quaternion_y'] = data[15]
|
||||
orientation_data['quaternion_z'] = data[16]
|
||||
orientation_data['quaternion_w'] = data[17]
|
||||
|
||||
orientation_data['error'] = 'None'
|
||||
|
||||
return orientation_data
|
||||
|
||||
except:
|
||||
traceback.print_exc()
|
||||
print(packet)
|
||||
return {'error': 'Could not decode Orientation telemetry packet.'}
|
||||
|
||||
return {'error': "Orientation: Not Implemented."}
|
||||
|
||||
def orientation_telemetry_string(packet):
|
||||
""" Produce a String representation of an Orientation Telemetry packet"""
|
||||
|
||||
return "Orientation: Not Implemented Yet."
|
||||
orientation_data = orientation_telemetry_decoder(packet)
|
||||
|
||||
# Check if there was a decode error. If not, produce a string.
|
||||
if orientation_data['error'] != 'None':
|
||||
return "Orientation: ERROR Could not decode."
|
||||
else:
|
||||
orientation_data_string = "Orientation: %s Status: %d Error: %d Cal: %d %d %d %d Temp: %d Euler: (%.1f,%.1f,%.1f) Quaternion: (%.1f, %.1f, %.1f, %.1f)" % (
|
||||
orientation_data['timestamp'],
|
||||
orientation_data['sys_status'],
|
||||
orientation_data['sys_error'],
|
||||
orientation_data['sys_cal'],
|
||||
orientation_data['gyro_cal'],
|
||||
orientation_data['accel_cal'],
|
||||
orientation_data['magnet_cal'],
|
||||
orientation_data['temp'],
|
||||
orientation_data['euler_heading'],
|
||||
orientation_data['euler_roll'],
|
||||
orientation_data['euler_pitch'],
|
||||
orientation_data['quaternion_x'],
|
||||
orientation_data['quaternion_y'],
|
||||
orientation_data['quaternion_z'],
|
||||
orientation_data['quaternion_w']
|
||||
)
|
||||
|
||||
return orientation_data_string
|
||||
|
||||
|
||||
|
||||
#
|
||||
|
@ -321,5 +393,5 @@ def image_telemetry_decoder(packet):
|
|||
|
||||
def image_telemetry_string(packet):
|
||||
""" Produce a String representation of an Image Telemetry packet"""
|
||||
|
||||
|
||||
return "Image Telemetry: Not Implemented Yet."
|
|
@ -290,10 +290,31 @@ class PacketTX(object):
|
|||
orientation_telemetry_decoder
|
||||
|
||||
"""
|
||||
try:
|
||||
orientation_packet = struct.pack(">BHIBBBBBBBbfffffff",
|
||||
2, # Packet ID for the Orientation Telemetry Packet.
|
||||
week,
|
||||
int(iTOW*1000), # Convert the GPS week value to milliseconds, and cast to an int.
|
||||
leapS,
|
||||
orientation_data['sys_status'],
|
||||
orientation_data['sys_error'],
|
||||
orientation_data['sys_cal'],
|
||||
orientation_data['gyro_cal'],
|
||||
orientation_data['accel_cal'],
|
||||
orientation_data['magnet_cal'],
|
||||
orientation_data['temp'],
|
||||
orientation_data['euler_heading'],
|
||||
orientation_data['euler_roll'],
|
||||
orientation_data['euler_pitch'],
|
||||
orientation_data['quaternion_x'],
|
||||
orientation_data['quaternion_y'],
|
||||
orientation_data['quaternion_z'],
|
||||
orientation_data['quaternion_w']
|
||||
)
|
||||
|
||||
# SHSSP Code goes here...
|
||||
|
||||
self.transmit_text_message("Orientation Telemetry Not Implemented.")
|
||||
self.queue_telemetry_packet(orientation_packet)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
|
||||
return
|
||||
|
||||
|
|
Ładowanie…
Reference in New Issue