Build a new pacing method for tsplay

New pacing method built for tsplay and made the default.  The new method
lacks some bells and whistles that the old method had but spits out packets
closer to the correct time than the old method.  Old method is still
availible if needed: -pace-pcr1 should engage it.
master
John Cox 2013-04-10 18:30:39 +01:00
rodzic 0e1e1b02e0
commit b3a3933e81
7 zmienionych plików z 555 dodań i 218 usunięć

Wyświetl plik

@ -214,6 +214,9 @@ static void print_help_ts()
"earlier versions of tsplay:\n"
"\n"
" -oldpace Switch off scanning ahead for the next PCR.\n"
" -pace-pcr1 v1 of the PCR scan code\n"
" -pace-pcr2-ts v2 of the PCR scan code - use 1st PCR PID found [default]\n"
" -pace-pcr2-pmt v2 of the PCR scan code - get PCR PID from PMT\n"
"\n"
"which attempts to predict an approximate PCR for each TS packet, based on an\n"
"initial speed (see '-bitrate'/'-byterate' in '-help tuning') and the PCRs found\n"
@ -351,7 +354,8 @@ int main(int argc, char **argv)
int use_network = FALSE;
char *multicast_if = NULL; // IP address of multicast i/f
int scan_for_PCRs = TRUE;
tsplay_output_pace_mode pace_mode = TSPLAY_OUTPUT_PACE_PCR2_TS;
uint32_t pid_to_ignore = 0;
uint32_t override_pcr_pid = 0; // 0 means "use the PCR found in the PMT"
@ -524,7 +528,19 @@ int main(int argc, char **argv)
}
else if (!strcmp("-oldpace",argv[ii]))
{
scan_for_PCRs = FALSE;
pace_mode = TSPLAY_OUTPUT_PACE_FIXED;
}
else if (!strcmp("-pace-pcr1",argv[ii]))
{
pace_mode = TSPLAY_OUTPUT_PACE_PCR1;
}
else if (!strcmp("-pace-pcr2-ts",argv[ii]))
{
pace_mode = TSPLAY_OUTPUT_PACE_PCR2_TS;
}
else if (!strcmp("-pace-pcr2-pmt",argv[ii]))
{
pace_mode = TSPLAY_OUTPUT_PACE_PCR2_PMT;
}
else if (!strcmp("-forcepcr",argv[ii]))
{
@ -728,8 +744,10 @@ int main(int argc, char **argv)
// If tswrite found '-nopcrs' in the switches, make sure that we've
// switched PCR lookahead off.
if (!context.use_pcrs)
scan_for_PCRs = FALSE;
if (context.pcr_mode == TSWRITE_PCR_MODE_NONE)
pace_mode = TSPLAY_OUTPUT_PACE_FIXED;
else if (pace_mode == TSPLAY_OUTPUT_PACE_PCR1)
context.pcr_mode = TSWRITE_PCR_MODE_PCR1;
if (input_name)
{
@ -771,7 +789,7 @@ int main(int argc, char **argv)
if (is_TS)
{
print_msg("Input appears to be Transport Stream\n");
if (scan_for_PCRs)
if (pace_mode != TSPLAY_OUTPUT_PACE_FIXED)
print_msg("Using 'exact' TS packet timing (by looking-ahead to the next PCR)\n");
else
print_msg("Approximating/predicting intermediate PCRs\n");
@ -821,7 +839,7 @@ int main(int argc, char **argv)
if (is_TS)
{
err = play_TS_stream(input,tswriter,pid_to_ignore,scan_for_PCRs,
err = play_TS_stream(input,tswriter,pace_mode,pid_to_ignore,
override_pcr_pid,max,loop,quiet,verbose);
}
else

Wyświetl plik

@ -34,6 +34,14 @@
// If not being quiet, report progress every TSPLAY_REPORT_EVERY packets read
#define TSPLAY_REPORT_EVERY 10000
typedef enum tsplay_output_pace_mode_e
{
TSPLAY_OUTPUT_PACE_FIXED,
TSPLAY_OUTPUT_PACE_PCR1, // Src buffering timing
TSPLAY_OUTPUT_PACE_PCR2_TS, // write buffer timing - use 1st PCR found for PID
TSPLAY_OUTPUT_PACE_PCR2_PMT // write buffer timing = look up PCR PID in PMT
} tsplay_output_pace_mode;
#endif // tsplay_defns
// Local Variables:

Wyświetl plik

@ -60,8 +60,8 @@
*/
extern int play_TS_stream(int input,
TS_writer_p tswriter,
const tsplay_output_pace_mode pace_mode,
uint32_t pid_to_ignore,
int scan_for_PCRs,
uint32_t override_pcr_pid,
int max,
int loop,

Wyświetl plik

@ -116,8 +116,7 @@ static int read_TS_packet(TS_reader_p tsreader,
}
// Read the next packet
err = read_next_TS_packet(tsreader,data);
if (err)
while ((err = read_next_TS_packet(tsreader,data)) != 0)
{
if (err == EOF)
{
@ -505,8 +504,9 @@ static int play_buffered_TS_packets(TS_reader_p tsreader,
*
* Returns 0 if all went well, 1 if something went wrong.
*/
extern int play_TS_packets(TS_reader_p tsreader,
static int play_TS_packets(TS_reader_p tsreader,
TS_writer_p tswriter,
const tsplay_output_pace_mode pace_mode,
uint32_t pid_to_ignore,
int max,
int loop,
@ -518,37 +518,41 @@ extern int play_TS_packets(TS_reader_p tsreader,
uint32_t count = 0;
int pcrs_used = 0;
int pcrs_ignored = 0;
uint32_t pcr_pid;
uint32_t pcr_pid = ~0U;
uint32_t start_count = 0; // which TS packet to loop from
offset_t start_posn = 0;
// Before we can use PCRs for timing, we need to read a PMT which tells us
// what our video stream is (so we can get our PCRs therefrom).
err = find_PCR_PID(tsreader,tswriter,&pcr_pid,&start_count,max,quiet);
if (err)
if (pace_mode == TSPLAY_OUTPUT_PACE_PCR2_PMT)
{
fprint_err("### Unable to find PCR PID for timing information\n"
" Looked in first %d TS packets\n",max);
return 1;
// Before we can use PCRs for timing, we need to read a PMT which tells us
// what our video stream is (so we can get our PCRs therefrom).
err = find_PCR_PID(tsreader,tswriter,&pcr_pid,&start_count,max,quiet);
if (err)
{
fprint_err("### Unable to find PCR PID for timing information\n"
" Looked in first %d TS packets\n",max);
return 1;
}
// Once we've found that, we're ready to play our data
// If we're looping, remember the location of the first packet of (probable)
// data - there's not much point rewinding before that point
if (loop)
start_posn = start_count * TS_PACKET_SIZE;
}
// Once we've found that, we're ready to play our data
// If we're looping, remember the location of the first packet of (probable)
// data - there's not much point rewinding before that point
if (loop)
start_posn = start_count * TS_PACKET_SIZE;
count = start_count;
for (;;)
{
byte *data;
uint32_t pid;
int got_pcr;
uint64_t pcr;
uint64_t pcr = 0;
err = read_TS_packet(tsreader,&count,&data,&pid,&got_pcr,&pcr,
max,loop,start_posn,start_count,quiet);
if (err == EOF) // shouldn't occur if `loop`
break;
else if (err)
@ -561,15 +565,29 @@ extern int play_TS_packets(TS_reader_p tsreader,
return 1;
}
if (count == start_count + 1)
tswrite_discontinuity(tswriter);
total ++;
// We are only interested in timing information from our PCR PID stream
if (got_pcr)
{
// If 1st PCR we see then remember its pid
if (pcr_pid == ~0U)
{
fprint_msg("PCR PID set to 1st seen: %#x (%d)\n", pid, pid);
pcr_pid = pid;
}
if (pid == pcr_pid)
pcrs_used ++;
else
{
if (pcrs_ignored == 0)
{
fprint_msg("Other PCR PIDs seen: %#x (%d)...\n", pid, pid);
}
pcrs_ignored ++;
got_pcr = FALSE;
}
@ -639,8 +657,8 @@ extern int play_TS_packets(TS_reader_p tsreader,
*/
extern int play_TS_stream(int input,
TS_writer_p tswriter,
const tsplay_output_pace_mode pace_mode,
uint32_t pid_to_ignore,
int scan_for_PCRs,
uint32_t override_pcr_pid,
int max,
int loop,
@ -653,11 +671,13 @@ extern int play_TS_stream(int input,
err = build_TS_reader(input,&tsreader);
if (err) return 1;
if (scan_for_PCRs)
fprint_msg("pace_mode=%d\n", pace_mode);
if (pace_mode == TSPLAY_OUTPUT_PACE_PCR1)
err = play_buffered_TS_packets(tsreader,tswriter,pid_to_ignore,
override_pcr_pid,max,loop,quiet,verbose);
else
err = play_TS_packets(tsreader,tswriter,pid_to_ignore,
err = play_TS_packets(tsreader, tswriter, pace_mode, pid_to_ignore,
max,loop,quiet,verbose);
if (err)
{

519
tswrite.c
Wyświetl plik

@ -57,6 +57,7 @@
#include "misc_fns.h"
#include "printing_fns.h"
#include "tswrite_fns.h"
#include "ts_fns.h"
// ------------------------------------------------------------
// Global flags affecting debugging
@ -131,7 +132,172 @@ static int global_child_wait = DEFAULT_CHILD_WAIT;
#define REPORT_EVERY 10000
// ============================================================
// CIRCULAR BUFFER
// ============================================================
// We default to using a "packet" of 7 transport stream packets because 7*188 =
// 1316, but 8*188 = 1504, and we would like to output as much data as we can
// that is guaranteed to fit into a single ethernet packet, size 1500.
#define DEFAULT_TS_PACKETS_IN_ITEM 7
// For simplicity, we'll have a maximum on that (it allows us to have static
// array sizes in some places). This should be a big enough size to more than
// fill a jumbo packet on a gigabit network.
#define MAX_TS_PACKETS_IN_ITEM 100
// ------------------------------------------------------------
// A circular buffer, usable as a queue
//
// We "waste" one buffer item so that we don't have to maintain a count
// of items in the buffer
//
// To get an understanding of how it works, choose a small BUFFER_SIZE
// (e.g., 11), enable DISPLAY_BUFFER, and select --visual - this will show the
// reading/writing of the circular buffer in action, including the
// "unused item".
//
// The data for the circular buffer
// Each circular buffer item "contains" (up to) N TS packets (where N defaults
// to 7, and is specified as `item_size` in the circular buffer header), and a
// time (in microseconds) when we would like it to be output (relative to the
// time for the first packet "sent").
//
// Said data is stored at the address indicated by the circular buffer
// "header", as `item_data`.
//
struct circular_buffer_item
{
uint32_t time; // when we would like this data output
int discontinuity; // TRUE if our timeline has "broken"
int length; // number of bytes of data in the array
};
typedef struct circular_buffer_item *circular_buffer_item_p;
#define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item)
// ------------------------------------------------------------
// The header for the circular buffer
//
// Note that `start` is only ever written to by the child process, and this is
// the only thing that the child process ever changes in the circular buffer.
//
// `maxnowait` is the maximum number of packets to send to the target host
// without forcing an intermediate wait - required to stop us "swamping" the
// target with too much data, and overrunning its buffers.
struct circular_buffer
{
volatile int start; // start of data "pointer"
volatile int end; // end of completed data "pointer" (you guessed)
volatile int pending; // end of buffered but not ready for xmit
int size; // the actual length of the `item` array
volatile int eos; // end of stream
int TS_in_item; // max number of TS packets in a circular buffer item
int item_size; // and thus the size of said item's data array
int maxnowait; // max number consecutive packets to send with no wait
int waitfor; // the number of microseconds to wait thereafter
// The location of the packet data for the circular buffer items
byte *item_data;
// The "header" data for each circular buffer item
struct circular_buffer_item item[];
};
typedef struct circular_buffer *circular_buffer_p;
// Note that size doesn't include the final `item`
#define SIZEOF_CIRCULAR_BUFFER sizeof(struct circular_buffer)
// For PCR2 pacing we accumulate the initial packets here so it must be big
// enough to cope with a max bitrate stream
// Say 100Mbits is the fastest we are going to care about
// So we need to buffer 2 PCRs which have a max spacing of .1s (by the std)
// and another .1s before that for having just missed a PCR = .2s = 20Mbits
// = 2.5Mbytes. 2048 * 188 * 7 = 2.7M
#define DEFAULT_CIRCULAR_BUFFER_SIZE 2048 // used to be 100
// ============================================================
// BUFFERED OUTPUT
// ============================================================
// Information about each TS packet in our circular buffer item
struct TS_packet_info
{
int index;
uint32_t pid; // do we need the PIDs?
int got_pcr;
uint64_t pcr;
};
typedef struct TS_packet_info *TS_packet_info_p;
#define SIZEOF_TS_PACKET_INFO sizeof(struct TS_packet_info);
// PCR interpolation structure
typedef struct pcr_pace_env_s
{
uint32_t gap_bytes;
uint32_t next_bytes;
int32_t next_offset;
int next_index;
int pcr1_set; // Seen 1st PCR
int gap_set; // Seen 2nd PCR
int pkt1; // 1st pkt dealt with? (!discontinuity)
uint64_t pcr1;
uint64_t pcr_base;
uint32_t prev_gap_bytes;
uint64_t prev_pcr_gap;
uint64_t next_pcr_base;
} pcr_pace_env;
// If we're going to support output via our circular buffer in a manner
// similar to that for output to a file or socket, then we need a structure
// to maintain the relevant information. It seems a bit wasteful to burden
// the circular buffer itself with this, particularly as only the writer
// cares about this data, so it needn't be shared.
struct buffered_TS_output
{
circular_buffer_p buffer;
int which; // Which buffer index we're writing to
int started; // TRUE if we've started writing therein
// For each TS packet in the circular buffer, remember its `count`
// within the input stream, whether it had a PCR, and if so what that
// PCR was. To make it simpler to access these arrays, also keep a fill
// index into them (the alternative would be to always re-zero the
// `got_pcr` values whenever we start a new circular buffer entry,
// which would be a pain...)
int num_packets; // how many TS packets we've got
struct TS_packet_info packet[MAX_TS_PACKETS_IN_ITEM];
// `rate` is the rate (in bytes per second) we would like to output data at
uint32_t rate;
// `pcr_scale` is a multiplier for PCRs - each PCR found gets its value
// multiplied by this
double pcr_scale;
// `use_pcrs` indicates if we should use PCRs in the data to drive our
// timing, rather than use the specified byte rate directly. The `priming`
// values are only relevant if `use_pcrs` is true.
tswrite_pcr_mode pcr_mode;
// 'prime_size' is the amount of space/time to 'prime' the circular buffer
// output timing mechanism with. This is effectively multiples of the
// size of a circular buffer item.
int prime_size;
// Percentage "too fast" speedup for our priming rate
int prime_speedup;
pcr_pace_env pcr_pace;
};
#ifdef _WIN32
// ============================================================
// Windows specific - gettimeofday replacement
@ -217,6 +383,8 @@ static int map_circular_buffer(circular_buffer_p *circular,
(*circular)->start = 1;
(*circular)->end = 0;
(*circular)->pending = 0;
(*circular)->eos = FALSE;
(*circular)->size = circ_buf_size;
(*circular)->TS_in_item = TS_in_packet;
(*circular)->item_size = TS_in_packet * TS_PACKET_SIZE;
@ -254,7 +422,7 @@ static int unmap_circular_buffer(circular_buffer_p circular)
#endif // _WIN32
return 0;
}
/*
* Is the buffer empty?
*/
@ -268,9 +436,16 @@ static inline int circular_buffer_empty(circular_buffer_p circular)
*/
static inline int circular_buffer_full(circular_buffer_p circular)
{
return ((circular->end + 2) % circular->size == circular->start);
return ((circular->pending + 2) % circular->size == circular->start);
}
// Is the buffer full and never going to empty?
static inline int circular_buffer_jammed(circular_buffer_p circular)
{
return ((circular->pending + 1) % circular->size == circular->end);
}
/*
* If the circular buffer is empty, wait until it gains some data.
*
@ -284,7 +459,7 @@ static inline int wait_if_buffer_empty(circular_buffer_p circular)
int err;
#endif // _WIN32
while (circular_buffer_empty(circular))
while (circular_buffer_empty(circular) && !circular->eos)
{
#if DISPLAY_BUFFER
if (global_show_circular && !global_parent_debug) print_msg("<-- wait\n");
@ -311,7 +486,7 @@ static inline int wait_if_buffer_empty(circular_buffer_p circular)
}
}
count = 0;
return 0;
return circular_buffer_empty(circular); // If empty then EOS so return 1
}
/*
@ -327,7 +502,7 @@ static inline int wait_for_buffer_to_fill(circular_buffer_p circular)
int err;
#endif // _WIN32
while (!circular_buffer_full(circular))
while (!circular_buffer_full(circular) && !circular->eos)
{
#if DISPLAY_BUFFER
if (global_show_circular && !global_child_debug)
@ -390,6 +565,13 @@ static inline int wait_if_buffer_full(circular_buffer_p circular)
}
#endif // _WIN32
if (circular_buffer_jammed(circular))
{
print_err("### Circular buffer jammed: No PCRs found\n");
circular->eos = TRUE;
return 1;
}
// If we wait for a *very* long time, maybe our child has crashed
if (count > PARENT_GIVE_UP_AFTER)
{
@ -454,14 +636,14 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer,
int maxnowait,
int waitfor,
int rate,
int use_pcrs,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale)
{
int err, ii;
circular_buffer_p circular;
buffered_TS_output_p new = malloc(SIZEOF_BUFFERED_TS_OUTPUT);
buffered_TS_output_p new = calloc(1, SIZEOF_BUFFERED_TS_OUTPUT);
if (new == NULL)
{
print_err("### Unable to allocate buffered output\n");
@ -478,11 +660,11 @@ static int build_buffered_TS_output(buffered_TS_output_p *writer,
}
new->buffer = circular;
new->started = FALSE;
new->which = (circular->end + 1) % circular->size;
new->which = (circular->pending + 1) % circular->size;
new->num_packets = 0;
new->rate = rate;
new->use_pcrs = use_pcrs;
new->pcr_mode = pcr_mode;
new->prime_size = prime_size;
new->prime_speedup = prime_speedup;
@ -533,7 +715,7 @@ static inline uint64_t pcr_delta_u(const uint64_t a, const uint64_t b)
{
return a < b ? a + PCR_WRAP - b : a - b;
}
// ============================================================
// Timing
// ============================================================
@ -544,7 +726,7 @@ static inline uint64_t pcr_delta_u(const uint64_t a, const uint64_t b)
*
* Returns 0 if all goes well, 1 if something goes wrong.
*/
static int set_buffer_item_time_pcr(buffered_TS_output_p writer)
static int set_buffer_item_time_pcr1(buffered_TS_output_p writer)
{
int ii;
circular_buffer_p circular = writer->buffer;
@ -717,8 +899,173 @@ static int set_buffer_item_time_pcr(buffered_TS_output_p writer)
}
last_timestamp = circular->item[writer->which].time = timestamp;
return 0;
return writer->which;
}
// Set times on all packets between where we were and where we are now
// Sets the time on both the first & last packets
// Returns the index of the last circ buffer entry modified
static int
set_circ_times(const circular_buffer_p circ,
const uint32_t index_start, const uint32_t len_bytes,
const int32_t pcr1_byte_offset, const uint64_t pcr1,
const uint64_t pcr_gap, const uint32_t gap_bytes,
uint64_t * const pNew_pcr_base)
{
int32_t offset = pcr1_byte_offset;
const int32_t end_offset = offset + len_bytes;
int32_t i = index_start;
int idx;
do
{
struct circular_buffer_item * const item = circ->item + i;
int64_t pcr = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes;
item->time = (uint32_t)(pcr / 27); // "time" in us
offset += item->length;
idx = i;
if (++i >= circ->size)
i = 0;
} while (offset <= end_offset);
// Predict PCR at the end of this packet if wanted
if (pNew_pcr_base != NULL)
{
*pNew_pcr_base = (int64_t)pcr1 + (int64_t)offset * (int64_t)pcr_gap / (int64_t)gap_bytes;
}
// fprint_msg("s: %d->%d\n", index_start, idx);
return idx;
}
static void
reset_pcr_time(pcr_pace_env * const ppe, const uint64_t next_pcr_base)
{
// fprint_msg("%s\n", __func__);
memset(ppe, 0, sizeof(*ppe));
ppe->pcr_base = next_pcr_base;
}
static int
finalize_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe)
{
const circular_buffer_p circ = writer->buffer;
int idx = -1;
// fprint_msg("%s\n", __func__);
if (!ppe->gap_set)
{
// Can't do anything - forget any pcr we may have had - but
// leave accumulated bytes to be output in the prologue of any subsequent
// segment
ppe->pcr1_set = FALSE;
}
else
{
if (ppe->next_bytes != 0)
{
idx = set_circ_times(circ, ppe->next_index, ppe->next_bytes - 1, ppe->next_offset, ppe->pcr1 + ppe->pcr_base,
ppe->prev_pcr_gap, ppe->prev_gap_bytes, &ppe->next_pcr_base);
// fprint_msg("%s: idx %d->%d\n", __func__, ppe->next_index, idx);
}
reset_pcr_time(ppe, ppe->next_pcr_base);
}
return idx;
}
static int
discontinuity_pkt_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe)
{
return finalize_pcr_time(writer, ppe);
}
static int
add_pkt_pcr_time(buffered_TS_output_p writer, pcr_pace_env * const ppe)
{
const circular_buffer_p circ = writer->buffer;
const circular_buffer_item_p item = circ->item + writer->which;
const TS_packet_info_p pkt0 = writer->packet + 0;
int idx = -1;
item->discontinuity = FALSE;
retry:
// Mark 1st packet after reset as discontinuity
if (!ppe->pkt1)
{
ppe->pkt1 = TRUE;
ppe->next_index = writer->which;
}
// If we have a pcr then we expect it to be on the 1st pkt in this group
if (!pkt0->got_pcr)
{
ppe->gap_bytes += item->length;
ppe->next_bytes += item->length;
// fprint_msg("%u/%u\n", ppe->gap_bytes, ppe->next_bytes);
}
else
{
const uint64_t pcr1 = ppe->pcr1;
const uint64_t pcr2 = pkt0->pcr;
// fprint_msg("pcr: %lld\n", pcr2);
if (!ppe->pcr1_set)
{
ppe->next_offset = -ppe->next_bytes;
ppe->next_bytes += item->length;
ppe->pcr_base -= pcr2;
// next_index set by discontinuity spotter
ppe->pcr1_set = TRUE;
}
else
{
const uint64_t pcr_gap = pcr_delta_u(pcr2, pcr1);
if (pcr_gap > PCR_MS(2000))
{
// Discontinuity
fprint_msg("%s: Discontinuity[%d]: gap=%lld\n", __func__, writer->which, pcr_gap);
idx = finalize_pcr_time(writer, ppe);
goto retry;
}
idx = set_circ_times(circ, ppe->next_index, ppe->next_bytes, ppe->next_offset, ppe->pcr_base + pcr1, pcr_gap, ppe->gap_bytes, &ppe->next_pcr_base);
// fprint_msg("%s: idx %d->%d (%d)\n", __func__, ppe->next_index, idx, writer->which);
ppe->next_offset = item->length;
ppe->next_bytes = 0;
ppe->next_index = writer->which + 1;
if (ppe->next_index >= circ->size)
ppe->next_index = 0;
// Remember in case we have to predict the next segment from this one
ppe->prev_pcr_gap = pcr_gap;
ppe->prev_gap_bytes = ppe->gap_bytes;
ppe->gap_set = TRUE;
// If non-discontinuity wrap then add wrap value to base time
if (pcr1 > pcr2)
ppe->pcr_base += PCR_WRAP;
}
ppe->pcr1 = pcr2;
ppe->gap_bytes = item->length;
}
return idx;
}
/*
* Set the time indicator for the next circular buffer item, based solely
@ -736,7 +1083,7 @@ static int set_buffer_item_time_plain(buffered_TS_output_p writer)
uint32_t elapsed_time = (uint32_t) (num_bytes * 1000000.0 / writer->rate);
last_time += elapsed_time;
circular->item[writer->which].time = last_time;
return 0;
return writer->which;
}
/*
@ -744,19 +1091,23 @@ static int set_buffer_item_time_plain(buffered_TS_output_p writer)
*
* - `writer` is our buffered output context
*
* Returns 0 if all goes well, 1 if something goes wrong.
* Returns new idx that can be written or -1 if unchanged
*/
static int set_buffer_item_time(buffered_TS_output_p writer)
static int set_buffer_item_time(const buffered_TS_output_p writer, const int finalize)
{
if (writer->use_pcrs)
switch (writer->pcr_mode)
{
return set_buffer_item_time_pcr(writer);
}
else
{
// Allow the user to choose not to look at PCRs, and just do the
// calculation based on the rate they've specified
return set_buffer_item_time_plain(writer);
case TSWRITE_PCR_MODE_PCR2:
return finalize ?
finalize_pcr_time(writer, &writer->pcr_pace) :
add_pkt_pcr_time(writer, &writer->pcr_pace);
case TSWRITE_PCR_MODE_PCR1:
return set_buffer_item_time_pcr1(writer);
case TSWRITE_PCR_MODE_NONE:
default:
// Allow the user to choose not to look at PCRs, and just do the
// calculation based on the rate they've specified
return set_buffer_item_time_plain(writer);
}
}
@ -784,7 +1135,7 @@ static int add_eof_entry(buffered_TS_output_p writer)
}
// Work out where we want to write
data_pos = (circular->end + 1) % circular->size;
data_pos = (circular->pending + 1) % circular->size;
#if DISPLAY_BUFFER
if (global_show_circular)
@ -794,7 +1145,7 @@ static int add_eof_entry(buffered_TS_output_p writer)
// Set the `time` within the item appropriately (it doesn't really
// matter for EOF, since we're not actually going to *write* anything
// out, but it won't hurt to get it right)
set_buffer_item_time(writer);
set_buffer_item_time(writer, TRUE);
// And mark EOF by setting the first byte to something that isn't 0x47,
// and the length to 1.
@ -804,6 +1155,7 @@ static int add_eof_entry(buffered_TS_output_p writer)
#if DISPLAY_BUFFER
if (global_show_circular) print_circular_buffer("eof",circular);
#endif
circular->eos = TRUE;
return 0;
}
@ -817,22 +1169,49 @@ static int add_eof_entry(buffered_TS_output_p writer)
*
* Returns 0 if all went well, 1 if something went wrong.
*/
static void internal_flush_buffered_TS_output(buffered_TS_output_p writer)
static void internal_flush_buffered_TS_output(const buffered_TS_output_p writer)
{
circular_buffer_p circular = writer->buffer;
const circular_buffer_p circular = writer->buffer;
int idx;
if (!writer->started || circular->item[writer->which].length == 0)
{
// Nothing to do
return;
}
// Set the `time` within the item appropriately
set_buffer_item_time(writer);
idx = set_buffer_item_time(writer, FALSE);
if (idx >= 0)
circular->end = idx;
// Make this item available for reading
circular->end = writer->which;
circular->pending = writer->which;
// And then prepare for the next index
writer->which = (circular->end + 1) % circular->size;
writer->which = (circular->pending + 1) % circular->size;
writer->started = FALSE;
writer->num_packets = 0;
writer->packet[0].got_pcr = FALSE; // Careful or paranoid?
}
static void discontinuity_buffered_TS_output(buffered_TS_output_p writer)
{
circular_buffer_p circular = writer->buffer;
int idx;
if (writer->pcr_mode != TSWRITE_PCR_MODE_PCR2)
return;
// Set the `time` within the item appropriately
idx = discontinuity_pkt_pcr_time(writer, &writer->pcr_pace);
if (idx >= 0)
circular->end = idx;
// We need to update the end of the circular buffer but we haven't added
// any packets so no need to update any of that
}
/*
* Write an EOF indicator to the buffered output
@ -847,14 +1226,10 @@ static void internal_flush_buffered_TS_output(buffered_TS_output_p writer)
*/
static int write_EOF_to_buffered_TS_output(buffered_TS_output_p writer)
{
circular_buffer_p circular = writer->buffer;
int which = writer->which;
int length = circular->item[which].length;
int err;
// Make sure anything we were working on beforehand has been output
if (writer->started && length > 0)
internal_flush_buffered_TS_output(writer);
internal_flush_buffered_TS_output(writer);
if (global_parent_debug)
print_msg("--> writing EOF\n");
@ -888,10 +1263,21 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer,
uint64_t pcr)
{
int err;
circular_buffer_p circular = writer->buffer;
int which = writer->which;
byte *data = circular->item_data + which*circular->item_size;
int *length = &(circular->item[which].length);
const circular_buffer_p circular = writer->buffer;
int which;
byte *data;
int *length;
// Force PCRs to start a buffer
if (writer->pcr_mode == TSWRITE_PCR_MODE_PCR2 && got_pcr)
{
// fprint_msg("got_pcr: %lld\n", pcr);
internal_flush_buffered_TS_output(writer);
}
which = writer->which;
data = circular->item_data + which*circular->item_size;
length = &(circular->item[which].length);
// If we haven't yet started writing to the (next) index in the
// circular buffer, we must check that it is not full
@ -906,8 +1292,11 @@ static int write_to_buffered_TS_output(buffered_TS_output_p writer,
writer->started = TRUE;
writer->num_packets = 0;
*length = 0;
// fprint_msg("> ");
}
// fprint_msg("[%d] @ %d\n", writer->which, *length);
// Copy our data into the circular buffer item, and adjust appropriately
memcpy(&(data[*length]),packet,TS_PACKET_SIZE);
(*length) += TS_PACKET_SIZE;
@ -1486,7 +1875,7 @@ static int write_circular_data(SOCKET output,
byte *buffer = circular->item_data + circular->start*circular->item_size;
int length = circular->item[circular->start].length;
#if DISPLAY_BUFFER
int oldend = circular->end;
int oldend = circular->pending;
int oldstart = circular->start;
int newend,newstart;
#endif
@ -1506,7 +1895,7 @@ static int write_circular_data(SOCKET output,
#if DISPLAY_BUFFER
if (global_show_circular)
{
newend = circular->end;
newend = circular->pending;
newstart = circular->start;
if (oldend != newend || oldstart != newstart)
{
@ -1718,6 +2107,8 @@ static int write_from_circular(SOCKET output,
// has told us that the timeline has changed radically
if (reset || circular->item[circular->start].discontinuity)
{
fprint_msg("%s: Discontinuity[%d]: reset=%d, pkt_time=%u\n", __func__, circular->start, reset, this_packet_time);
// We believe out timeline has gone askew - start a new one
// Set up "now" as our base time, and output our packet right away
start = now;
@ -1746,6 +2137,12 @@ static int write_from_circular(SOCKET output,
// So how long *should* we wait for the correct time to write?
if (waitfor > 0)
{
if (waitfor > 200000)
{
fprint_msg("###[%d] (%d) >0.2s, RESET\n", circular->start, waitfor);
reset = TRUE;
waitfor = 200000;
}
if (global_child_debug) print_msg("(waiting");
}
else if (waitfor > -200000) // less than 0.2 seconds gap - "small", so ignore
@ -1766,9 +2163,10 @@ static int write_from_circular(SOCKET output,
// process logs progress in terms of the number of TS packets
// output - (count-1)*7+1 should be the index of the first packet
// in our circular buffer item, which is a decent approximation
fprint_err("!!! Packet %d (item %d): Outputting %.2fs late -"
" restarting time sequence\n",
(count-1)*7+1,count,-(double)waitfor/1000000);
fprint_err("!!! [%d] Packet %d (item %d): Outputting %.2fs late -"
" restarting time sequence: time=%u\n",
circular->start,
(count-1)*7+1,count,-(double)waitfor/1000000, this_packet_time);
if (circular->maxnowait >= 0)
fprint_err(" Maybe consider running with -maxnowait greater"
" than %d\n",circular->maxnowait);
@ -2234,7 +2632,7 @@ extern int tswrite_start_buffering(TS_writer_p tswriter,
int maxnowait,
int waitfor,
int byterate,
int use_pcrs,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale)
@ -2252,7 +2650,7 @@ extern int tswrite_start_buffering(TS_writer_p tswriter,
err = build_buffered_TS_output(&(tswriter->writer),
circ_buf_size,TS_in_packet,
maxnowait,waitfor,byterate,use_pcrs,
maxnowait,waitfor,byterate,pcr_mode,
prime_size,prime_speedup,pcr_scale);
if (err) return 1;
@ -2286,7 +2684,7 @@ extern int tswrite_start_buffering_from_context(TS_writer_p tswriter,
context->maxnowait,
context->waitfor,
context->byterate,
context->use_pcrs,
context->pcr_mode,
context->prime_size,
context->prime_speedup,
context->pcr_scale);
@ -2669,6 +3067,23 @@ extern int tswrite_write(TS_writer_p tswriter,
}
return 0;
}
/*
* Discontinuity on the stream being written (e.g. file looping)
* If we are pacing the output then this resets the timing info
*/
int tswrite_discontinuity(const TS_writer_p tswriter)
{
if (tswriter->writer == NULL)
return 0;
internal_flush_buffered_TS_output(tswriter->writer);
discontinuity_buffered_TS_output(tswriter->writer);
return 0;
}
// ============================================================
// Common option handling - helpers for utility writers
@ -2845,7 +3260,7 @@ extern void tswrite_report_args(TS_context_p context)
context->waitfor);
}
if (context->use_pcrs)
if (context->pcr_mode != TSWRITE_PCR_MODE_NONE)
{
fprint_msg("PCR mechanism 'primed' with time for %d circular buffer items\n",
context->prime_size);
@ -2913,7 +3328,7 @@ extern int tswrite_process_args(char *prefix,
context->waitfor = 1000;
context->byterate = DEFAULT_BYTE_RATE;
context->bitrate = context->byterate * 8;
context->use_pcrs = TRUE;
context->pcr_mode = TSWRITE_PCR_MODE_PCR2;
context->prime_size = DEFAULT_PRIME_SIZE;
context->prime_speedup = 100;
context->pcr_scale = 1.0;
@ -2922,7 +3337,7 @@ extern int tswrite_process_args(char *prefix,
{
if (!strcmp("-nopcrs",argv[ii]))
{
context->use_pcrs = FALSE;
context->pcr_mode = TSWRITE_PCR_MODE_NONE;
argv[ii] = TSWRITE_PROCESSED;
}
else if (!strcmp("-bitrate",argv[ii]))

Wyświetl plik

@ -45,143 +45,9 @@ typedef int SOCKET; // for compatibility with Windows
#endif
// ============================================================
// CIRCULAR BUFFER
// ============================================================
// We default to using a "packet" of 7 transport stream packets because 7*188 =
// 1316, but 8*188 = 1504, and we would like to output as much data as we can
// that is guaranteed to fit into a single ethernet packet, size 1500.
#define DEFAULT_TS_PACKETS_IN_ITEM 7
// For simplicity, we'll have a maximum on that (it allows us to have static
// array sizes in some places). This should be a big enough size to more than
// fill a jumbo packet on a gigabit network.
#define MAX_TS_PACKETS_IN_ITEM 100
// ------------------------------------------------------------
// A circular buffer, usable as a queue
//
// We "waste" one buffer item so that we don't have to maintain a count
// of items in the buffer
//
// To get an understanding of how it works, choose a small BUFFER_SIZE
// (e.g., 11), enable DISPLAY_BUFFER, and select --visual - this will show the
// reading/writing of the circular buffer in action, including the
// "unused item".
//
// The data for the circular buffer
// Each circular buffer item "contains" (up to) N TS packets (where N defaults
// to 7, and is specified as `item_size` in the circular buffer header), and a
// time (in microseconds) when we would like it to be output (relative to the
// time for the first packet "sent").
//
// Said data is stored at the address indicated by the circular buffer
// "header", as `item_data`.
//
struct circular_buffer_item
{
uint32_t time; // when we would like this data output
int discontinuity; // TRUE if our timeline has "broken"
int length; // number of bytes of data in the array
};
typedef struct circular_buffer_item *circular_buffer_item_p;
#define SIZEOF_CIRCULAR_BUFFER_ITEM sizeof(struct circular_buffer_item)
// ------------------------------------------------------------
// The header for the circular buffer
//
// Note that `start` is only ever written to by the child process, and this is
// the only thing that the child process ever changes in the circular buffer.
//
// `maxnowait` is the maximum number of packets to send to the target host
// without forcing an intermediate wait - required to stop us "swamping" the
// target with too much data, and overrunning its buffers.
struct circular_buffer
{
int start; // start of data "pointer"
int end; // end of data "pointer" (you guessed)
int size; // the actual length of the `item` array
int TS_in_item; // max number of TS packets in a circular buffer item
int item_size; // and thus the size of said item's data array
int maxnowait; // max number consecutive packets to send with no wait
int waitfor; // the number of microseconds to wait thereafter
// The location of the packet data for the circular buffer items
byte *item_data;
// The "header" data for each circular buffer item
struct circular_buffer_item item[];
};
typedef struct circular_buffer *circular_buffer_p;
// Note that size doesn't include the final `item`
#define SIZEOF_CIRCULAR_BUFFER sizeof(struct circular_buffer)
#define DEFAULT_CIRCULAR_BUFFER_SIZE 1024 // used to be 100
// ============================================================
// BUFFERED OUTPUT
// ============================================================
// Information about each TS packet in our circular buffer item
struct TS_packet_info
{
int index;
uint32_t pid; // do we need the PIDs?
int got_pcr;
uint64_t pcr;
};
typedef struct TS_packet_info *TS_packet_info_p;
#define SIZEOF_TS_PACKET_INFO sizeof(struct TS_packet_info);
// If we're going to support output via our circular buffer in a manner
// similar to that for output to a file or socket, then we need a structure
// to maintain the relevant information. It seems a bit wasteful to burden
// the circular buffer itself with this, particularly as only the writer
// cares about this data, so it needn't be shared.
struct buffered_TS_output
{
circular_buffer_p buffer;
int which; // Which buffer index we're writing to
int started; // TRUE if we've started writing therein
// For each TS packet in the circular buffer, remember its `count`
// within the input stream, whether it had a PCR, and if so what that
// PCR was. To make it simpler to access these arrays, also keep a fill
// index into them (the alternative would be to always re-zero the
// `got_pcr` values whenever we start a new circular buffer entry,
// which would be a pain...)
int num_packets; // how many TS packets we've got
struct TS_packet_info packet[MAX_TS_PACKETS_IN_ITEM];
// `rate` is the rate (in bytes per second) we would like to output data at
uint32_t rate;
// `pcr_scale` is a multiplier for PCRs - each PCR found gets its value
// multiplied by this
double pcr_scale;
// `use_pcrs` indicates if we should use PCRs in the data to drive our
// timing, rather than use the specified byte rate directly. The `priming`
// values are only relevant if `use_pcrs` is true.
int use_pcrs;
// 'prime_size' is the amount of space/time to 'prime' the circular buffer
// output timing mechanism with. This is effectively multiples of the
// size of a circular buffer item.
int prime_size;
// Percentage "too fast" speedup for our priming rate
int prime_speedup;
};
struct buffered_TS_output;
typedef struct buffered_TS_output *buffered_TS_output_p;
#define SIZEOF_BUFFERED_TS_OUTPUT sizeof(struct buffered_TS_output)
// ============================================================
// EXTERNAL DATASTRUCTURES - these are *intended* for external use
@ -305,6 +171,13 @@ typedef struct TS_writer *TS_writer_p;
// And a "return code" that means "the command character has changed"
#define COMMAND_RETURN_CODE -999
typedef enum tswrite_pcr_mode_e {
TSWRITE_PCR_MODE_NONE,
TSWRITE_PCR_MODE_PCR1,
TSWRITE_PCR_MODE_PCR2
} tswrite_pcr_mode;
// ------------------------------------------------------------
// Context for use in decoding command line - see `tswrite_process_args()`
struct TS_context
@ -316,7 +189,7 @@ struct TS_context
int waitfor; // the number of microseconds to wait thereafter
int bitrate; // suggested bit rate (byterate*8) - both are given
int byterate; // suggested byte rate (bitrate/8) - for convenience
int use_pcrs; // use PCRs for timing information?
tswrite_pcr_mode pcr_mode; // use PCRs for timing information?
int prime_size; // initial priming size for buffered output
int prime_speedup; // percentage of normal speed to prime with
double pcr_scale; // multiplier for PCRs -- see buffered_TS_output

Wyświetl plik

@ -167,13 +167,14 @@ extern int tswrite_wait_for_client(int server_socket,
*
* Returns 0 if all went well, 1 if something went wrong.
*/
extern int tswrite_start_buffering(TS_writer_p tswriter,
int circ_buf_size,
int TS_in_packet,
int maxnowait,
int waitfor,
int byterate,
int use_pcrs,
tswrite_pcr_mode pcr_mode,
int prime_size,
int prime_speedup,
double pcr_scale);
@ -288,6 +289,8 @@ extern int tswrite_write(TS_writer_p tswriter,
int got_pcr,
uint64_t pcr);
extern int tswrite_discontinuity(const TS_writer_p tswriter);
/*
* Write a usage string (to standard output) describing the tuning
* options processed by tswrite_process_args.