diff --git a/tsplay.c b/tsplay.c index 7e416ec..51cdd86 100644 --- a/tsplay.c +++ b/tsplay.c @@ -214,6 +214,9 @@ static void print_help_ts() "earlier versions of tsplay:\n" "\n" " -oldpace Switch off scanning ahead for the next PCR.\n" + " -pace-pcr1 v1 of the PCR scan code\n" + " -pace-pcr2-ts v2 of the PCR scan code - use 1st PCR PID found [default]\n" + " -pace-pcr2-pmt v2 of the PCR scan code - get PCR PID from PMT\n" "\n" "which attempts to predict an approximate PCR for each TS packet, based on an\n" "initial speed (see '-bitrate'/'-byterate' in '-help tuning') and the PCRs found\n" @@ -351,7 +354,8 @@ int main(int argc, char **argv) int use_network = FALSE; char *multicast_if = NULL; // IP address of multicast i/f - int scan_for_PCRs = TRUE; + tsplay_output_pace_mode pace_mode = TSPLAY_OUTPUT_PACE_PCR2_TS; + uint32_t pid_to_ignore = 0; uint32_t override_pcr_pid = 0; // 0 means "use the PCR found in the PMT" @@ -524,7 +528,19 @@ int main(int argc, char **argv) } else if (!strcmp("-oldpace",argv[ii])) { - scan_for_PCRs = FALSE; + pace_mode = TSPLAY_OUTPUT_PACE_FIXED; + } + else if (!strcmp("-pace-pcr1",argv[ii])) + { + pace_mode = TSPLAY_OUTPUT_PACE_PCR1; + } + else if (!strcmp("-pace-pcr2-ts",argv[ii])) + { + pace_mode = TSPLAY_OUTPUT_PACE_PCR2_TS; + } + else if (!strcmp("-pace-pcr2-pmt",argv[ii])) + { + pace_mode = TSPLAY_OUTPUT_PACE_PCR2_PMT; } else if (!strcmp("-forcepcr",argv[ii])) { @@ -728,8 +744,10 @@ int main(int argc, char **argv) // If tswrite found '-nopcrs' in the switches, make sure that we've // switched PCR lookahead off. - if (!context.use_pcrs) - scan_for_PCRs = FALSE; + if (context.pcr_mode == TSWRITE_PCR_MODE_NONE) + pace_mode = TSPLAY_OUTPUT_PACE_FIXED; + else if (pace_mode == TSPLAY_OUTPUT_PACE_PCR1) + context.pcr_mode = TSWRITE_PCR_MODE_PCR1; if (input_name) { @@ -771,7 +789,7 @@ int main(int argc, char **argv) if (is_TS) { print_msg("Input appears to be Transport Stream\n"); - if (scan_for_PCRs) + if (pace_mode != TSPLAY_OUTPUT_PACE_FIXED) print_msg("Using 'exact' TS packet timing (by looking-ahead to the next PCR)\n"); else print_msg("Approximating/predicting intermediate PCRs\n"); @@ -821,7 +839,7 @@ int main(int argc, char **argv) if (is_TS) { - err = play_TS_stream(input,tswriter,pid_to_ignore,scan_for_PCRs, + err = play_TS_stream(input,tswriter,pace_mode,pid_to_ignore, override_pcr_pid,max,loop,quiet,verbose); } else diff --git a/tsplay_defns.h b/tsplay_defns.h index a25fffe..a9f4618 100644 --- a/tsplay_defns.h +++ b/tsplay_defns.h @@ -34,6 +34,14 @@ // If not being quiet, report progress every TSPLAY_REPORT_EVERY packets read #define TSPLAY_REPORT_EVERY 10000 +typedef enum tsplay_output_pace_mode_e +{ + TSPLAY_OUTPUT_PACE_FIXED, + TSPLAY_OUTPUT_PACE_PCR1, // Src buffering timing + TSPLAY_OUTPUT_PACE_PCR2_TS, // write buffer timing - use 1st PCR found for PID + TSPLAY_OUTPUT_PACE_PCR2_PMT // write buffer timing = look up PCR PID in PMT +} tsplay_output_pace_mode; + #endif // tsplay_defns // Local Variables: diff --git a/tsplay_fns.h b/tsplay_fns.h index cdcf038..28fb259 100644 --- a/tsplay_fns.h +++ b/tsplay_fns.h @@ -60,8 +60,8 @@ */ extern int play_TS_stream(int input, TS_writer_p tswriter, + const tsplay_output_pace_mode pace_mode, uint32_t pid_to_ignore, - int scan_for_PCRs, uint32_t override_pcr_pid, int max, int loop, diff --git a/tsplay_innards.c b/tsplay_innards.c index ee0c592..b6fb8d8 100644 --- a/tsplay_innards.c +++ b/tsplay_innards.c @@ -116,8 +116,7 @@ static int read_TS_packet(TS_reader_p tsreader, } // Read the next packet - err = read_next_TS_packet(tsreader,data); - if (err) + while ((err = read_next_TS_packet(tsreader,data)) != 0) { if (err == EOF) { @@ -505,8 +504,9 @@ static int play_buffered_TS_packets(TS_reader_p tsreader, * * Returns 0 if all went well, 1 if something went wrong. */ -extern int play_TS_packets(TS_reader_p tsreader, +static int play_TS_packets(TS_reader_p tsreader, TS_writer_p tswriter, + const tsplay_output_pace_mode pace_mode, uint32_t pid_to_ignore, int max, int loop, @@ -518,37 +518,41 @@ extern int play_TS_packets(TS_reader_p tsreader, uint32_t count = 0; int pcrs_used = 0; int pcrs_ignored = 0; - uint32_t pcr_pid; + uint32_t pcr_pid = ~0U; uint32_t start_count = 0; // which TS packet to loop from offset_t start_posn = 0; - // Before we can use PCRs for timing, we need to read a PMT which tells us - // what our video stream is (so we can get our PCRs therefrom). - err = find_PCR_PID(tsreader,tswriter,&pcr_pid,&start_count,max,quiet); - if (err) + if (pace_mode == TSPLAY_OUTPUT_PACE_PCR2_PMT) { - fprint_err("### Unable to find PCR PID for timing information\n" - " Looked in first %d TS packets\n",max); - return 1; + // Before we can use PCRs for timing, we need to read a PMT which tells us + // what our video stream is (so we can get our PCRs therefrom). + err = find_PCR_PID(tsreader,tswriter,&pcr_pid,&start_count,max,quiet); + if (err) + { + fprint_err("### Unable to find PCR PID for timing information\n" + " Looked in first %d TS packets\n",max); + return 1; + } + + // Once we've found that, we're ready to play our data + + // If we're looping, remember the location of the first packet of (probable) + // data - there's not much point rewinding before that point + if (loop) + start_posn = start_count * TS_PACKET_SIZE; } - // Once we've found that, we're ready to play our data - - // If we're looping, remember the location of the first packet of (probable) - // data - there's not much point rewinding before that point - if (loop) - start_posn = start_count * TS_PACKET_SIZE; - count = start_count; for (;;) { byte *data; uint32_t pid; int got_pcr; - uint64_t pcr; + uint64_t pcr = 0; err = read_TS_packet(tsreader,&count,&data,&pid,&got_pcr,&pcr, max,loop,start_posn,start_count,quiet); + if (err == EOF) // shouldn't occur if `loop` break; else if (err) @@ -561,15 +565,29 @@ extern int play_TS_packets(TS_reader_p tsreader, return 1; } + if (count == start_count + 1) + tswrite_discontinuity(tswriter); + total ++; // We are only interested in timing information from our PCR PID stream if (got_pcr) { + // If 1st PCR we see then remember its pid + if (pcr_pid == ~0U) + { + fprint_msg("PCR PID set to 1st seen: %#x (%d)\n", pid, pid); + pcr_pid = pid; + } + if (pid == pcr_pid) pcrs_used ++; else { + if (pcrs_ignored == 0) + { + fprint_msg("Other PCR PIDs seen: %#x (%d)...\n", pid, pid); + } pcrs_ignored ++; got_pcr = FALSE; } @@ -639,8 +657,8 @@ extern int play_TS_packets(TS_reader_p tsreader, */ extern int play_TS_stream(int input, TS_writer_p tswriter, + const tsplay_output_pace_mode pace_mode, uint32_t pid_to_ignore, - int scan_for_PCRs, uint32_t override_pcr_pid, int max, int loop, @@ -653,11 +671,13 @@ extern int play_TS_stream(int input, err = build_TS_reader(input,&tsreader); if (err) return 1; - if (scan_for_PCRs) + fprint_msg("pace_mode=%d\n", pace_mode); + + if (pace_mode == TSPLAY_OUTPUT_PACE_PCR1) err = play_buffered_TS_packets(tsreader,tswriter,pid_to_ignore, override_pcr_pid,max,loop,quiet,verbose); else - err = play_TS_packets(tsreader,tswriter,pid_to_ignore, + err = play_TS_packets(tsreader, tswriter, pace_mode, pid_to_ignore, max,loop,quiet,verbose); if (err) { diff --git a/tswrite.c b/tswrite.c index 1b9a15f..66ad323 100644 --- a/tswrite.c +++ b/tswrite.c @@ -57,6 +57,7 @@ #include "misc_fns.h" #include "printing_fns.h" #include "tswrite_fns.h" +#include "ts_fns.h" // ------------------------------------------------------------ // Global flags affecting debugging @@ -131,7 +132,172 @@ static int global_child_wait = DEFAULT_CHILD_WAIT; #define REPORT_EVERY 10000 + +// ============================================================ +// CIRCULAR BUFFER +// ============================================================ + +// We default to using a "packet" of 7 transport stream packets because 7*188 = +// 1316, but 8*188 = 1504, and we would like to output as much data as we can +// that is guaranteed to fit into a single ethernet packet, size 1500. +#define DEFAULT_TS_PACKETS_IN_ITEM 7 + +// For simplicity, we'll have a maximum on that (it allows us to have static +// array sizes in some places). This should be a big enough size to more than +// fill a jumbo packet on a gigabit network. +#define MAX_TS_PACKETS_IN_ITEM 100 + +// ------------------------------------------------------------ +// A circular buffer, usable as a queue +// +// We "waste" one buffer item so that we don't have to maintain a count +// of items in the buffer +// +// To get an understanding of how it works, choose a small BUFFER_SIZE +// (e.g., 11), enable DISPLAY_BUFFER, and select --visual - this will show the +// reading/writing of the circular buffer in action, including the +// "unused item". +// +// The data for the circular buffer +// Each circular buffer item "contains" (up to) N TS packets (where N defaults +// to 7, and is specified as `item_size` in the circular buffer header), and a +// time (in microseconds) when we would like it to be output (relative to the +// time for the first packet "sent"). +// +// Said data is stored at the address indicated by the circular buffer +// "header", as `item_data`. +// +struct circular_buffer_item +{ + uint32_t time; // when we would like this data output + int discontinuity; // TRUE if our timeline has "broken" + int length; // number of bytes of data in the array +}; +typedef struct circular_buffer_item *circular_buffer_item_p; + +#define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item) + +// ------------------------------------------------------------ +// The header for the circular buffer +// +// Note that `start` is only ever written to by the child process, and this is +// the only thing that the child process ever changes in the circular buffer. +// +// `maxnowait` is the maximum number of packets to send to the target host +// without forcing an intermediate wait - required to stop us "swamping" the +// target with too much data, and overrunning its buffers. +struct circular_buffer +{ + volatile int start; // start of data "pointer" + volatile int end; // end of completed data "pointer" (you guessed) + volatile int pending; // end of buffered but not ready for xmit + int size; // the actual length of the `item` array + + volatile int eos; // end of stream + + int TS_in_item; // max number of TS packets in a circular buffer item + int item_size; // and thus the size of said item's data array + + int maxnowait; // max number consecutive packets to send with no wait + int waitfor; // the number of microseconds to wait thereafter + + // The location of the packet data for the circular buffer items + byte *item_data; + + // The "header" data for each circular buffer item + struct circular_buffer_item item[]; +}; +typedef struct circular_buffer *circular_buffer_p; + +// Note that size doesn't include the final `item` +#define SIZEOF_CIRCULAR_BUFFER sizeof(struct circular_buffer) + +// For PCR2 pacing we accumulate the initial packets here so it must be big +// enough to cope with a max bitrate stream +// Say 100Mbits is the fastest we are going to care about +// So we need to buffer 2 PCRs which have a max spacing of .1s (by the std) +// and another .1s before that for having just missed a PCR = .2s = 20Mbits +// = 2.5Mbytes. 2048 * 188 * 7 = 2.7M +#define DEFAULT_CIRCULAR_BUFFER_SIZE 2048 // used to be 100 + +// ============================================================ +// BUFFERED OUTPUT +// ============================================================ + +// Information about each TS packet in our circular buffer item +struct TS_packet_info +{ + int index; + uint32_t pid; // do we need the PIDs? + int got_pcr; + uint64_t pcr; +}; +typedef struct TS_packet_info *TS_packet_info_p; +#define SIZEOF_TS_PACKET_INFO sizeof(struct TS_packet_info); + +// PCR interpolation structure +typedef struct pcr_pace_env_s +{ + uint32_t gap_bytes; + uint32_t next_bytes; + int32_t next_offset; + int next_index; + int pcr1_set; // Seen 1st PCR + int gap_set; // Seen 2nd PCR + int pkt1; // 1st pkt dealt with? (!discontinuity) + uint64_t pcr1; + uint64_t pcr_base; + + uint32_t prev_gap_bytes; + uint64_t prev_pcr_gap; + uint64_t next_pcr_base; +} pcr_pace_env; + + +// If we're going to support output via our circular buffer in a manner +// similar to that for output to a file or socket, then we need a structure +// to maintain the relevant information. It seems a bit wasteful to burden +// the circular buffer itself with this, particularly as only the writer +// cares about this data, so it needn't be shared. +struct buffered_TS_output +{ + circular_buffer_p buffer; + int which; // Which buffer index we're writing to + int started; // TRUE if we've started writing therein + + // For each TS packet in the circular buffer, remember its `count` + // within the input stream, whether it had a PCR, and if so what that + // PCR was. To make it simpler to access these arrays, also keep a fill + // index into them (the alternative would be to always re-zero the + // `got_pcr` values whenever we start a new circular buffer entry, + // which would be a pain...) + int num_packets; // how many TS packets we've got + struct TS_packet_info packet[MAX_TS_PACKETS_IN_ITEM]; + + // `rate` is the rate (in bytes per second) we would like to output data at + uint32_t rate; + + // `pcr_scale` is a multiplier for PCRs - each PCR found gets its value + // multiplied by this + double pcr_scale; + + // `use_pcrs` indicates if we should use PCRs in the data to drive our + // timing, rather than use the specified byte rate directly. The `priming` + // values are only relevant if `use_pcrs` is true. + tswrite_pcr_mode pcr_mode; + + // 'prime_size' is the amount of space/time to 'prime' the circular buffer + // output timing mechanism with. This is effectively multiples of the + // size of a circular buffer item. + int prime_size; + + // Percentage "too fast" speedup for our priming rate + int prime_speedup; + + pcr_pace_env pcr_pace; +}; + #ifdef _WIN32 // ============================================================ // Windows specific - gettimeofday replacement @@ -217,6 +383,8 @@ static int map_circular_buffer(circular_buffer_p *circular, (*circular)->start = 1; (*circular)->end = 0; + (*circular)->pending = 0; + (*circular)->eos = FALSE; (*circular)->size = circ_buf_size; (*circular)->TS_in_item = TS_in_packet; (*circular)->item_size = TS_in_packet * TS_PACKET_SIZE; @@ -254,7 +422,7 @@ static int unmap_circular_buffer(circular_buffer_p circular) #endif // _WIN32 return 0; } - + /* * Is the buffer empty? */ @@ -268,9 +436,16 @@ static inline int circular_buffer_empty(circular_buffer_p circular) */ static inline int circular_buffer_full(circular_buffer_p circular) { - return ((circular->end + 2) % circular->size == circular->start); + return ((circular->pending + 2) % circular->size == circular->start); } - + +// Is the buffer full and never going to empty? +static inline int circular_buffer_jammed(circular_buffer_p circular) +{ + return ((circular->pending + 1) % circular->size == circular->end); +} + + /* * If the circular buffer is empty, wait until it gains some data. * @@ -284,7 +459,7 @@ static inline int wait_if_buffer_empty(circular_buffer_p circular) int err; #endif // _WIN32 - while (circular_buffer_empty(circular)) + while (circular_buffer_empty(circular) && !circular->eos) { #if DISPLAY_BUFFER if (global_show_circular && !global_parent_debug) print_msg("<-- wait\n"); @@ -311,7 +486,7 @@ static inline int wait_if_buffer_empty(circular_buffer_p circular) } } count = 0; - return 0; + return circular_buffer_empty(circular); // If empty then EOS so return 1 } /* @@ -327,7 +502,7 @@ static inline int wait_for_buffer_to_fill(circular_buffer_p circular) int err; #endif // _WIN32 - while (!circular_buffer_full(circular)) + while (!circular_buffer_full(circular) && !circular->eos) { #if DISPLAY_BUFFER if (global_show_circular && !global_child_debug) @@ -390,6 +565,13 @@ static inline int wait_if_buffer_full(circular_buffer_p circular) } #endif // _WIN32 + if (circular_buffer_jammed(circular)) + { + print_err("### Circular buffer jammed: No PCRs found\n"); + circular->eos = TRUE; + return 1; + } + // If we wait for a *very* long time, maybe our child has crashed if (count > PARENT_GIVE_UP_AFTER) { @@ -454,14 +636,14 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer, int maxnowait, int waitfor, int rate, - int use_pcrs, + tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, double pcr_scale) { int err, ii; circular_buffer_p circular; - buffered_TS_output_p new = malloc(SIZEOF_BUFFERED_TS_OUTPUT); + buffered_TS_output_p new = calloc(1, SIZEOF_BUFFERED_TS_OUTPUT); if (new == NULL) { print_err("### Unable to allocate buffered output\n"); @@ -478,11 +660,11 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer, } new->buffer = circular; new->started = FALSE; - new->which = (circular->end + 1) % circular->size; + new->which = (circular->pending + 1) % circular->size; new->num_packets = 0; new->rate = rate; - new->use_pcrs = use_pcrs; + new->pcr_mode = pcr_mode; new->prime_size = prime_size; new->prime_speedup = prime_speedup; @@ -533,7 +715,7 @@ static inline uint64_t pcr_delta_u(const uint64_t a, const uint64_t b) { return a < b ? a + PCR_WRAP - b : a - b; } - + // ============================================================ // Timing // ============================================================ @@ -544,7 +726,7 @@ static inline uint64_t pcr_delta_u(const uint64_t a, const uint64_t b) * * Returns 0 if all goes well, 1 if something goes wrong. */ -static int set_buffer_item_time_pcr(buffered_TS_output_p writer) +static int set_buffer_item_time_pcr1(buffered_TS_output_p writer) { int ii; circular_buffer_p circular = writer->buffer; @@ -717,8 +899,173 @@ static int set_buffer_item_time_pcr(buffered_TS_output_p writer) } last_timestamp = circular->item[writer->which].time = timestamp; - return 0; + return writer->which; } + + +// Set times on all packets between where we were and where we are now +// Sets the time on both the first & last packets +// Returns the index of the last circ buffer entry modified +static int +set_circ_times(const circular_buffer_p circ, + const uint32_t index_start, const uint32_t len_bytes, + const int32_t pcr1_byte_offset, const uint64_t pcr1, + const uint64_t pcr_gap, const uint32_t gap_bytes, + uint64_t * const pNew_pcr_base) +{ + int32_t offset = pcr1_byte_offset; + const int32_t end_offset = offset + len_bytes; + int32_t i = index_start; + int idx; + + do + { + struct circular_buffer_item * const item = circ->item + i; + int64_t pcr = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes; + + item->time = (uint32_t)(pcr / 27); // "time" in us + offset += item->length; + idx = i; + + if (++i >= circ->size) + i = 0; + } while (offset <= end_offset); + + // Predict PCR at the end of this packet if wanted + if (pNew_pcr_base != NULL) + { + *pNew_pcr_base = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes; + } + +// fprint_msg("s: %d->%d\n", index_start, idx); + return idx; +} + + +static void +reset_pcr_time(pcr_pace_env * const ppe, const uint64_t next_pcr_base) +{ +// fprint_msg("%s\n", __func__); + + memset(ppe, 0, sizeof(*ppe)); + ppe->pcr_base = next_pcr_base; +} + +static int +finalize_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe) +{ + const circular_buffer_p circ = writer->buffer; + int idx = -1; + +// fprint_msg("%s\n", __func__); + + if (!ppe->gap_set) + { + // Can't do anything - forget any pcr we may have had - but + // leave accumulated bytes to be output in the prologue of any subsequent + // segment + ppe->pcr1_set = FALSE; + } + else + { + if (ppe->next_bytes != 0) + { + idx = set_circ_times(circ, ppe->next_index, ppe->next_bytes - 1, ppe->next_offset, ppe->pcr1 + ppe->pcr_base, + ppe->prev_pcr_gap, ppe->prev_gap_bytes, &ppe->next_pcr_base); +// fprint_msg("%s: idx %d->%d\n", __func__, ppe->next_index, idx); + } + + reset_pcr_time(ppe, ppe->next_pcr_base); + } + + return idx; +} + +static int +discontinuity_pkt_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe) +{ + return finalize_pcr_time(writer, ppe); +} + +static int +add_pkt_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe) +{ + const circular_buffer_p circ = writer->buffer; + const circular_buffer_item_p item = circ->item + writer->which; + const TS_packet_info_p pkt0 = writer->packet + 0; + int idx = -1; + + item->discontinuity = FALSE; + +retry: + // Mark 1st packet after reset as discontinuity + if (!ppe->pkt1) + { + ppe->pkt1 = TRUE; + ppe->next_index = writer->which; + } + + // If we have a pcr then we expect it to be on the 1st pkt in this group + if (!pkt0->got_pcr) + { + ppe->gap_bytes += item->length; + ppe->next_bytes += item->length; + +// fprint_msg("%u/%u\n", ppe->gap_bytes, ppe->next_bytes); + } + else + { + const uint64_t pcr1 = ppe->pcr1; + const uint64_t pcr2 = pkt0->pcr; + +// fprint_msg("pcr: %lld\n", pcr2); + + if (!ppe->pcr1_set) + { + ppe->next_offset = -ppe->next_bytes; + ppe->next_bytes += item->length; + ppe->pcr_base -= pcr2; + // next_index set by discontinuity spotter + ppe->pcr1_set = TRUE; + } + else + { + const uint64_t pcr_gap = pcr_delta_u(pcr2, pcr1); + + if (pcr_gap > PCR_MS(2000)) + { + // Discontinuity + fprint_msg("%s: Discontinuity[%d]: gap=%lld\n", __func__, writer->which, pcr_gap); + + idx = finalize_pcr_time(writer, ppe); + goto retry; + } + + idx = set_circ_times(circ, ppe->next_index, ppe->next_bytes, ppe->next_offset, ppe->pcr_base + pcr1, pcr_gap, ppe->gap_bytes, &ppe->next_pcr_base); +// fprint_msg("%s: idx %d->%d (%d)\n", __func__, ppe->next_index, idx, writer->which); + ppe->next_offset = item->length; + ppe->next_bytes = 0; + ppe->next_index = writer->which + 1; + if (ppe->next_index >= circ->size) + ppe->next_index = 0; + + // Remember in case we have to predict the next segment from this one + ppe->prev_pcr_gap = pcr_gap; + ppe->prev_gap_bytes = ppe->gap_bytes; + ppe->gap_set = TRUE; + + // If non-discontinuity wrap then add wrap value to base time + if (pcr1 > pcr2) + ppe->pcr_base += PCR_WRAP; + } + + ppe->pcr1 = pcr2; + ppe->gap_bytes = item->length; + } + + return idx; +} + /* * Set the time indicator for the next circular buffer item, based solely @@ -736,7 +1083,7 @@ static int set_buffer_item_time_plain(buffered_TS_output_p writer) uint32_t elapsed_time = (uint32_t) (num_bytes * 1000000.0 / writer->rate); last_time += elapsed_time; circular->item[writer->which].time = last_time; - return 0; + return writer->which; } /* @@ -744,19 +1091,23 @@ static int set_buffer_item_time_plain(buffered_TS_output_p writer) * * - `writer` is our buffered output context * - * Returns 0 if all goes well, 1 if something goes wrong. + * Returns new idx that can be written or -1 if unchanged */ -static int set_buffer_item_time(buffered_TS_output_p writer) +static int set_buffer_item_time(const buffered_TS_output_p writer, const int finalize) { - if (writer->use_pcrs) + switch (writer->pcr_mode) { - return set_buffer_item_time_pcr(writer); - } - else - { - // Allow the user to choose not to look at PCRs, and just do the - // calculation based on the rate they've specified - return set_buffer_item_time_plain(writer); + case TSWRITE_PCR_MODE_PCR2: + return finalize ? + finalize_pcr_time(writer, &writer->pcr_pace) : + add_pkt_pcr_time(writer, &writer->pcr_pace); + case TSWRITE_PCR_MODE_PCR1: + return set_buffer_item_time_pcr1(writer); + case TSWRITE_PCR_MODE_NONE: + default: + // Allow the user to choose not to look at PCRs, and just do the + // calculation based on the rate they've specified + return set_buffer_item_time_plain(writer); } } @@ -784,7 +1135,7 @@ static int add_eof_entry(buffered_TS_output_p writer) } // Work out where we want to write - data_pos = (circular->end + 1) % circular->size; + data_pos = (circular->pending + 1) % circular->size; #if DISPLAY_BUFFER if (global_show_circular) @@ -794,7 +1145,7 @@ static int add_eof_entry(buffered_TS_output_p writer) // Set the `time` within the item appropriately (it doesn't really // matter for EOF, since we're not actually going to *write* anything // out, but it won't hurt to get it right) - set_buffer_item_time(writer); + set_buffer_item_time(writer, TRUE); // And mark EOF by setting the first byte to something that isn't 0x47, // and the length to 1. @@ -804,6 +1155,7 @@ static int add_eof_entry(buffered_TS_output_p writer) #if DISPLAY_BUFFER if (global_show_circular) print_circular_buffer("eof",circular); #endif + circular->eos = TRUE; return 0; } @@ -817,22 +1169,49 @@ static int add_eof_entry(buffered_TS_output_p writer) * * Returns 0 if all went well, 1 if something went wrong. */ -static void internal_flush_buffered_TS_output(buffered_TS_output_p writer) +static void internal_flush_buffered_TS_output(const buffered_TS_output_p writer) { - circular_buffer_p circular = writer->buffer; + const circular_buffer_p circular = writer->buffer; + int idx; + + if (!writer->started || circular->item[writer->which].length == 0) + { + // Nothing to do + return; + } // Set the `time` within the item appropriately - set_buffer_item_time(writer); - + idx = set_buffer_item_time(writer, FALSE); + if (idx >= 0) + circular->end = idx; + // Make this item available for reading - circular->end = writer->which; + circular->pending = writer->which; // And then prepare for the next index - writer->which = (circular->end + 1) % circular->size; + writer->which = (circular->pending + 1) % circular->size; writer->started = FALSE; writer->num_packets = 0; writer->packet[0].got_pcr = FALSE; // Careful or paranoid? } + + +static void discontinuity_buffered_TS_output(buffered_TS_output_p writer) +{ + circular_buffer_p circular = writer->buffer; + int idx; + + if (writer->pcr_mode != TSWRITE_PCR_MODE_PCR2) + return; + + // Set the `time` within the item appropriately + idx = discontinuity_pkt_pcr_time(writer, &writer->pcr_pace); + if (idx >= 0) + circular->end = idx; + + // We need to update the end of the circular buffer but we haven't added + // any packets so no need to update any of that +} /* * Write an EOF indicator to the buffered output @@ -847,14 +1226,10 @@ static void internal_flush_buffered_TS_output(buffered_TS_output_p writer) */ static int write_EOF_to_buffered_TS_output(buffered_TS_output_p writer) { - circular_buffer_p circular = writer->buffer; - int which = writer->which; - int length = circular->item[which].length; int err; // Make sure anything we were working on beforehand has been output - if (writer->started && length > 0) - internal_flush_buffered_TS_output(writer); + internal_flush_buffered_TS_output(writer); if (global_parent_debug) print_msg("--> writing EOF\n"); @@ -888,10 +1263,21 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer, uint64_t pcr) { int err; - circular_buffer_p circular = writer->buffer; - int which = writer->which; - byte *data = circular->item_data + which*circular->item_size; - int *length = &(circular->item[which].length); + const circular_buffer_p circular = writer->buffer; + int which; + byte *data; + int *length; + + // Force PCRs to start a buffer + if (writer->pcr_mode == TSWRITE_PCR_MODE_PCR2 && got_pcr) + { +// fprint_msg("got_pcr: %lld\n", pcr); + internal_flush_buffered_TS_output(writer); + } + + which = writer->which; + data = circular->item_data + which*circular->item_size; + length = &(circular->item[which].length); // If we haven't yet started writing to the (next) index in the // circular buffer, we must check that it is not full @@ -906,8 +1292,11 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer, writer->started = TRUE; writer->num_packets = 0; *length = 0; +// fprint_msg("> "); } +// fprint_msg("[%d] @ %d\n", writer->which, *length); + // Copy our data into the circular buffer item, and adjust appropriately memcpy(&(data[*length]),packet,TS_PACKET_SIZE); (*length) += TS_PACKET_SIZE; @@ -1486,7 +1875,7 @@ static int write_circular_data(SOCKET output, byte *buffer = circular->item_data + circular->start*circular->item_size; int length = circular->item[circular->start].length; #if DISPLAY_BUFFER - int oldend = circular->end; + int oldend = circular->pending; int oldstart = circular->start; int newend,newstart; #endif @@ -1506,7 +1895,7 @@ static int write_circular_data(SOCKET output, #if DISPLAY_BUFFER if (global_show_circular) { - newend = circular->end; + newend = circular->pending; newstart = circular->start; if (oldend != newend || oldstart != newstart) { @@ -1718,6 +2107,8 @@ static int write_from_circular(SOCKET output, // has told us that the timeline has changed radically if (reset || circular->item[circular->start].discontinuity) { + fprint_msg("%s: Discontinuity[%d]: reset=%d, pkt_time=%u\n", __func__, circular->start, reset, this_packet_time); + // We believe out timeline has gone askew - start a new one // Set up "now" as our base time, and output our packet right away start = now; @@ -1746,6 +2137,12 @@ static int write_from_circular(SOCKET output, // So how long *should* we wait for the correct time to write? if (waitfor > 0) { + if (waitfor > 200000) + { + fprint_msg("###[%d] (%d) >0.2s, RESET\n", circular->start, waitfor); + reset = TRUE; + waitfor = 200000; + } if (global_child_debug) print_msg("(waiting"); } else if (waitfor > -200000) // less than 0.2 seconds gap - "small", so ignore @@ -1766,9 +2163,10 @@ static int write_from_circular(SOCKET output, // process logs progress in terms of the number of TS packets // output - (count-1)*7+1 should be the index of the first packet // in our circular buffer item, which is a decent approximation - fprint_err("!!! Packet %d (item %d): Outputting %.2fs late -" - " restarting time sequence\n", - (count-1)*7+1,count,-(double)waitfor/1000000); + fprint_err("!!! [%d] Packet %d (item %d): Outputting %.2fs late -" + " restarting time sequence: time=%u\n", + circular->start, + (count-1)*7+1,count,-(double)waitfor/1000000, this_packet_time); if (circular->maxnowait >= 0) fprint_err(" Maybe consider running with -maxnowait greater" " than %d\n",circular->maxnowait); @@ -2234,7 +2632,7 @@ extern int tswrite_start_buffering(TS_writer_p tswriter, int maxnowait, int waitfor, int byterate, - int use_pcrs, + tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, double pcr_scale) @@ -2252,7 +2650,7 @@ extern int tswrite_start_buffering(TS_writer_p tswriter, err = build_buffered_TS_output(&(tswriter->writer), circ_buf_size,TS_in_packet, - maxnowait,waitfor,byterate,use_pcrs, + maxnowait,waitfor,byterate,pcr_mode, prime_size,prime_speedup,pcr_scale); if (err) return 1; @@ -2286,7 +2684,7 @@ extern int tswrite_start_buffering_from_context(TS_writer_p tswriter, context->maxnowait, context->waitfor, context->byterate, - context->use_pcrs, + context->pcr_mode, context->prime_size, context->prime_speedup, context->pcr_scale); @@ -2669,6 +3067,23 @@ extern int tswrite_write(TS_writer_p tswriter, } return 0; } + +/* + * Discontinuity on the stream being written (e.g. file looping) + * If we are pacing the output then this resets the timing info +*/ + +int tswrite_discontinuity(const TS_writer_p tswriter) +{ + if (tswriter->writer == NULL) + return 0; + + internal_flush_buffered_TS_output(tswriter->writer); + + discontinuity_buffered_TS_output(tswriter->writer); + + return 0; +} // ============================================================ // Common option handling - helpers for utility writers @@ -2845,7 +3260,7 @@ extern void tswrite_report_args(TS_context_p context) context->waitfor); } - if (context->use_pcrs) + if (context->pcr_mode != TSWRITE_PCR_MODE_NONE) { fprint_msg("PCR mechanism 'primed' with time for %d circular buffer items\n", context->prime_size); @@ -2913,7 +3328,7 @@ extern int tswrite_process_args(char *prefix, context->waitfor = 1000; context->byterate = DEFAULT_BYTE_RATE; context->bitrate = context->byterate * 8; - context->use_pcrs = TRUE; + context->pcr_mode = TSWRITE_PCR_MODE_PCR2; context->prime_size = DEFAULT_PRIME_SIZE; context->prime_speedup = 100; context->pcr_scale = 1.0; @@ -2922,7 +3337,7 @@ extern int tswrite_process_args(char *prefix, { if (!strcmp("-nopcrs",argv[ii])) { - context->use_pcrs = FALSE; + context->pcr_mode = TSWRITE_PCR_MODE_NONE; argv[ii] = TSWRITE_PROCESSED; } else if (!strcmp("-bitrate",argv[ii])) diff --git a/tswrite_defns.h b/tswrite_defns.h index bc154c7..4544a7d 100644 --- a/tswrite_defns.h +++ b/tswrite_defns.h @@ -45,143 +45,9 @@ typedef int SOCKET; // for compatibility with Windows #endif -// ============================================================ -// CIRCULAR BUFFER -// ============================================================ - -// We default to using a "packet" of 7 transport stream packets because 7*188 = -// 1316, but 8*188 = 1504, and we would like to output as much data as we can -// that is guaranteed to fit into a single ethernet packet, size 1500. -#define DEFAULT_TS_PACKETS_IN_ITEM 7 - -// For simplicity, we'll have a maximum on that (it allows us to have static -// array sizes in some places). This should be a big enough size to more than -// fill a jumbo packet on a gigabit network. -#define MAX_TS_PACKETS_IN_ITEM 100 - -// ------------------------------------------------------------ -// A circular buffer, usable as a queue -// -// We "waste" one buffer item so that we don't have to maintain a count -// of items in the buffer -// -// To get an understanding of how it works, choose a small BUFFER_SIZE -// (e.g., 11), enable DISPLAY_BUFFER, and select --visual - this will show the -// reading/writing of the circular buffer in action, including the -// "unused item". -// -// The data for the circular buffer -// Each circular buffer item "contains" (up to) N TS packets (where N defaults -// to 7, and is specified as `item_size` in the circular buffer header), and a -// time (in microseconds) when we would like it to be output (relative to the -// time for the first packet "sent"). -// -// Said data is stored at the address indicated by the circular buffer -// "header", as `item_data`. -// -struct circular_buffer_item -{ - uint32_t time; // when we would like this data output - int discontinuity; // TRUE if our timeline has "broken" - int length; // number of bytes of data in the array -}; -typedef struct circular_buffer_item *circular_buffer_item_p; - -#define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item) - -// ------------------------------------------------------------ -// The header for the circular buffer -// -// Note that `start` is only ever written to by the child process, and this is -// the only thing that the child process ever changes in the circular buffer. -// -// `maxnowait` is the maximum number of packets to send to the target host -// without forcing an intermediate wait - required to stop us "swamping" the -// target with too much data, and overrunning its buffers. -struct circular_buffer -{ - int start; // start of data "pointer" - int end; // end of data "pointer" (you guessed) - int size; // the actual length of the `item` array - - int TS_in_item; // max number of TS packets in a circular buffer item - int item_size; // and thus the size of said item's data array - - int maxnowait; // max number consecutive packets to send with no wait - int waitfor; // the number of microseconds to wait thereafter - - // The location of the packet data for the circular buffer items - byte *item_data; - - // The "header" data for each circular buffer item - struct circular_buffer_item item[]; -}; -typedef struct circular_buffer *circular_buffer_p; - -// Note that size doesn't include the final `item` -#define SIZEOF_CIRCULAR_BUFFER sizeof(struct circular_buffer) - -#define DEFAULT_CIRCULAR_BUFFER_SIZE 1024 // used to be 100 - - -// ============================================================ -// BUFFERED OUTPUT -// ============================================================ - -// Information about each TS packet in our circular buffer item -struct TS_packet_info -{ - int index; - uint32_t pid; // do we need the PIDs? - int got_pcr; - uint64_t pcr; -}; -typedef struct TS_packet_info *TS_packet_info_p; -#define SIZEOF_TS_PACKET_INFO sizeof(struct TS_packet_info); - -// If we're going to support output via our circular buffer in a manner -// similar to that for output to a file or socket, then we need a structure -// to maintain the relevant information. It seems a bit wasteful to burden -// the circular buffer itself with this, particularly as only the writer -// cares about this data, so it needn't be shared. -struct buffered_TS_output -{ - circular_buffer_p buffer; - int which; // Which buffer index we're writing to - int started; // TRUE if we've started writing therein - - // For each TS packet in the circular buffer, remember its `count` - // within the input stream, whether it had a PCR, and if so what that - // PCR was. To make it simpler to access these arrays, also keep a fill - // index into them (the alternative would be to always re-zero the - // `got_pcr` values whenever we start a new circular buffer entry, - // which would be a pain...) - int num_packets; // how many TS packets we've got - struct TS_packet_info packet[MAX_TS_PACKETS_IN_ITEM]; - - // `rate` is the rate (in bytes per second) we would like to output data at - uint32_t rate; - - // `pcr_scale` is a multiplier for PCRs - each PCR found gets its value - // multiplied by this - double pcr_scale; - - // `use_pcrs` indicates if we should use PCRs in the data to drive our - // timing, rather than use the specified byte rate directly. The `priming` - // values are only relevant if `use_pcrs` is true. - int use_pcrs; - - // 'prime_size' is the amount of space/time to 'prime' the circular buffer - // output timing mechanism with. This is effectively multiples of the - // size of a circular buffer item. - int prime_size; - - // Percentage "too fast" speedup for our priming rate - int prime_speedup; -}; +struct buffered_TS_output; typedef struct buffered_TS_output *buffered_TS_output_p; #define SIZEOF_BUFFERED_TS_OUTPUT sizeof(struct buffered_TS_output) - // ============================================================ // EXTERNAL DATASTRUCTURES - these are *intended* for external use @@ -305,6 +171,13 @@ typedef struct TS_writer *TS_writer_p; // And a "return code" that means "the command character has changed" #define COMMAND_RETURN_CODE -999 + +typedef enum tswrite_pcr_mode_e { + TSWRITE_PCR_MODE_NONE, + TSWRITE_PCR_MODE_PCR1, + TSWRITE_PCR_MODE_PCR2 +} tswrite_pcr_mode; + // ------------------------------------------------------------ // Context for use in decoding command line - see `tswrite_process_args()` struct TS_context @@ -316,7 +189,7 @@ struct TS_context int waitfor; // the number of microseconds to wait thereafter int bitrate; // suggested bit rate (byterate*8) - both are given int byterate; // suggested byte rate (bitrate/8) - for convenience - int use_pcrs; // use PCRs for timing information? + tswrite_pcr_mode pcr_mode; // use PCRs for timing information? int prime_size; // initial priming size for buffered output int prime_speedup; // percentage of normal speed to prime with double pcr_scale; // multiplier for PCRs -- see buffered_TS_output diff --git a/tswrite_fns.h b/tswrite_fns.h index 0c0ebee..ee821ad 100644 --- a/tswrite_fns.h +++ b/tswrite_fns.h @@ -167,13 +167,14 @@ extern int tswrite_wait_for_client(int server_socket, * * Returns 0 if all went well, 1 if something went wrong. */ + extern int tswrite_start_buffering(TS_writer_p tswriter, int circ_buf_size, int TS_in_packet, int maxnowait, int waitfor, int byterate, - int use_pcrs, + tswrite_pcr_mode pcr_mode, int prime_size, int prime_speedup, double pcr_scale); @@ -288,6 +289,8 @@ extern int tswrite_write(TS_writer_p tswriter, int got_pcr, uint64_t pcr); +extern int tswrite_discontinuity(const TS_writer_p tswriter); + /* * Write a usage string (to standard output) describing the tuning * options processed by tswrite_process_args.