pyqso/pyqso/dx_cluster.py

319 wiersze
13 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2013-2017 Christian Thomas Jacobs.
# This file is part of PyQSO.
# PyQSO is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyQSO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyQSO. If not, see <http://www.gnu.org/licenses/>.
from gi.repository import Gtk, GObject, Gdk
import logging
import telnetlib
try:
import configparser
except ImportError:
import ConfigParser as configparser
import os.path
from pyqso.telnet_connection_dialog import TelnetConnectionDialog
from pyqso.auxiliary_dialogs import error
BOOKMARKS_FILE = os.path.expanduser('~/.config/pyqso/bookmarks.ini')
class DXCluster:
""" A tool for connecting to a DX cluster (specifically Telnet-based DX clusters). """
def __init__(self, application):
""" Set up the DX cluster, and set up a timer so that PyQSO can retrieve new data from the Telnet server every few seconds.
:arg application: The PyQSO application containing the main Gtk window, etc.
"""
logging.debug("Setting up the DX cluster...")
self.application = application
self.builder = self.application.builder
self.connection = None
# Connect signals.
self.builder.get_object("mitem_new").connect("activate", self.new_server)
self.builder.get_object("mitem_disconnect").connect("activate", self.telnet_disconnect)
self.builder.get_object("send").connect("clicked", self.telnet_send_command)
self.builder.get_object("command").connect("key-press-event", self.on_command_key_press)
# Get the text renderer and its buffer.
self.renderer = self.builder.get_object("renderer")
self.buffer = self.renderer.get_buffer()
# Items whose sensitivity may change.
self.items = {}
self.items["CONNECT"] = self.builder.get_object("mitem_connect")
self.items["DISCONNECT"] = self.builder.get_object("mitem_disconnect")
self.items["SEND"] = self.builder.get_object("send")
self.set_items_sensitive(True)
self.populate_bookmarks()
logging.debug("DX cluster ready!")
return
def on_command_key_press(self, widget, event, data=None):
""" If the Return key is pressed when the focus is on the command box, then send whatever command the user has entered. """
if(event.keyval == Gdk.KEY_Return):
self.telnet_send_command()
return
def new_server(self, widget=None):
""" Get Telnet server host and login details specified in the Gtk.Entry boxes in the Telnet connection dialog and attempt a connection. """
# Get connection details.
tcd = TelnetConnectionDialog(self.application)
response = tcd.dialog.run()
if(response == Gtk.ResponseType.OK):
host = tcd.host
port = tcd.port
username = tcd.username
password = tcd.password
bookmark = tcd.bookmark
tcd.dialog.destroy()
# Handle empty hostname.
if(not host):
logging.error("No hostname specified.")
return
# Handle empty port number.
if(not port):
logging.warning("No port specified. Assuming default port 23...")
port = 23
else:
try:
# Cast port into an int.
port = int(port)
except ValueError as e:
logging.error("Could not cast the DX cluster's port information to an integer.")
logging.exception(e)
return
# Save the server details in a new bookmark, if desired.
if(bookmark):
try:
config = configparser.ConfigParser()
config.read(BOOKMARKS_FILE)
# Use the host name as the bookmark's identifier.
if(username):
bookmark_identifier = "%s@%s:%d" % (username, host, port)
else:
bookmark_identifier = "%s:%d" % (host, port)
logging.debug("Using %s as the bookmark identifier." % bookmark_identifier)
# Add bookmark.
try:
config.add_section(bookmark_identifier)
except configparser.DuplicateSectionError:
# If the hostname already exists, assume the user wants to update the port number, username and/or password.
logging.warning("Bookmark '%s' already exists. Over-writing existing details..." % (bookmark_identifier))
config.set(bookmark_identifier, "host", host)
config.set(bookmark_identifier, "port", str(port))
config.set(bookmark_identifier, "username", username)
config.set(bookmark_identifier, "password", password)
# Write the bookmarks to file.
if not os.path.exists(os.path.expanduser('~/.config/pyqso')):
os.makedirs(os.path.expanduser('~/.config/pyqso'))
with open(BOOKMARKS_FILE, 'w') as f:
config.write(f)
self.populate_bookmarks()
except IOError:
# Maybe the bookmarks file could not be written to?
logging.error("Bookmark could not be saved. Check bookmarks file permissions? Going ahead with the server connection anyway...")
# Attempt a connection with the server.
self.telnet_connect(host, port, username, password)
else:
tcd.dialog.destroy()
return
def populate_bookmarks(self):
""" Populate the list of bookmarked Telnet servers in the menu. """
# Get the bookmarks submenu.
subm_bookmarks = self.builder.get_object("subm_bookmarks")
config = configparser.ConfigParser()
have_config = (config.read(BOOKMARKS_FILE) != [])
if(have_config):
try:
# Clear the menu of all current bookmarks.
for i in subm_bookmarks.get_children():
subm_bookmarks.remove(i)
# Add all bookmarks in the config file.
for bookmark in config.sections():
mitem = Gtk.MenuItem(label=bookmark)
mitem.connect("activate", self.bookmarked_server, bookmark)
subm_bookmarks.append(mitem)
except Exception as e:
logging.error("An error occurred whilst populating the DX cluster bookmarks menu.")
logging.exception(e)
self.builder.get_object("dx_cluster").show_all() # Need to do this to update the bookmarks list in the menu.
return
def bookmarked_server(self, widget, name):
""" Get Telnet server host and login details from an existing bookmark and attempt a connection.
:arg str name: The name of the bookmark. This is the same as the server's hostname.
"""
config = configparser.ConfigParser()
have_config = (config.read(BOOKMARKS_FILE) != [])
try:
if(not have_config):
raise IOError("The bookmark's details could not be loaded.")
host = config.get(name, "host")
port = int(config.get(name, "port"))
username = config.get(name, "username")
password = config.get(name, "password")
self.telnet_connect(host, port, username, password)
except ValueError as e:
# This exception may occur when casting the port (which is a str) to an int.
logging.exception(e)
except IOError as e:
logging.exception(e)
except Exception as e:
logging.error("Could not connect to Telnet server '%s'." % name)
logging.exception(e)
return
def telnet_connect(self, host, port=23, username=None, password=None):
""" Connect to a user-specified Telnet server.
:arg str host: The Telnet server's hostname.
:arg int port: The Telnet server's port number. If no port is specified, the default Telnet server port of 23 will be used.
:arg str username: The user's username. This is an optional argument.
:arg str password: The user's password. This is an optional argument.
"""
# Handle empty host/port string (or the case where host/port are None).
if(not host):
error(parent=self.application.window, message="Unable to connect to a DX cluster because no hostname was specified.")
return
if(not port):
logging.warning("No port specified. Assuming default port 23...")
port = 23 # Use the default Telnet port.
try:
logging.debug("Attempting connection to Telnet server %s:%d..." % (host, port))
self.connection = telnetlib.Telnet(host, port)
assert(self.connection)
if(username):
self.connection.read_until("login: ".encode())
self.connection.write((username + "\n").encode())
if(password):
self.connection.read_until("password: ".encode())
self.connection.write((password + "\n").encode())
except Exception as e:
message = "Could not create a connection to the Telnet server %s:%d. Check connection to the internets? Check connection details?" % (host, port)
error(parent=self.application.window, message=message)
logging.exception(e)
self.connection = None
return
logging.debug("Connection to %s:%d established." % (host, port))
self.set_items_sensitive(False)
self.check_io_event = GObject.timeout_add(1000, self.on_telnet_io)
return
def telnet_disconnect(self, widget=None):
""" Disconnect from a Telnet server and remove the I/O timer. """
if(self.connection):
self.connection.close()
self.buffer.set_text("")
self.connection = None
self.set_items_sensitive(True)
# Stop checking for server output once disconnected.
try:
GObject.source_remove(self.check_io_event)
except AttributeError:
# This may happen if a connection hasn't yet been established.
pass
return
def telnet_send_command(self, widget=None):
""" Send the user-specified command in the Gtk.Entry box to the Telnet server (if PyQSO is connected to one). """
if(self.connection):
command = self.builder.get_object("command")
self.connection.write((command.get_text() + "\n").encode())
command.set_text("")
return
def on_telnet_io(self):
""" Retrieve any new data from the Telnet server and print it out in the Gtk.TextView widget.
:returns: Always returns True to satisfy the GObject timer.
:rtype: bool
"""
if(self.connection):
text = self.connection.read_very_eager()
text = text.decode("ascii", "replace") # Replace any characters that cannot be decoded with a replacement marker.
try:
text = text.replace("\u0007", "") # Remove the BEL Unicode character from the end of the line
except UnicodeDecodeError:
pass
# Allow auto-scrolling to the new text entry if the focus is already at
# the very end of the Gtk.TextView. Otherwise, don't auto-scroll
# in case the user is reading something further up.
# Note: This is based on the code from http://forums.gentoo.org/viewtopic-t-445598-view-next.html
end_iter = self.buffer.get_end_iter()
end_mark = self.buffer.create_mark(None, end_iter)
self.renderer.move_mark_onscreen(end_mark)
at_end = self.buffer.get_iter_at_mark(end_mark).equal(end_iter)
self.buffer.insert(end_iter, text)
if(at_end):
end_mark = self.buffer.create_mark(None, end_iter)
self.renderer.scroll_mark_onscreen(end_mark)
return True
def set_items_sensitive(self, sensitive):
""" Enable/disable the relevant buttons for connecting/disconnecting from a DX cluster, so that users cannot click the connect button if PyQSO is already connected.
:arg bool sensitive: If True, enable the Connect button and disable the Disconnect button. If False, vice versa.
"""
self.items["CONNECT"].set_sensitive(sensitive)
self.items["DISCONNECT"].set_sensitive(not sensitive)
self.items["SEND"].set_sensitive(not sensitive)
return