WIP: Refactoring a bit of the saleae capture, and fleshing out the complete shortest move test (almost complete)

feature/EBB_Regression_Tests
EmbeddedMan 2024-03-23 13:46:29 -05:00
rodzic 87834d560e
commit 08f6fbca6e
5 zmienionych plików z 210 dodań i 53 usunięć

Wyświetl plik

@ -105,7 +105,7 @@ time.sleep(15)
# can then be used to see which command caused the un-full FIFO.
# First for SM command
"""
for x in range(100):
query(the_port, "SM,10,250,250" + '\r')
for x in range(100):
@ -176,46 +176,6 @@ for x in range(100):
query(the_port, "LT,25,2147483647,-1,2147483647,-1" + '\r')
time.sleep(0.5)
"""
# Test LM command- specifically looking for step position errors
# Clear the step position at the beginning, and then check it after
# each test.
#
# First just straight non-accelerating moves in a square
query(the_port, "CS" + '\r')
for x in range(10):
query(the_port, "LM,85899350,100,0,85899350,100,0" + '\r')
for x in range(10):
query(the_port, "LM,85899350,-100,0,85899350,100,0" + '\r')
for x in range(10):
query(the_port, "LM,85899350,-100,0,85899350,-100,0" + '\r')
for x in range(10):
query(the_port, "LM,85899350,100,0,85899350,-100,0" + '\r')
time.sleep(1.0)
response = query(the_port, 'QS\r')
print(response)
# Now each set of moves is an accel down to zero and then back up again as separate commands
for x in range(10):
query(the_port, "LM,85899350,100,-34360,85899350,100,-34360" + '\r')
query(the_port, "LM,0,100,34360,0,100,34360" + '\r')
for x in range(10):
query(the_port, "LM,85899350,-100,-34360,85899350,100,-34360" + '\r')
query(the_port, "LM,0,-100,34360,0,100,34360" + '\r')
for x in range(10):
query(the_port, "LM,85899350,-100,-34360,85899350,-100,-34360" + '\r')
query(the_port, "LM,0,-100,34360,0,100,34360" + '\r')
for x in range(10):
query(the_port, "LM,85899350,100,-34360,85899350,-100,-34360" + '\r')
query(the_port, "LM,0,100,34360,0,100,34360" + '\r')
time.sleep(1.0)
response = query(the_port, 'QS\r')
print(response)
print("Test complete")

Wyświetl plik

@ -141,24 +141,41 @@ def analyze_digital_csv(file_pathname : str):
sys.exit('file {}, line {}: {}'.format(filename, reader.line_num, e))
# Output all statistics we just collected as a dict to the caller
if step1_count == 0:
step1_aveHighTimeUS = 0
step1_aveLowTimeUS = 0
step1_aveFreqHZ = 0
else:
step1_aveHighTimeUS = (step1_ave_high_time_acc/step1_count)*1000000
step1_aveLowTimeUS = (step1_ave_low_time_acc/step1_count)*1000000
step1_aveFreqHZ = 1.0/(step1_freq_acc/step1_count)
if step2_count == 0:
step2_aveHighTimeUS = 0
step2_aveLowTimeUS = 0
step2_aveFreqHZ = 0
else:
step2_aveHighTimeUS = (step2_ave_high_time_acc/step2_count)*1000000
step2_aveLowTimeUS = (step2_ave_low_time_acc/step2_count)*1000000
step2_aveFreqHZ = 1.0/(step2_freq_acc/step2_count)
return_dict = {
"Step1_Count": step1_count,
"Step1_MaxHighTimeUS" : step1_max_high_time*1000000,
"Step1_MinHighTimeUS" : step1_min_high_time*1000000,
"Step1_AveHigTimeUS" : (step1_ave_high_time_acc/step1_count)*1000000,
"Step1_AveHigTimeUS" : step1_aveHighTimeUS,
"Step1_MaxLowTimeUS" : step1_max_low_time*1000000,
"Step1_MinLowTimeUS" : step1_min_low_time*1000000,
"Step1_AveLowTimeUS" : (step1_ave_low_time_acc/step1_count)*1000000,
"Step1_AveFreqHZ" : 1.0/(step1_freq_acc/step1_count),
"Step1_AveLowTimeUS" : step1_aveLowTimeUS,
"Step1_AveFreqHZ" : step1_aveFreqHZ,
"Step2_Count": step2_count,
"Step2_MaxHighTimeUS" : step2_max_high_time*1000000,
"Step2_MinHighTimeUS" : step2_min_high_time*1000000,
"Step2_AveHigTimeUS" : (step2_ave_high_time_acc/step1_count)*1000000,
"Step2_AveHigTimeUS" : step2_aveHighTimeUS,
"Step2_MaxLowTimeUS" : step2_max_low_time*1000000,
"Step2_MinLowTimeUS" : step2_min_low_time*1000000,
"Step2_AveLowTimeUS" : (step2_ave_low_time_acc/step1_count)*1000000,
"Step2_AveFreqHZ" : 1.0/(step2_freq_acc/step1_count),
"Step2_AveLowTimeUS" : step2_aveLowTimeUS,
"Step2_AveFreqHZ" : step2_aveFreqHZ,
}
return return_dict

