diff --git a/RNS/Interfaces/BackboneInterface.py b/RNS/Interfaces/BackboneInterface.py index 7e7767b..f9b2517 100644 --- a/RNS/Interfaces/BackboneInterface.py +++ b/RNS/Interfaces/BackboneInterface.py @@ -131,10 +131,8 @@ class BackboneInterface(Interface): self.bind_ip = bind_address[0] self.owner = owner - self.add_listener(bind_address) + BackboneInterface.add_listener(bind_address) self.bitrate = self.BITRATE_GUESS - - self.start() self.online = True else: @@ -152,7 +150,20 @@ class BackboneInterface(Interface): def add_client_socket(client_socket, interface): BackboneInterface.ensure_epoll() BackboneInterface.spawned_interface_filenos[client_socket.fileno()] = interface - BackboneInterface.epoll.register(client_socket.fileno(), select.EPOLLIN) + BackboneInterface.register_in(client_socket.fileno()) + BackboneInterface.start() + + @staticmethod + def register_in(fileno): + # TODO: Remove debug + RNS.log(f"Registering EPOLL_IN for {fileno}", RNS.LOG_DEBUG) + BackboneInterface.epoll.register(fileno, select.EPOLLIN) + + @staticmethod + def deregister_fileno(fileno): + # TODO: Remove debug + RNS.log(f"Deregistering {fileno}", RNS.LOG_DEBUG) + BackboneInterface.epoll.unregister(fileno) @staticmethod def tx_ready(interface): @@ -169,7 +180,7 @@ class BackboneInterface(Interface): with BackboneInterface._job_lock: if BackboneInterface._job_active: return else: - RNS.log(f"Starting BackboneInterface I/O handler") # TODO: Remove debug + RNS.log(f"Starting BackboneInterface I/O handler", RNS.LOG_DEBUG) # TODO: Remove debug BackboneInterface._job_active = True BackboneInterface.ensure_epoll() try: @@ -187,7 +198,7 @@ class BackboneInterface(Interface): if len(received_bytes): spawned_interface.receive(received_bytes) else: - BackboneInterface.epoll.unregister(fileno); client_socket.close() + BackboneInterface.deregister_fileno(fileno); client_socket.close() spawned_interface.receive(received_bytes) elif fileno == client_socket.fileno() and (event & select.EPOLLOUT): @@ -196,7 +207,7 @@ class BackboneInterface(Interface): except Exception as e: RNS.log(f"Error while writing to {spawned_interface}: {e}", RNS.LOG_ERROR) written = 0 - BackboneInterface.epoll.unregister(fileno) + BackboneInterface.deregister_fileno(fileno) try: client_socket.close() except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) spawned_interface.receive(b"") @@ -207,7 +218,7 @@ class BackboneInterface(Interface): if spawned_interface.parent_interface: spawned_interface.parent_interface.txb += written elif fileno == client_socket.fileno() and event & (select.EPOLLHUP): - BackboneInterface.epoll.unregister(fileno) + BackboneInterface.deregister_fileno(fileno) try: client_socket.close() except Exception as e: RNS.log(f"Error while closing socket for {spawned_interface}: {e}", RNS.LOG_ERROR) spawned_interface.receive(b"") @@ -217,11 +228,11 @@ class BackboneInterface(Interface): if fileno == server_socket.fileno() and (event & select.EPOLLIN): client_socket, address = server_socket.accept() client_socket.setblocking(0) - if owner_interface.incoming_connection(client_socket): pass - else: client_socket.close() + if not owner_interface.incoming_connection(client_socket): + client_socket.close() elif fileno == server_socket.fileno() and (event & select.EPOLLHUP): - try: BackboneInterface.epoll.unregister(fileno) + try: BackboneInterface.deregister_fileno(fileno) except Exception as e: RNS.log(f"Error while deregistering listener file descriptor {fileno}: {e}", RNS.LOG_ERROR) try: server_socket.close() @@ -234,23 +245,25 @@ class BackboneInterface(Interface): finally: for owner_interface, serversocket in BackboneInterface.listener_filenos: fileno = serversocket.fileno() - BackboneInterface.epoll.unregister(fileno) + BackboneInterface.deregister_fileno(fileno) serversocket.close() BackboneInterface.listener_filenos.clear() - def add_listener(self, bind_address, socket_type=socket.AF_INET): + @staticmethod + def add_listener(interface, bind_address, socket_type=socket.AF_INET): + BackboneInterface.ensure_epoll() if socket_type == socket.AF_INET: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(bind_address) - else: raise TypeError(f"Invalid socket type {socket_type} for {self}") + else: raise TypeError(f"Invalid socket type {socket_type} for {interface}") server_socket.listen(1) server_socket.setblocking(0) - BackboneInterface.listener_filenos[server_socket.fileno()] = (self, server_socket) + BackboneInterface.listener_filenos[server_socket.fileno()] = (interface, server_socket) BackboneInterface.epoll.register(server_socket.fileno(), select.EPOLLIN) - RNS.log(f"{self} listener added: {server_socket}", RNS.LOG_DEBUG) # TODO: Remove debug + BackboneInterface.start() def incoming_connection(self, socket): RNS.log("Accepting incoming connection", RNS.LOG_VERBOSE) @@ -295,7 +308,7 @@ class BackboneInterface(Interface): RNS.Transport.interfaces.append(spawned_interface) while spawned_interface in self.spawned_interfaces: self.spawned_interfaces.remove(spawned_interface) self.spawned_interfaces.append(spawned_interface) - BackboneInterface.add_client_socket(client_socket, spawned_interface) + BackboneInterface.add_client_socket(socket, spawned_interface) return True @@ -459,8 +472,6 @@ class BackboneClientInterface(Interface): self.socket.settimeout(None) BackboneInterface.add_client_socket(self.socket, self) - BackboneInterface.start() - self.online = True if initial: diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 91b232f..37008a7 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -76,6 +76,7 @@ class Interface: self.bitrate = 62500 self.HW_MTU = None + self.parent_interface = None self.ingress_control = True self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES self.ic_burst_hold = Interface.IC_BURST_HOLD diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 6d52e1c..ccaefb9 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -21,6 +21,7 @@ # SOFTWARE. from RNS.Interfaces.Interface import Interface +from RNS.Interfaces.BackboneInterface import BackboneInterface import socketserver import threading import socket @@ -57,9 +58,9 @@ class LocalClientInterface(Interface): def __init__(self, owner, name, target_port = None, connected_socket=None): super().__init__() - self.HW_MTU = 262144 - - self.online = False + self.epoll_backend = False + self.HW_MTU = 262144 + self.online = False self.IN = True self.OUT = False @@ -70,6 +71,11 @@ class LocalClientInterface(Interface): self.detached = False self.name = name self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL + self.frame_buffer = b"" + self.transmit_buffer = b"" + + if RNS.vendor.platformutils.is_linux(): + self.epoll_backend = True if connected_socket != None: self.receives = True @@ -98,9 +104,10 @@ class LocalClientInterface(Interface): self.announce_rate_penalty = None if connected_socket == None: - thread = threading.Thread(target=self.read_loop) - thread.daemon = True - thread.start() + if not self.epoll_backend: + thread = threading.Thread(target=self.read_loop) + thread.daemon = True + thread.start() def should_ingress_limit(self): return False @@ -114,6 +121,8 @@ class LocalClientInterface(Interface): self.is_connected_to_shared_instance = True self.never_connected = False + if self.epoll_backend: BackboneInterface.add_client_socket(self.socket, self) + return True @@ -137,9 +146,11 @@ class LocalClientInterface(Interface): RNS.log("Reconnected socket for "+str(self)+".", RNS.LOG_INFO) self.reconnecting = False - thread = threading.Thread(target=self.read_loop) - thread.daemon = True - thread.start() + if not self.epoll_backend: + thread = threading.Thread(target=self.read_loop) + thread.daemon = True + thread.start() + def job(): time.sleep(LocalClientInterface.RECONNECT_WAIT+2) RNS.Transport.shared_connection_reappeared() @@ -152,8 +163,7 @@ class LocalClientInterface(Interface): def process_incoming(self, data): self.rxb += len(data) - if hasattr(self, "parent_interface") and self.parent_interface != None: - self.parent_interface.rxb += len(data) + if self.parent_interface != None: self.parent_interface.rxb += len(data) try: self.owner.inbound(data, self) @@ -164,23 +174,28 @@ class LocalClientInterface(Interface): def process_outgoing(self, data): if self.online: try: - self.writing = True + if self.epoll_backend: + self.transmit_buffer += bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) + BackboneInterface.tx_ready(self) - if self._force_bitrate: - if not hasattr(self, "send_lock"): - self.send_lock = Lock() + else: + self.writing = True - with self.send_lock: - # RNS.log(f"Simulating latency of {RNS.prettytime(s)} for {len(data)} bytes", RNS.LOG_EXTREME) - s = len(data) / self.bitrate * 8 - time.sleep(s) + if self._force_bitrate: + if not hasattr(self, "send_lock"): + self.send_lock = Lock() - data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) - self.socket.sendall(data) - self.writing = False - self.txb += len(data) - if hasattr(self, "parent_interface") and self.parent_interface != None: - self.parent_interface.txb += len(data) + with self.send_lock: + # RNS.log(f"Simulating latency of {RNS.prettytime(s)} for {len(data)} bytes", RNS.LOG_EXTREME) + s = len(data) / self.bitrate * 8 + time.sleep(s) + + data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) + self.socket.sendall(data) + self.writing = False + self.txb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.txb += len(data) except Exception as e: RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) @@ -188,36 +203,50 @@ class LocalClientInterface(Interface): RNS.trace_exception(e) self.teardown() + def handle_hdlc(self, data_in): + self.frame_buffer += data_in + flags_remaining = True + while flags_remaining: + frame_start = self.frame_buffer.find(HDLC.FLAG) + if frame_start != -1: + frame_end = self.frame_buffer.find(HDLC.FLAG, frame_start+1) + if frame_end != -1: + frame = self.frame_buffer[frame_start+1:frame_end] + frame = frame.replace(bytes([HDLC.ESC, HDLC.FLAG ^ HDLC.ESC_MASK]), bytes([HDLC.FLAG])) + frame = frame.replace(bytes([HDLC.ESC, HDLC.ESC ^ HDLC.ESC_MASK]), bytes([HDLC.ESC])) + if len(frame) > RNS.Reticulum.HEADER_MINSIZE: + self.process_incoming(frame) + self.frame_buffer = self.frame_buffer[frame_end:] + else: + flags_remaining = False + else: + flags_remaining = False + + def receive(self, data_in): + try: + if len(data_in) > 0: self.handle_hdlc(data_in) + else: + self.online = False + if self.is_connected_to_shared_instance and not self.detached: + RNS.log("Socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) + RNS.Transport.shared_connection_disappeared() + self.reconnect() + else: + self.teardown(nowarning=True) + + except Exception as e: + self.online = False + RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.log("Tearing down "+str(self), RNS.LOG_ERROR) + self.teardown() def read_loop(self): try: - in_frame = False - escape = False - frame_buffer = b"" + self.frame_buffer = b"" data_in = b"" - data_buffer = b"" - while True: data_in = self.socket.recv(4096) - if len(data_in) > 0: - frame_buffer += data_in - flags_remaining = True - while flags_remaining: - frame_start = frame_buffer.find(HDLC.FLAG) - if frame_start != -1: - frame_end = frame_buffer.find(HDLC.FLAG, frame_start+1) - if frame_end != -1: - frame = frame_buffer[frame_start+1:frame_end] - frame = frame.replace(bytes([HDLC.ESC, HDLC.FLAG ^ HDLC.ESC_MASK]), bytes([HDLC.FLAG])) - frame = frame.replace(bytes([HDLC.ESC, HDLC.ESC ^ HDLC.ESC_MASK]), bytes([HDLC.ESC])) - if len(frame) > RNS.Reticulum.HEADER_MINSIZE: - self.process_incoming(frame) - frame_buffer = frame_buffer[frame_end:] - else: - flags_remaining = False - else: - flags_remaining = False - + if len(data_in) > 0: self.handle_hdlc(data_in) else: self.online = False if self.is_connected_to_shared_instance and not self.detached: @@ -229,7 +258,6 @@ class LocalClientInterface(Interface): break - except Exception as e: self.online = False RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) @@ -293,6 +321,7 @@ class LocalServerInterface(Interface): def __init__(self, owner, bindport=None): super().__init__() + self.epoll_backend = False self.online = False self.clients = 0 @@ -301,6 +330,9 @@ class LocalServerInterface(Interface): self.name = "Reticulum" self.mode = RNS.Interfaces.Interface.Interface.MODE_FULL + if RNS.vendor.platformutils.is_linux(): + self.epoll_backend = True + if (bindport != None): self.receives = True self.bind_ip = "127.0.0.1" @@ -316,12 +348,13 @@ class LocalServerInterface(Interface): address = (self.bind_ip, self.bind_port) - self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) - self.server.daemon_threads = True - - thread = threading.Thread(target=self.server.serve_forever) - thread.daemon = True - thread.start() + if self.epoll_backend: BackboneInterface.add_listener(self, address) + else: + self.server = ThreadingTCPServer(address, handlerFactory(self.incoming_connection)) + self.server.daemon_threads = True + thread = threading.Thread(target=self.server.serve_forever) + thread.daemon = True + thread.start() self.announce_rate_target = None self.announce_rate_grace = None @@ -330,24 +363,39 @@ class LocalServerInterface(Interface): self.bitrate = 1000*1000*1000 self.online = True - - def incoming_connection(self, handler): - interface_name = str(str(handler.client_address[1])) - spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=handler.request) - spawned_interface.OUT = self.OUT - spawned_interface.IN = self.IN - spawned_interface.target_ip = handler.client_address[0] - spawned_interface.target_port = str(handler.client_address[1]) - spawned_interface.parent_interface = self - spawned_interface.bitrate = self.bitrate - if hasattr(self, "_force_bitrate"): - spawned_interface._force_bitrate = self._force_bitrate - # RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_EXTREME) - RNS.Transport.interfaces.append(spawned_interface) - RNS.Transport.local_client_interfaces.append(spawned_interface) - self.clients += 1 - spawned_interface.read_loop() + if self.epoll_backend: + socket = handler + interface_name = str(str(socket.getpeername()[1])) + spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=socket) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.socket = socket + spawned_interface.target_ip = socket.getpeername()[0] + spawned_interface.target_port = str(socket.getpeername()[1]) + spawned_interface.parent_interface = self + spawned_interface.bitrate = self.bitrate + if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate + RNS.Transport.interfaces.append(spawned_interface) + RNS.Transport.local_client_interfaces.append(spawned_interface) + self.clients += 1 + BackboneInterface.add_client_socket(socket, spawned_interface) + return True + + else: + interface_name = str(str(handler.client_address[1])) + spawned_interface = LocalClientInterface(self.owner, name=interface_name, connected_socket=handler.request) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.target_ip = handler.client_address[0] + spawned_interface.target_port = str(handler.client_address[1]) + spawned_interface.parent_interface = self + spawned_interface.bitrate = self.bitrate + if hasattr(self, "_force_bitrate"): spawned_interface._force_bitrate = self._force_bitrate + RNS.Transport.interfaces.append(spawned_interface) + RNS.Transport.local_client_interfaces.append(spawned_interface) + self.clients += 1 + spawned_interface.read_loop() def process_outgoing(self, data): pass