From 2252cab1d6e99f46d1edf3916258b11fe2143928 Mon Sep 17 00:00:00 2001 From: David Freese Date: Sun, 25 Nov 2012 21:21:39 -0600 Subject: [PATCH] ARQ Socket * Fix for ARQ socket crash due to thread conflicts * Added additional mutex to protect transmit queue * Added additional lock/unlock mutex blocks * Added code to put socket close() within a try {} catch {} block --- src/include/main.h | 2 +- src/misc/arq_io.cxx | 195 +++++++++++++++++++++++--------------------- 2 files changed, 105 insertions(+), 92 deletions(-) diff --git a/src/include/main.h b/src/include/main.h index c4b386e0..5da2c8c5 100644 --- a/src/include/main.h +++ b/src/include/main.h @@ -70,7 +70,7 @@ extern bool mailclient; extern bool mailserver; extern bool tlfio; extern bool arq_text_available; -extern char arq_get_char(); +extern int arq_get_char(); // ARQ mail implementation extern void arq_init(); diff --git a/src/misc/arq_io.cxx b/src/misc/arq_io.cxx index d5da95f2..3e23f4a0 100644 --- a/src/misc/arq_io.cxx +++ b/src/misc/arq_io.cxx @@ -65,6 +65,20 @@ LOG_FILE_SOURCE(debug::LOG_ARQCONTROL); using namespace std; +// ============================================================================ +// Implementation using thread vice the fldigi timeout facility +// ============================================================================ +static pthread_t arq_thread; +static pthread_mutex_t arq_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t tosend_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void *arq_loop(void *args); + +static bool arq_exit = false; +static bool arq_enabled; + +static string tosend = ""; +static string enroute = ""; //====================================================================== // test code for pskmail eol issues @@ -114,13 +128,11 @@ string noctrl(string src) //====================================================================== static string arqtext; -//string::iterator pText; size_t pText; bool arq_text_available = false; string txstring; -extern void send0x06(); extern void parse_arqtext(string &toparse); static void set_button(Fl_Button* button, bool value) @@ -162,9 +174,11 @@ LOG_INFO("%s", "ARQ set ptt off"); for (size_t i = 0; i < NUM_MODES; ++i) { if (strlen(mode_info[i].pskmail_name) > 0) { if (src == mode_info[i].pskmail_name) { - while (trx_state != STATE_RX) MilliSleep(50); + while (trx_state != STATE_RX) { + MilliSleep(10); + } +LOG_INFO("Setting modem to %s", mode_info[i].pskmail_name); REQ_SYNC(init_modem_sync, mode_info[i].mode, 0); -LOG_INFO("ARQ new modem set to %s", mode_info[i].pskmail_name); break; } } @@ -453,22 +467,14 @@ bool WRAP_auto_arqRx() // Socket ARQ i/o used on all platforms //----------------------------------------------------------------------------- -extern void arq_run(Socket s); -extern void arq_stop(); - -string errstring; -string cmdstring; -string response; -bool isTxChar = false; -bool isCmdChar = false; - -bool isNotMULTIPSK = true; +static string errstring; +static string cmdstring; static pthread_t* arq_socket_thread = 0; ARQ_SOCKET_Server* ARQ_SOCKET_Server::inst = 0; +static std::vector arqclient; -Socket arqclient; -bool isSocketConnected = false; +void arq_run(Socket); ARQ_SOCKET_Server::ARQ_SOCKET_Server() { @@ -491,8 +497,7 @@ ARQ_SOCKET_Server::~ARQ_SOCKET_Server() bool ARQ_SOCKET_Server::start(const char* node, const char* service) { - if (inst) - return false; + if (inst) return false; inst = new ARQ_SOCKET_Server; @@ -543,9 +548,11 @@ void* ARQ_SOCKET_Server::thread_func(void*) try { #ifdef __WOE32__ if (inst->server_socket->wait(0)) -#endif arq_run(inst->server_socket->accept()); +#else + arq_run(inst->server_socket->accept()); TEST_THREAD_CANCEL(); +#endif } catch (const SocketException& e) { if (e.error() != EINTR) { @@ -558,61 +565,79 @@ void* ARQ_SOCKET_Server::thread_func(void*) break; } } - arq_stop(); + if (!arqclient.empty()) { + vector::iterator p = arqclient.begin(); + while (p != arqclient.end()) { + try { + (*p).close(); + arqclient.erase(p); + } + catch (...) {;} + p++; + } + } inst->server_socket->close(); return NULL; } void arq_run(Socket s) { + pthread_mutex_lock (&arq_mutex); + LOG_INFO("Adding ARQ client %d", s.fd()); struct timeval t = { 0, 20000 }; - arqclient = s; - arqclient.set_timeout(t); - arqclient.set_nonblocking(); - isSocketConnected = true; + s.set_timeout(t); + s.set_nonblocking(); + arqclient.push_back(s); arqmode = true; -} - -void arq_stop() -{ - arqclient.close(); - isSocketConnected = false; - arqmode = false; + pthread_mutex_unlock (&arq_mutex); } void WriteARQsocket(unsigned char* data, size_t len) { -static string instr; + if (arqclient.empty()) return; + static string instr; instr.clear(); + vector::iterator p; try { - size_t n = arqclient.recv(instr); - if ( n > 0) txstring.append(instr); - } catch (const SocketException& e) { - arq_stop(); - return; - } - if (!isSocketConnected) return; - try { - arqclient.send(data, len); + p = arqclient.begin(); + while (p != arqclient.end()) { + (*p).wait(1); + (*p).send(data, len); + LOG_DEBUG("Wrote to socket %d", (*p).fd()); + p++; + } } catch (const SocketException& e) { - LOG_ERROR("%s", e.what()); - arq_stop(); + LOG_ERROR("socket fd %d %s", (*p).fd(), e.what()); + try { + (*p).close(); + arqclient.erase(p); + } catch (...) {;} + if (arqclient.empty()) arqmode = false; } } bool Socket_arqRx() { - if (!isSocketConnected) return false; + if (arqclient.empty()) return false; static string instr; + vector::iterator p = arqclient.begin(); + size_t n = 0; instr.clear(); - try { - size_t n = arqclient.recv(instr); - if ( n > 0) - txstring.append(instr); + pthread_mutex_lock (&arq_mutex); + try { + while (p != arqclient.end()) { + LOG_DEBUG("Query %d", (*p).fd()); + (*p).wait(0); + n = (*p).recv(instr); + if ( n > 0) { + txstring.append(instr); + } + p++; + } if (!bSend0x06 && arqtext.empty() && !txstring.empty()) { arqtext = txstring; parse_arqtext(arqtext); @@ -627,15 +652,24 @@ bool Socket_arqRx() } txstring.clear(); cmdstring.clear(); + pthread_mutex_unlock (&arq_mutex); return true; } cmdstring.clear(); + pthread_mutex_unlock (&arq_mutex); return false; } catch (const SocketException& e) { - arq_stop(); + LOG_ERROR("socket fd %d %s", (*p).fd(), e.what()); + try { + (*p).close(); + arqclient.erase(p); + } catch (...) {;} + pthread_mutex_unlock (&arq_mutex); return false; } + pthread_mutex_unlock (&arq_mutex); + return false; } //----------------------------------------------------------------------------- @@ -653,43 +687,16 @@ void WriteARQSysV(unsigned char data) } #endif -//----------------------------------------------------------------------------- -// Write End of Transmit character to ARQ client -//----------------------------------------------------------------------------- - -void send0x06() -{ - if (trx_state == STATE_RX) { - bSend0x06 = false; - WriteARQ(0x06); - } -} - // ============================================================================ // Implementation using thread vice the fldigi timeout facility // ============================================================================ -static pthread_t arq_thread; -static pthread_mutex_t arq_mutex = PTHREAD_MUTEX_INITIALIZER; - -static void *arq_loop(void *args); - -static bool arq_exit = false; -static bool arq_enabled; - -string tosend = ""; -string enroute = ""; void WriteARQ(unsigned char data) { - pthread_mutex_lock (&arq_mutex); + pthread_mutex_lock (&tosend_mutex); tosend += data; - pthread_mutex_unlock (&arq_mutex); + pthread_mutex_unlock (&tosend_mutex); return; - -// WriteARQsocket(&data, 1); -//#if !defined(__WOE32__) && !defined(__APPLE__) -// WriteARQSysV(data); -//#endif } static void *arq_loop(void *args) @@ -701,22 +708,29 @@ static void *arq_loop(void *args) if (arq_exit) break; - pthread_mutex_lock (&arq_mutex); - if (!tosend.empty()) { + pthread_mutex_lock(&tosend_mutex); enroute = tosend; tosend.clear(); - pthread_mutex_unlock (&arq_mutex); + pthread_mutex_unlock(&tosend_mutex); + + pthread_mutex_lock (&arq_mutex); WriteARQsocket((unsigned char*)enroute.c_str(), enroute.length()); #if !defined(__WOE32__) && !defined(__APPLE__) for (size_t i = 0; i < enroute.length(); i++) WriteARQSysV((unsigned char)enroute[i]); #endif - } else pthread_mutex_unlock (&arq_mutex); + } - if (bSend0x06) - send0x06(); + if (bSend0x06) { + pthread_mutex_lock (&arq_mutex); + string xmtdone; + xmtdone += 0x06; + WriteARQsocket((unsigned char*)xmtdone.c_str(), xmtdone.length()); + bSend0x06 = false; + pthread_mutex_unlock (&arq_mutex); + } #if !defined(__WOE32__) && !defined(__APPLE__) // order of precedence; Socket, Wrap autofile, TLF autofile @@ -729,7 +743,6 @@ static void *arq_loop(void *args) if (!Socket_arqRx()) WRAP_auto_arqRx(); #endif -// pthread_mutex_unlock (&arq_mutex); MilliSleep(100); } @@ -771,19 +784,19 @@ void arq_close(void) arq_exit = false; } -char arq_get_char() +int arq_get_char() { - char c = 0; + int c = 0; pthread_mutex_lock (&arq_mutex); if (arq_text_available) { - if (pText != arqtext.length()) - c = arqtext[pText++]; - else { + if (pText != arqtext.length()) { + c = arqtext[pText++] & 0xFF; + } else { arqtext.clear(); pText = 0; bSend0x06 = true; arq_text_available = false; - c = 0x03; + c = GET_TX_CHAR_ETX; } } pthread_mutex_unlock (&arq_mutex);