diff --git a/include/hamlib/rig.h b/include/hamlib/rig.h index 88635aa95..ce9dfe38d 100644 --- a/include/hamlib/rig.h +++ b/include/hamlib/rig.h @@ -2148,6 +2148,10 @@ extern HAMLIB_EXPORT (long long) rig_get_caps_int(rig_model_t rig_model, enum ri //! @cond Doxygen_Suppress extern HAMLIB_EXPORT (const char *) rig_get_caps_cptr(rig_model_t rig_model, enum rig_caps_cptr_e rig_caps); +struct hamlib_async_pipe; + +typedef struct hamlib_async_pipe hamlib_async_pipe_t; + /** * \brief Port definition * @@ -2214,16 +2218,20 @@ typedef struct hamlib_port { int client_port; /*!< client socket port for tcp connection */ RIG *rig; /*!< our parent RIG device */ -} hamlib_port_t; -//! @endcond - -typedef struct hamlib_async { +#ifdef ASYNC_BUG int async; /*!< enable asynchronous data handling if true */ +#if defined(_WIN32) + hamlib_async_pipe_t *sync_data_pipe; /*!< pipe data structure for synchronous data */ + hamlib_async_pipe_t *sync_data_error_pipe; /*!< pipe data structure for synchronous data error codes */ +#else int fd_sync_write; /*!< file descriptor for writing synchronous data */ int fd_sync_read; /*!< file descriptor for reading synchronous data */ int fd_sync_error_write; /*!< file descriptor for writing synchronous data error codes */ int fd_sync_error_read; /*!< file descriptor for reading synchronous data error codes */ -} hamlib_async_t; +#endif +#endif +} hamlib_port_t; +//! @endcond #if !defined(__APPLE__) || !defined(__cplusplus) typedef hamlib_port_t port_t; @@ -2233,7 +2241,7 @@ typedef hamlib_port_t port_t; #define HAMLIB_ELAPSED_SET 1 #define HAMLIB_ELAPSED_INVALIDATE 2 -#define HAMLIB_CACHE_ALWAYS -1 /*< value to set cache timeout to always use cache */ +#define HAMLIB_CACHE_ALWAYS (-1) /*< value to set cache timeout to always use cache */ typedef enum { HAMLIB_CACHE_ALL, // to set all cache timeouts at once @@ -2393,7 +2401,7 @@ struct rig_state { rig_ptr_t priv; /*!< Pointer to private rig state data. */ rig_ptr_t obj; /*!< Internal use by hamlib++ for event handling. */ - int async_data; /*!< Whether async data mode is on */ + int async_data_enabled; /*!< Whether async data mode is enabled */ int poll_interval; /*!< Rig state polling period in milliseconds */ freq_t current_freq; /*!< Frequency currently set */ rmode_t current_mode; /*!< Mode currently set */ @@ -2437,7 +2445,6 @@ struct rig_state { void *async_data_handler_priv_data; volatile int poll_routine_thread_run; void *poll_routine_priv_data; - hamlib_async_t asyncport; }; //! @cond Doxygen_Suppress diff --git a/lib/Makefile.am b/lib/Makefile.am index 6c45280bc..391cb2279 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -3,5 +3,5 @@ EXTRA_DIST = getopt.c getopt.h getopt_long.c usleep.c \ noinst_LTLIBRARIES = libmisc.la -libmisc_la_SOURCES = cJSON.c cJSON.h +libmisc_la_SOURCES = cJSON.c cJSON.h asyncpipe.c asyncpipe.h libmisc_la_LIBADD = $(LTLIBOBJS) $(NET_LIBS) diff --git a/lib/asyncpipe.c b/lib/asyncpipe.c new file mode 100644 index 000000000..7def21010 --- /dev/null +++ b/lib/asyncpipe.c @@ -0,0 +1,289 @@ +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "asyncpipe.h" + +#if defined(WIN32) && defined(HAVE_WINDOWS_H) + +static volatile long pipe_serial_nunber; + +int async_pipe_create(hamlib_async_pipe_t **pipe_out, unsigned long pipe_buffer_size, unsigned long pipe_connect_timeout_millis) +{ + DWORD error_code; + CHAR pipe_name[MAX_PATH]; + hamlib_async_pipe_t *pipe; + + pipe = calloc(1, sizeof(hamlib_async_pipe_t)); + if (pipe == NULL) + { + return -RIG_ENOMEM; + } + + if (pipe_buffer_size == 0) + { + pipe_buffer_size = PIPE_BUFFER_SIZE_DEFAULT; + } + + sprintf(pipe_name, + "\\\\.\\Pipe\\Hamlib.%08lx.%08lx", + GetCurrentProcessId(), + InterlockedIncrement(&pipe_serial_nunber) + ); + + pipe->read = CreateNamedPipe( + pipe_name, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, // Number of pipes + pipe_buffer_size, // Out buffer size + pipe_buffer_size, // In buffer size + pipe_connect_timeout_millis, // Timeout in ms + NULL); + + if (!pipe->read) + { + return -RIG_EINTERNAL; + } + + pipe->write = CreateFile( + pipe_name, + GENERIC_WRITE, + 0, // No sharing + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, + NULL // Template file + ); + + if (pipe->write == INVALID_HANDLE_VALUE) + { + error_code = GetLastError(); + CloseHandle(pipe->read); + free(pipe); + SetLastError(error_code); + return -RIG_EINTERNAL; + } + + pipe->read_overlapped.hEvent = CreateEvent( + NULL, // default security attribute + TRUE, // manual-reset event + FALSE, // initial state = not signaled + NULL); // unnamed event object + + if (pipe->read_overlapped.hEvent == NULL) + { + error_code = GetLastError(); + CloseHandle(pipe->read); + CloseHandle(pipe->write); + free(pipe); + SetLastError(error_code); + return -RIG_EINTERNAL; + } + + pipe->write_overlapped.hEvent = CreateEvent( + NULL, // default security attribute + TRUE, // manual-reset event + FALSE, // initial state = not signaled + NULL); // unnamed event object + + if (pipe->write_overlapped.hEvent == NULL) + { + error_code = GetLastError(); + CloseHandle(pipe->read_overlapped.hEvent); + CloseHandle(pipe->read); + CloseHandle(pipe->write); + free(pipe); + SetLastError(error_code); + return -RIG_EINTERNAL; + } + + *pipe_out = pipe; + + return RIG_OK; +} + +void async_pipe_close(hamlib_async_pipe_t *pipe) +{ + if (pipe->read != NULL) + { + CloseHandle(pipe->read); + pipe->read = NULL; + } + if (pipe->write != NULL) + { + CloseHandle(pipe->write); + pipe->write = NULL; + } + + if (pipe->read_overlapped.hEvent != NULL) + { + CloseHandle(pipe->read_overlapped.hEvent); + pipe->read_overlapped.hEvent = NULL; + } + if (pipe->write_overlapped.hEvent != NULL) + { + CloseHandle(pipe->write_overlapped.hEvent); + pipe->write_overlapped.hEvent = NULL; + } + + free(pipe); +} + +ssize_t async_pipe_read(hamlib_async_pipe_t *pipe, void *buf, size_t count, int timeout) +{ + HANDLE event_handles[1] = { + pipe->read_overlapped.hEvent, + }; + HANDLE read_handle = pipe->read; + LPOVERLAPPED overlapped = &pipe->read_overlapped; + DWORD wait_result; + int result; + ssize_t bytes_read; + + result = ReadFile(read_handle, buf, count, NULL, overlapped); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + wait_result = WaitForMultipleObjects(1, event_handles, FALSE, timeout); + + switch (wait_result) + { + case WAIT_OBJECT_0 + 0: + break; + + case WAIT_TIMEOUT: + if (count == 0) + { + // Zero-length reads are used to wait for incoming data, + // so the I/O operation needs to be cancelled in case of a timeout + CancelIo(read_handle); + return -RIG_ETIMEOUT; + } + else + { + // Should not happen, as reads with count > 0 are used only when there is data available in the pipe + return -RIG_EINTERNAL; + } + + default: + result = GetLastError(); + rig_debug(RIG_DEBUG_ERR, "%s: WaitForMultipleObjects() error: %d\n", __func__, result); + return -RIG_EINTERNAL; + } + break; + default: + rig_debug(RIG_DEBUG_ERR, "%s: ReadFile() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + result = GetOverlappedResult(read_handle, overlapped, (LPDWORD) &bytes_read, FALSE); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + // Shouldn't happen? + return -RIG_ETIMEOUT; + default: + rig_debug(RIG_DEBUG_ERR, "%s: GetOverlappedResult() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + return bytes_read; +} + +int async_pipe_wait_for_data(hamlib_async_pipe_t *pipe, int timeout) +{ + unsigned char data; + int result; + + // Use a zero-length read to wait for data in pipe + result = async_pipe_read(pipe, &data, 0, timeout); + + if (result > 0) + { + return RIG_OK; + } + + return result; +} + +ssize_t async_pipe_write(hamlib_async_pipe_t *pipe, const unsigned char *buf, size_t count, int timeout) +{ + HANDLE event_handles[1] = { + pipe->write_overlapped.hEvent, + }; + HANDLE write_handle = pipe->write; + LPOVERLAPPED overlapped = &pipe->write_overlapped; + DWORD wait_result; + int result; + ssize_t bytes_written; + + result = WriteFile(write_handle, buf, count, NULL, overlapped); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + wait_result = WaitForMultipleObjects(1, event_handles, FALSE, timeout); + + switch (wait_result) + { + case WAIT_OBJECT_0 + 0: + break; + + case WAIT_TIMEOUT: + CancelIo(write_handle); + return -RIG_ETIMEOUT; + + default: + result = GetLastError(); + rig_debug(RIG_DEBUG_ERR, "%s: WaitForMultipleObjects() error: %d\n", __func__, result); + return -RIG_EINTERNAL; + } + break; + default: + rig_debug(RIG_DEBUG_ERR, "%s: WriteFile() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + result = GetOverlappedResult(write_handle, overlapped, (LPDWORD) &bytes_written, FALSE); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + // Shouldn't happen? + return -RIG_ETIMEOUT; + default: + rig_debug(RIG_DEBUG_ERR, "%s: GetOverlappedResult() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + return bytes_written; +} + +#endif diff --git a/lib/asyncpipe.h b/lib/asyncpipe.h new file mode 100644 index 000000000..f9af2af0c --- /dev/null +++ b/lib/asyncpipe.h @@ -0,0 +1,30 @@ +#ifndef _ASYNC_PIPE_H +#define _ASYNC_PIPE_H 1 + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(WIN32) && defined(HAVE_WINDOWS_H) + +#include +#include + +#define PIPE_BUFFER_SIZE_DEFAULT 65536 + +struct hamlib_async_pipe { + HANDLE write; + HANDLE read; + OVERLAPPED write_overlapped; + OVERLAPPED read_overlapped; +}; + +int async_pipe_create(hamlib_async_pipe_t **pipe_out, unsigned long pipe_buffer_size, unsigned long pipe_connect_timeout_millis); +void async_pipe_close(hamlib_async_pipe_t *pipe); +ssize_t async_pipe_read(hamlib_async_pipe_t *pipe, void *buf, size_t count, int timeout); +int async_pipe_wait_for_data(hamlib_async_pipe_t *pipe, int timeout); +ssize_t async_pipe_write(hamlib_async_pipe_t *pipe, const unsigned char *buf, size_t count, int timeout); + +#endif + +#endif diff --git a/rigs/icom/icom.c b/rigs/icom/icom.c index d117e57d4..f0758922c 100644 --- a/rigs/icom/icom.c +++ b/rigs/icom/icom.c @@ -2510,6 +2510,8 @@ int icom_get_mode(RIG *rig, vfo_t vfo, rmode_t *mode, pbwidth_t *width) } else { + priv_data->filter = 0; + if (mode_len == 2) priv_data->filter = modebuf[2]; rig_debug(RIG_DEBUG_TRACE, "%s: modebuf[0]=0x%02x, modebuf[1]=0x%02x, mode_len=%d\n", __func__, modebuf[0], modebuf[1], mode_len); diff --git a/rigs/icom/icom.h b/rigs/icom/icom.h index 412144b63..f98ad2743 100644 --- a/rigs/icom/icom.h +++ b/rigs/icom/icom.h @@ -30,7 +30,7 @@ #include #endif -#define BACKEND_VER "20211222" +#define BACKEND_VER "20220105" #define ICOM_IS_SECONDARY_VFO(vfo) ((vfo) & (RIG_VFO_B | RIG_VFO_SUB | RIG_VFO_SUB_B | RIG_VFO_MAIN_B)) #define ICOM_GET_VFO_NUMBER(vfo) (ICOM_IS_SECONDARY_VFO(vfo) ? 0x01 : 0x00) diff --git a/src/conf.c b/src/conf.c index 9347f7376..b6225476a 100644 --- a/src/conf.c +++ b/src/conf.c @@ -168,6 +168,11 @@ static const struct confparams frontend_cfg_params[] = "Suppress get_freq on VFOB for RIT tuning satellites", "Unset", RIG_CONF_COMBO, { .c = {{ "Unset", "ON", "OFF", NULL }} } }, + { + TOK_ASYNC, "async", "Asynchronous data transfer support", + "True enables asynchronous data transfer for backends that support it. This allows use of transceive and spectrum data.", + "0", RIG_CONF_CHECKBUTTON, { } + }, { RIG_CONF_END, NULL, } }; @@ -666,6 +671,15 @@ static int frontend_set_conf(RIG *rig, token_t token, const char *val) rs->twiddle_rit = val_i ? 1 : 0; break; + case TOK_ASYNC: + if (1 != sscanf(val, "%d", &val_i)) + { + return -RIG_EINVAL; //value format error + } + + rs->async_data_enabled = val_i ? 1 : 0; + break; + default: return -RIG_EINVAL; } @@ -1010,6 +1024,9 @@ static int frontend_get_conf(RIG *rig, token_t token, char *val) sprintf(val, "%d", rs->twiddle_rit); break; + case TOK_ASYNC: + sprintf(val, "%d", rs->async_data_enabled); + break; default: return -RIG_EINVAL; diff --git a/src/iofunc.c b/src/iofunc.c index fb2c4c91c..76171213c 100644 --- a/src/iofunc.c +++ b/src/iofunc.c @@ -1,6 +1,6 @@ /* * Hamlib Interface - generic file based io functions - * Copyright (c) 2021 by Mikael Nousiainen + * Copyright (c) 2021-2022 by Mikael Nousiainen * Copyright (c) 2000-2012 by Stephane Fillod * Copyright (c) 2000-2003 by Frank Singleton * @@ -55,9 +55,72 @@ #include "network.h" #include "cm108.h" #include "gpio.h" +#include "asyncpipe.h" #ifdef ASYNC_BUG -static void close_sync_data_pipe(hamlib_async_t *p) + +#if defined(WIN32) && defined(HAVE_WINDOWS_H) +#include + +static void init_sync_data_pipe(hamlib_port_t *p) +{ + p->sync_data_pipe = NULL; + p->sync_data_error_pipe = NULL; +} + +static void close_sync_data_pipe(hamlib_port_t *p) +{ + ENTERFUNC; + + if (p->sync_data_pipe != NULL) + { + async_pipe_close(p->sync_data_pipe); + p->sync_data_pipe = NULL; + } + + if (p->sync_data_error_pipe != NULL) + { + async_pipe_close(p->sync_data_error_pipe); + p->sync_data_error_pipe = NULL; + } +} + +static int create_sync_data_pipe(hamlib_port_t *p) +{ + int status; + + ENTERFUNC; + + status = async_pipe_create(&p->sync_data_pipe, PIPE_BUFFER_SIZE_DEFAULT, p->timeout); + if (status < 0) + { + close_sync_data_pipe(p); + RETURNFUNC(-RIG_EINTERNAL); + } + + status = async_pipe_create(&p->sync_data_error_pipe, PIPE_BUFFER_SIZE_DEFAULT, p->timeout); + if (status < 0) + { + close_sync_data_pipe(p); + RETURNFUNC(-RIG_EINTERNAL); + } + + rig_debug(RIG_DEBUG_VERBOSE, "%s: created data pipe for synchronous transactions\n", __func__); + + RETURNFUNC(RIG_OK); +} + +#else + +static void init_sync_data_pipe(hamlib_port_t *p) +{ + p->fd_sync_write = -1; + p->fd_sync_read = -1; + p->fd_sync_error_write = -1; + p->fd_sync_error_read = -1; +} + +static void close_sync_data_pipe(hamlib_port_t *p) { if (p->fd_sync_read != -1) { close(p->fd_sync_read); @@ -78,6 +141,67 @@ static void close_sync_data_pipe(hamlib_async_t *p) p->fd_sync_error_write = -1; } } + +static int create_sync_data_pipe(hamlib_port_t *p) +{ + int status; + int sync_pipe_fds[2]; + int flags; + + status = pipe(sync_pipe_fds); + flags = fcntl(sync_pipe_fds[0], F_GETFL); + flags |= O_NONBLOCK; + if (fcntl(sync_pipe_fds[0], F_SETFL, flags)) + { + rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on sync_read=%s\n", __func__, strerror(errno)); + } + flags = fcntl(sync_pipe_fds[1], F_GETFL); + flags |= O_NONBLOCK; + if (fcntl(sync_pipe_fds[1], F_SETFL, flags)) + { + rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on sync_write=%s\n", __func__, strerror(errno)); + } + if (status != 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: synchronous data pipe open status=%d, err=%s\n", __func__, + status, strerror(errno)); + close_sync_data_pipe(p); + RETURNFUNC(-RIG_EINTERNAL); + } + + p->fd_sync_read = sync_pipe_fds[0]; + p->fd_sync_write = sync_pipe_fds[1]; + + status = pipe(sync_pipe_fds); + flags = fcntl(sync_pipe_fds[0], F_GETFL); + flags |= O_NONBLOCK; + if (fcntl(sync_pipe_fds[0], F_SETFL, flags)) + { + rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on error_read=%s\n", __func__, strerror(errno)); + } + flags = fcntl(sync_pipe_fds[1], F_GETFL); + flags |= O_NONBLOCK; + if (fcntl(sync_pipe_fds[1], F_SETFL, flags)) + { + rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on error_write=%s\n", __func__, strerror(errno)); + } + if (status != 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: synchronous data error code pipe open status=%d, err=%s\n", __func__, + status, strerror(errno)); + close_sync_data_pipe(p); + RETURNFUNC(-RIG_EINTERNAL); + } + + p->fd_sync_error_read = sync_pipe_fds[0]; + p->fd_sync_error_write = sync_pipe_fds[1]; + + rig_debug(RIG_DEBUG_VERBOSE, "%s: created data pipe for synchronous transactions\n", __func__); + + RETURNFUNC(RIG_OK); +} + +#endif #endif /** @@ -89,80 +213,22 @@ int HAMLIB_API port_open(hamlib_port_t *p) { int status; int want_state_delay = 0; -#ifdef ASYNC_BUG - int sync_pipe_fds[2]; -#endif ENTERFUNC; p->fd = -1; - //p->fd_sync_write = -1; - //p->fd_sync_read = -1; - //p->fd_sync_error_write = -1; - //p->fd_sync_error_read = -1; +#ifdef ASYNC_BUG + init_sync_data_pipe(p); +#endif -#if 0 +#ifdef ASYNC_BUG if (p->async) { -#ifdef HAVE_WINDOWS_H - // this needs to be done with overlapping I/O to achieve non-blocking - status = _pipe(sync_pipe_fds, 256, O_BINARY); -#else - status = pipe(sync_pipe_fds); - int flags = fcntl(sync_pipe_fds[0], F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(sync_pipe_fds[0], F_SETFL, flags)) + status = create_sync_data_pipe(p); + if (status < 0) { - rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on sync_read=%s\n", __func__, strerror(errno)); + RETURNFUNC(status); } - flags = fcntl(sync_pipe_fds[1], F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(sync_pipe_fds[1], F_SETFL, flags)) - { - rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on sync_write=%s\n", __func__, strerror(errno)); - } -#endif - if (status != 0) - { - rig_debug(RIG_DEBUG_ERR, "%s: synchronous data pipe open status=%d, err=%s\n", __func__, - status, strerror(errno)); - close_sync_data_pipe(p); - RETURNFUNC(-RIG_EINTERNAL); - } - - p->fd_sync_read = sync_pipe_fds[0]; - p->fd_sync_write = sync_pipe_fds[1]; - -#ifdef HAVE_WINDOWS_H - // this needs to be done with overlapping I/O to achieve non-blocking - status = _pipe(sync_pipe_fds, 256, O_BINARY); -#else - status = pipe(sync_pipe_fds); - flags = fcntl(sync_pipe_fds[0], F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(sync_pipe_fds[0], F_SETFL, flags)) - { - rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on error_read=%s\n", __func__, strerror(errno)); - } - flags = fcntl(sync_pipe_fds[1], F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(sync_pipe_fds[1], F_SETFL, flags)) - { - rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on error_write=%s\n", __func__, strerror(errno)); - } -#endif - if (status != 0) - { - rig_debug(RIG_DEBUG_ERR, "%s: synchronous data error code pipe open status=%d, err=%s\n", __func__, - status, strerror(errno)); - close_sync_data_pipe(p); - RETURNFUNC(-RIG_EINTERNAL); - } - - p->fd_sync_error_read = sync_pipe_fds[0]; - p->fd_sync_error_write = sync_pipe_fds[1]; - - rig_debug(RIG_DEBUG_VERBOSE, "%s: created synchronous data pipe\n", __func__); } #endif @@ -367,18 +433,164 @@ int HAMLIB_API port_close(hamlib_port_t *p, rig_port_t port_type) extern int is_uh_radio_fd(int fd); +#ifdef ASYNC_BUG +static int port_read_sync_data_error_code(hamlib_port_t *p) +{ + ssize_t total_bytes_read = 0; + unsigned char data; + int result; + + do { + // Wait for data using a zero-length read + result = async_pipe_read(p->sync_data_error_pipe, &data, 0, p->timeout); + if (result < 0) + { + if (result == -RIG_ETIMEOUT) + { + if (total_bytes_read > 0) + { + return data; + } + } + + return result; + } + + result = async_pipe_read(p->sync_data_error_pipe, &data, 1, p->timeout); + if (result < 0) + { + if (result == -RIG_ETIMEOUT) + { + if (total_bytes_read > 0) + { + return data; + } + } + + return result; + } + + total_bytes_read += result; + } while (result > 0); + + return data; +} + +static int port_read_sync_data(hamlib_port_t *p, void *buf, size_t count) +{ + // Wait for data in both the response data pipe and the error code pipe to detect errors occurred during read + HANDLE event_handles[2] = { + p->sync_data_pipe->read_overlapped.hEvent, + p->sync_data_error_pipe->read_overlapped.hEvent, + }; + HANDLE read_handle = p->sync_data_pipe->read; + LPOVERLAPPED overlapped = &p->sync_data_pipe->read_overlapped; + DWORD wait_result; + int result; + ssize_t bytes_read; + + result = ReadFile(p->sync_data_pipe->read, buf, count, NULL, overlapped); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + wait_result = WaitForMultipleObjects(2, event_handles, FALSE, p->timeout); + + switch (wait_result) + { + case WAIT_OBJECT_0 + 0: + break; + + case WAIT_OBJECT_0 + 1: + return port_read_sync_data_error_code(p); + + case WAIT_TIMEOUT: + if (count == 0) + { + CancelIo(read_handle); + return -RIG_ETIMEOUT; + } + else + { + // Should not happen + return -RIG_EINTERNAL; + } + + default: + result = GetLastError(); + rig_debug(RIG_DEBUG_ERR, "%s(): WaitForMultipleObjects() error: %d\n", __func__, result); + return -RIG_EINTERNAL; + } + break; + default: + rig_debug(RIG_DEBUG_ERR, "%s(): ReadFile() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + result = GetOverlappedResult(read_handle, overlapped, (LPDWORD) &bytes_read, FALSE); + if (!result) + { + result = GetLastError(); + switch (result) + { + case ERROR_SUCCESS: + // No error? + break; + case ERROR_IO_PENDING: + // Shouldn't happen? + return -RIG_ETIMEOUT; + default: + rig_debug(RIG_DEBUG_ERR, "%s(): GetOverlappedResult() error: %d\n", __func__, result); + return -RIG_EIO; + } + } + + return bytes_read; +} + +static int port_wait_for_data_sync_pipe(hamlib_port_t *p) +{ + unsigned char data; + int result; + + // Use a zero-length read to wait for data in pipe + result = port_read_sync_data(p, &data, 0); + + if (result > 0) + { + return RIG_OK; + } + + return result; +} + +static ssize_t port_read_sync_data_pipe(hamlib_port_t *p, void *buf, size_t count) +{ + return port_read_sync_data(p, buf, count); +} +#endif + /* On MinGW32/MSVC/.. the appropriate accessor must be used * depending on the port type, sigh. */ static ssize_t port_read_generic(hamlib_port_t *p, void *buf, size_t count, int direct) { -#if ASYNC_BUG - int fd = direct ? p->fd : p->fd_sync_read; -#else int fd = p->fd; -#endif int i; - ssize_t ret; + ssize_t bytes_read; + +#if ASYNC_BUG + if (!direct) + { + return port_read_sync_data_pipe(p, buf, count); + } +#endif /* * Since WIN32 does its special serial read, we have @@ -386,29 +598,29 @@ static ssize_t port_read_generic(hamlib_port_t *p, void *buf, size_t count, int * Note that we always have RIG_PORT_SERIAL in the * microHam case. */ - if (direct && is_uh_radio_fd(fd)) + if (is_uh_radio_fd(fd)) { return read(fd, buf, count); } - if (direct && p->type.rig == RIG_PORT_SERIAL) + if (p->type.rig == RIG_PORT_SERIAL) { - ret = win32_serial_read(fd, buf, (int) count); + bytes_read = win32_serial_read(fd, buf, (int) count); if (p->parm.serial.data_bits == 7) { unsigned char *pbuf = buf; /* clear MSB */ - for (i = 0; i < ret; i++) + for (i = 0; i < bytes_read; i++) { pbuf[i] &= ~0x80; } } - return ret; + return bytes_read; } - else if (direct && (p->type.rig == RIG_PORT_NETWORK || p->type.rig == RIG_PORT_UDP_NETWORK)) + else if (p->type.rig == RIG_PORT_NETWORK || p->type.rig == RIG_PORT_UDP_NETWORK) { return recv(fd, buf, count, 0); } @@ -446,7 +658,6 @@ static ssize_t port_write(hamlib_port_t *p, const void *buf, size_t count) } } - static int port_select(hamlib_port_t *p, int n, fd_set *readfds, @@ -495,6 +706,67 @@ static int port_select(hamlib_port_t *p, } } +static int port_wait_for_data_direct(hamlib_port_t *p) +{ + fd_set rfds, efds; + int fd = p->fd; + struct timeval tv, tv_timeout; + int result; + + tv_timeout.tv_sec = p->timeout / 1000; + tv_timeout.tv_usec = (p->timeout % 1000) * 1000; + + tv = tv_timeout; /* select may have updated it */ + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + efds = rfds; + + result = port_select(p, fd + 1, &rfds, NULL, &efds, &tv, 1); + + if (result == 0) + { + return -RIG_ETIMEOUT; + } + else if (result < 0) + { + rig_debug(RIG_DEBUG_ERR, + "%s(): select() error: %s\n", + __func__, + strerror(errno)); + return -RIG_EIO; + } + + if (FD_ISSET(fd, &efds)) + { + rig_debug(RIG_DEBUG_ERR, "%s(): fd error\n", __func__); + return -RIG_EIO; + } + + return RIG_OK; +} + +static int port_wait_for_data(hamlib_port_t *p, int direct) +{ + if (direct) + { + return port_wait_for_data_direct(p); + } + + return port_wait_for_data_sync_pipe(p); +} + +#ifdef ASYNC_BUG +int HAMLIB_API write_block_sync(hamlib_port_t *p, const unsigned char *txbuffer, size_t count) +{ + return async_pipe_write(p->sync_data_pipe, txbuffer, count, p->timeout); +} + +int HAMLIB_API write_block_sync_error(hamlib_port_t *p, const unsigned char *txbuffer, size_t count) +{ + return async_pipe_write(p->sync_data_error_pipe, txbuffer, count, p->timeout); +} +#endif #else @@ -535,6 +807,133 @@ static ssize_t port_read_generic(hamlib_port_t *p, void *buf, size_t count, int #define port_select(p,n,r,w,e,t,d) select((n),(r),(w),(e),(t)) //! @endcond +static int flush_and_read_last_byte(hamlib_port_t *p, int fd, int direct) +{ + fd_set rfds, efds; + ssize_t bytes_read; + struct timeval tv_timeout; + int result; + char data; + + do { + tv_timeout.tv_sec = 0; + tv_timeout.tv_usec = 0; + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + efds = rfds; + + result = port_select(p, fd + 1, &rfds, NULL, &efds, &tv_timeout, direct); + if (result < 0) + { + return -RIG_ETIMEOUT; + } + if (result == 0) + { + return -RIG_EIO; + } + + if (FD_ISSET(fd, &efds)) + { + return -RIG_EIO; + } + + bytes_read = read(fd, &data, 1); + } while (bytes_read > 0); + + return data; +} + +static int port_wait_for_data(hamlib_port_t *p, int direct) +{ + fd_set rfds, efds; + int fd, errorfd, maxfd; + struct timeval tv, tv_timeout; + int result; + +#if ASYNC_BUG + fd = direct ? p->fd : p->fd_sync_read; + errorfd = direct ? -1 : p->fd_sync_error_read; + maxfd = (fd > errorfd) ? fd : errorfd; +#else + fd = p->fd; + errorfd = -1; + maxfd = (fd > errorfd) ? fd : errorfd; +#endif + + tv_timeout.tv_sec = p->timeout / 1000; + tv_timeout.tv_usec = (p->timeout % 1000) * 1000; + + tv = tv_timeout; /* select may have updated it */ + + FD_ZERO(&rfds); + FD_SET(fd, &rfds); + if (!direct) + { + FD_SET(errorfd, &rfds); + } + efds = rfds; + + result = port_select(p, maxfd + 1, &rfds, NULL, &efds, &tv, direct); + + if (result == 0) + { + return -RIG_ETIMEOUT; + } + else if (result < 0) + { + rig_debug(RIG_DEBUG_ERR, + "%s(): select() error: %s\n", + __func__, + strerror(errno)); + return -RIG_EIO; + } + + if (FD_ISSET(fd, &efds)) + { + rig_debug(RIG_DEBUG_ERR, "%s(): fd error\n", __func__); + return -RIG_EIO; + } + if (!direct) + { + if (FD_ISSET(errorfd, &efds)) + { + rig_debug(RIG_DEBUG_ERR, "%s(): fd error from sync error pipe\n", __func__); + return -RIG_EIO; + } + + if (FD_ISSET(errorfd, &rfds)) + { + return flush_and_read_last_byte(p, errorfd, 0); + } + } + + return RIG_OK; +} + +#ifdef ASYNC_BUG +int HAMLIB_API write_block_sync(hamlib_port_t *p, const unsigned char *txbuffer, size_t count) +{ + + if (!p->async) + { + return -RIG_EINTERNAL; + } + + return (int) write(p->fd_sync_write, txbuffer, count); +} + +int HAMLIB_API write_block_sync_error(hamlib_port_t *p, const unsigned char *txbuffer, size_t count) +{ + if (!p->async) + { + return -RIG_EINTERNAL; + } + + return (int) write(p->fd_sync_error_write, txbuffer, count); +} +#endif + #endif /** @@ -667,33 +1066,9 @@ int HAMLIB_API write_block(hamlib_port_t *p, const unsigned char *txbuffer, size return RIG_OK; } -#ifdef ASYNC_BUG -int HAMLIB_API write_block_sync(async_port_t *p, const unsigned char *txbuffer, size_t count) -{ - if (!p->async) - { - return -RIG_EINTERNAL; - } - - return (int) write(p->fd_sync_write, txbuffer, count); -} - -int HAMLIB_API write_block_sync_error(async_port_t *p, const unsigned char *txbuffer, size_t count) -{ - if (!p->async) - { - return -RIG_EINTERNAL; - } - - return (int) write(p->fd_sync_error_write, txbuffer, count); -} -#endif - static int read_block_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t count, int direct) { - fd_set rfds, efds; - int fd; - struct timeval tv, tv_timeout, start_time, end_time, elapsed_time; + struct timeval start_time, end_time, elapsed_time; int total_count = 0; rig_debug(RIG_DEBUG_VERBOSE, "%s called\n", __func__); @@ -707,40 +1082,27 @@ static int read_block_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t return -RIG_EINTERNAL; } -#ifdef ASYNC_GBUG - fd = direct ? p->fd : p->fd_sync_read; -#else - fd = p->fd; -#endif - - /* - * Wait up to timeout ms. - */ - tv_timeout.tv_sec = p->timeout / 1000; - tv_timeout.tv_usec = (p->timeout % 1000) * 1000; - /* Store the time of the read loop start */ gettimeofday(&start_time, NULL); while (count > 0) { - int retval; + int result; int rd_count; - tv = tv_timeout; /* select may have updated it */ - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - efds = rfds; + result = port_wait_for_data(p, direct); - retval = port_select(p, fd + 1, &rfds, NULL, &efds, &tv, direct); - - if (retval == 0) + if (result == -RIG_ETIMEOUT) { /* Record timeout time and calculate elapsed time */ gettimeofday(&end_time, NULL); timersub(&end_time, &start_time, &elapsed_time); - dump_hex((unsigned char *) rxbuffer, total_count); + if (direct) + { + dump_hex((unsigned char *) rxbuffer, total_count); + } + rig_debug(RIG_DEBUG_WARN, "%s(): Timed out %d.%d seconds after %d chars\n", __func__, @@ -751,26 +1113,14 @@ static int read_block_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t return -RIG_ETIMEOUT; } - if (retval < 0) + if (result < 0) { - dump_hex((unsigned char *) rxbuffer, total_count); - rig_debug(RIG_DEBUG_ERR, - "%s(): select() error after %d chars: %s\n", - __func__, - total_count, - strerror(errno)); - - return -RIG_EIO; - } - - if (FD_ISSET(fd, &efds)) - { - rig_debug(RIG_DEBUG_ERR, - "%s(): fd error after %d chars\n", - __func__, - total_count); - - return -RIG_EIO; + if (direct) + { + dump_hex((unsigned char *) rxbuffer, total_count); + } + rig_debug(RIG_DEBUG_ERR, "%s(): I/O error after %d chars: %d\n", __func__, total_count, result); + return result; } /* @@ -781,11 +1131,7 @@ static int read_block_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t if (rd_count < 0) { - rig_debug(RIG_DEBUG_ERR, - "%s(): read() failed - %s\n", - __func__, - strerror(errno)); - + rig_debug(RIG_DEBUG_ERR, "%s(): read failed - %s\n", __func__, strerror(errno)); return -RIG_EIO; } @@ -793,8 +1139,11 @@ static int read_block_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t count -= rd_count; } - rig_debug(RIG_DEBUG_TRACE, "%s(): RX %d bytes\n", __func__, total_count); - dump_hex((unsigned char *) rxbuffer, total_count); + if (direct) + { + rig_debug(RIG_DEBUG_TRACE, "%s(): RX %d bytes\n", __func__, total_count); + dump_hex((unsigned char *) rxbuffer, total_count); + } return total_count; /* return bytes count read */ } @@ -850,43 +1199,6 @@ int HAMLIB_API read_block_direct(hamlib_port_t *p, unsigned char *rxbuffer, size return read_block_generic(p, rxbuffer, count, 1); } -static int flush_and_read_last_byte(hamlib_port_t *p, int fd, int direct) -{ - fd_set rfds, efds; - ssize_t bytes_read; - struct timeval tv_timeout; - int retval; - char data; - - do { - tv_timeout.tv_sec = 0; - tv_timeout.tv_usec = 0; - - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - efds = rfds; - - retval = port_select(p, fd + 1, &rfds, NULL, &efds, &tv_timeout, direct); - if (retval < 0) - { - return -RIG_ETIMEOUT; - } - if (retval == 0) - { - return -RIG_EIO; - } - - if (FD_ISSET(fd, &efds)) - { - return -RIG_EIO; - } - - bytes_read = read(fd, &data, 1); - } while (bytes_read > 0); - - return data; -} - static int read_string_generic(hamlib_port_t *p, unsigned char *rxbuffer, size_t rxmax, @@ -896,9 +1208,7 @@ static int read_string_generic(hamlib_port_t *p, int expected_len, int direct) { - fd_set rfds, efds; - int fd, errorfd, maxfd; - struct timeval tv, tv_timeout, start_time, end_time, elapsed_time; + struct timeval start_time, end_time, elapsed_time; int total_count = 0; int i = 0; static int minlen = 1; // dynamic minimum length of rig response data @@ -927,23 +1237,6 @@ static int read_string_generic(hamlib_port_t *p, return 0; } -#if ASYNC_BUG - fd = direct ? p->fd : p->fd_sync_read; - errorfd = direct ? -1 : p->fd_sync_error_read; - maxfd = (fd > errorfd) ? fd : errorfd; -#else - fd = p->fd; - errorfd = -1; - maxfd = (fd > errorfd) ? fd : errorfd; -#endif - - /* - * Wait up to timeout ms. - */ - - tv_timeout.tv_sec = p->timeout / 1000; - tv_timeout.tv_usec = (p->timeout % 1000) * 1000; - /* Store the time of the read loop start */ gettimeofday(&start_time, NULL); @@ -952,21 +1245,11 @@ static int read_string_generic(hamlib_port_t *p, while (total_count < rxmax - 1) // allow 1 byte for end-of-string { ssize_t rd_count = 0; - int retval; + int result; - tv = tv_timeout; /* select may have updated it */ + result = port_wait_for_data(p, direct); - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - if (!direct) - { - FD_SET(errorfd, &rfds); - } - efds = rfds; - - retval = port_select(p, maxfd + 1, &rfds, NULL, &efds, &tv, direct); - - if (retval == 0) + if (result == -RIG_ETIMEOUT) { if (0 == total_count) { @@ -978,7 +1261,8 @@ static int read_string_generic(hamlib_port_t *p, { dump_hex((unsigned char *) rxbuffer, total_count); } - if (!flush_flag) { + if (!flush_flag) + { rig_debug(RIG_DEBUG_WARN, "%s(): Timed out %d.%03d seconds after %d chars\n", __func__, @@ -990,48 +1274,17 @@ static int read_string_generic(hamlib_port_t *p, return -RIG_ETIMEOUT; } - break; /* return what we have read */ + break; /* return what we have read */ } - if (retval < 0) + if (result < 0) { if (direct) { dump_hex(rxbuffer, total_count); } - rig_debug(RIG_DEBUG_ERR, - "%s(): select() error after %d chars: %s\n", - __func__, - total_count, - strerror(errno)); - - return -RIG_EIO; - } - - if (FD_ISSET(fd, &efds)) - { - rig_debug(RIG_DEBUG_ERR, - "%s(): fd error after %d chars\n", - __func__, - total_count); - - return -RIG_EIO; - } - if (!direct) - { - if (FD_ISSET(errorfd, &efds)) - { - rig_debug(RIG_DEBUG_ERR, - "%s(): fd error from sync error pipe after %d chars\n", - __func__, - total_count); - return -RIG_EIO; - } - - if (FD_ISSET(errorfd, &rfds)) - { - return flush_and_read_last_byte(p, errorfd, 0); - } + rig_debug(RIG_DEBUG_ERR, "%s(): I/O error after %d chars: %d\n", __func__, total_count, result); + return result; } /* @@ -1057,10 +1310,7 @@ static int read_string_generic(hamlib_port_t *p, { dump_hex((unsigned char *) rxbuffer, total_count); } - rig_debug(RIG_DEBUG_ERR, - "%s(): read() failed - %s\n", - __func__, - strerror(errno)); + rig_debug(RIG_DEBUG_ERR, "%s(): read failed - %s\n", __func__, strerror(errno)); return -RIG_EIO; } @@ -1096,10 +1346,9 @@ static int read_string_generic(hamlib_port_t *p, "%s(): RX %d characters\n", __func__, total_count); + dump_hex((unsigned char *) rxbuffer, total_count); } - dump_hex((unsigned char *) rxbuffer, total_count); - return total_count; /* return bytes count read */ } diff --git a/src/misc.c b/src/misc.c index 21283a1db..3b2e40cd5 100644 --- a/src/misc.c +++ b/src/misc.c @@ -58,6 +58,13 @@ #include "serial.h" #include "network.h" +#if defined(_WIN32) +# include +# ifndef localtime_r +# define localtime_r(T,Tm) (localtime_s(Tm,T) ? NULL : Tm) +# endif +#endif + #ifdef __APPLE__ #include diff --git a/src/network.c b/src/network.c index 206415e67..eb3175d92 100644 --- a/src/network.c +++ b/src/network.c @@ -79,6 +79,7 @@ #include #include "network.h" #include "misc.h" +#include "asyncpipe.h" #include "snapshot_data.h" #ifdef HAVE_WINDOWS_H @@ -113,8 +114,12 @@ typedef struct multicast_publisher_args_s const char *multicast_addr; int multicast_port; +#if defined(WIN32) && defined(HAVE_WINDOWS_H) + hamlib_async_pipe_t *data_pipe; +#else int data_write_fd; int data_read_fd; +#endif } multicast_publisher_args; typedef struct multicast_publisher_priv_data_s @@ -447,15 +452,45 @@ extern void sync_callback(int lock); #ifdef HAVE_PTHREAD //! @cond Doxygen_Suppress -static int multicast_publisher_write_data(int fd, size_t length, const unsigned char *data) +#define MULTICAST_DATA_PIPE_TIMEOUT_MILLIS 1000 + +#if defined(WIN32) && defined(HAVE_WINDOWS_H) + +static int multicast_publisher_create_data_pipe(multicast_publisher_priv_data *mcast_publisher_priv) +{ + int status; + + ENTERFUNC; + + status = async_pipe_create(&mcast_publisher_priv->args.data_pipe, PIPE_BUFFER_SIZE_DEFAULT, MULTICAST_DATA_PIPE_TIMEOUT_MILLIS); + if (status != 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: multicast publisher data pipe creation failed with status=%d, err=%s\n", __func__, + status, strerror(errno)); + RETURNFUNC(-RIG_EINTERNAL); + } + + RETURNFUNC(RIG_OK); +} + +static void multicast_publisher_close_data_pipe(multicast_publisher_priv_data *mcast_publisher_priv) +{ + ENTERFUNC; + + if (mcast_publisher_priv->args.data_pipe != NULL) { + async_pipe_close(mcast_publisher_priv->args.data_pipe); + mcast_publisher_priv->args.data_pipe = NULL; + } +} + +static int multicast_publisher_write_data(multicast_publisher_args *mcast_publisher_args, size_t length, const unsigned char *data) { ssize_t result; - result = write(fd, data, length); + result = async_pipe_write(mcast_publisher_args->data_pipe, data, length, MULTICAST_DATA_PIPE_TIMEOUT_MILLIS); if (result < 0) { - rig_debug(RIG_DEBUG_ERR, "%s: error writing to multicast publisher data pipe, status=%d, err=%s\n", __func__, - (int)result, strerror(errno)); + rig_debug(RIG_DEBUG_ERR, "%s: error writing to multicast publisher data pipe, result=%d\n", __func__, (int)result); RETURNFUNC(-RIG_EIO); } @@ -469,14 +504,114 @@ static int multicast_publisher_write_data(int fd, size_t length, const unsigned RETURNFUNC(RIG_OK); } -static int multicast_publisher_read_data(int fd, size_t length, unsigned char *data) +static int multicast_publisher_read_data(multicast_publisher_args *mcast_publisher_args, size_t length, unsigned char *data) { + ssize_t result; + + result = async_pipe_wait_for_data(mcast_publisher_args->data_pipe, MULTICAST_DATA_PIPE_TIMEOUT_MILLIS); + if (result < 0) + { + // Timeout is expected when there is no data + if (result != -RIG_ETIMEOUT) + { + rig_debug(RIG_DEBUG_ERR, "%s: error waiting for multicast publisher data, result=%ld\n", __func__, (long) result); + } + RETURNFUNC(result); + } + + result = async_pipe_read(mcast_publisher_args->data_pipe, data, length, MULTICAST_DATA_PIPE_TIMEOUT_MILLIS); + if (result < 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: error reading multicast publisher data, result=%ld\n", __func__, (long) result); + RETURNFUNC(-RIG_EIO); + } + + if (result != length) + { + rig_debug(RIG_DEBUG_ERR, "%s: could not read from multicast publisher data pipe, expected %ld bytes, read %ld bytes\n", + __func__, (long) length, (long) result); + RETURNFUNC(-RIG_EIO); + } + + RETURNFUNC(RIG_OK); +} + +#else + +static int multicast_publisher_create_data_pipe(multicast_publisher_priv_data *mcast_publisher_priv) +{ + int data_pipe_fds[2]; + int status; + + ENTERFUNC; + + status = pipe(data_pipe_fds); + if (status != 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: multicast publisher data pipe creation failed with status=%d, err=%s\n", __func__, + status, strerror(errno)); + RETURNFUNC(-RIG_EINTERNAL); + } + + int flags = fcntl(data_pipe_fds[0], F_GETFD); + flags |= O_NONBLOCK; + if (fcntl(data_pipe_fds[0], F_SETFD, flags)) + { + rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on pipe=%s\n", __func__, strerror(errno)); + } + + mcast_publisher_priv->args.data_read_fd = data_pipe_fds[0]; + mcast_publisher_priv->args.data_write_fd = data_pipe_fds[1]; + + RETURNFUNC(RIG_OK); +} + +static void multicast_publisher_close_data_pipe(multicast_publisher_priv_data *mcast_publisher_priv) +{ + ENTERFUNC; + + if (mcast_publisher_priv->args.data_read_fd != -1) { + close(mcast_publisher_priv->args.data_read_fd); + mcast_publisher_priv->args.data_read_fd = -1; + } + if (mcast_publisher_priv->args.data_write_fd != -1) { + close(mcast_publisher_priv->args.data_write_fd); + mcast_publisher_priv->args.data_write_fd = -1; + } +} + +static int multicast_publisher_write_data(multicast_publisher_args *mcast_publisher_args, size_t length, const unsigned char *data) +{ + int fd = mcast_publisher_args->data_write_fd; + ssize_t result; + + result = write(fd, data, length); + if (result < 0) + { + rig_debug(RIG_DEBUG_ERR, "%s: error writing to multicast publisher data pipe, result=%d, err=%s\n", __func__, + (int)result, strerror(errno)); + RETURNFUNC(-RIG_EIO); + } + + if (result != length) + { + rig_debug(RIG_DEBUG_ERR, "%s: could not write to multicast publisher data pipe, expected %ld bytes, wrote %ld bytes\n", + __func__, (long) length, (long) result); + RETURNFUNC(-RIG_EIO); + } + + RETURNFUNC(RIG_OK); +} + +static int multicast_publisher_read_data(multicast_publisher_args *mcast_publisher_args, size_t length, unsigned char *data) +{ + int fd = mcast_publisher_args->data_read_fd; fd_set rfds, efds; struct timeval timeout; ssize_t result; int retval; - timeout.tv_sec = 1; + timeout.tv_sec = MULTICAST_DATA_PIPE_TIMEOUT_MILLIS / 1000; timeout.tv_usec = 0; FD_ZERO(&rfds); @@ -519,18 +654,21 @@ static int multicast_publisher_read_data(int fd, size_t length, unsigned char *d if (result != length) { - rig_debug(RIG_DEBUG_ERR, "%s: could not read from multicast publisher data pipe, expected %d bytes, read %d bytes\n", - __func__, (int)length, (int)result); + rig_debug(RIG_DEBUG_ERR, "%s: could not read from multicast publisher data pipe, expected %ld bytes, read %ld bytes\n", + __func__, (long) length, (long) result); RETURNFUNC(-RIG_EIO); } RETURNFUNC(RIG_OK); } +#endif + static int multicast_publisher_write_packet_header(RIG *rig, multicast_publisher_data_packet *packet) { struct rig_state *rs = &rig->state; multicast_publisher_priv_data *mcast_publisher_priv; + multicast_publisher_args *mcast_publisher_args; ssize_t result; if (rs->multicast_publisher_priv_data == NULL) @@ -540,9 +678,10 @@ static int multicast_publisher_write_packet_header(RIG *rig, multicast_publisher } mcast_publisher_priv = (multicast_publisher_priv_data *) rs->multicast_publisher_priv_data; + mcast_publisher_args = &mcast_publisher_priv->args; result = multicast_publisher_write_data( - mcast_publisher_priv->args.data_write_fd, sizeof(multicast_publisher_data_packet), (unsigned char *) packet); + mcast_publisher_args, sizeof(multicast_publisher_data_packet), (unsigned char *) packet); if (result != RIG_OK) { RETURNFUNC(result); @@ -553,21 +692,37 @@ static int multicast_publisher_write_packet_header(RIG *rig, multicast_publisher int network_publish_rig_poll_data(RIG *rig) { + struct rig_state *rs = &rig->state; multicast_publisher_data_packet packet = { .type = MULTICAST_PUBLISHER_DATA_PACKET_TYPE_POLL, .padding = 0, .data_length = 0, }; + + if (rs->multicast_publisher_priv_data == NULL) + { + // Silently ignore call if multicast publisher is not enabled + return RIG_OK; + } + return multicast_publisher_write_packet_header(rig, &packet); } int network_publish_rig_transceive_data(RIG *rig) { + struct rig_state *rs = &rig->state; multicast_publisher_data_packet packet = { .type = MULTICAST_PUBLISHER_DATA_PACKET_TYPE_TRANSCEIVE, .padding = 0, .data_length = 0, }; + + if (rs->multicast_publisher_priv_data == NULL) + { + // Silently ignore call if multicast publisher is not enabled + return RIG_OK; + } + return multicast_publisher_write_packet_header(rig, &packet); } @@ -576,6 +731,7 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) int result; struct rig_state *rs = &rig->state; multicast_publisher_priv_data *mcast_publisher_priv; + multicast_publisher_args *mcast_publisher_args; multicast_publisher_data_packet packet = { .type = MULTICAST_PUBLISHER_DATA_PACKET_TYPE_SPECTRUM, .padding = 0, @@ -584,8 +740,8 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) if (rs->multicast_publisher_priv_data == NULL) { - // Silently ignore if multicast publisher is not enabled - RETURNFUNC(RIG_OK); + // Silently ignore call if multicast publisher is not enabled + return RIG_OK; } result = multicast_publisher_write_packet_header(rig, &packet); @@ -595,16 +751,17 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) } mcast_publisher_priv = (multicast_publisher_priv_data *) rs->multicast_publisher_priv_data; + mcast_publisher_args = &mcast_publisher_priv->args; result = multicast_publisher_write_data( - mcast_publisher_priv->args.data_write_fd, sizeof(struct rig_spectrum_line), (unsigned char *) line); + mcast_publisher_args, sizeof(struct rig_spectrum_line), (unsigned char *) line); if (result != RIG_OK) { RETURNFUNC(result); } result = multicast_publisher_write_data( - mcast_publisher_priv->args.data_write_fd, line->spectrum_data_length, line->spectrum_data); + mcast_publisher_args, line->spectrum_data_length, line->spectrum_data); if (result != RIG_OK) { RETURNFUNC(result); @@ -613,12 +770,13 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) RETURNFUNC(RIG_OK); } -static int multicast_publisher_read_packet(int fd, uint8_t *type, struct rig_spectrum_line *spectrum_line, unsigned char *spectrum_data) +static int multicast_publisher_read_packet(multicast_publisher_args *mcast_publisher_args, + uint8_t *type, struct rig_spectrum_line *spectrum_line, unsigned char *spectrum_data) { int result; multicast_publisher_data_packet packet; - result = multicast_publisher_read_data(fd, sizeof(packet), (unsigned char *) &packet); + result = multicast_publisher_read_data(mcast_publisher_args, sizeof(packet), (unsigned char *) &packet); if (result < 0) { RETURNFUNC(result); @@ -631,7 +789,7 @@ static int multicast_publisher_read_packet(int fd, uint8_t *type, struct rig_spe break; case MULTICAST_PUBLISHER_DATA_PACKET_TYPE_SPECTRUM: result = multicast_publisher_read_data( - fd, sizeof(struct rig_spectrum_line), (unsigned char *) spectrum_line); + mcast_publisher_args, sizeof(struct rig_spectrum_line), (unsigned char *) spectrum_line); if (result < 0) { RETURNFUNC(result); @@ -646,7 +804,7 @@ static int multicast_publisher_read_packet(int fd, uint8_t *type, struct rig_spe spectrum_line->spectrum_data = spectrum_data; - result = multicast_publisher_read_data(fd, spectrum_line->spectrum_data_length, spectrum_data); + result = multicast_publisher_read_data(mcast_publisher_args, spectrum_line->spectrum_data_length, spectrum_data); if (result < 0) { RETURNFUNC(result); @@ -687,7 +845,7 @@ void *multicast_publisher(void *arg) while (rs->multicast_publisher_run) { - result = multicast_publisher_read_packet(args->data_read_fd, &packet_type, &spectrum_line, spectrum_data); + result = multicast_publisher_read_packet(args, &packet_type, &spectrum_line, spectrum_data); if (result != RIG_OK) { if (result == -RIG_ETIMEOUT) @@ -729,18 +887,6 @@ void *multicast_publisher(void *arg) return NULL; } -static void multicast_publisher_close_data_pipe(multicast_publisher_priv_data *mcast_publisher_priv) -{ - if (mcast_publisher_priv->args.data_read_fd != -1) { - close(mcast_publisher_priv->args.data_read_fd); - mcast_publisher_priv->args.data_read_fd = -1; - } - if (mcast_publisher_priv->args.data_write_fd != -1) { - close(mcast_publisher_priv->args.data_write_fd); - mcast_publisher_priv->args.data_write_fd = -1; - } -} - //! @endcond /** @@ -758,7 +904,6 @@ int network_multicast_publisher_start(RIG *rig, const char *multicast_addr, struct rig_state *rs = &rig->state; multicast_publisher_priv_data *mcast_publisher_priv; int socket_fd; - int data_pipe_fds[2]; int status; ENTERFUNC; @@ -768,9 +913,8 @@ int network_multicast_publisher_start(RIG *rig, const char *multicast_addr, if (strcmp(multicast_addr, "0.0.0.0") == 0) { - rig_debug(RIG_DEBUG_TRACE, "%s(%d): not starting multicast\n", __FILE__, - __LINE__); - return RIG_OK; // don't start it + rig_debug(RIG_DEBUG_TRACE, "%s(%d): not starting multicast publisher\n", __FILE__, __LINE__); + return RIG_OK; } if (rs->multicast_publisher_priv_data != NULL) @@ -817,38 +961,21 @@ int network_multicast_publisher_start(RIG *rig, const char *multicast_addr, RETURNFUNC(-RIG_ENOMEM); } -#ifdef HAVE_WINDOWS_H - // Need to replace this with overlapped I/O to achieve O_NONBLOCK - status = _pipe(data_pipe_fds, 256, O_BINARY); -#else - status = pipe(data_pipe_fds); -#endif - if (status != 0) - { - free(rs->multicast_publisher_priv_data); - rs->multicast_publisher_priv_data = NULL; - close(socket_fd); - rig_debug(RIG_DEBUG_ERR, "%s: multicast publisher data pipe open status=%d, err=%s\n", __func__, - status, strerror(errno)); - RETURNFUNC(-RIG_EINTERNAL); - } -#ifndef HAVE_WINDOWS_H - int flags = fcntl(data_pipe_fds[0], F_GETFD); - flags |= O_NONBLOCK; - if (fcntl(data_pipe_fds[0], F_SETFD, flags)) - { - rig_debug(RIG_DEBUG_ERR, "%s: error setting O_NONBLOCK on pipe=%s\n", __func__, strerror(errno)); - } -#endif - - mcast_publisher_priv = (multicast_publisher_priv_data *) rs->multicast_publisher_priv_data; mcast_publisher_priv->args.socket_fd = socket_fd; mcast_publisher_priv->args.multicast_addr = multicast_addr; mcast_publisher_priv->args.multicast_port = multicast_port; mcast_publisher_priv->args.rig = rig; - mcast_publisher_priv->args.data_read_fd = data_pipe_fds[0]; - mcast_publisher_priv->args.data_write_fd = data_pipe_fds[1]; + + status = multicast_publisher_create_data_pipe(mcast_publisher_priv); + if (status < 0) + { + free(rs->multicast_publisher_priv_data); + rs->multicast_publisher_priv_data = NULL; + close(socket_fd); + rig_debug(RIG_DEBUG_ERR, "%s: multicast publisher data pipe creation failed, result=%d\n", __func__, status); + RETURNFUNC(-RIG_EINTERNAL); + } int err = pthread_create(&mcast_publisher_priv->thread_id, NULL, multicast_publisher, &mcast_publisher_priv->args); diff --git a/src/rig.c b/src/rig.c index 3d35b65d6..d4780b557 100644 --- a/src/rig.c +++ b/src/rig.c @@ -452,21 +452,16 @@ RIG *HAMLIB_API rig_init(rig_model_t rig_model) */ rs = &rig->state; + rs->async_data_enabled = 0; rs->rigport.fd = -1; rs->pttport.fd = -1; rs->comm_state = 0; rig_debug(RIG_DEBUG_VERBOSE, "%s: rs->comm_state==0?=%d\n", __func__, rs->comm_state); rs->rigport.type.rig = caps->port_type; /* default from caps */ -#ifdef HAVE_PTHREAD - rs->asyncport.async = caps->async_data_supported; -#else +#if defined(ASYNC_BUG) && defined(HAVE_PTHREAD) rs->rigport.async = 0; #endif - rs->asyncport.fd_sync_write = -1; - rs->asyncport.fd_sync_read = -1; - rs->asyncport.fd_sync_error_write = -1; - rs->asyncport.fd_sync_error_read = -1; switch (caps->port_type) { @@ -515,11 +510,6 @@ RIG *HAMLIB_API rig_init(rig_model_t rig_model) rs->vfo_comp = 0.0; /* override it with preferences */ rs->current_vfo = RIG_VFO_CURR; /* we don't know yet! */ rs->tx_vfo = RIG_VFO_CURR; /* we don't know yet! */ -#ifdef HAVE_PTHREAD - rs->async_data = caps->async_data_supported; -#else - rs->async_data = 0; -#endif rs->poll_interval = 0; // disable polling by default rs->lo_freq = 0; rs->cache.timeout_ms = 500; // 500ms cache timeout by default @@ -725,6 +715,12 @@ int HAMLIB_API rig_open(RIG *rig) rs = &rig->state; rs->rigport.rig = rig; +#if defined(ASYNC_BUG) && defined(HAVE_PTHREAD) + // Enable async data only if it's enabled through conf settings *and* supported by the backend + rs->async_data_enabled = rs->async_data_enabled && caps->async_data_supported; + rs->rigport.async = rs->async_data_enabled; +#endif + if (strlen(rs->rigport.pathname) > 0) { char hoststr[256], portstr[6]; @@ -1047,7 +1043,6 @@ int HAMLIB_API rig_open(RIG *rig) RETURNFUNC(status); } -#if !defined(WIN32) #ifdef ASYNC_BUG status = async_data_handler_start(rig); @@ -1057,7 +1052,6 @@ int HAMLIB_API rig_open(RIG *rig) RETURNFUNC(status); } -#endif #endif add_opened_rig(rig); @@ -6884,39 +6878,39 @@ HAMLIB_EXPORT(void) sync_callback(int lock) #ifdef ASYNC_BUG static int async_data_handler_start(RIG *rig) { - const struct rig_caps *caps = rig->caps; struct rig_state *rs = &rig->state; async_data_handler_priv_data *async_data_handler_priv; ENTERFUNC; + if (!rs->async_data_enabled) + { + rig_debug(RIG_DEBUG_TRACE, "%s: async data support disabled\n", __func__); + RETURNFUNC(RIG_OK); + } + #ifdef ASYNC_BUG #ifdef HAVE_PTHREAD - if (caps->async_data_supported) + rs->async_data_handler_thread_run = 1; + rs->async_data_handler_priv_data = calloc(1, + sizeof(async_data_handler_priv_data)); + + if (rs->async_data_handler_priv_data == NULL) { - rs->async_data_handler_thread_run = 1; - rs->async_data_handler_priv_data = calloc(1, - sizeof(async_data_handler_priv_data)); + RETURNFUNC(-RIG_ENOMEM); + } - if (rs->async_data_handler_priv_data == NULL) - { - RETURNFUNC(-RIG_ENOMEM); - } + async_data_handler_priv = (async_data_handler_priv_data *) + rs->async_data_handler_priv_data; + async_data_handler_priv->args.rig = rig; + int err = pthread_create(&async_data_handler_priv->thread_id, NULL, + async_data_handler, &async_data_handler_priv->args); - async_data_handler_priv = (async_data_handler_priv_data *) - rs->async_data_handler_priv_data; - async_data_handler_priv->args.rig = rig; - int err = pthread_create(&async_data_handler_priv->thread_id, NULL, - async_data_handler, &async_data_handler_priv->args); - - if (err) - { - rig_debug(RIG_DEBUG_ERR, "%s(%d) pthread_create error: %s\n", __FILE__, - __LINE__, - strerror(errno)); - RETURNFUNC(-RIG_EINTERNAL); - } + if (err) + { + rig_debug(RIG_DEBUG_ERR, "%s: pthread_create error: %s\n", __func__, strerror(errno)); + RETURNFUNC(-RIG_EINTERNAL); } #endif @@ -6947,8 +6941,7 @@ static int async_data_handler_stop(RIG *rig) if (err) { - rig_debug(RIG_DEBUG_ERR, "%s(%d): pthread_join error: %s\n", __FILE__, __LINE__, - strerror(errno)); + rig_debug(RIG_DEBUG_ERR, "%s: pthread_join error: %s\n", __func__, strerror(errno)); // just ignore the error } @@ -6977,9 +6970,7 @@ void *async_data_handler(void *arg) int result; #endif - rig_debug(RIG_DEBUG_VERBOSE, "%s(%d): Starting async data handler thread\n", - __FILE__, - __LINE__); + rig_debug(RIG_DEBUG_VERBOSE, "%s: Starting async data handler thread\n", __func__); // TODO: check how to enable "transceive" on recent Kenwood/Yaesu rigs // TODO: add initial support for async in Kenwood kenwood_transaction (+one) functions -> add transaction_active flag usage @@ -7049,9 +7040,7 @@ void *async_data_handler(void *arg) #endif - rig_debug(RIG_DEBUG_VERBOSE, "%s(%d): Stopping async data handler thread\n", - __FILE__, - __LINE__); + rig_debug(RIG_DEBUG_VERBOSE, "%s: Stopping async data handler thread\n", __func__); return NULL; } diff --git a/src/token.h b/src/token.h index b73174a69..46b80f19e 100644 --- a/src/token.h +++ b/src/token.h @@ -91,6 +91,9 @@ #define TOK_PTT_SHARE TOKEN_FRONTEND(35) /** \brief PTT share with other applications */ #define TOK_FLUSHX TOKEN_FRONTEND(36) +/** \brief Asynchronous data transfer support */ +#define TOK_ASYNC TOKEN_FRONTEND(37) + /* * rig specific tokens */