* Changed to multiple client model
pull/1/head
Dave Freese (none) 2012-11-08 19:41:20 -06:00 zatwierdzone przez David Freese
rodzic 39bce3d6a3
commit c7f2918859
5 zmienionych plików z 83 dodań i 74 usunięć

Wyświetl plik

@ -6297,13 +6297,7 @@ int get_tx_char(void)
if (idling) { return GET_TX_CHAR_NODATA; } if (idling) { return GET_TX_CHAR_NODATA; }
if (arq_text_available) { if (arq_text_available) {
char character = (arq_get_char() & 0xFF); return arq_get_char();
if (character == 0x03) {
// ETX (0x03) in ARQ data means "stop transmitting" not "send ETX"
return(GET_TX_CHAR_ETX);
}
else
return(character);
} }
if (active_modem == cw_modem && progdefaults.QSKadjust) if (active_modem == cw_modem && progdefaults.QSKadjust)

Wyświetl plik

@ -1,7 +1,10 @@
#ifndef ARQIO_H #ifndef ARQIO_H
#define ARQIO_H #define ARQIO_H
class Socket; #include <vector>
#include "threads.h"
#include "socket.h"
extern void WriteARQsocket(unsigned char* data, size_t len); extern void WriteARQsocket(unsigned char* data, size_t len);
extern bool Socket_arqRx(); extern bool Socket_arqRx();

Wyświetl plik

@ -70,7 +70,7 @@ extern bool mailclient;
extern bool mailserver; extern bool mailserver;
extern bool tlfio; extern bool tlfio;
extern bool arq_text_available; extern bool arq_text_available;
extern char arq_get_char(); extern int arq_get_char();
// ARQ mail implementation // ARQ mail implementation
extern void arq_init(); extern void arq_init();

Wyświetl plik