Wyświetl plik

@ -14,7 +14,7 @@ from ebb_serial_utility import query
# .csv file of the bytes on the debug serial port into the capture_dir_name
# directory
def capture_command(EBB_command : str, capture_dir_path : str, capture_time : float, the_port):
def capture_command(EBB_command_function : callable, capture_dir_path : str, capture_time : float, the_port):
# Connect to the running Logic 2 Application on port `10430`.
# Alternatively you can use automation.Manager.launch() to launch a new Logic 2 process - see
# the API documentation for more details.
@ -39,7 +39,7 @@ def capture_command(EBB_command : str, capture_dir_path : str, capture_time : fl
# digital_sample_rate=50_000_000,
#)
# Record 5 seconds of data before stopping the capture
# Record capture_time seconds of data before stopping the capture
capture_configuration = automation.CaptureConfiguration(
capture_mode=automation.TimedCaptureMode(duration_seconds=capture_time)
)
@ -56,8 +56,10 @@ def capture_command(EBB_command : str, capture_dir_path : str, capture_time : fl
# The capture has started. We now need to send our EBB command:
# But wait a bit first in case there is a delay starting up the capture
time.sleep(0.2)
response = str(query(the_port, EBB_command + '\r'))
#time.sleep(0.2)
EBB_command_function()
#response = str(query(the_port, EBB_command + '\r'))
# TODO: Confirm that response was "OK\r\n", print error and actual response if not
#print(last_command + " :: " + response.strip())

Wyświetl plik

@ -29,6 +29,7 @@ import test_log
# Import the test function from each test file
from test_ISR_math import test_ISR_math_run
from test_global_step_counter import test_global_step_counter_run
from test_shortest_move import test_shortest_move_run
# Start off assuming all will pass. Any that fail will flip this
all_tests_pass = True
@ -36,18 +37,23 @@ all_tests_pass = True
# Generate the test run output directory and init the test log file
test_log.tl_init()
# Test
# Test ISR math
if sys.argv[1] == "" or sys.argv[1] == "test_ISR_math":
test_log.tl_print("Run test: test_ISR_math")
if test_ISR_math_run("..\\test input data\\test_inputs_simple.csv") == False:
all_tests_pass = False
# Test
# Test global step counter
if sys.argv[1] == "" or sys.argv[1] == "test_global_step_counter":
test_log.tl_print("Run test: test_global_step_counter")
if test_global_step_counter_run() == False:
all_tests_pass = False
# Test shortest move
if sys.argv[1] == "" or sys.argv[1] == "test_shortest_move":
test_log.tl_print("Run test: test_shortest_move")
if test_shortest_move_run() == False:
all_tests_pass = False
# Done running all tests
if all_tests_pass == True:

Wyświetl plik

@ -0,0 +1,172 @@
# A test for measuring the shortest move time EBB firmware
#
# Connect to EBB, configure it for Saleae output capture
# There are basically three code paths (for stepper moves) through the ISR
# in the EBB firmware. We pick a representative command for each of them.
# In this case SM, LM and L3.
# For each command, we pick a move duration and a stepper step speed, and
# send enough of that set of command parameters to get to a steady state.
# Then we keep the step speed and command the same, and send a slightly
# shorter move, sending enough to get to a steady state. We go from maybe
# about 15ms/move to 2ms/move, and save all of that as one capture.
# Then we choose a different stepper speed, and do it again. Because stepper
# speed changes how much CPU time is spent in the ISR, it also changes how
# fast we can parse/process move commands. So we will get different behavior
# with 25 KHz steps compared to 1 KHz steps for example.
# For each capture, we need to look through the step pulses looking for
# gaps larger than say 3 step times. The first move duration (15 to 2ms) where
# these gaps occur is 'too fast', so one ms slower is the fastest move for
# that combination of command and step rate. So for each step rate, and each
# of the three commands, we will have a 'fastest allowed without FIFO underrun'
# move duration. This can then get summarized in the EBB documentation.
#
# Note that because we only use the step1 pulses for this test, we can
# run it on any version of EBB firmware. Now, some versions don't have
# all commands (i.e. v3.0.0 has L3) so we have to query the EBB at the
# start of the run and turn on/off various commands based on which version
# we see.
import os
import os.path
import csv
import time
from pyaxidraw import axidraw
import test_log
from ebb_serial_utility import query
from ebb_serial_utility import EBB_version_less_than
from saleae_capture_one import capture_command
from analyze_digital_csv import analyze_digital_csv
def test_shortest_move_run():
all_tests_pass = True
output_filepath = test_log.artifact_dir + f'test_shortest_move/'
# Connect to EBB
ad = axidraw.AxiDraw() # Initialize class
ad.interactive()
if not ad.connect(): # Open serial port to AxiDraw;
test_log.tl_print("test_shortest_move: failed to connect")
return False
the_port = ad.plot_status.port
if the_port is None:
test_log.tl_print("test_shortest_move: failed to connect")
return False
the_port.reset_input_buffer()
test_log.tl_print("test_shortest_move: connected")
EBB_version = query(the_port, 'V\r')
# Put the version string of the currently attached EBB into the test log
test_log.tl_print(EBB_version.decode("utf-8"), False)
if EBB_version_less_than(EBB_version.decode("utf-8"), "3.0.0"):
version_3_or_above = False
else:
version_3_or_above = True
# Turn on the debug features we need in order to run our tests
# Note that for this test, we turn all debug outputs off, to give as accurate a result
# as possible. This means there is a lot less data in the captures, but we can still
# get a good 'signal' by looking for overly long gaps in the step signals
response = str(query(the_port, "CU,250,0" + '\r'))
#print(last_command + " :: " + response.strip())
response = str(query(the_port, "CU,251,0" + '\r'))
#print(last_command + " :: " + response.strip())
response = str(query(the_port, "CU,257,0" + '\r'))
#print(last_command + " :: " + response.strip())
if version_3_or_above:
move_commands = ["SM", "LM", "L3"]
else:
move_commands = ["SM", "LM"]
move_duration_max = 15 # in milliseconds
stepper_speed_max = 25000 # in steps/second
stepper_speed_min = 2000 # in steps/second. Since we go down to 1ms move durations, we need at least 2 steps per move
command_repeats = 20
for move_command in move_commands:
speed_decrement = int(stepper_speed_max/2)
stepper_speed = stepper_speed_max
while stepper_speed >= stepper_speed_min:
# For each run through the various move durations at a given stepper speed, there is a threshold
# for the max low period of 2.5 * the expected time between steps pulses. The first command length which gives
# a max low period of more than this threshold is the final result - i.e. the move duration which, at this
# step rate, is no longer able to sustain smooth steps. So the previous move duration is the shortest move
# duration for this step rate which can sustain smooth motion, and thus is the value that gets entered in
# our final output table.
maximum_allowed_time_between_steps_us = 2.5 * (1.0/(stepper_speed / 1000.0) * 1000)
# This records the latest move duration which does NOT go above the threshold
shortest_smooth_move_ms = move_duration_max + 1 # Start with an 'invalid' value (16) to alert if we never get a duration lower
#print("Speed = " + str(stepper_speed) + " decrement = " + str(speed_decrement))
for move_duration in range(move_duration_max, 0, -1):
# build up the full command SM,1000,25000,250000
# SM, 100,2500,2500
if move_command == "SM":
sm_steps = int(stepper_speed*(move_duration/1000.0))
full_command = move_command + "," + str(move_duration) + "," + str(sm_steps) + "," + str(sm_steps)
if move_command == "LM":
lm_rate = int(85899.35 * stepper_speed)
if lm_rate > 2147483647:
lm_rate = 2147483647
lm_steps = int(stepper_speed*(move_duration/1000.0))
lm_accel = 1 # Hard coding 1 as our accel, so that the math in the ISR has something to do, but don't actually change the rate much
full_command = move_command + "," + str(lm_rate) + "," + str(lm_steps) + "," + str(lm_accel) + "," + str(lm_rate) + "," + str(lm_steps) + "," + str(lm_accel)
if move_command == "L3":
l3_rate = int(85899.35 * stepper_speed)
if l3_rate > 2147483647:
l3_rate = 2147483647
l3_steps = int(stepper_speed*(move_duration/1000.0))
l3_accel = 1 # Hard coding 1 as our accel, so that the math in the ISR has something to do, but don't actually change the rate much
l3_jerk = 1 # Hard coding 1 as our jerk, so that the math in the ISR has something to do, but don't actually change the rate much
full_command = move_command + "," + str(l3_rate) + "," + str(l3_steps) + "," + str(l3_accel) + "," + str(l3_jerk) + "," + str(l3_rate) + "," + str(l3_steps) + "," + str(l3_accel) + "," + str(l3_jerk)
#print(full_command)
test_log.tl_print("test_shortest_move: " + full_command + " (" + str(stepper_speed) + "): ", False)
test_dir = os.path.join(output_filepath, move_command + "-" + str(stepper_speed) + "-" + str(move_duration))
def ebb_command_function():
# Send all the commands
for i in range(command_repeats):
query(the_port, full_command + '\r')
# Give the commands some time to run and be captured
time.sleep(1.0)
# For this length of command, figure out how long to capture for (in seconds)
capture_length_S = command_repeats * (move_duration / 1000) + 1
capture_command(ebb_command_function, test_dir, capture_length_S, the_port)
# Now perform the type of analysis appropriate for this test
# In this case what we want is the longest period where the step 1 signal was low
# (other than before or after the moves)
res_dict = analyze_digital_csv(test_dir + "/digital.csv")
measured_max_low_time = (int(res_dict["Step1_MaxLowTimeUS"]))
test_log.tl_print(str(measured_max_low_time))
if measured_max_low_time < maximum_allowed_time_between_steps_us and measured_max_low_time > 0:
# Update the new shortest smooth move to the one we just analyzed
shortest_smooth_move_ms = move_duration
stepper_speed = stepper_speed - speed_decrement
speed_decrement = int(stepper_speed/2)
test_log.tl_print("Shortest smooth move in ms was : " + str(shortest_smooth_move_ms))
#if compare_debug_uart(extract_debug_uart(test_dir), param[4]):
# test_log.tl_print("Pass")
#else:
# test_log.tl_print("Fail")
# all_tests_pass = False
ad.disconnect() # Close serial port to AxiDraw
test_log.tl_print("test_shortest_move: Complete")
return all_tests_pass