PSKMAIL/ARQ socket

* rewrite of arq_io.cxx with assistance of Remi
    - more clearly define use of various variable mutex
    - added mutex blocks to TLF interface
    - changed from ::lock ::unlock to
      guard_lock usage.
  * added command line switch
      --debug-pskmail
    which enables verbose time-tagged logging for
    ARQ and PSKMAIL events
  * added guard_lock to debug methods
pull/1/head
David Freese 2013-07-05 06:53:18 -05:00
rodzic 6e16b41620
commit 0f5ede7efa
4 zmienionych plików z 195 dodań i 152 usunięć

Wyświetl plik

@ -23,7 +23,7 @@
#ifndef _DEBUG_H_
#define _DEBUG_H_
#define DEBUG_PSKMAIL 0
#define DEBUG_PSKMAIL 1
#include "util.h"
@ -86,6 +86,8 @@ unused__ static uint32_t log_source_ = debug::LOG_OTHER;
#define LOG_SET_SOURCE(source__) log_source_ = source__
extern bool debug_pskmail;
#endif // _DEBUG_H_
// Local Variables:

Wyświetl plik

@ -655,6 +655,9 @@ void generate_option_help(void) {
<< " --debug-level LEVEL\n"
<< " Set the event log verbosity\n\n"
<< " --debug-pskmail\n"
<< " Enable logging for pskmail / arq events\n\n"
<< " --version\n"
<< " Print version information\n\n"
@ -754,7 +757,7 @@ int parse_args(int argc, char **argv, int& idx)
#if USE_PORTAUDIO
OPT_FRAMES_PER_BUFFER,
#endif
OPT_NOISE, OPT_DEBUG_LEVEL,
OPT_NOISE, OPT_DEBUG_LEVEL, OPT_DEBUG_PSKMAIL,
OPT_EXIT_AFTER,
OPT_DEPRECATED, OPT_HELP, OPT_VERSION, OPT_BUILD_INFO };
@ -807,6 +810,7 @@ int parse_args(int argc, char **argv, int& idx)
{ "noise", 0, 0, OPT_NOISE },
{ "debug-level", 1, 0, OPT_DEBUG_LEVEL },
{ "debug-pskmail", 0, 0, OPT_DEBUG_PSKMAIL },
{ "help", 0, 0, OPT_HELP },
{ "version", 0, 0, OPT_VERSION },
@ -1006,6 +1010,10 @@ int parse_args(int argc, char **argv, int& idx)
}
break;
case OPT_DEBUG_PSKMAIL:
debug_pskmail = true;
break;
case OPT_DEPRECATED:
cerr << "W: the --" << longopts[longindex].name
<< " option has been deprecated and will be removed in a future version\n";

Wyświetl plik