@ -114,7 +114,6 @@ string noctrl(string src)
//====================================================================== //======================================================================
static string arqtext; static string arqtext;
//string::iterator pText;
size_t pText; size_t pText;
bool arq_text_available = false; bool arq_text_available = false;
@ -455,22 +454,15 @@ bool WRAP_auto_arqRx()
// Socket ARQ i/o used on all platforms // Socket ARQ i/o used on all platforms
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
extern void arq_run(Socket s); static string errstring;
extern void arq_stop(); static string cmdstring;
string errstring;
string cmdstring;
string response;
bool isTxChar = false;
bool isCmdChar = false;
bool isNotMULTIPSK = true;
static pthread_t* arq_socket_thread = 0; static pthread_t* arq_socket_thread = 0;
ARQ_SOCKET_Server* ARQ_SOCKET_Server::inst = 0; ARQ_SOCKET_Server* ARQ_SOCKET_Server::inst = 0;
static std::vector<Socket> arqclient;
Socket arqclient; void arq_run(Socket);
bool isSocketConnected = false; void arq_stop(Socket);
ARQ_SOCKET_Server::ARQ_SOCKET_Server() ARQ_SOCKET_Server::ARQ_SOCKET_Server()
{ {
@ -493,8 +485,7 @@ ARQ_SOCKET_Server::~ARQ_SOCKET_Server()
bool ARQ_SOCKET_Server::start(const char* node, const char* service) bool ARQ_SOCKET_Server::start(const char* node, const char* service)
{ {
if (inst) if (inst) return false;
return false;
inst = new ARQ_SOCKET_Server; inst = new ARQ_SOCKET_Server;
@ -545,9 +536,11 @@ void* ARQ_SOCKET_Server::thread_func(void*)
try { try {
#ifdef __WOE32__ #ifdef __WOE32__
if (inst->server_socket->wait(0)) if (inst->server_socket->wait(0))
#endif
arq_run(inst->server_socket->accept()); arq_run(inst->server_socket->accept());
#else
arq_run(inst->server_socket->accept());
TEST_THREAD_CANCEL(); TEST_THREAD_CANCEL();
#endif
} }
catch (const SocketException& e) { catch (const SocketException& e) {
if (e.error() != EINTR) { if (e.error() != EINTR) {
@ -560,61 +553,74 @@ void* ARQ_SOCKET_Server::thread_func(void*)
break; break;
} }
} }
arq_stop(); if (!arqclient.empty()) {
vector<Socket>::iterator p = arqclient.begin();
while (p != arqclient.end()) {
arq_stop(*p);
p++;
}
}
inst->server_socket->close(); inst->server_socket->close();
return NULL; return NULL;
} }
void arq_run(Socket s) void arq_run(Socket s)
{ {
LOG_INFO("Adding ARQ client %d", s.fd());
struct timeval t = { 0, 20000 }; struct timeval t = { 0, 20000 };
arqclient = s; s.set_timeout(t);
arqclient.set_timeout(t); s.set_nonblocking();
arqclient.set_nonblocking(); arqclient.push_back(s);
isSocketConnected = true;
arqmode = true; arqmode = true;
} }
void arq_stop() void arq_stop(Socket s)
{ {
arqclient.close(); LOG_INFO("Closing socket %d", s.fd());
isSocketConnected = false; s.close();
arqmode = false;
} }
void WriteARQsocket(unsigned char* data, size_t len) void WriteARQsocket(unsigned char* data, size_t len)
{ {
static string instr; if (arqclient.empty()) return;
static string instr;
instr.clear(); instr.clear();
vector<Socket>::iterator p;
try { try {
size_t n = arqclient.recv(instr); p = arqclient.begin();
if ( n > 0) txstring.append(instr); while (p != arqclient.end()) {
} catch (const SocketException& e) { (*p).wait(1);
arq_stop(); (*p).send(data, len);
return; LOG_INFO("Wrote to socket %d", (*p).fd());
} p++;
if (!isSocketConnected) return; }
try {
arqclient.send(data, len);
} }
catch (const SocketException& e) { catch (const SocketException& e) {
LOG_ERROR("%s", e.what()); LOG_ERROR("socket fd %d %s", (*p).fd(), e.what());
arq_stop(); arq_stop(*p);
arqclient.erase(p);
if (arqclient.empty()) arqmode = false;
} }
} }
bool Socket_arqRx() bool Socket_arqRx()
{ {
if (!isSocketConnected) return false; if (arqclient.empty()) return false;
static string instr; static string instr;
vector<Socket>::iterator p = arqclient.begin();
size_t n = 0;
instr.clear(); instr.clear();
try { try {
size_t n = arqclient.recv(instr); while (p != arqclient.end()) {
if ( n > 0) (*p).wait(0);
txstring.append(instr); n = (*p).recv(instr);
if ( n > 0) {
txstring.append(instr);
}
p++;
}
if (!bSend0x06 && arqtext.empty() && !txstring.empty()) { if (!bSend0x06 && arqtext.empty() && !txstring.empty()) {
arqtext = txstring; arqtext = txstring;
parse_arqtext(arqtext); parse_arqtext(arqtext);
@ -635,7 +641,9 @@ bool Socket_arqRx()
return false; return false;
} }
catch (const SocketException& e) { catch (const SocketException& e) {
arq_stop(); LOG_ERROR("socket fd %d %s", (*p).fd(), e.what());
arq_stop(*p);
arqclient.erase(p);
return false; return false;
} }
} }
@ -678,8 +686,8 @@ static void *arq_loop(void *args);
static bool arq_exit = false; static bool arq_exit = false;
static bool arq_enabled; static bool arq_enabled;
string tosend = ""; static string tosend = "";
string enroute = ""; static string enroute = "";
void WriteARQ(unsigned char data) void WriteARQ(unsigned char data)
{ {
@ -687,11 +695,6 @@ void WriteARQ(unsigned char data)
tosend += data; tosend += data;
pthread_mutex_unlock (&arq_mutex); pthread_mutex_unlock (&arq_mutex);
return; return;
// WriteARQsocket(&data, 1);
//#if !defined(__WOE32__) && !defined(__APPLE__)
// WriteARQSysV(data);
//#endif
} }
static void *arq_loop(void *args) static void *arq_loop(void *args)
@ -731,7 +734,6 @@ static void *arq_loop(void *args)
if (!Socket_arqRx()) if (!Socket_arqRx())
WRAP_auto_arqRx(); WRAP_auto_arqRx();
#endif #endif
// pthread_mutex_unlock (&arq_mutex);
MilliSleep(100); MilliSleep(100);
} }
@ -773,19 +775,19 @@ void arq_close(void)
arq_exit = false; arq_exit = false;
} }
char arq_get_char() int arq_get_char()
{ {
char c = 0; int c = 0;
pthread_mutex_lock (&arq_mutex); pthread_mutex_lock (&arq_mutex);
if (arq_text_available) { if (arq_text_available) {
if (pText != arqtext.length()) if (pText != arqtext.length()) {
c = arqtext[pText++]; c = arqtext[pText++] & 0xFF;
else { } else {
arqtext.clear(); arqtext.clear();
pText = 0; pText = 0;
bSend0x06 = true; bSend0x06 = true;
arq_text_available = false; arq_text_available = false;
c = 0x03; c = GET_TX_CHAR_ETX;
} }
} }
pthread_mutex_unlock (&arq_mutex); pthread_mutex_unlock (&arq_mutex);

Wyświetl plik

@ -694,12 +694,16 @@ size_t Socket::send(const void* buf, size_t len)
const char *sp = (const char *)buf; const char *sp = (const char *)buf;
while ( nToWrite > 0) { while ( nToWrite > 0) {
try {
#if defined(__WIN32__) #if defined(__WIN32__)
r = ::send(sockfd, sp, nToWrite, 0); r = ::send(sockfd, sp, nToWrite, 0);
#else #else
r = ::write(sockfd, sp, nToWrite); r = ::write(sockfd, sp, nToWrite);
#endif #endif
}
catch (...) {
throw;
}
if (r > 0) { if (r > 0) {
sp += r; sp += r;
nToWrite -= r; nToWrite -= r;
@ -747,13 +751,19 @@ size_t Socket::recv(void* buf, size_t len)
if (!wait(0)) if (!wait(0))
return 0; return 0;
ssize_t r = ::recv(sockfd, (char*)buf, len, 0); int r = 0;
if (r == 0) try {
shutdown(sockfd, SHUT_RD); r = ::recv(sockfd, (char*)buf, len, 0);
else if (r == -1) { if (r == 0)
if (errno != EAGAIN) shutdown(sockfd, SHUT_RD);
throw SocketException(errno, "recv"); else if (r == -1) {
r = 0; if (errno != EAGAIN)
throw SocketException(errno, "recv");
r = 0;
}
}
catch (...) {
throw;
} }
return r; return r;