Replace telnetlib with telnetlib3.

master
Christian Rodriguez Jacobs 2025-03-07 13:44:35 +00:00
rodzic 0733d8535e
commit d9ebc7c543
2 zmienionych plików z 118 dodań i 130 usunięć

Wyświetl plik

@ -17,13 +17,12 @@
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from gi.repository import Gtk, GObject, Gdk
from gi.repository import Gtk, GObject, Gdk, GLib
import logging
import telnetlib
try:
import configparser
except ImportError:
import ConfigParser as configparser
import telnetlib3
import asyncio
from threading import Thread
import configparser
import os.path
from pyqso.telnet_connection_dialog import TelnetConnectionDialog
@ -46,44 +45,43 @@ class DXCluster:
self.application = application
self.builder = self.application.builder
self.connection = None
self.thread = None
self.telnet_reader = None
self.telnet_writer = None
# Connect signals.
self.builder.get_object("mitem_new").connect("activate", self.new_server)
self.builder.get_object("mitem_disconnect").connect("activate", self.telnet_disconnect)
self.builder.get_object("send").connect("clicked", self.telnet_send_command)
self.builder.get_object("command").connect("key-press-event", self.on_command_key_press)
self.builder.get_object('mitem_new').connect('activate', self.connect_to_new_server)
self.builder.get_object('mitem_disconnect').connect('activate', self.disconnect)
self.builder.get_object('send').connect('clicked', self.send_command)
self.builder.get_object('command').connect('key-press-event', self.on_command_key_press)
# Get the text renderer and its buffer.
self.renderer = self.builder.get_object("renderer")
self.renderer = self.builder.get_object('renderer')
self.buffer = self.renderer.get_buffer()
# Items whose sensitivity may change.
self.items = {}
self.items["CONNECT"] = self.builder.get_object("mitem_connect")
self.items["DISCONNECT"] = self.builder.get_object("mitem_disconnect")
self.items["SEND"] = self.builder.get_object("send")
self.items['CONNECT'] = self.builder.get_object('mitem_connect')
self.items['DISCONNECT'] = self.builder.get_object('mitem_disconnect')
self.items['SEND'] = self.builder.get_object('send')
self.set_items_sensitive(True)
self.populate_bookmarks()
logging.debug("DX cluster ready!")
return
logging.debug("DX cluster ready.")
def on_command_key_press(self, widget, event, data=None):
""" If the Return key is pressed when the focus is on the command box, then send whatever command the user has entered. """
if(event.keyval == Gdk.KEY_Return):
self.telnet_send_command()
return
if event.keyval == Gdk.KEY_Return:
self.send_command()
def new_server(self, widget=None):
def connect_to_new_server(self, widget=None):
""" Get Telnet server host and login details specified in the Gtk.Entry boxes in the Telnet connection dialog and attempt a connection. """
# Get connection details.
tcd = TelnetConnectionDialog(self.application)
response = tcd.dialog.run()
if(response == Gtk.ResponseType.OK):
if response == Gtk.ResponseType.OK:
host = tcd.host
port = tcd.port
username = tcd.username
@ -92,12 +90,12 @@ class DXCluster:
tcd.dialog.destroy()
# Handle empty hostname.
if(not host):
if not host:
logging.error("No hostname specified.")
return
# Handle empty port number.
if(not port):
if not port:
logging.warning("No port specified. Assuming default port 23...")
port = 23
else:
@ -110,13 +108,13 @@ class DXCluster:
return
# Save the server details in a new bookmark, if desired.
if(bookmark):
if bookmark:
try:
config = configparser.ConfigParser()
config.read(BOOKMARKS_FILE)
# Use the host name as the bookmark's identifier.
if(username):
if username:
bookmark_identifier = "%s@%s:%d" % (username, host, port)
else:
bookmark_identifier = "%s:%d" % (host, port)
@ -128,10 +126,10 @@ class DXCluster:
except configparser.DuplicateSectionError:
# If the hostname already exists, assume the user wants to update the port number, username and/or password.
logging.warning("Bookmark '%s' already exists. Over-writing existing details..." % (bookmark_identifier))
config.set(bookmark_identifier, "host", host)
config.set(bookmark_identifier, "port", str(port))
config.set(bookmark_identifier, "username", username)
config.set(bookmark_identifier, "password", password)
config.set(bookmark_identifier, 'host', host)
config.set(bookmark_identifier, 'port', str(port))
config.set(bookmark_identifier, 'username', username)
config.set(bookmark_identifier, 'password', password)
# Write the bookmarks to file.
if not os.path.exists(os.path.expanduser('~/.config/pyqso')):
@ -140,28 +138,26 @@ class DXCluster:
config.write(f)
self.populate_bookmarks()
except IOError:
# Maybe the bookmarks file could not be written to?
logging.error("Bookmark could not be saved. Check bookmarks file permissions? Going ahead with the server connection anyway...")
logging.error("Bookmark could not be saved. Check bookmarks file permissions. Proceeding with the server connection anyway...")
# Attempt a connection with the server.
self.telnet_connect(host, port, username, password)
self.connect(host, port, username, password)
else:
tcd.dialog.destroy()
return
def populate_bookmarks(self):
""" Populate the list of bookmarked Telnet servers in the menu. """
# Get the bookmarks submenu.
subm_bookmarks = self.builder.get_object("subm_bookmarks")
subm_bookmarks = self.builder.get_object('subm_bookmarks')
config = configparser.ConfigParser()
have_config = (config.read(BOOKMARKS_FILE) != [])
if(have_config):
if have_config:
try:
# Clear the menu of all current bookmarks.
for i in subm_bookmarks.get_children():
@ -170,18 +166,16 @@ class DXCluster:
# Add all bookmarks in the config file.
for bookmark in config.sections():
mitem = Gtk.MenuItem(label=bookmark)
mitem.connect("activate", self.bookmarked_server, bookmark)
mitem.connect('activate', self.connect_to_bookmarked_server, bookmark)
subm_bookmarks.append(mitem)
except Exception as e:
logging.error("An error occurred whilst populating the DX cluster bookmarks menu.")
logging.exception(e)
self.builder.get_object("dx_cluster").show_all() # Need to do this to update the bookmarks list in the menu.
self.builder.get_object('dx_cluster').show_all() # Need to do this to update the bookmarks list in the menu.
return
def bookmarked_server(self, widget, name):
def connect_to_bookmarked_server(self, widget, name):
""" Get Telnet server host and login details from an existing bookmark and attempt a connection.
:arg str name: The name of the bookmark. This is the same as the server's hostname.
@ -190,14 +184,14 @@ class DXCluster:
config = configparser.ConfigParser()
have_config = (config.read(BOOKMARKS_FILE) != [])
try:
if(not have_config):
if not have_config:
raise IOError("The bookmark's details could not be loaded.")
host = config.get(name, "host")
port = int(config.get(name, "port"))
username = config.get(name, "username")
password = config.get(name, "password")
self.telnet_connect(host, port, username, password)
host = config.get(name, 'host')
port = int(config.get(name, 'port'))
username = config.get(name, 'username')
password = config.get(name, 'password')
self.connect(host, port, username, password)
except ValueError as e:
# This exception may occur when casting the port (which is a str) to an int.
@ -208,9 +202,68 @@ class DXCluster:
logging.error("Could not connect to Telnet server '%s'." % name)
logging.exception(e)
return
def render(self, data):
""" Add the text received from the Telnet server to the
text buffer, and perform autoscrolling. """
end_iter = self.buffer.get_end_iter()
self.buffer.insert(end_iter, data)
end_mark = self.buffer.create_mark('end', end_iter)
self.renderer.scroll_mark_onscreen(end_mark)
def telnet_connect(self, host, port=23, username=None, password=None):
async def telnet_shell(self, host, port, username, password):
""" Receive and render text from the Telnet server. """
while True:
# Wait for data from the Telnet server.
data = await self.telnet_reader.read(4096)
if not data:
break
else:
# Call the rendering method in the main Gtk loop,
# rather than the asyncio loop.
GLib.idle_add(self.render, data)
# Responses to login/password prompts.
if username and ("login: " in data):
self.telnet_writer.write(username + '\n')
if password and ("password: " in data):
self.telnet_writer.write(password + '\n')
def send_command(self, widget=None):
""" Send the user-specified command in the Gtk.Entry box to the Telnet server. """
command = self.builder.get_object('command')
self.telnet_writer.write(command.get_text() + '\n')
command.set_text("")
def start_telnet(self, host, port, username, password):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Attempt connection to Telnet server.
try:
logging.debug("Attempting connection to Telnet server %s:%d..." % (host, port))
self.telnet_reader, self.telnet_writer = loop.run_until_complete(telnetlib3.open_connection(host, port))
logging.debug("Connection to %s:%d established." % (host, port))
except Exception as e:
logging.exception(e)
message = "Could not create a connection to the Telnet server %s:%d. Check connection to the internet. Check connection details." % (host, port)
GLib.idle_add(error, self.application.window, message)
return
# When successfully connected, disable the option to connect.
GLib.idle_add(self.set_items_sensitive, False)
# Run interactive shell.
try:
loop.run_until_complete(self.telnet_shell(host, port, username, password))
except Exception as e:
logging.exception(e)
GLib.idle_add(error, self.application.window, "Exception occurred in Telnet shell.")
# Clean up and re-enable the option to connect again.
GLib.idle_add(self.set_items_sensitive, True)
loop.close()
def connect(self, host, port=23, username=None, password=None):
""" Connect to a user-specified Telnet server.
:arg str host: The Telnet server's hostname.
@ -220,99 +273,33 @@ class DXCluster:
"""
# Handle empty host/port string (or the case where host/port are None).
if(not host):
if not host:
error(parent=self.application.window, message="Unable to connect to a DX cluster because no hostname was specified.")
return
if(not port):
if not port:
logging.warning("No port specified. Assuming default port 23...")
port = 23 # Use the default Telnet port.
try:
logging.debug("Attempting connection to Telnet server %s:%d..." % (host, port))
self.connection = telnetlib.Telnet(host, port)
assert(self.connection)
if(username):
self.connection.read_until("login: ".encode())
self.connection.write((username + "\n").encode())
if(password):
self.connection.read_until("password: ".encode())
self.connection.write((password + "\n").encode())
self.thread = Thread(target=self.start_telnet, args=(host, port, username, password))
self.thread.setDaemon(True) # Allows the thread to be stopped when the main Gtk thread is stopped.
self.thread.start()
except Exception as e:
message = "Could not create a connection to the Telnet server %s:%d. Check connection to the internets? Check connection details?" % (host, port)
error(parent=self.application.window, message=message)
logging.exception(e)
self.connection = None
error(parent=self.application.window, message="Could not start Telnet.")
return
logging.debug("Connection to %s:%d established." % (host, port))
self.set_items_sensitive(False)
self.check_io_event = GObject.timeout_add(1000, self.on_telnet_io)
return
def telnet_disconnect(self, widget=None):
""" Disconnect from a Telnet server and remove the I/O timer. """
if(self.connection):
self.connection.close()
self.buffer.set_text("")
self.connection = None
def disconnect(self, widget=None):
""" Disconnect from a Telnet server. """
self.telnet_writer.write("quit\n")
self.telnet_writer.close()
self.set_items_sensitive(True)
# Stop checking for server output once disconnected.
try:
GObject.source_remove(self.check_io_event)
except AttributeError:
# This may happen if a connection hasn't yet been established.
pass
return
def telnet_send_command(self, widget=None):
""" Send the user-specified command in the Gtk.Entry box to the Telnet server (if PyQSO is connected to one). """
if(self.connection):
command = self.builder.get_object("command")
self.connection.write((command.get_text() + "\n").encode())
command.set_text("")
return
def on_telnet_io(self):
""" Retrieve any new data from the Telnet server and print it out in the Gtk.TextView widget.
:returns: Always returns True to satisfy the GObject timer.
:rtype: bool
"""
if(self.connection):
text = self.connection.read_very_eager()
text = text.decode("ascii", "replace") # Replace any characters that cannot be decoded with a replacement marker.
try:
text = text.replace("\u0007", "") # Remove the BEL Unicode character from the end of the line
except UnicodeDecodeError:
pass
# Allow auto-scrolling to the new text entry if the focus is already at
# the very end of the Gtk.TextView. Otherwise, don't auto-scroll
# in case the user is reading something further up.
# Note: This is based on the code from http://forums.gentoo.org/viewtopic-t-445598-view-next.html
end_iter = self.buffer.get_end_iter()
end_mark = self.buffer.create_mark(None, end_iter)
self.renderer.move_mark_onscreen(end_mark)
at_end = self.buffer.get_iter_at_mark(end_mark).equal(end_iter)
self.buffer.insert(end_iter, text)
if(at_end):
end_mark = self.buffer.create_mark(None, end_iter)
self.renderer.scroll_mark_onscreen(end_mark)
return True
def set_items_sensitive(self, sensitive):
""" Enable/disable the relevant buttons for connecting/disconnecting from a DX cluster, so that users cannot click the connect button if PyQSO is already connected.
:arg bool sensitive: If True, enable the Connect button and disable the Disconnect button. If False, vice versa.
"""
self.items["CONNECT"].set_sensitive(sensitive)
self.items["DISCONNECT"].set_sensitive(not sensitive)
self.items["SEND"].set_sensitive(not sensitive)
return
self.items['CONNECT'].set_sensitive(sensitive)
self.items['DISCONNECT'].set_sensitive(not sensitive)
self.items['SEND'].set_sensitive(not sensitive)

Wyświetl plik

@ -7,3 +7,4 @@ geocoder
vext
pygobject
setuptools
telnetlib3