@ -3,12 +3,14 @@
//
// support for ARQ server/client system such as pskmail and fl_arq
//
// Copyright (C) 2006-2010
// Copyright (C) 2006-2013
// Dave Freese, W1HKJ
// Copyright (C) 2008-2010
// Copyright (C) 2008-2013
// Stelios Bounanos, M0GLD
// Copyright (C) 2009-2010
// Copyright (C) 2009-2013
// John Douyere, VK2ETA
// Copyright (c) 2013
// Remi Chateauneu, F4ECW
//
// This file is part of fldigi.
//
@ -66,9 +68,7 @@ LOG_FILE_SOURCE(debug::LOG_ARQCONTROL);
using namespace std;
// ============================================================================
// Implementation using thread vice the fldigi timeout facility
// ============================================================================
// =====================================================================
static pthread_t arq_thread;
static pthread_mutex_t arq_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t arq_rx_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -80,10 +80,19 @@ static bool arq_exit = false;
static bool arq_enabled;
static bool abort_flag = false;
static string tosend = "";
static string enroute = "";
//======================================================================
// test code for pskmail eol issues
/// Any access to shared variables must be protected.
static string tosend = ""; // Protected by tosend_mutex
static string enroute = ""; // Protected by tosend_mutex
static string arqtext = ""; // Protected by arq_rx_mutex
static string txstring = ""; // Protected by arq_rx_mutex
static size_t pText; // Protected by arq_rx_mutex
bool arq_text_available = false; // Protected by arq_rx_mutex
// Beware 'arq_text_available' is accessed by other modules.
static bool bSend0x06 = false; // Protected by arq_rx_mutex
// =====================================================================
static const char *asc[128] = {
"<NUL>", "<SOH>", "<STX>", "<ETX>",
@ -140,12 +149,6 @@ string noctrl(string src)
//======================================================================
static string arqtext;
size_t pText;
bool arq_text_available = false;
string txstring;
extern void parse_arqtext(string &toparse);
static void set_button(Fl_Button* button, bool value)
@ -162,10 +165,12 @@ void ParseMode(string src)
int ret = sscanf( src.substr(7, src.length() - 7).c_str(), "%d", &msecs);
if (ret != 1 || msecs < 10 || msecs > 20000) msecs = 100;
}
LOG_INFO("%s %5.2f sec", "ARQ tune on", msecs/1000.0);
if (debug_pskmail)
LOG_INFO("%s %5.2f sec", "ARQ tune on", msecs/1000.0);
trx_tune();
MilliSleep(msecs);
LOG_INFO("%s", "ARQ tune off");
if (debug_pskmail)
LOG_INFO("%s", "ARQ tune off");
trx_receive();
return;
}
@ -175,11 +180,13 @@ LOG_INFO("%s", "ARQ tune off");
int ret = sscanf( src.substr(7, src.length() - 7).c_str(), "%d", &msecs);
if (ret != 1 || msecs < 10 || msecs > 20000) msecs = 100;
}
LOG_INFO("%s %5.2f sec", "ARQ set ptt on", msecs/1000.0);
if (debug_pskmail)
LOG_INFO("%s %5.2f sec", "ARQ set ptt on", msecs/1000.0);
push2talk->set(true);
REQ(&waterfall::set_XmtRcvBtn, wf, true);
MilliSleep(msecs);
LOG_INFO("%s", "ARQ set ptt off");
if (debug_pskmail)
LOG_INFO("%s", "ARQ set ptt off");
push2talk->set(false);
REQ(&waterfall::set_XmtRcvBtn, wf, false);
return;
@ -190,7 +197,8 @@ LOG_INFO("%s", "ARQ set ptt off");
while (trx_state != STATE_RX) {
MilliSleep(10);
}
LOG_INFO("Setting modem to %s", mode_info[i].pskmail_name);
if (debug_pskmail)
LOG_INFO("Setting modem to %s", mode_info[i].pskmail_name);
REQ_SYNC(init_modem_sync, mode_info[i].mode, 0);
break;
}
@ -201,11 +209,13 @@ LOG_INFO("Setting modem to %s", mode_info[i].pskmail_name);
void ParseRSID(string src)
{
if (src == "ON") {
LOG_INFO("%s", "RsID turned ON");
if (debug_pskmail)
LOG_INFO("%s", "RsID turned ON");
REQ(set_button, btnRSID, 1);
}
if (src == "OFF") {
LOG_INFO("%s", "RsID turned OFF");
if (debug_pskmail)
LOG_INFO("%s", "RsID turned OFF");
REQ(set_button, btnRSID, 0);
}
}
@ -214,11 +224,13 @@ void ParseRSID(string src)
void ParseTxRSID(string src)
{
if (src == "ON") {
LOG_INFO("%s", "TxRsID turned ON");
if (debug_pskmail)
LOG_INFO("%s", "TxRsID turned ON");
REQ(set_button, btnTxRSID, 1);
}
if (src == "OFF") {
LOG_INFO("%s", "TxRsID turned OFF");
if (debug_pskmail)
LOG_INFO("%s", "TxRsID turned OFF");
REQ(set_button, btnTxRSID, 0);
}
}
@ -231,14 +243,12 @@ static string strSubCmd;
if (toparse.empty()) return;
if (DEBUG_PSKMAIL)
LOG_INFO("Arq text: %s", noctrl(toparse).c_str());
else
LOG_VERBOSE("Arq text: %s", noctrl(toparse).c_str());
idxCmd = toparse.find("<cmd>");
idxCmdEnd = toparse.find("</cmd>");
if (idxCmd != string::npos && debug_pskmail)
LOG_INFO("Parsing: %s", noctrl(toparse).c_str());
while ( idxCmd != string::npos && idxCmdEnd != string::npos && idxCmdEnd > idxCmd ) {
strCmdText = toparse.substr(idxCmd + 5, idxCmdEnd - idxCmd - 5);
@ -315,11 +325,9 @@ else
#define TIMEOUT 180 // 3 minutes
bool bSend0x06 = false;
//-----------------------------------------------------------------------------
//======================================================================
// Gmfsk ARQ file i/o used only on Linux
//-----------------------------------------------------------------------------
//======================================================================
// checkTLF
// look for files named
// TLFfldigi ==> tlfio is true and
@ -348,14 +356,16 @@ static string TLFlogname;
}
}
bool TLF_arqRx()
static bool TLF_arqRx()
{
/// The mutex is automatically unlocked when returning.
guard_lock arq_rx_lock(&arq_rx_mutex);
#if defined(__WOE32__) || defined(__APPLE__)
return false;
#else
time_t start_time, prog_time;
static char mailline[1000];
static string sAutoFile;
static string sAutoFile("");
sAutoFile.assign(PskMailDir);
sAutoFile.append("gmfsk_autofile");
@ -386,8 +396,6 @@ bool TLF_arqRx()
return true;
}
pthread_mutex_lock (&arq_rx_mutex);
if (arqtext.empty() && !txstring.empty()) {
arqtext = txstring;
if (mailserver && progdefaults.PSKmailSweetSpot)
@ -398,22 +406,19 @@ bool TLF_arqRx()
start_tx();
txstring.clear();
}
pthread_mutex_unlock (&arq_rx_mutex);
}
return true;
#endif
}
//-----------------------------------------------------------------------------
//======================================================================
// Auto transmit of file contained in WRAP_auto_dir
//-----------------------------------------------------------------------------
//======================================================================
bool WRAP_auto_arqRx()
{
time_t start_time, prog_time;
static char mailline[1000];
static string sAutoFile;
static string sAutoFile("");
sAutoFile.assign(FLMSG_WRAP_auto_dir);
sAutoFile.append("wrap_auto_file");
@ -425,6 +430,8 @@ bool WRAP_auto_arqRx()
autofile.open(sAutoFile.c_str());
}
if(autofile) {
/// Mutex is unlocked when leaving the block.
guard_lock arq_rx_lock(&arq_rx_mutex);
txstring.clear();
time(&start_time);
while (!autofile.eof()) {
@ -458,9 +465,9 @@ bool WRAP_auto_arqRx()
return false;
}
//-----------------------------------------------------------------------------
//======================================================================
// Socket ARQ i/o used on all platforms
//-----------------------------------------------------------------------------
//======================================================================
#define ARQLOOP_TIMING 100 // msec
#define CLIENT_TIMEOUT 5 // timeout after N secs
@ -470,7 +477,7 @@ static string errstring;
static pthread_t* arq_socket_thread = 0;
ARQ_SOCKET_Server* ARQ_SOCKET_Server::inst = 0;
static std::vector<ARQCLIENT> arqclient;
static std::vector<ARQCLIENT> arqclient; // Protected by arq_mutex
void arq_run(Socket);
@ -563,23 +570,32 @@ void* ARQ_SOCKET_Server::thread_func(void*)
break;
}
}
if (!arqclient.empty()) {
vector<ARQCLIENT>::iterator p = arqclient.begin();
while (p != arqclient.end()) {
try {
(*p).sock.close();
arqclient.erase(p);
{
/// Mutex is unlocked when leaving the block.
guard_lock arq_lock(&arq_mutex);
if (!arqclient.empty()) {
vector<ARQCLIENT>::iterator p = arqclient.begin();
while (p != arqclient.end()) {
try {
(*p).sock.close();
arqclient.erase(p);
}
catch (...) {;}
p++;
}
catch (...) {;}
p++;
}
}
inst->server_socket->close();
return NULL;
}
void arq_reset()
{
/// Mutex is unlocked when returning from function
guard_lock arq_rx_lock(&arq_rx_mutex);
arqmode = mailserver = mailclient = false;
txstring.clear();
arqtext.clear();
@ -589,7 +605,8 @@ void arq_reset()
void arq_run(Socket s)
{
pthread_mutex_lock (&arq_mutex);
/// Mutex is unlocked when returning from function
guard_lock arq_lock(&arq_mutex);
struct timeval t = { 0, 20000 };
s.set_timeout(t);
s.set_nonblocking();
@ -606,11 +623,12 @@ void arq_run(Socket s)
p++;
}
LOG_INFO("%s", outs.str().c_str());
pthread_mutex_unlock (&arq_mutex);
}
void WriteARQsocket(unsigned char* data, size_t len)
{
/// Mutex is unlocked when returning from function
guard_lock arq_lock(&arq_mutex);
if (arqclient.empty()) return;
static string instr;
instr.clear();
@ -644,12 +662,13 @@ void WriteARQsocket(unsigned char* data, size_t len)
void test_arq_clients()
{
/// Mutex is unlocked when returning from function
guard_lock arq_lock(&arq_mutex);
if (arqclient.empty()) return;
static string instr;
instr.clear();
vector<ARQCLIENT>::iterator p;
p = arqclient.begin();
pthread_mutex_lock (&arq_mutex);
time_t now;
while (p != arqclient.end()) {
if (difftime(now = time(0), (*p).keep_alive) > CLIENT_TIMEOUT) {
@ -673,106 +692,105 @@ void test_arq_clients()
}
}
if (arqclient.empty()) arq_reset();
pthread_mutex_unlock (&arq_mutex);
}
bool Socket_arqRx()
{
if (arqclient.empty()) return false;
{
/// Mutex is unlocked when leaving block
guard_lock arq_lock(&arq_mutex);
if (arqclient.empty()) return false;
static string instr;
vector<ARQCLIENT>::iterator p = arqclient.begin();
size_t n = 0;
instr.clear();
static string instr;
vector<ARQCLIENT>::iterator p = arqclient.begin();
size_t n = 0;
instr.clear();
pthread_mutex_lock (&arq_mutex);
while (p != arqclient.end()) {
try {
(*p).sock.wait(0);
n = (*p).sock.recv(instr);
if ( n > 0) {
txstring.append(instr);
(*p).keep_alive = time(0);
}
p++;
}
catch (const SocketException& e) {
txstring.clear();
LOG_INFO("closing socket fd %d, %d: %s", (*p).sock.fd(), e.error(), e.what());
while (p != arqclient.end()) {
try {
(*p).sock.close();
} catch (const SocketException& e) {
LOG_ERROR("socket error on # %d, %d: %s", (*p).sock.fd(), e.error(), e.what());
(*p).sock.wait(0);
n = (*p).sock.recv(instr);
if ( n > 0) {
txstring.append(instr);
(*p).keep_alive = time(0);
}
p++;
}
catch (const SocketException& e) {
txstring.clear();
LOG_INFO("closing socket fd %d, %d: %s", (*p).sock.fd(), e.error(), e.what());
try {
(*p).sock.close();
} catch (const SocketException& e) {
LOG_ERROR("socket error on # %d, %d: %s", (*p).sock.fd(), e.error(), e.what());
}
arqclient.erase(p);
}
arqclient.erase(p);
}
}
if (arqclient.empty()) arq_reset();
pthread_mutex_unlock (&arq_mutex);
if (!txstring.empty()) parse_arqtext(txstring);
if (abort_flag) {
AbortARQ();
abort_flag = false;
return true;
if (arqclient.empty()) arq_reset();
}
if (bSend0x06 || txstring.empty()) return false;
{
/// Mutex is unlocked when leaving block
guard_lock arq_rx_lock(&arq_rx_mutex);
if (!txstring.empty()) parse_arqtext(txstring);
pthread_mutex_lock (&arq_rx_mutex);
if (abort_flag) {
AbortARQ();
abort_flag = false;
return true;
}
if (arqtext.empty()) {
arqtext.assign(txstring);
LOG_INFO("Assigned tx text: %s", noctrl(txstring).c_str());
pText = 0;
if (mailserver && progdefaults.PSKmailSweetSpot)
active_modem->set_freq(progdefaults.PSKsweetspot);
start_tx();
} else {
arqtext.append(txstring);
LOG_INFO("Appended tx text: %s", noctrl(txstring).c_str());
if (trx_state != STATE_TX) {
LOG_INFO("%s","Restarting TX");
{
if (bSend0x06 || txstring.empty()) return false;
if (arqtext.empty()) {
arqtext.assign(txstring);
// if (debug_pskmail)
// LOG_INFO("Assigned tx text: %s", noctrl(txstring).c_str());
pText = 0;
if (mailserver && progdefaults.PSKmailSweetSpot)
active_modem->set_freq(progdefaults.PSKsweetspot);
start_tx();
} else {
arqtext.append(txstring);
// if (debug_pskmail)
// LOG_INFO("Appended tx text: %s", noctrl(txstring).c_str());
if (trx_state != STATE_TX) {
if (debug_pskmail)
LOG_INFO("%s","Restarting TX");
start_tx();
}
}
txstring.clear();
arq_text_available = true;
active_modem->set_stopflag(false);
}
}
txstring.clear();
arq_text_available = true;
active_modem->set_stopflag(false);
pthread_mutex_unlock (&arq_rx_mutex);
return true;
}
// ============================================================================
//======================================================================
// Implementation using thread vice the fldigi timeout facility
// ============================================================================
//======================================================================
void WriteARQ(unsigned char data)
{
pthread_mutex_lock (&tosend_mutex);
guard_lock tosend_lock(&tosend_mutex);
tosend += data;
pthread_mutex_unlock (&tosend_mutex);
return;
}
void WriteARQ(const char *data)
{
pthread_mutex_lock (&tosend_mutex);
guard_lock tosend_lock(&tosend_mutex);
tosend.append(data);
pthread_mutex_unlock (&tosend_mutex);
return;
}
static void *arq_loop(void *args)
{
static unsigned char szACK = 0x06;
SET_THREAD_ID(ARQ_TID);
for (;;) {
@ -782,27 +800,23 @@ static void *arq_loop(void *args)
test_arq_clients();
enroute.clear();
pthread_mutex_lock(&tosend_mutex);
if (!tosend.empty()) {
enroute = tosend;
tosend.clear();
}
pthread_mutex_unlock(&tosend_mutex);
{
/// Mutex is unlocked when exiting block
guard_lock tosend_lock(&tosend_mutex);
enroute.clear();
if (!tosend.empty()) {
enroute = tosend;
tosend.clear();
}
if (!enroute.empty()) {
pthread_mutex_lock (&arq_mutex);
WriteARQsocket((unsigned char*)enroute.c_str(), enroute.length());
pthread_mutex_unlock (&arq_mutex);
if (!enroute.empty()) {
WriteARQsocket((unsigned char*)enroute.c_str(), enroute.length());
}
}
if (bSend0x06) {
pthread_mutex_lock (&arq_mutex);
string xmtdone;
xmtdone += 0x06;
WriteARQsocket((unsigned char*)xmtdone.c_str(), xmtdone.length());
WriteARQsocket(&szACK, 1);
bSend0x06 = false;
pthread_mutex_unlock (&arq_mutex);
}
@ -853,9 +867,10 @@ void arq_close(void)
int arq_get_char()
{
/// Mutex is unlocked when returning from function
guard_lock arq_rx_lock(&arq_rx_mutex);
int c = 0;
if (arq_text_available) {
pthread_mutex_lock (&arq_rx_mutex);
if (pText != arqtext.length()) {
c = arqtext[pText++] & 0xFF;
} else {
@ -865,30 +880,33 @@ int arq_get_char()
arq_text_available = false;
c = GET_TX_CHAR_ETX;
}
pthread_mutex_unlock (&arq_rx_mutex);
}
return c;
}
//======================================================================
// following function used if the T/R button is pressed to stop a transmission
// that is servicing the ARQ text buffer. It allows the ARQ client to reset
// itself properly
//======================================================================
void AbortARQ() {
pthread_mutex_lock (&arq_mutex);
/// Mutex is unlocked when returning from function
guard_lock arq_lock(&arq_rx_mutex);
arqtext.clear();
txstring.clear();
pText = 0;
arq_text_available = false;
bSend0x06 = true;
pthread_mutex_unlock (&arq_mutex);
}
//======================================================================
// Special notification for PSKMAIL: new mode marked only, in following
// format: "<DC2><Mode:newmode>", with <DC2> = '0x12'.
//======================================================================
void pskmail_notify_rsid(trx_mode mode)
{
char buf[64];
static char buf[64];
memset(buf, 0, sizeof(buf));
int n = snprintf(buf, sizeof(buf),
"\x12<Mode:%s>\n",
mode_info[mode].name);
@ -899,12 +917,15 @@ void pskmail_notify_rsid(trx_mode mode)
}
}
//======================================================================
// Special notification for PSKMAIL: signal to noise measured by decoder
// format "<DC2><s2n: CC, A.a, D.d>"
// where CC = count, A.a = average s/n, D.d = Std dev of s/n
//======================================================================
void pskmail_notify_s2n(double s2n_ncount, double s2n_avg, double s2n_stddev)
{
char buf[64];
static char buf[64];
memset(buf, 0, sizeof(buf));
int n = snprintf(buf, sizeof(buf),
"\x12<s2n: %1.0f, %1.1f, %1.1f>\n",
s2n_ncount, s2n_avg, s2n_stddev);

Wyświetl plik

@ -52,6 +52,14 @@
#include "icons.h"
#include "gettext.h"
#include "threads.h"
#ifndef FLARQ_VERSION
# include "fl_digi.h"
#endif
static pthread_mutex_t debug_mutex = PTHREAD_MUTEX_INITIALIZER;
extern Fl_Double_Window *fl_digi_main;
extern void update_main_title();
@ -76,6 +84,8 @@ debug* debug::inst = 0;
debug::level_e debug::level = debug::INFO_LEVEL;
uint32_t debug::mask = ~0u;
bool debug_pskmail = false;
static const char* prefix[] = {
_("Quiet"), _("Error"), _("Warning"), _("Info"), _("Verbose"), _("Debug")
};
@ -154,6 +164,7 @@ void debug::start(const char* filename)
void debug::stop(void)
{
guard_lock debug_lock(&debug_mutex);
if (window) {
window->hide();
delete window;
@ -168,10 +179,11 @@ static char fmt[1024];
void debug::log(level_e level, const char* func, const char* srcf, int line, const char* format, ...)
{
guard_lock debug_lock(&debug_mutex);
if (!inst)
return;
if (unlikely(debug::level == DEBUG_LEVEL) || DEBUG_PSKMAIL) {
if (unlikely(debug::level == DEBUG_LEVEL) || debug_pskmail) {
time_t t = time(NULL);
struct tm stm;
(void)localtime_r(&t, &stm);