From 00cb0eccdc666e101b64cb393891863522ca5d9a Mon Sep 17 00:00:00 2001 From: Mikael Nousiainen Date: Wed, 22 Nov 2023 18:41:55 +0200 Subject: [PATCH] Fix multicast publisher write commands to use a mutex lock to prevent corruption in (spectrum) data written with multiple write() calls. Remove unnecessary hl_usleep() calls in Icom backend, as the frame parser will correctly detect transceive messages. --- rigs/icom/icom.c | 41 +++++++++++++------------- rigs/yaesu/newcat.c | 1 + src/iofunc.c | 10 +------ src/network.c | 72 ++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 87 insertions(+), 37 deletions(-) diff --git a/rigs/icom/icom.c b/rigs/icom/icom.c index 5e1284a75..7ff78ddcc 100644 --- a/rigs/icom/icom.c +++ b/rigs/icom/icom.c @@ -1297,6 +1297,8 @@ int icom_band_changing(RIG *rig, freq_t test_freq) freq_t curr_freq, freq1, freq2; int retval; + ENTERFUNC2; + // We should be sitting on the VFO we want to change so just get it's frequency retval = rig_get_freq(rig, RIG_VFO_CURR, &curr_freq); @@ -1347,7 +1349,7 @@ static int icom_set_freq_x25(RIG *rig, vfo_t vfo, freq_t freq, int freq_len, uns if ((retval = icom_check_ack(ack_len, ackbuf)) != RIG_OK) { - RETURNFUNC2(retval); + return retval; } return retval; @@ -1429,6 +1431,8 @@ int icom_set_freq(RIG *rig, vfo_t vfo, freq_t freq) vfo_t vfo_save = rs->current_vfo; freq_t curr_freq; + ENTERFUNC2; + rig_debug(RIG_DEBUG_VERBOSE, "%s called %s=%" PRIfreq "\n", __func__, rig_strvfo(vfo), freq); @@ -1493,9 +1497,9 @@ int icom_set_freq(RIG *rig, vfo_t vfo, freq_t freq) if (retval != RIG_OK) { rig_debug(RIG_DEBUG_ERR, "%s: set_freq failed: %s\n", __func__, rigerror(retval)); - return retval; + RETURNFUNC2(retval); } - return RIG_OK; + RETURNFUNC2(RIG_OK); } else { @@ -1512,9 +1516,6 @@ int icom_set_freq(RIG *rig, vfo_t vfo, freq_t freq) check_ack = 1; } - // pause for transceive message and we'll flush it - hl_usleep(50 * 1000); - if (retval != RIG_OK) { // We might have a failed command if we're changing bands @@ -2135,11 +2136,9 @@ static int icom_set_mode_x26(RIG *rig, vfo_t vfo, rmode_t mode, int datamode, unsigned char ackbuf[MAXFRAMELEN]; int ack_len = sizeof(ackbuf); - ENTERFUNC; - if (priv->x26cmdfails > 0 && !priv_caps->x25x26_always) { - RETURNFUNC(-RIG_ENAVAIL); + return -RIG_ENAVAIL; } buf[0] = mode; @@ -2161,10 +2160,10 @@ static int icom_set_mode_x26(RIG *rig, vfo_t vfo, rmode_t mode, int datamode, if ((retval = icom_check_ack(ack_len, ackbuf)) != RIG_OK) { - RETURNFUNC2(retval); + return retval; } - RETURNFUNC(RIG_OK); + return RIG_OK; } static int icom_get_mode_x26(RIG *rig, vfo_t vfo, int *mode_len, unsigned char *modebuf) @@ -2173,11 +2172,9 @@ static int icom_get_mode_x26(RIG *rig, vfo_t vfo, int *mode_len, unsigned char * const struct icom_priv_caps *priv_caps = rig->caps->priv; int retval; - ENTERFUNC; - if (priv->x26cmdfails > 0 && !priv_caps->x25x26_always) { - RETURNFUNC(-RIG_ENAVAIL); + return -RIG_ENAVAIL; } int vfo_number = icom_get_vfo_number_x25x26(rig, vfo); @@ -2193,14 +2190,14 @@ static int icom_get_mode_x26(RIG *rig, vfo_t vfo, int *mode_len, unsigned char * if (retval != RIG_OK) { - RETURNFUNC(retval); + return retval; } rig_debug(RIG_DEBUG_TRACE, "%s: mode_len=%d, modebuf=%02x %02x %02x %02x %02x\n", __func__, *mode_len, modebuf[0], modebuf[1], modebuf[2], modebuf[3], modebuf[4]); - RETURNFUNC(RIG_OK); + return RIG_OK; } /* @@ -2238,7 +2235,7 @@ int icom_set_mode(RIG *rig, vfo_t vfo, rmode_t mode, pbwidth_t width) retval = set_vfo_curr(rig, vfo, rig->state.current_vfo); if (retval != RIG_OK) { - RETURNFUNC2(retval); + RETURNFUNC(retval); } } @@ -2255,7 +2252,7 @@ int icom_set_mode(RIG *rig, vfo_t vfo, rmode_t mode, pbwidth_t width) retval = retval2; } } - RETURNFUNC2(retval); + RETURNFUNC(retval); } // Do nothing if current mode and width is not changing @@ -2309,8 +2306,6 @@ int icom_set_mode(RIG *rig, vfo_t vfo, rmode_t mode, pbwidth_t width) retval = RIG_OK; } - hl_usleep(50 * 1000); // pause for possible transceive message which we'll flush - if (retval == RIG_OK && mode != current_mode) { unsigned char datamode[2]; @@ -2335,7 +2330,7 @@ int icom_set_mode(RIG *rig, vfo_t vfo, rmode_t mode, pbwidth_t width) if (retval < 0) { rig_debug(RIG_DEBUG_ERR, "%s: error on rig2icom_mode, result=%d\n", __func__, retval); - RETURNFUNC2(retval); + RETURNFUNC(retval); } // Check if the filter width byte is needed @@ -2421,6 +2416,8 @@ static int icom_get_mode_without_data(RIG *rig, vfo_t vfo, rmode_t *mode, pbwidt int mode_len; int retval; + ENTERFUNC2; + rig_debug(RIG_DEBUG_VERBOSE, "%s called vfo=%s\n", __func__, rig_strvfo(vfo)); *width = 0; @@ -2549,6 +2546,8 @@ int icom_get_mode(RIG *rig, vfo_t vfo, rmode_t *mode, pbwidth_t *width) int force_vfo_swap = 0; vfo_t vfo_save = rs->current_vfo; + ENTERFUNC2; + rig_debug(RIG_DEBUG_VERBOSE, "%s called vfo=%s\n", __func__, rig_strvfo(vfo)); // Icom 0x26 command can only manipulate VFO A/B *or* VFO Main/Sub modes. diff --git a/rigs/yaesu/newcat.c b/rigs/yaesu/newcat.c index 556e065d2..f0d729a5b 100644 --- a/rigs/yaesu/newcat.c +++ b/rigs/yaesu/newcat.c @@ -961,6 +961,7 @@ int newcat_set_freq(RIG *rig, vfo_t vfo, freq_t freq) priv = (struct newcat_priv_data *)rig->state.priv; caps = rig->caps; + // TODO: this is likely a bug, should call get_vfo_mode() rig_get_mode(rig, RIG_VFO_A, &tmode, &twidth); if (tmode == RIG_VFO_MEM) diff --git a/src/iofunc.c b/src/iofunc.c index 4d58694c4..459f72275 100644 --- a/src/iofunc.c +++ b/src/iofunc.c @@ -1335,16 +1335,12 @@ static int read_string_generic(hamlib_port_t *p, memset(rxbuffer, 0, rxmax); short timeout_retries = p->timeout_retry; - //HAMLIB_TRACE2; + while (total_count < rxmax - 1) // allow 1 byte for end-of-string { ssize_t rd_count = 0; int result; - int timeout_save = p->timeout; -// p->timeout = 2; result = port_wait_for_data(p, direct); - //HAMLIB_TRACE2; - p->timeout = timeout_save; if (result == -RIG_ETIMEOUT) { @@ -1354,7 +1350,6 @@ static int read_string_generic(hamlib_port_t *p, rig_debug(RIG_DEBUG_CACHE, "%s(%d): retrying read timeout %d/%d timeout=%d\n", __func__, __LINE__, p->timeout_retry - timeout_retries, p->timeout_retry, p->timeout); hl_usleep(10 * 1000); - //HAMLIB_TRACE2; continue; } @@ -1412,11 +1407,9 @@ static int read_string_generic(hamlib_port_t *p, //rig_debug(RIG_DEBUG_ERR, "xs: avail=%d expected_len=%d, minlen=%d, direct=%d\n", __func__, avail, expected_len, minlen, direct); #endif #endif - //HAMLIB_TRACE2; shortcut: rd_count = port_read_generic(p, &rxbuffer[total_count], expected_len == 1 ? 1 : minlen, direct); - //HAMLIB_TRACE2; // rig_debug(RIG_DEBUG_VERBOSE, "%s: read %d bytes tot=%d\n", __func__, (int)rd_count, total_count); minlen -= rd_count; @@ -1443,7 +1436,6 @@ static int read_string_generic(hamlib_port_t *p, } while (++i < 10 && errno == EBUSY); // 50ms should be enough - //HAMLIB_TRACE2; /* if we get 0 bytes or an error something is wrong */ if (rd_count <= 0) diff --git a/src/network.c b/src/network.c index 19cea6a73..bb129472c 100644 --- a/src/network.c +++ b/src/network.c @@ -118,6 +118,10 @@ typedef struct multicast_publisher_args_s int data_write_fd; int data_read_fd; #endif + +#ifdef HAVE_PTHREAD + pthread_mutex_t write_lock; +#endif } multicast_publisher_args; typedef struct multicast_publisher_priv_data_s @@ -650,6 +654,22 @@ static int multicast_publisher_write_data(const multicast_publisher_args return (RIG_OK); } +static void multicast_publisher_write_lock(RIG *rig) +{ + struct rig_state *rs = &rig->state; + multicast_publisher_priv_data *priv_data = (multicast_publisher_priv_data *) + rs->multicast_publisher_priv_data; + pthread_mutex_lock(&priv_data->args.write_lock); +} + +static void multicast_publisher_write_unlock(RIG *rig) +{ + struct rig_state *rs = &rig->state; + multicast_publisher_priv_data *priv_data = (multicast_publisher_priv_data *) + rs->multicast_publisher_priv_data; + pthread_mutex_unlock(&priv_data->args.write_lock); +} + static int multicast_publisher_read_data(const multicast_publisher_args *mcast_publisher_args, size_t length, unsigned char *data) { @@ -658,7 +678,11 @@ static int multicast_publisher_read_data(const multicast_publisher_args struct timeval timeout; ssize_t result; int retval; + int retries = 2; + size_t offset = 0; + size_t length_left = length; +retry: timeout.tv_sec = MULTICAST_DATA_PIPE_TIMEOUT_MILLIS / 1000; timeout.tv_usec = 0; @@ -690,7 +714,7 @@ static int multicast_publisher_read_data(const multicast_publisher_args return -RIG_EIO; } - result = read(fd, data, length); + result = read(fd, data + offset, length_left); if (result < 0) { @@ -704,11 +728,25 @@ static int multicast_publisher_read_data(const multicast_publisher_args return (-RIG_EIO); } - if (result != length) + offset += result; + length_left -= result; + + if (length_left > 0) { + if (retries > 0) + { + // Execution of this routine may time out between writes to pipe, retry to get more data + rig_debug(RIG_DEBUG_VERBOSE, + "%s: could not read from multicast publisher data pipe, expected %ld bytes, read %ld bytes, retrying...\n", + __func__, (long) length, (long) offset); + retries--; + goto retry; + } + rig_debug(RIG_DEBUG_ERR, - "%s: could not read from multicast publisher data pipe, expected %ld bytes, read %ld bytes\n", - __func__, (long) length, (long) result); + "%s: could not read from multicast publisher data pipe even after retries, expected %ld bytes, read %ld bytes\n", + __func__, (long) length, (long) offset); + return (-RIG_EIO); } @@ -751,6 +789,7 @@ static int multicast_publisher_write_packet_header(RIG *rig, int network_publish_rig_poll_data(RIG *rig) { const struct rig_state *rs = &rig->state; + int result; multicast_publisher_data_packet packet = { .type = MULTICAST_PUBLISHER_DATA_PACKET_TYPE_POLL, @@ -764,13 +803,17 @@ int network_publish_rig_poll_data(RIG *rig) return RIG_OK; } - return multicast_publisher_write_packet_header(rig, &packet); + multicast_publisher_write_lock(rig); + result = multicast_publisher_write_packet_header(rig, &packet); + multicast_publisher_write_unlock(rig); + return result; } // cppcheck-suppress unusedFunction int network_publish_rig_transceive_data(RIG *rig) { const struct rig_state *rs = &rig->state; + int result; multicast_publisher_data_packet packet = { .type = MULTICAST_PUBLISHER_DATA_PACKET_TYPE_TRANSCEIVE, @@ -784,7 +827,10 @@ int network_publish_rig_transceive_data(RIG *rig) return RIG_OK; } - return multicast_publisher_write_packet_header(rig, &packet); + multicast_publisher_write_lock(rig); + result = multicast_publisher_write_packet_header(rig, &packet); + multicast_publisher_write_unlock(rig); + return result; } int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) @@ -806,10 +852,14 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) return RIG_OK; } + // Acquire write lock to write all data in one go to the pipe + multicast_publisher_write_lock(rig); + result = multicast_publisher_write_packet_header(rig, &packet); if (result != RIG_OK) { + multicast_publisher_write_unlock(rig); RETURNFUNC2(result); } @@ -822,12 +872,15 @@ int network_publish_rig_spectrum_data(RIG *rig, struct rig_spectrum_line *line) if (result != RIG_OK) { + multicast_publisher_write_unlock(rig); RETURNFUNC2(result); } result = multicast_publisher_write_data( mcast_publisher_args, line->spectrum_data_length, line->spectrum_data); + multicast_publisher_write_unlock(rig); + if (result != RIG_OK) { RETURNFUNC2(result); @@ -1255,6 +1308,7 @@ int network_multicast_publisher_start(RIG *rig, const char *multicast_addr, multicast_publisher_priv_data *mcast_publisher_priv; int socket_fd; int status; + int mutex_status; ENTERFUNC; @@ -1341,9 +1395,11 @@ int network_multicast_publisher_start(RIG *rig, const char *multicast_addr, mcast_publisher_priv->args.multicast_port = multicast_port; mcast_publisher_priv->args.rig = rig; + mutex_status = pthread_mutex_init(&mcast_publisher_priv->args.write_lock, NULL); + status = multicast_publisher_create_data_pipe(mcast_publisher_priv); - if (status < 0) + if (status < 0 || mutex_status != 0) { free(rs->multicast_publisher_priv_data); rs->multicast_publisher_priv_data = NULL; @@ -1418,6 +1474,8 @@ int network_multicast_publisher_stop(RIG *rig) mcast_publisher_priv->args.socket_fd = -1; } + pthread_mutex_destroy(&mcast_publisher_priv->args.write_lock); + free(rs->multicast_publisher_priv_data); rs->multicast_publisher_priv_data = NULL;