fabmodules/src/guis/rml_send_gui

525 wiersze
20 KiB
Python
Executable File

#!/usr/bin/env python
#
# rml_send_gui
# graphical interface for sending jobs to the Roland Modela
#
# Brian Mayton <bmayton@media.mit.edu>
# MIT 2011-2014
#
# (c) Massachusetts Institute of Technology 2011-2014
# Permission granted for experimental and personal use;
# license for commercial sale available from MIT.
# imports
from __future__ import with_statement
import serial
import sys
import wx
import threading
import time
import math
# global constants
RML_UNITS=40.0
SPEED_TRAVERSE = 15.0
# utility functions
def dist(x1, y1, z1, x2, y2, z2):
return math.sqrt(
pow(x1-x2, 2.0) +
pow(y1-y2, 2.0) +
pow(z1-z2, 2.0)
)
class RMLSender:
"""This class implements the parsing of RML files and sending to the
Modela."""
def __init__(self, port="/dev/ttyUSB0"):
self.serial = serial.Serial(port, baudrate=9600, rtscts=True, timeout=0)
self.cmds = []
self.xr=[0,1]; self.yr=[0,1]; self.zr=[0,1]
self.paths = []
self.segments_done = []
self.traverses = []
self.traverses_done = []
self.speed_feed = 15.0
self.speed_plunge = 5.0
self.total_distance = 1.0
self.distance_milled = 0.0
self.total_time = 1.0
self.time_remaining = 1.0
self.time_start = None
self.current_cmd = ""
self.cur_cmd_start = time.time()
self.cur_cmd_duration = 0.0
self.running = False
self.thread = threading.Thread(target=self.thread_fn)
self.should_stop = threading.Event()
self.done = threading.Event()
self.aborted = threading.Event()
self.lock = threading.Lock()
def load_file(self, filename):
self.cmds = []
f = open(filename, "r")
data = f.read()
f.close()
self.cmds = data.split(";")
self.calculate_metrics()
def calculate_metrics(self):
paths = []
traverses = []
cur_path = []
xmin, ymin, zmin = 99999999, 999999999, 999999999
xmax, ymax, zmax = 0, 0, 0
xpos, ypos, zpos = 0, 0, 0
zup, zdown = 0, 0
speeds, speedz = 0.0, 0.0
total_distance = 0.0
total_time = 0.0
in_path = False
for cmd in self.cmds:
cmd=cmd.strip()
try:
if cmd[:3] == "!PZ":
params = cmd[3:].split(',')
if len(params) < 2:
params = cmd[3:].split(' ')
zup = int(params[1])
zdown = int(params[0])
print "pen: %d up, %d down" % (zup, zdown)
elif cmd[:2] == "VS":
params = cmd[2:].split(',')
if len(params) < 2:
params = cmd[2:].split(' ')
speeds = float(params[0])
print "xy speed: %f mm/s" % (speeds)
elif cmd[:3] == "!VZ":
params = cmd[3:].split(',')
if len(params) < 2:
params = cmd[3:].split(' ')
speedz = float(params[0])
print "z speed: %f mm/s" % (speedz)
elif cmd[:2] == "PU":
params = cmd[2:].split(',')
if len(params) < 2:
params = cmd[2:].split(' ')
if len(params) < 2:
continue
x = int(params[0])
y = int(params[1])
z = zup
d = dist(xpos, ypos, zpos, x, y, z)
total_distance += d
total_time += d / RML_UNITS / SPEED_TRAVERSE
traverses.append([(xpos, ypos, zpos), (x, y, z)])
xpos = x; ypos = y; zpos = z;
xmax = max(x, xmax); ymax = max(y, ymax); zmax = max(z, zmax)
xmin = min(x, xmin); ymin = min(y, ymin); zmin = min(z, zmin)
if len(cur_path) > 0:
paths.append(cur_path)
cur_path = []
elif cmd[:1] == "Z":
params = cmd[1:].split(',')
if len(params) < 2:
params = cmd[1:].split(' ')
x = int(params[0])
y = int(params[1])
z = int(params[2])
dist_xy = math.hypot(xpos-x, ypos-y) / RML_UNITS
dist_z = float(zpos-z) / RML_UNITS
time_xy = dist_xy / speeds
time_z = dist_z / speedz
total_time += max(time_xy, time_z)
total_distance += dist(xpos, ypos, zpos, x, y, z)
xpos = x; ypos = y; zpos = z;
xmax = max(x, xmax); ymax = max(y, ymax); zmax = max(z, zmax)
xmin = min(x, xmin); ymin = min(y, ymin); zmin = min(z, zmin)
cur_path.append((x, y, z))
except:
print "ignoring: %s" % cmd
pass
self.paths = paths
self.traverses = traverses
self.speed_feed = speeds
self.speed_plunge = speedz
self.xr = (xmin, xmax)
self.yr = (ymin, ymax)
self.zr = (zmin, zmax)
self.total_distance = total_distance
if self.total_distance == 0: self.total_distance = 1.0
self.total_time = total_time
if self.total_time == 0: self.total_time = 1.0
self.time_remaining = total_time
def start(self):
self.running = True
self.time_start = time.time()
self.thread.start()
def abort(self):
if self.running and not self.done.isSet():
self.should_stop.set()
def thread_fn(self):
xmax, ymax, zmax = 0, 0, 0
xpos, ypos, zpos = 0, 0, 0
zup, zdown = 0, 0
speeds, speedz = 0.0, 0.0
with self.lock:
cmds = self.cmds
for cmd in cmds:
cmd = cmd.strip()
if self.should_stop.isSet():
cmd="PA;PA;!VZ10;!PZ0,100;PU0,0;PD0,0;!MC0;"
self.serial.write(cmd)
self.serial.close()
self.aborted.set()
return
cmd=cmd.strip()
with self.lock:
self.current_cmd = cmd
self.cur_cmd_start = time.time()
self.cur_cmd_duration = 0.0
while (self.serial.getDSR() != True):
time.sleep(0.001)
self.serial.write(cmd)
try:
if cmd[:3] == "!PZ":
params = cmd[3:].split(',')
if len(params) < 2:
params = cmd[3:].split(' ')
zup = int(params[1])
zdown = int(params[0])
elif cmd[:2] == "VS":
params = cmd[2:].split(',')
if len(params) < 2:
params = cmd[2:].split(' ')
speeds = float(params[0])
with self.lock:
self.speed_feed = speeds
elif cmd[:3] == "!VZ":
params = cmd[3:].split(',')
if len(params) < 2:
params = cmd[3:].split(' ')
speedz = float(params[0])
with self.lock:
self.speed_plunge = speedz
elif cmd[:2] == "PU":
params = cmd[2:].split(',')
if len(params) < 2:
params = cmd[2:].split(' ')
if len(params) < 2:
continue
x = int(params[0])
y = int(params[1])
z = zup
d = dist(xpos, ypos, zpos, x, y, z)
t = d / RML_UNITS / SPEED_TRAVERSE
with self.lock:
self.cur_cmd_duration = t
self.time_remaining -= t
self.distance_milled += d
self.traverses_done.append(((xpos, ypos, zpos), (x, y, z)))
xpos = x; ypos = y; zpos = z;
elif cmd[:1] == "Z":
params = cmd[1:].split(',')
if len(params) < 2:
params = cmd[1:].split(' ')
x = int(params[0])
y = int(params[1])
z = int(params[2])
dist_xy = math.hypot(xpos-x, ypos-y) / RML_UNITS
dist_z = float(zpos-z) / RML_UNITS
time_xy = dist_xy / speeds
time_z = dist_z / speedz
t = max(time_xy, time_z)
with self.lock:
self.cur_cmd_duration = t
self.time_remaining -= t
self.distance_milled += dist(xpos, ypos, zpos, x, y, z)
self.segments_done.append(((xpos, ypos, zpos), (x, y, z)))
xpos = x; ypos = y; zpos = z;
time.sleep(self.cur_cmd_duration)
except:
print "ignoring: %s" % cmd
self.done.set()
class RMLSenderGUI(RMLSender):
"""This class implements the GUI."""
def __init__(self, port="/dev/ttyUSB0"):
RMLSender.__init__(self, port)
self.lines_path = []
self.lines_traverse = []
self.lines_seg_done = []
self.lines_traverse_done = []
self.n_segs_done = 0
self.n_traverse_done = 0
self.distance = 0.0
self.frame = wx.Frame(None, -1, "Modela Output", size=(640,480))
top_vbox = wx.BoxSizer(wx.VERTICAL)
self.frame.SetSizer(top_vbox)
upper_panel = wx.Panel(self.frame)
upper_panel_sizer = wx.BoxSizer(wx.HORIZONTAL)
upper_panel.SetSizer(upper_panel_sizer)
self.panel = wx.Panel(upper_panel)
self.gui_xsize, self.gui_ysize = self.panel.GetSize()
self.gui_xoff, self.gui_yoff = 0, 0
self.gui_scale = 0.5
self.panel.Bind(wx.EVT_PAINT, self.on_paint)
self.panel.Bind(wx.EVT_SIZE, self.on_size)
upper_panel_sizer.Add(self.panel, 1, wx.EXPAND | wx.ALL, border=4)
right_panel = wx.Panel(upper_panel, size=(200, 0))
#right_panel.SetBackgroundColour('gray')
right_panel_sizer = wx.BoxSizer(wx.VERTICAL)
right_panel.SetSizer(right_panel_sizer)
upper_panel_sizer.Add(right_panel, 0, wx.EXPAND)
origin_label = wx.StaticText(right_panel, label="Origin:")
right_panel_sizer.Add(origin_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.TOP, border=4)
bf = origin_label.GetFont()
bf.SetWeight(wx.BOLD)
origin_label.SetFont(bf)
self.origin_label_x = wx.StaticText(right_panel, label="x: ")
self.origin_label_y = wx.StaticText(right_panel, label="y: ")
right_panel_sizer.Add(self.origin_label_x, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT, border=4)
right_panel_sizer.Add(self.origin_label_y, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.BOTTOM, border=4)
size_label = wx.StaticText(right_panel, label="Size:")
size_label.SetFont(bf)
right_panel_sizer.Add(size_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.TOP, border=4)
self.size_label_x = wx.StaticText(right_panel, label="x: ")
self.size_label_y = wx.StaticText(right_panel, label="y: ")
right_panel_sizer.Add(self.size_label_x, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT, border=4)
right_panel_sizer.Add(self.size_label_y, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.BOTTOM, border=4)
size_label = wx.StaticText(right_panel, label="Speeds:")
size_label.SetFont(bf)
right_panel_sizer.Add(size_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.TOP, border=4)
self.plunge_label = wx.StaticText(right_panel, label="plunge: ")
self.feed_label = wx.StaticText(right_panel, label="feed: ")
right_panel_sizer.Add(self.plunge_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT, border=4)
right_panel_sizer.Add(self.feed_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.BOTTOM, border=4)
size_label = wx.StaticText(right_panel, label="Time:")
size_label.SetFont(bf)
right_panel_sizer.Add(size_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.TOP, border=4)
self.elapsed_label = wx.StaticText(right_panel, label="elapsed: ")
self.remaining_label = wx.StaticText(right_panel, label="remaining: ")
right_panel_sizer.Add(self.elapsed_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT, border=4)
right_panel_sizer.Add(self.remaining_label, 0,
wx.EXPAND | wx.ALIGN_LEFT | wx.LEFT | wx.BOTTOM, border=4)
filler_panel = wx.Panel(right_panel)
right_panel_sizer.Add(filler_panel, 1, wx.EXPAND)
self.start_button = wx.Button(right_panel, label="Begin Milling")
right_panel_sizer.Add(self.start_button, 0, wx.EXPAND | wx.ALL, border=4)
self.start_button.Bind(wx.EVT_BUTTON, self.on_startstop)
top_vbox.Add(upper_panel, 1, wx.EXPAND)
self.gauge = wx.Gauge(self.frame, range=100, size=(0,32))
top_vbox.Add(self.gauge, 0, wx.EXPAND)
self.status_label = wx.StaticText(self.frame, label="Ready.")
top_vbox.Add(self.status_label, 0, wx.EXPAND)
self.frame.Bind(wx.EVT_CLOSE, self.on_close)
self.frame.Show()
def on_close(self, event):
self.abort()
sys.exit(0)
def transform_point(self, point):
xsize, ysize = self.gui_xsize, self.gui_ysize
xoff, yoff = self.gui_xoff, self.gui_yoff
scale = self.gui_scale
x, y, z = point
x = (x - self.xr[0]) * scale + xoff
y = ysize - (y - self.yr[0]) * scale + yoff
return (x, y, z)
def update(self):
self.lines_path = []
self.lines_traverse = []
self.lines_seg_done = []
self.lines_traverse_done = []
self.gui_xsize, self.gui_ysize = self.panel.GetSize()
path_xsize = self.xr[1] - self.xr[0]
path_ysize = self.yr[1] - self.yr[0]
scalex = float(self.gui_xsize) / path_xsize
scaley = float(self.gui_ysize) / path_ysize
if scalex < scaley:
self.gui_scale = scalex
self.gui_yoff = -1* (self.gui_ysize - (path_ysize * scalex)) / 2
self.gui_xoff = 0
else:
self.gui_scale = scaley
self.gui_xoff = (self.gui_xsize - (path_xsize * scaley)) / 2
self.gui_yoff = 0
self.origin_label_x.SetLabel("x: %02.01f mm" % (self.xr[0] / RML_UNITS))
self.origin_label_y.SetLabel("y: %02.01f mm" % (self.yr[0] / RML_UNITS))
self.size_label_x.SetLabel("x: %02.01f mm" % (path_xsize / RML_UNITS))
self.size_label_y.SetLabel("y: %02.01f mm" % (path_ysize / RML_UNITS))
self.plunge_label.SetLabel("plunge: %01.01f mm/s" % (self.speed_plunge))
self.feed_label.SetLabel("feed: %01.01f mm/s" % (self.speed_feed))
self.remaining_label.SetLabel("remaining: %02d:%02d s" % (int(self.time_remaining)/60,
int(self.time_remaining) % 60))
for path in self.paths:
for i in xrange(len(path)-1):
p1 = self.transform_point(path[i])
p2 = self.transform_point(path[i+1])
line = (p1[0], p1[1], p2[0], p2[1])
self.lines_path.append(line)
for tr in self.traverses:
p1 = self.transform_point(tr[0])
p2 = self.transform_point(tr[1])
line = (p1[0], p1[1], p2[0], p2[1])
self.lines_traverse.append(line)
for seg in self.segments_done:
p1 = self.transform_point(seg[0])
p2 = self.transform_point(seg[1])
line = (p1[0], p1[1], p2[0], p2[1])
self.lines_seg_done.append(line)
for tr in self.traverses_done:
p1 = self.transform_point(tr[0])
p2 = self.transform_point(tr[1])
line = (p1[0], p1[1], p2[0], p2[1])
self.lines_traverse_done.append(line)
self.panel.Refresh()
def on_paint(self, event):
dc = wx.PaintDC(self.panel)
dc.Clear()
dc.SetPen(wx.Pen('gray', 1))
dc.DrawLineList(self.lines_traverse)
dc.SetPen(wx.Pen('blue', 1))
dc.DrawLineList(self.lines_path)
dc.SetPen(wx.Pen('gray', 2))
dc.DrawLineList(self.lines_traverse_done)
dc.SetPen(wx.Pen('red', 2))
dc.DrawLineList(self.lines_seg_done)
def new_segs_check(self, event):
dc = wx.ClientDC(self.panel)
with self.lock:
dc.SetPen(wx.Pen('red', 2))
for i in xrange(self.n_segs_done, len(self.segments_done)):
seg = self.segments_done[i]
p1 = self.transform_point(seg[0])
p2 = self.transform_point(seg[1])
dc.DrawLine(p1[0], p1[1], p2[0], p2[1])
self.lines_seg_done.append((p1[0], p1[1], p2[0], p2[1]))
self.n_segs_done = len(self.segments_done)
dc.SetPen(wx.Pen('gray', 2))
for i in xrange(self.n_traverse_done, len(self.traverses_done)):
seg = self.traverses_done[i]
p1 = self.transform_point(seg[0])
p2 = self.transform_point(seg[1])
dc.DrawLine(p1[0], p1[1], p2[0], p2[1])
self.lines_traverse_done.append((p1[0], p1[1], p2[0], p2[1]))
self.n_traverse_done = len(self.traverses_done)
perc_done = int(round(100.0 * (self.distance_milled / self.total_distance)))
self.gauge.SetValue(perc_done)
self.plunge_label.SetLabel("plunge: %01.01f mm/s" % (self.speed_plunge))
self.feed_label.SetLabel("feed: %01.01f mm/s" % (self.speed_feed))
elapsed = time.time() - self.time_start
self.elapsed_label.SetLabel("elapsed: %02d:%02d s" %
(int(elapsed)/60, int(elapsed) % 60))
self.remaining_label.SetLabel("remaining: %02d:%02d s" %
(int(self.time_remaining)/60, int(self.time_remaining) % 60))
cmd_elapsed = time.time() - self.cur_cmd_start
cmd_rem = self.cur_cmd_duration - cmd_elapsed
self.status_label.SetLabel("Executing: %s (%d:%02d)" % (self.current_cmd,
cmd_rem / 60, cmd_rem % 60))
if self.done.isSet():
self.timer.Stop()
self.start_button.SetLabel("Exit")
self.status_label.SetLabel("Finished.")
elif self.should_stop.isSet():
self.status_label.SetLabel("Aborting; wait for Modela to stop or switch to view mode and hold up/down buttons to clear buffer")
if self.aborted.isSet():
sys.exit(0)
def on_size(self, event):
self.update()
def on_startstop(self, event):
if not self.running:
self.start_button.SetLabel("Abort")
self.start()
self.timer = wx.Timer(self.frame)
self.frame.Bind(wx.EVT_TIMER, self.new_segs_check, self.timer)
self.timer.Start(100)
elif self.done.isSet():
sys.exit(0)
else:
self.abort()
def load_file(self, filename):
RMLSender.load_file(self,filename)
self.update()
if __name__ == '__main__':
if len(sys.argv) < 2:
print "usage: %s file.rml [/dev/ttyUSB0]" % sys.argv[0]
sys.exit(1)
filename = sys.argv[1]
port = "/dev/ttyUSB0"
if len(sys.argv) > 2:
port = sys.argv[2]
app = wx.PySimpleApp()
f = open(filename)
data = f.read()
f.close()
if len(data) < 150:
sender = RMLSender(port)
sender.load_file(filename)
sender.start()
sender.thread.join()
else:
sender = RMLSenderGUI(port)
sender.load_file(filename)
app.MainLoop()