From d9ebc7c54321eb5fbe8ed5e1f7381e2298d1b314 Mon Sep 17 00:00:00 2001 From: Christian Rodriguez Jacobs Date: Fri, 7 Mar 2025 13:44:35 +0000 Subject: [PATCH] Replace telnetlib with telnetlib3. --- pyqso/dx_cluster.py | 247 +++++++++++++++++++++----------------------- requirements.txt | 1 + 2 files changed, 118 insertions(+), 130 deletions(-) diff --git a/pyqso/dx_cluster.py b/pyqso/dx_cluster.py index 6e60717..b8c34ac 100644 --- a/pyqso/dx_cluster.py +++ b/pyqso/dx_cluster.py @@ -17,13 +17,12 @@ # You should have received a copy of the GNU General Public License # along with PyQSO. If not, see . -from gi.repository import Gtk, GObject, Gdk +from gi.repository import Gtk, GObject, Gdk, GLib import logging -import telnetlib -try: - import configparser -except ImportError: - import ConfigParser as configparser +import telnetlib3 +import asyncio +from threading import Thread +import configparser import os.path from pyqso.telnet_connection_dialog import TelnetConnectionDialog @@ -46,44 +45,43 @@ class DXCluster: self.application = application self.builder = self.application.builder - self.connection = None + self.thread = None + self.telnet_reader = None + self.telnet_writer = None # Connect signals. - self.builder.get_object("mitem_new").connect("activate", self.new_server) - self.builder.get_object("mitem_disconnect").connect("activate", self.telnet_disconnect) - self.builder.get_object("send").connect("clicked", self.telnet_send_command) - self.builder.get_object("command").connect("key-press-event", self.on_command_key_press) + self.builder.get_object('mitem_new').connect('activate', self.connect_to_new_server) + self.builder.get_object('mitem_disconnect').connect('activate', self.disconnect) + self.builder.get_object('send').connect('clicked', self.send_command) + self.builder.get_object('command').connect('key-press-event', self.on_command_key_press) # Get the text renderer and its buffer. - self.renderer = self.builder.get_object("renderer") + self.renderer = self.builder.get_object('renderer') self.buffer = self.renderer.get_buffer() # Items whose sensitivity may change. self.items = {} - self.items["CONNECT"] = self.builder.get_object("mitem_connect") - self.items["DISCONNECT"] = self.builder.get_object("mitem_disconnect") - self.items["SEND"] = self.builder.get_object("send") + self.items['CONNECT'] = self.builder.get_object('mitem_connect') + self.items['DISCONNECT'] = self.builder.get_object('mitem_disconnect') + self.items['SEND'] = self.builder.get_object('send') self.set_items_sensitive(True) self.populate_bookmarks() - - logging.debug("DX cluster ready!") - - return + + logging.debug("DX cluster ready.") def on_command_key_press(self, widget, event, data=None): """ If the Return key is pressed when the focus is on the command box, then send whatever command the user has entered. """ - if(event.keyval == Gdk.KEY_Return): - self.telnet_send_command() - return + if event.keyval == Gdk.KEY_Return: + self.send_command() - def new_server(self, widget=None): + def connect_to_new_server(self, widget=None): """ Get Telnet server host and login details specified in the Gtk.Entry boxes in the Telnet connection dialog and attempt a connection. """ # Get connection details. tcd = TelnetConnectionDialog(self.application) response = tcd.dialog.run() - if(response == Gtk.ResponseType.OK): + if response == Gtk.ResponseType.OK: host = tcd.host port = tcd.port username = tcd.username @@ -92,12 +90,12 @@ class DXCluster: tcd.dialog.destroy() # Handle empty hostname. - if(not host): + if not host: logging.error("No hostname specified.") return # Handle empty port number. - if(not port): + if not port: logging.warning("No port specified. Assuming default port 23...") port = 23 else: @@ -110,13 +108,13 @@ class DXCluster: return # Save the server details in a new bookmark, if desired. - if(bookmark): + if bookmark: try: config = configparser.ConfigParser() config.read(BOOKMARKS_FILE) # Use the host name as the bookmark's identifier. - if(username): + if username: bookmark_identifier = "%s@%s:%d" % (username, host, port) else: bookmark_identifier = "%s:%d" % (host, port) @@ -128,10 +126,10 @@ class DXCluster: except configparser.DuplicateSectionError: # If the hostname already exists, assume the user wants to update the port number, username and/or password. logging.warning("Bookmark '%s' already exists. Over-writing existing details..." % (bookmark_identifier)) - config.set(bookmark_identifier, "host", host) - config.set(bookmark_identifier, "port", str(port)) - config.set(bookmark_identifier, "username", username) - config.set(bookmark_identifier, "password", password) + config.set(bookmark_identifier, 'host', host) + config.set(bookmark_identifier, 'port', str(port)) + config.set(bookmark_identifier, 'username', username) + config.set(bookmark_identifier, 'password', password) # Write the bookmarks to file. if not os.path.exists(os.path.expanduser('~/.config/pyqso')): @@ -140,28 +138,26 @@ class DXCluster: config.write(f) self.populate_bookmarks() - except IOError: # Maybe the bookmarks file could not be written to? - logging.error("Bookmark could not be saved. Check bookmarks file permissions? Going ahead with the server connection anyway...") + logging.error("Bookmark could not be saved. Check bookmarks file permissions. Proceeding with the server connection anyway...") # Attempt a connection with the server. - self.telnet_connect(host, port, username, password) + self.connect(host, port, username, password) else: tcd.dialog.destroy() - return def populate_bookmarks(self): """ Populate the list of bookmarked Telnet servers in the menu. """ # Get the bookmarks submenu. - subm_bookmarks = self.builder.get_object("subm_bookmarks") + subm_bookmarks = self.builder.get_object('subm_bookmarks') config = configparser.ConfigParser() have_config = (config.read(BOOKMARKS_FILE) != []) - if(have_config): + if have_config: try: # Clear the menu of all current bookmarks. for i in subm_bookmarks.get_children(): @@ -170,18 +166,16 @@ class DXCluster: # Add all bookmarks in the config file. for bookmark in config.sections(): mitem = Gtk.MenuItem(label=bookmark) - mitem.connect("activate", self.bookmarked_server, bookmark) + mitem.connect('activate', self.connect_to_bookmarked_server, bookmark) subm_bookmarks.append(mitem) except Exception as e: logging.error("An error occurred whilst populating the DX cluster bookmarks menu.") logging.exception(e) - self.builder.get_object("dx_cluster").show_all() # Need to do this to update the bookmarks list in the menu. + self.builder.get_object('dx_cluster').show_all() # Need to do this to update the bookmarks list in the menu. - return - - def bookmarked_server(self, widget, name): + def connect_to_bookmarked_server(self, widget, name): """ Get Telnet server host and login details from an existing bookmark and attempt a connection. :arg str name: The name of the bookmark. This is the same as the server's hostname. @@ -190,14 +184,14 @@ class DXCluster: config = configparser.ConfigParser() have_config = (config.read(BOOKMARKS_FILE) != []) try: - if(not have_config): + if not have_config: raise IOError("The bookmark's details could not be loaded.") - host = config.get(name, "host") - port = int(config.get(name, "port")) - username = config.get(name, "username") - password = config.get(name, "password") - self.telnet_connect(host, port, username, password) + host = config.get(name, 'host') + port = int(config.get(name, 'port')) + username = config.get(name, 'username') + password = config.get(name, 'password') + self.connect(host, port, username, password) except ValueError as e: # This exception may occur when casting the port (which is a str) to an int. @@ -208,9 +202,68 @@ class DXCluster: logging.error("Could not connect to Telnet server '%s'." % name) logging.exception(e) - return + def render(self, data): + """ Add the text received from the Telnet server to the + text buffer, and perform autoscrolling. """ + end_iter = self.buffer.get_end_iter() + self.buffer.insert(end_iter, data) + end_mark = self.buffer.create_mark('end', end_iter) + self.renderer.scroll_mark_onscreen(end_mark) - def telnet_connect(self, host, port=23, username=None, password=None): + async def telnet_shell(self, host, port, username, password): + """ Receive and render text from the Telnet server. """ + while True: + # Wait for data from the Telnet server. + data = await self.telnet_reader.read(4096) + if not data: + break + else: + # Call the rendering method in the main Gtk loop, + # rather than the asyncio loop. + GLib.idle_add(self.render, data) + + # Responses to login/password prompts. + if username and ("login: " in data): + self.telnet_writer.write(username + '\n') + if password and ("password: " in data): + self.telnet_writer.write(password + '\n') + + def send_command(self, widget=None): + """ Send the user-specified command in the Gtk.Entry box to the Telnet server. """ + command = self.builder.get_object('command') + self.telnet_writer.write(command.get_text() + '\n') + command.set_text("") + + def start_telnet(self, host, port, username, password): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Attempt connection to Telnet server. + try: + logging.debug("Attempting connection to Telnet server %s:%d..." % (host, port)) + self.telnet_reader, self.telnet_writer = loop.run_until_complete(telnetlib3.open_connection(host, port)) + logging.debug("Connection to %s:%d established." % (host, port)) + except Exception as e: + logging.exception(e) + message = "Could not create a connection to the Telnet server %s:%d. Check connection to the internet. Check connection details." % (host, port) + GLib.idle_add(error, self.application.window, message) + return + + # When successfully connected, disable the option to connect. + GLib.idle_add(self.set_items_sensitive, False) + + # Run interactive shell. + try: + loop.run_until_complete(self.telnet_shell(host, port, username, password)) + except Exception as e: + logging.exception(e) + GLib.idle_add(error, self.application.window, "Exception occurred in Telnet shell.") + + # Clean up and re-enable the option to connect again. + GLib.idle_add(self.set_items_sensitive, True) + loop.close() + + def connect(self, host, port=23, username=None, password=None): """ Connect to a user-specified Telnet server. :arg str host: The Telnet server's hostname. @@ -220,99 +273,33 @@ class DXCluster: """ # Handle empty host/port string (or the case where host/port are None). - if(not host): + if not host: error(parent=self.application.window, message="Unable to connect to a DX cluster because no hostname was specified.") return - if(not port): + if not port: logging.warning("No port specified. Assuming default port 23...") port = 23 # Use the default Telnet port. try: - logging.debug("Attempting connection to Telnet server %s:%d..." % (host, port)) - self.connection = telnetlib.Telnet(host, port) - assert(self.connection) - - if(username): - self.connection.read_until("login: ".encode()) - self.connection.write((username + "\n").encode()) - if(password): - self.connection.read_until("password: ".encode()) - self.connection.write((password + "\n").encode()) + self.thread = Thread(target=self.start_telnet, args=(host, port, username, password)) + self.thread.setDaemon(True) # Allows the thread to be stopped when the main Gtk thread is stopped. + self.thread.start() except Exception as e: - message = "Could not create a connection to the Telnet server %s:%d. Check connection to the internets? Check connection details?" % (host, port) - error(parent=self.application.window, message=message) logging.exception(e) - self.connection = None + error(parent=self.application.window, message="Could not start Telnet.") return - logging.debug("Connection to %s:%d established." % (host, port)) - - self.set_items_sensitive(False) - - self.check_io_event = GObject.timeout_add(1000, self.on_telnet_io) - - return - - def telnet_disconnect(self, widget=None): - """ Disconnect from a Telnet server and remove the I/O timer. """ - if(self.connection): - self.connection.close() - self.buffer.set_text("") - self.connection = None + def disconnect(self, widget=None): + """ Disconnect from a Telnet server. """ + self.telnet_writer.write("quit\n") + self.telnet_writer.close() self.set_items_sensitive(True) - # Stop checking for server output once disconnected. - try: - GObject.source_remove(self.check_io_event) - except AttributeError: - # This may happen if a connection hasn't yet been established. - pass - - return - - def telnet_send_command(self, widget=None): - """ Send the user-specified command in the Gtk.Entry box to the Telnet server (if PyQSO is connected to one). """ - if(self.connection): - command = self.builder.get_object("command") - self.connection.write((command.get_text() + "\n").encode()) - command.set_text("") - return - - def on_telnet_io(self): - """ Retrieve any new data from the Telnet server and print it out in the Gtk.TextView widget. - - :returns: Always returns True to satisfy the GObject timer. - :rtype: bool - """ - if(self.connection): - text = self.connection.read_very_eager() - text = text.decode("ascii", "replace") # Replace any characters that cannot be decoded with a replacement marker. - try: - text = text.replace("\u0007", "") # Remove the BEL Unicode character from the end of the line - except UnicodeDecodeError: - pass - - # Allow auto-scrolling to the new text entry if the focus is already at - # the very end of the Gtk.TextView. Otherwise, don't auto-scroll - # in case the user is reading something further up. - # Note: This is based on the code from http://forums.gentoo.org/viewtopic-t-445598-view-next.html - end_iter = self.buffer.get_end_iter() - end_mark = self.buffer.create_mark(None, end_iter) - self.renderer.move_mark_onscreen(end_mark) - at_end = self.buffer.get_iter_at_mark(end_mark).equal(end_iter) - self.buffer.insert(end_iter, text) - if(at_end): - end_mark = self.buffer.create_mark(None, end_iter) - self.renderer.scroll_mark_onscreen(end_mark) - - return True - def set_items_sensitive(self, sensitive): """ Enable/disable the relevant buttons for connecting/disconnecting from a DX cluster, so that users cannot click the connect button if PyQSO is already connected. :arg bool sensitive: If True, enable the Connect button and disable the Disconnect button. If False, vice versa. """ - self.items["CONNECT"].set_sensitive(sensitive) - self.items["DISCONNECT"].set_sensitive(not sensitive) - self.items["SEND"].set_sensitive(not sensitive) - return + self.items['CONNECT'].set_sensitive(sensitive) + self.items['DISCONNECT'].set_sensitive(not sensitive) + self.items['SEND'].set_sensitive(not sensitive) diff --git a/requirements.txt b/requirements.txt index db85b8f..a37c8c0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ geocoder vext pygobject setuptools +telnetlib3