From b08e57cfce2f929593ea9a044940befa5c96083b Mon Sep 17 00:00:00 2001 From: EmbeddedMan Date: Sat, 27 Jan 2024 08:35:12 -0600 Subject: [PATCH] Continuing HIL development. --- .../Analysis/firmware tests/saleae_capture.py | 296 ++++++++++++------ 1 file changed, 207 insertions(+), 89 deletions(-) diff --git a/EBB_firmware/Analysis/firmware tests/saleae_capture.py b/EBB_firmware/Analysis/firmware tests/saleae_capture.py index 20697e8..3f179b0 100644 --- a/EBB_firmware/Analysis/firmware tests/saleae_capture.py +++ b/EBB_firmware/Analysis/firmware tests/saleae_capture.py @@ -16,6 +16,8 @@ # This code requires version 3.0.0-a39 or above on the EBB # This code requires that the Saleae Logic2 software have the Automation feature turned on and # software installed as called out here : https://saleae.github.io/logic2-automation/ +# This code requires a 16 channel Saleae - either the "Logic 16" or the "Logic Pro 16" as we +# need more than 8 simultaneous input channels. # # Parameters: # saleae_capture.py < @@ -32,6 +34,7 @@ import os.path from datetime import datetime import sys import time +import csv from pyaxidraw import axidraw from plotink import ebb_motion @@ -90,6 +93,202 @@ def block(ad_ref, timeout_ms=None): if timeout_ms is not None: time_left -= 10 +# Pass in a filepath and a string of the 5 parameters to check +# This function will open the .csv file with the debug serial data, +# pull out the five parameters, and check them. +# Prints out either pass or fail +def check_debug_serial(serial_filepath, parameter_str): + output_str = '' + + with open(serial_filepath, newline='') as f: + reader = csv.reader(f) + try: + for row in reader: + if reader.line_num > 1: + output_str = output_str + row[4] + + except csv.Error as e: + sys.exit('file {}, line {}: {}'.format(serial_filepath, reader.line_num, e)) + + # we now have a string like "T,000061A9,S,000004D2,C,06511930,R,06516DB0,P,000004D2" + # Convert that into separate variables and decimal values + + #print(int.from_bytes(bytes(output_str[0:1],"utf-8"), "big", signed=False)) + #print(int.from_bytes(bytes(output_str[1:2],"utf-8"), "big", signed=False)) + #print(int.from_bytes(bytes(output_str[2:3],"utf-8"), "big", signed=False)) + #print(int.from_bytes(bytes(output_str[3:4],"utf-8"), "big", signed=False)) + + + #move_ticks = int.from_bytes(bytes(output_str[0:4],"utf-8"), "big", signed=False) + #move_steps = int.from_bytes(bytes(output_str[4:8],"utf-8"), "big", signed=False) + #move_accumulator1 = int.from_bytes(bytes(output_str[8:12],"utf-8"), "big", signed=False) + #move_rate1 = int.from_bytes(bytes(output_str[12:16],"utf-8"), "big", signed=False) + #move_position1 = int.from_bytes(bytes(output_str[16:20],"utf-8"), "big", signed=False) + + # We now have a string that looks like "0x000x000x610xA80x000x000x040xD20x060x500xC40xB00x060x510x6D0xB00x000x000x040xD20x0A" + + # I know this is super inefficient and inelegant. When you have time, rewrite to be cleaner. But this works. + move_ticks = int(output_str[2:4], 16) + move_ticks = (move_ticks * 256) + int(output_str[6:8], 16) + move_ticks = (move_ticks * 256) + int(output_str[10:12], 16) + move_ticks = (move_ticks * 256) + int(output_str[14:16], 16) + move_steps = int(output_str[18:20], 16) + move_steps = (move_steps * 256) + int(output_str[22:24], 16) + move_steps = (move_steps * 256) + int(output_str[26:28], 16) + move_steps = (move_steps * 256) + int(output_str[30:32], 16) + move_accumulator1 = int(output_str[34:36], 16) + move_accumulator1 = (move_accumulator1 * 256) + int(output_str[38:40], 16) + move_accumulator1 = (move_accumulator1 * 256) + int(output_str[42:44], 16) + move_accumulator1 = (move_accumulator1 * 256) + int(output_str[46:48], 16) + move_rate1 = int(output_str[50:52], 16) + move_rate1 = (move_rate1 * 256) + int(output_str[54:56], 16) + move_rate1 = (move_rate1 * 256) + int(output_str[58:60], 16) + move_rate1 = (move_rate1 * 256) + int(output_str[62:64], 16) + move_position1 = int(output_str[66:68], 16) + move_position1 = (move_position1 * 256) + int(output_str[70:72], 16) + move_position1 = (move_position1 * 256) + int(output_str[74:76], 16) + move_position1 = (move_position1 * 256) + int(output_str[78:80], 16) + + #print("ticks=" + str(move_ticks)) + #print("steps=" + str(move_steps)) + #print("acc1=" + str(move_accumulator1)) + #print("rate1=" + str(move_rate1)) + #print("pos1=" + str(move_position1)) + + # Parse apart the correct parameter string + para_list = parameter_str.split(",") + para_ticks = int(para_list[0]) + para_steps = int(para_list[1]) + para_accumulator1 = int(para_list[2]) + para_rate1 = int(para_list[3]) + para_position1 = int(para_list[4]) + + if (move_ticks == para_ticks) & (move_steps == para_steps) & (move_accumulator1 == para_accumulator1) & (move_rate1 == para_rate1) & (move_position1 == para_position1): + print ("Pass") + + if (move_ticks != para_ticks): + print("Fail: measured ticks " + str(move_ticks) + " != expected ticks " + str(para_ticks)) + + if (move_steps != para_steps): + print("Fail: measured steps " + str(move_steps) + " != expected steps " + str(para_steps)) + + if (move_accumulator1 != para_accumulator1): + print("Fail: measured accumulator1 " + str(move_accumulator1) + " != expected accumulator1 " + str(para_accumulator1)) + + if (move_rate1 != para_rate1): + print("Fail: measured rate1 " + str(move_rate1) + " != expected rate1 " + str(para_rate1)) + + if (move_position1 != para_position1): + print("Fail: measured position1 " + str(move_position1) + " != expected position1 " + str(para_position1)) + + + + +# Pass in an EBB command a file path name and a time and this function +# will start the capture (with length of the time), send the EBB command, +# and then save off the resulting Saleae capture file (.sal) as well as the +# .csv file of the bytes on the debug serial port into the capture_dir_name +# directory + +def capture_command(EBB_command, capture_dir_name, capture_time, expected_params): + # Connect to the running Logic 2 Application on port `10430`. + # Alternatively you can use automation.Manager.launch() to launch a new Logic 2 process - see + # the API documentation for more details. + # Using the `with` statement will automatically call manager.close() when exiting the scope. If you + # want to use `automation.Manager` outside of a `with` block, you will need to call `manager.close()` manually. + with automation.Manager.connect(port=10430) as manager: + + # Configure the capturing device to record on digital channels 0, 1, 2, and 3, + # with a sampling rate of 10 MSa/s, and a logic level of 3.3V. + # The settings chosen here will depend on your device's capabilities and what + # you can configure in the Logic 2 UI. + device_configuration = automation.LogicDeviceConfiguration( + enabled_digital_channels=[0, 1, 2, 3, 4, 5, 6, 7, 8], + digital_sample_rate=32_000_000, + ) + + # Record 5 seconds of data before stopping the capture + capture_configuration = automation.CaptureConfiguration( + capture_mode=automation.TimedCaptureMode(duration_seconds=capture_time) + ) + + # Start a capture - the capture will be automatically closed when leaving the `with` block + # Note: The serial number 'F4241' is for the Logic Pro 16 demo device. + # To use a real device, you can: + # 1. Omit the `device_id` argument. Logic 2 will choose the first real (non-simulated) device. + # 2. Use the serial number for your device. See the "Finding the Serial Number + # of a Device" section for information on finding your device's serial number. + with manager.start_capture( + device_configuration=device_configuration, + capture_configuration=capture_configuration) as capture: + + # The capture has started. We now need to send our EBB command: + response = str(query(the_port, EBB_command + '\r')) + print(last_command + " :: " + response.strip()) + + # Wait until the capture has finished + capture.wait() + + analyzers = [] + + # Add an analyzer to the capture + async_analyzer = capture.add_analyzer('Async Serial', label=f'ISR Serial', settings={ + 'Input Channel': 7, + 'Bit Rate (Bits/s)': 4000000, + }) + analyzers.append(automation.DataTableExportConfiguration(async_analyzer, automation.RadixType.HEXADECIMAL)) + + # Create a directory name to store our output files in + # output_dir = os.path.join(os.getcwd(), f'output-{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}') + output_dir = os.path.join(os.getcwd(), capture_dir_name) + os.makedirs(output_dir, exist_ok = True) + + # Export analyzer data to a CSV file + analyzer_export_filepath = os.path.join(output_dir, 'debug_serial.csv') + capture.export_data_table( + filepath=analyzer_export_filepath, + analyzers=analyzers + ) + + # Export raw digital data to a CSV file + capture.export_raw_data_csv(directory=output_dir, digital_channels=[0, 1, 2, 3, 4, 5, 6, 7, 8]) + + # Save the capture to a file + capture_filepath = os.path.join(output_dir, 'capture.sal') + capture.save_capture(filepath=capture_filepath) + + # Gather up the debug serial output + check_debug_serial(analyzer_export_filepath, expected_params) + + # Save off text file with EBB command and parameters + with open(os.path.join(output_dir, 'param.csv'), 'w') as param_file: + param_file.write(EBB_version.decode() + '\n') + param_file.write(EBB_command + '\n') + param_file.write(output_dir + '\n') + param_file.write(capture_dir_name + '\n') + param_file.write(str(capture_time) + '\n') + + +# Temporary list of commands to send (will get from file eventually) +param_list = [ +["SM,100,100,0,3", "test1", 1, "2500,100,20054816,85899344,100"], +["SM,100,-100,0,3", "test2", 1, "2500,100,20035616,85899344,4294967196"], +["SM,100,0,100,3", "test3", 1, "-1,-1,-1,-1,-1"], +#["SM,100,0,-100,3", "test4" ,1, ""], +#["SM,100,100,100,3", "test5" ,1, ""], +#["SM,100,-100,-100,3", "test6" ,1, ""], +#["XM,100,100,0,3", "test1" ,1, ""], +#["XM,100,-100,0,3", "test1" ,1, ""], +#["XM,100,0,100,3", "test1" ,1, ""], +#["XM,100,0,-100,3", "test1" ,1, ""], +#["XM,100,100,100,3", "test1" ,1, ""], +#["XM,100,-100,-100,3", "test1" ,1, ""], +] + + +# +# Connect to EBB +# ad = axidraw.AxiDraw() # Initialize class @@ -106,27 +305,9 @@ if the_port is None: the_port.reset_input_buffer() - -# Note: -command_list = [ -"SM,100,100,0", -"SM,100,-100,0", -"SM,100,0,100", -"SM,100,0,-100", -"SM,100,100,100", -"SM,100,-100,-100", -"XM,100,100,0", -"XM,100,-100,0", -"XM,100,0,100", -"XM,100,0,-100", -"XM,100,100,100", -"XM,100,-100,-100", -] - print("connected") -response = query(the_port, 'V\r') -print(response) +EBB_version = query(the_port, 'V\r') last_command = "" @@ -134,86 +315,23 @@ response = str(query(the_port, "CU,250,1" + '\r')) print(last_command + " :: " + response.strip()) response = str(query(the_port, "CU,251,1" + '\r')) print(last_command + " :: " + response.strip()) +response = str(query(the_port, "CU,257,1" + '\r')) +print(last_command + " :: " + response.strip()) -# Connect to the running Logic 2 Application on port `10430`. -# Alternatively you can use automation.Manager.launch() to launch a new Logic 2 process - see -# the API documentation for more details. -# Using the `with` statement will automatically call manager.close() when exiting the scope. If you -# want to use `automation.Manager` outside of a `with` block, you will need to call `manager.close()` manually. -with automation.Manager.connect(port=10430) as manager: - - # Configure the capturing device to record on digital channels 0, 1, 2, and 3, - # with a sampling rate of 10 MSa/s, and a logic level of 3.3V. - # The settings chosen here will depend on your device's capabilities and what - # you can configure in the Logic 2 UI. - device_configuration = automation.LogicDeviceConfiguration( - enabled_digital_channels=[0, 1, 2, 3, 4, 5, 6, 7, 8], - digital_sample_rate=32_000_000, - ) - - # Record 5 seconds of data before stopping the capture - capture_configuration = automation.CaptureConfiguration( - capture_mode=automation.TimedCaptureMode(duration_seconds=2.0) - ) - - # Start a capture - the capture will be automatically closed when leaving the `with` block - # Note: The serial number 'F4241' is for the Logic Pro 16 demo device. - # To use a real device, you can: - # 1. Omit the `device_id` argument. Logic 2 will choose the first real (non-simulated) device. - # 2. Use the serial number for your device. See the "Finding the Serial Number - # of a Device" section for information on finding your device's serial number. - with manager.start_capture( - device_configuration=device_configuration, - capture_configuration=capture_configuration) as capture: - - # The capture has started. We now need to send our EBB command: - response = str(query(the_port, "SM,1000,1234,567" + '\r')) - print(last_command + " :: " + response.strip()) - - # Wait until the capture has finished - # This will take about 5 seconds because we are using a timed capture mode - capture.wait() - - analyzers = [] - - # Add an analyzer to the capture - async_analyzer = capture.add_analyzer('Async Serial', label=f'ISR Serial', settings={ - 'Input Channel': 7, - 'Bit Rate (Bits/s)': 3000000, - }) - analyzers.append(automation.DataTableExportConfiguration(async_analyzer, automation.RadixType.HEXADECIMAL)) - - # Store output in a timestamped directory - output_dir = os.path.join(os.getcwd(), f'output-{datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}') - os.makedirs(output_dir) - - # Export analyzer data to a CSV file - analyzer_export_filepath = os.path.join(output_dir, 'async_serial_export.csv') - capture.export_data_table( - filepath=analyzer_export_filepath, - analyzers=analyzers - ) - - # Export raw digital data to a CSV file - capture.export_raw_data_csv(directory=output_dir, digital_channels=[0, 1, 2, 3, 4, 5, 6, 7]) - - # Finally, save the capture to a file - capture_filepath = os.path.join(output_dir, 'example_capture.sal') - capture.save_capture(filepath=capture_filepath) +for param in param_list: + print(param[1], end='') + capture_command(param[0], param[1], param[2], param[3]) - - - -#for command in command_list: # #block(ad) # response = str(query(the_port, command + '\r')) # print(last_command + " :: " + response.strip()) # last_command = command -print(last_command + " :: ") +#print(last_command + " :: ") +print("Complete") ad.disconnect() # Close serial port to AxiDraw