* Fix for ARQ socket crash due to thread conflicts
  * Added additional mutex to protect transmit queue
  * Added additional lock/unlock mutex blocks
  * Added code to put socket close() within a try {} catch {}
    block
pull/1/head
David Freese 2012-11-25 21:21:39 -06:00
rodzic 5b9e09fc8a
commit 2252cab1d6
2 zmienionych plików z 105 dodań i 92 usunięć

Wyświetl plik

@ -70,7 +70,7 @@ extern bool mailclient;
extern bool mailserver;
extern bool tlfio;
extern bool arq_text_available;
extern char arq_get_char();
extern int arq_get_char();
// ARQ mail implementation
extern void arq_init();

Wyświetl plik

@ -65,6 +65,20 @@ LOG_FILE_SOURCE(debug::LOG_ARQCONTROL);
using namespace std;
// ============================================================================
// Implementation using thread vice the fldigi timeout facility
// ============================================================================
static pthread_t arq_thread;
static pthread_mutex_t arq_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t tosend_mutex = PTHREAD_MUTEX_INITIALIZER;
static void *arq_loop(void *args);
static bool arq_exit = false;
static bool arq_enabled;
static string tosend = "";
static string enroute = "";
//======================================================================
// test code for pskmail eol issues
@ -114,13 +128,11 @@ string noctrl(string src)
//======================================================================
static string arqtext;
//string::iterator pText;
size_t pText;
bool arq_text_available = false;
string txstring;
extern void send0x06();
extern void parse_arqtext(string &toparse);
static void set_button(Fl_Button* button, bool value)
@ -162,9 +174,11 @@ LOG_INFO("%s", "ARQ set ptt off");
for (size_t i = 0; i < NUM_MODES; ++i) {
if (strlen(mode_info[i].pskmail_name) > 0) {
if (src == mode_info[i].pskmail_name) {
while (trx_state != STATE_RX) MilliSleep(50);
while (trx_state != STATE_RX) {
MilliSleep(10);
}
LOG_INFO("Setting modem to %s", mode_info[i].pskmail_name);
REQ_SYNC(init_modem_sync, mode_info[i].mode, 0);
LOG_INFO("ARQ new modem set to %s", mode_info[i].pskmail_name);
break;
}
}
@ -453,22 +467,14 @@ bool WRAP_auto_arqRx()
// Socket ARQ i/o used on all platforms
//-----------------------------------------------------------------------------
extern void arq_run(Socket s);
extern void arq_stop();
string errstring;
string cmdstring;
string response;
bool isTxChar = false;
bool isCmdChar = false;
bool isNotMULTIPSK = true;
static string errstring;
static string cmdstring;
static pthread_t* arq_socket_thread = 0;
ARQ_SOCKET_Server* ARQ_SOCKET_Server::inst = 0;
static std::vector<Socket> arqclient;
Socket arqclient;
bool isSocketConnected = false;
void arq_run(Socket);
ARQ_SOCKET_Server::ARQ_SOCKET_Server()
{
@ -491,8 +497,7 @@ ARQ_SOCKET_Server::~ARQ_SOCKET_Server()
bool ARQ_SOCKET_Server::start(const char* node, const char* service)
{
if (inst)
return false;
if (inst) return false;
inst = new ARQ_SOCKET_Server;
@ -543,9 +548,11 @@ void* ARQ_SOCKET_Server::thread_func(void*)
try {
#ifdef __WOE32__
if (inst->server_socket->wait(0))
#endif
arq_run(inst->server_socket->accept());
#else
arq_run(inst->server_socket->accept());
TEST_THREAD_CANCEL();
#endif
}
catch (const SocketException& e) {
if (e.error() != EINTR) {
@ -558,61 +565,79 @@ void* ARQ_SOCKET_Server::thread_func(void*)
break;
}
}
arq_stop();
if (!arqclient.empty()) {
vector<Socket>::iterator p = arqclient.begin();
while (p != arqclient.end()) {
try {
(*p).close();
arqclient.erase(p);
}
catch (...) {;}
p++;
}
}
inst->server_socket->close();
return NULL;
}
void arq_run(Socket s)
{
pthread_mutex_lock (&arq_mutex);
LOG_INFO("Adding ARQ client %d", s.fd());
struct timeval t = { 0, 20000 };
arqclient = s;
arqclient.set_timeout(t);
arqclient.set_nonblocking();
isSocketConnected = true;
s.set_timeout(t);
s.set_nonblocking();
arqclient.push_back(s);
arqmode = true;
}
void arq_stop()
{
arqclient.close();
isSocketConnected = false;
arqmode = false;
pthread_mutex_unlock (&arq_mutex);
}
void WriteARQsocket(unsigned char* data, size_t len)
{
static string instr;
if (arqclient.empty()) return;
static string instr;
instr.clear();
vector<Socket>::iterator p;
try {
size_t n = arqclient.recv(instr);
if ( n > 0) txstring.append(instr);
} catch (const SocketException& e) {
arq_stop();
return;
}
if (!isSocketConnected) return;
try {
arqclient.send(data, len);
p = arqclient.begin();
while (p != arqclient.end()) {
(*p).wait(1);
(*p).send(data, len);
LOG_DEBUG("Wrote to socket %d", (*p).fd());
p++;
}
}
catch (const SocketException& e) {
LOG_ERROR("%s", e.what());
arq_stop();
LOG_ERROR("socket fd %d %s", (*p).fd(), e.what());
try {
(*p).close();
arqclient.erase(p);
} catch (...) {;}
if (arqclient.empty()) arqmode = false;
}
}
bool Socket_arqRx()
{
if (!isSocketConnected) return false;
if (arqclient.empty()) return false;
static string instr;
vector<Socket>::iterator p = arqclient.begin();
size_t n = 0;
instr.clear();
try {
size_t n = arqclient.recv(instr);
if ( n > 0)
txstring.append(instr);
pthread_mutex_lock (&arq_mutex);
try {
while (p != arqclient.end()) {
LOG_DEBUG("Query %d", (*p).fd());
(*p).wait(0);
n = (*p).recv(instr);
if ( n > 0) {
txstring.append(instr);
}
p++;
}
if (!bSend0x06 && arqtext.empty() && !txstring.empty()) {
arqtext = txstring;
parse_arqtext(arqtext);
@ -627,15 +652,24 @@ bool Socket_arqRx()
}
txstring.clear();
cmdstring.clear();
pthread_mutex_unlock (&arq_mutex);
return true;
}
cmdstring.clear();
pthread_mutex_unlock (&arq_mutex);
return false;
}
catch (const SocketException& e) {
arq_stop();
LOG_ERROR("socket fd %d %s", (*p).fd(), e.what());
try {
(*p).close();
arqclient.erase(p);
} catch (...) {;}
pthread_mutex_unlock (&arq_mutex);
return false;
}
pthread_mutex_unlock (&arq_mutex);
return false;
}
//-----------------------------------------------------------------------------
@ -653,43 +687,16 @@ void WriteARQSysV(unsigned char data)
}
#endif
//-----------------------------------------------------------------------------
// Write End of Transmit character to ARQ client
//-----------------------------------------------------------------------------
void send0x06()
{
if (trx_state == STATE_RX) {
bSend0x06 = false;
WriteARQ(0x06);
}
}
// ============================================================================
// Implementation using thread vice the fldigi timeout facility
// ============================================================================
static pthread_t arq_thread;
static pthread_mutex_t arq_mutex = PTHREAD_MUTEX_INITIALIZER;
static void *arq_loop(void *args);
static bool arq_exit = false;
static bool arq_enabled;
string tosend = "";
string enroute = "";
void WriteARQ(unsigned char data)
{
pthread_mutex_lock (&arq_mutex);
pthread_mutex_lock (&tosend_mutex);
tosend += data;
pthread_mutex_unlock (&arq_mutex);
pthread_mutex_unlock (&tosend_mutex);
return;
// WriteARQsocket(&data, 1);
//#if !defined(__WOE32__) && !defined(__APPLE__)
// WriteARQSysV(data);
//#endif
}
static void *arq_loop(void *args)
@ -701,22 +708,29 @@ static void *arq_loop(void *args)
if (arq_exit)
break;
pthread_mutex_lock (&arq_mutex);
if (!tosend.empty()) {
pthread_mutex_lock(&tosend_mutex);
enroute = tosend;
tosend.clear();
pthread_mutex_unlock (&arq_mutex);
pthread_mutex_unlock(&tosend_mutex);
pthread_mutex_lock (&arq_mutex);
WriteARQsocket((unsigned char*)enroute.c_str(), enroute.length());
#if !defined(__WOE32__) && !defined(__APPLE__)
for (size_t i = 0; i < enroute.length(); i++)
WriteARQSysV((unsigned char)enroute[i]);
#endif
} else
pthread_mutex_unlock (&arq_mutex);
}
if (bSend0x06)
send0x06();
if (bSend0x06) {
pthread_mutex_lock (&arq_mutex);
string xmtdone;
xmtdone += 0x06;
WriteARQsocket((unsigned char*)xmtdone.c_str(), xmtdone.length());
bSend0x06 = false;
pthread_mutex_unlock (&arq_mutex);
}
#if !defined(__WOE32__) && !defined(__APPLE__)
// order of precedence; Socket, Wrap autofile, TLF autofile
@ -729,7 +743,6 @@ static void *arq_loop(void *args)
if (!Socket_arqRx())
WRAP_auto_arqRx();
#endif
// pthread_mutex_unlock (&arq_mutex);
MilliSleep(100);
}
@ -771,19 +784,19 @@ void arq_close(void)
arq_exit = false;
}
char arq_get_char()
int arq_get_char()
{
char c = 0;
int c = 0;
pthread_mutex_lock (&arq_mutex);
if (arq_text_available) {
if (pText != arqtext.length())
c = arqtext[pText++];
else {
if (pText != arqtext.length()) {
c = arqtext[pText++] & 0xFF;
} else {
arqtext.clear();
pText = 0;
bSend0x06 = true;
arq_text_available = false;
c = 0x03;
c = GET_TX_CHAR_ETX;
}
}
pthread_mutex_unlock (&arq_mutex);