leandvb: Support multiple LDPC helpers.

work
pabr 2019-02-18 14:28:07 +01:00
rodzic c7d3beefac
commit 9c8a94dd51
2 zmienionych plików z 130 dodań i 85 usunięć

Wyświetl plik

@ -74,6 +74,7 @@ struct config {
bool hard_metric;
int ldpc_bf;
const char *ldpc_helper;
int nhelpers;
bool resample;
float resample_rej; // Approx. filter rejection in dB
enum { SAMP_NEAREST, SAMP_LINEAR, SAMP_RRC } sampler;
@ -123,6 +124,7 @@ struct config {
hard_metric(false),
ldpc_bf(0),
ldpc_helper(NULL),
nhelpers(1),
resample(false),
resample_rej(10),
sampler(config::SAMP_LINEAR),
@ -241,7 +243,7 @@ struct runtime_common {
}
// Min buffer size for TS packets: Up to 39 per BBFRAME
unsigned long BUF_S2PACKETS = 39 * cfg.buf_factor;
unsigned long BUF_S2PACKETS = (fec_info::KBCH_MAX/188/8+1) * cfg.buf_factor;
// Min buffer size for misc measurements: 1
unsigned long BUF_SLOW = cfg.buf_factor;
@ -479,7 +481,7 @@ struct runtime_common {
p_cstln_pls = new pipebuf<cf32>(sch, "PLS cstln", BUF_BASEBAND);
p_framelock = new pipebuf<int>(sch, "frame lock", BUF_SLOW);
p_vber = new pipebuf<float>(sch, "VBER", BUF_SLOW);
p_lock = new pipebuf<int>(sch, "lock", BUF_SLOW);
p_lock = new pipebuf<int>(sch, "lock", BUF_SLOW*2);
p_locktime = new pipebuf<unsigned long>(sch, "locktime", BUF_S2PACKETS);
rrc_steps = cfg.rrc_steps;
@ -685,9 +687,11 @@ int run_dvbs2(config &cfg) {
new pipebuf< fecframe<llr_sb> >(run.sch, "FEC frames", BUF_FRAMES);
new s2_deinterleaver<llr_ss,llr_sb>(run.sch, p_slots, *p_fecframes);
// Decode FEC-protected frames into plain BB frames.
new s2_fecdec_helper<llr_t,llr_sb>(run.sch, *p_fecframes, p_bbframes,
cfg.ldpc_helper,
run.p_vbitcount, run.p_verrcount);
s2_fecdec_helper<llr_t,llr_sb> *r_fecdec =
new s2_fecdec_helper<llr_t,llr_sb>(run.sch, *p_fecframes, p_bbframes,
cfg.ldpc_helper,
run.p_vbitcount, run.p_verrcount);
r_fecdec->nhelpers = cfg.nhelpers;
}
// Deframe BB frames to TS packets
@ -1334,6 +1338,7 @@ void usage(const char *name, FILE *f, int c, const char *info=NULL) {
" --ldpc-bf INT Max number of LDPC bitflips (default: 0)\n"
" --ldpc-helper CMD Spawn external LDPC decoder:\n"
" 'CMD --standard DVB-S2 --modcod N [--shortframes]'\n"
" --nhelpers INT Number of decoder processes (default:1)\n"
);
fprintf
(f,
@ -1440,6 +1445,8 @@ int main(int argc, const char *argv[]) {
cfg.ldpc_bf = atoi(argv[++i]);
else if ( ! strcmp(argv[i], "--ldpc-helper") && i+1<argc )
cfg.ldpc_helper = argv[++i];
else if ( ! strcmp(argv[i], "--nhelpers") && i+1<argc )
cfg.nhelpers = atoi(argv[++i]);
else if ( ! strcmp(argv[i], "--hard-metric") )
cfg.hard_metric = true;
else if ( ! strcmp(argv[i], "--filter") ) {

Wyświetl plik

@ -1825,18 +1825,15 @@ namespace leansdr {
// External LDPC decoder
// Spawns a user-specified command, FEC frames on stdin/stdout.
struct helper_job {
s2_pls pls;
int fd_rx;
};
template<typename T, int _SIZE>
struct simplequeue {
static const int SIZE = _SIZE;
T *put() { T *res=&q[wr]; wr=(wr+1)%SIZE; ++count; return res; }
const T *get() { const T *res=&q[rd]; rd=(rd+1)%SIZE; --count; return res; }
simplequeue() { rd = wr = count = 0; }
bool full() { return count == SIZE; }
T *put() { T *res=&q[wr]; wr=(wr+1)%SIZE; ++count; return res; }
bool empty() { return count == 0; }
const T *peek() { return &q[rd]; }
const T *get() { const T *res=&q[rd]; rd=(rd+1)%SIZE; --count; return res; }
// private:
int rd, wr, count;
T q[SIZE];
@ -1844,6 +1841,8 @@ namespace leansdr {
template<typename SOFTBIT, typename SOFTBYTE>
struct s2_fecdec_helper : runnable {
int batch_size;
int nhelpers;
s2_fecdec_helper(scheduler *sch,
pipebuf< fecframe<SOFTBYTE> > &_in,
pipebuf<bbframe> &_out,
@ -1851,23 +1850,31 @@ namespace leansdr {
pipebuf<int> *_bitcount=NULL,
pipebuf<int> *_errcount=NULL)
: runnable(sch, "S2 fecdec io"),
batch_size(32),
nhelpers(1),
in(_in), out(_out),
command(_command),
bitcount(opt_writer(_bitcount,1)),
errcount(opt_writer(_errcount,1))
{
for ( int mc=0; mc<32; ++mc )
for ( int sf=0; sf<2; ++sf )
pools[mc][sf].procs = NULL;
}
void run() {
bool work_done = false;
// Send work until we can't write() to the helper without blocking.
// Send work until all helpers block.
bool all_blocked = false;
while ( in.readable() >= 1 && !jobs.full() ) {
if ( ! send_frame(in.rd()) ) break;
if ( ! send_frame(in.rd()) ) { all_blocked=true; break; }
in.read(1);
work_done = true;
}
// Risk blocking on read() only if we have nothing else to do.
//fprintf(stderr, "WD=%d jc=%d batch=%d/%d wrable=%d\n", work_done, jobs.count, the_helper.b_in, the_helper.b_out, out.writable());
while ( !work_done && the_helper.b_out &&
// Risk blocking on read() only when we have nothing else to do
// and we know a result is coming.
while ( (all_blocked || !work_done || jobs.full()) &&
!jobs.empty() &&
jobs.peek()->h->b_out &&
out.writable()>=1 &&
opt_writable(bitcount,1) && opt_writable(errcount,1) ) {
receive_frame(jobs.get());
@ -1875,96 +1882,127 @@ namespace leansdr {
}
private:
struct helper_instance {
s2_pls pls; // PLS implemented by this helper processss
int fd_tx; // To helper
int fd_rx; // From helper
int batch_size; // Latency
int b_in, b_out;
helper_instance() { pls.modcod=-1; }
int b_in; // Jobs in input queue
int b_out; // Jobs in output queue
};
helper_instance the_helper; // Currently support only one helper
simplequeue<helper_job,256> jobs;
struct pool {
helper_instance *procs; // NULL or [nprocs]
int nprocs;
} pools[32][2]; // [modcod][sf]
struct helper_job {
s2_pls pls;
helper_instance *h;
};
simplequeue<helper_job,1024> jobs;
// Try to send a frame. Return false if helper was busy.
bool send_frame(fecframe<SOFTBYTE> *pin) {
helper_instance *h = get_helper(&pin->pls);
size_t iosize = (h->pls.framebits()/8) * sizeof(SOFTBYTE);
// fprintf(stderr, "Writing %lu to fd %d\n", iosize, h->fd_tx);
int nw = write(h->fd_tx, pin->bytes, iosize);
if ( nw < 0 ) {
if ( errno == EWOULDBLOCK ) return false;
fatal("write(LDPC helper");
}
if ( nw != iosize ) fatal("partial write(LDPC helper)");
helper_job *job = jobs.put();
job->pls = h->pls;
job->fd_rx = h->fd_rx;
++h->b_in;
if ( h->b_in >= h->batch_size ) {
h->b_in -= h->batch_size;
h->b_out += h->batch_size;
}
return true;
}
// Find or create a helper process for a given modcod.
helper_instance *get_helper(s2_pls *pls) {
helper_instance *h = &the_helper;
if ( memcmp(pls, &h->pls, sizeof(*pls)) ) {
if ( sch->debug ) fprintf(stderr, "Spawning new helper\n");
int tx[2], rx[2];
if ( pipe(tx) || pipe(rx) ) fatal("pipe");
fcntl(tx[1], F_SETPIPE_SZ, 512*1024);
fcntl(rx[1], F_SETPIPE_SZ, 512*1024);
int child = vfork();
if ( ! child ) {
// Child process
close(tx[1]); dup2(tx[0], 0);
close(rx[0]); dup2(rx[1], 1);
char modcod_string[16];
sprintf(modcod_string, "%d", pls->modcod);
const char *argv[] = { command, "--modcod", modcod_string,
NULL/*placeholder*/, NULL };
if ( pls->sf ) argv[3] = "--shortframes";
execve(command, (char *const*)argv, NULL);
fatal(command);
pool *p = get_pool(&pin->pls);
for ( int i=0; i<p->nprocs; ++i ) {
helper_instance *h = &p->procs[i];
size_t iosize = (pin->pls.framebits()/8) * sizeof(SOFTBYTE);
// fprintf(stderr, "Writing %lu to fd %d\n", iosize, h->fd_tx);
int nw = write(h->fd_tx, pin->bytes, iosize);
if ( nw<0 && errno==EWOULDBLOCK ) continue;
if ( nw < 0 ) fatal("write(LDPC helper");
if ( nw != iosize ) fatal("partial write(LDPC helper)");
helper_job *job = jobs.put();
job->pls = pin->pls;
job->h = h;
++h->b_in;
if ( h->b_in >= h->batch_size ) {
h->b_in -= h->batch_size;
h->b_out += h->batch_size;
}
h->pls = *pls;
h->fd_tx = tx[1]; close(tx[0]);
h->fd_rx = rx[0]; close(rx[1]);
h->batch_size = 32; // TBD
h->b_in = h->b_out = 0;
int flags = fcntl(h->fd_tx, F_GETFL);
if ( fcntl(h->fd_tx, F_SETFL, flags|O_NONBLOCK) ) fatal("fcntl(helper)");
return true;
}
return h;
return false;
}
// Receive a corrected frame.
void receive_frame(const helper_job *pin) {
// Return a pool of running helpers for a given modcod.
pool *get_pool(const s2_pls *pls) {
pool *p = &pools[pls->modcod][pls->sf];
if ( ! p->procs ) {
p->procs = new helper_instance[nhelpers];
for ( int i=0; i<nhelpers; ++i ) spawn_helper(&p->procs[i], pls);
p->nprocs = nhelpers;
}
return p;
}
// Spawn a helper process.
void spawn_helper(helper_instance *h, const s2_pls *pls) {
if ( sch->debug )
fprintf(stderr, "Spawning LDPC helper: modcod=%d sf=%d\n",
pls->modcod, pls->sf);
int tx[2], rx[2];
if ( pipe(tx) || pipe(rx) ) fatal("pipe");
int pipesize = 64800 * batch_size;
if ( fcntl(tx[0], F_SETPIPE_SZ, pipesize) < 0 ||
fcntl(rx[0], F_SETPIPE_SZ, pipesize) < 0 ||
fcntl(tx[1], F_SETPIPE_SZ, pipesize) < 0 ||
fcntl(rx[1], F_SETPIPE_SZ, pipesize) < 0 ) {
fprintf(stderr,
"***\n"
"*** Failed to increase pipe size.\n"
"*** Try echo %d > /proc/sys/fs/pipe-max-size\n"
"***\n", pipesize);
fprintf(stderr, "\n");
}
int child = vfork();
if ( ! child ) {
// Child process
close(tx[1]); dup2(tx[0], 0);
close(rx[0]); dup2(rx[1], 1);
char mc_arg[16];
sprintf(mc_arg, "%d", pls->modcod);
const char *sf_arg = pls->sf ? "--shortframes" : NULL;
const char *argv[] = { command, "--modcod", mc_arg, sf_arg, NULL };
execve(command, (char *const*)argv, NULL);
fatal(command);
}
h->fd_tx = tx[1]; close(tx[0]);
h->fd_rx = rx[0]; close(rx[1]);
h->batch_size = 32; // TBD
h->b_in = h->b_out = 0;
int flags = fcntl(h->fd_tx, F_GETFL);
if ( fcntl(h->fd_tx, F_SETFL, flags|O_NONBLOCK) ) fatal("fcntl(helper)");
}
// Receive a finished job.
void receive_frame(const helper_job *job) {
// Read corrected frame from helper
const s2_pls *pls = &pin->pls;
const s2_pls *pls = &job->pls;
size_t iosize = (pls->framebits()/8) * sizeof(ldpc_buf[0]);
// fprintf(stderr, "Reading %lu from fd %d\n", iosize, pin->fd_rx);
int nr = read(pin->fd_rx, ldpc_buf, iosize);
int nr = read(job->h->fd_rx, ldpc_buf, iosize);
if ( nr < 0 ) fatal("read(LDPC helper)");
if ( nr != iosize ) fatal("partial read(LDPC helper)");
--the_helper.b_out; // TBD Support several helpers.
--job->h->b_out;
// Decode BCH.
const modcod_info *mcinfo = check_modcod(pin->pls.modcod);
const fec_info *fi = &fec_infos[pin->pls.sf][mcinfo->rate];
const modcod_info *mcinfo = check_modcod(job->pls.modcod);
const fec_info *fi = &fec_infos[job->pls.sf][mcinfo->rate];
uint8_t *hardbytes = softbytes_harden(ldpc_buf, fi->kldpc/8, bch_buf);
size_t cwbytes = fi->kldpc / 8;
size_t msgbytes = fi->Kbch / 8;
size_t chkbytes = cwbytes - msgbytes;
bch_interface *bch = s2bch.bchs[pin->pls.sf][mcinfo->rate];
bch_interface *bch = s2bch.bchs[job->pls.sf][mcinfo->rate];
int ncorr = bch->decode(hardbytes, cwbytes);
fprintf(stderr, "BCHCORR = %d\n", ncorr);
bool corrupted = (ncorr < 0);
// Report VBER
opt_write(bitcount, fi->Kbch);
opt_write(errcount, (ncorr>=0) ? ncorr : fi->Kbch);
#if 0
// TBD Some decoders want the bad packets.
if ( corrupted ) {
fprintf(stderr, "Passing bad frame\n");
corrupted = false;
}
#endif
if ( ! corrupted ) {
// Descramble and output
bbframe *pout = out.wr();
pout->pls = pin->pls;
pout->pls = job->pls;
bbscrambling.transform(hardbytes, fi->Kbch/8, pout->bytes);
out.written(1);
}
@ -2065,14 +2103,14 @@ namespace leansdr {
missing(-1),
in(_in), out(_out,MAX_TS_PER_BBFRAME),
current_state(false),
state_out(opt_writer(_state_out)),
state_out(opt_writer(_state_out,2)),
report_state(true),
locktime(0),
locktime_out(opt_writer(_locktime_out,MAX_TS_PER_BBFRAME))
{ }
void run() {
while ( in.readable()>=1 && out.writable()>=MAX_TS_PER_BBFRAME &&
opt_writable(state_out,1) &&
opt_writable(state_out,2) &&
opt_writable(locktime_out,MAX_TS_PER_BBFRAME) ) {
if ( report_state ) {
// Report unlocked state on first invocation.
@ -2101,8 +2139,8 @@ namespace leansdr {
crc, crcexp, (crc==crcexp)?"OK":"KO",
bbh[0], bbh[1], ro_values[ro_code], upl, dfl, sync, syncd);
}
if ( crc!=crcexp || upl!=188*8 || sync!=0x47 || dfl>58112 || syncd>dfl ||
(dfl&7) || (syncd&7) ) {
if ( crc!=crcexp || upl!=188*8 || sync!=0x47 || dfl>fec_info::KBCH_MAX ||
syncd>dfl || (dfl&7) || (syncd&7) ) {
// Note: Maybe accept syncd=65535
fprintf(stderr, "Bad bbframe\n");
missing = -1;
@ -2175,7 +2213,7 @@ namespace leansdr {
// or 0 if no leftover data,
// or -1 if not synced.
uint8_t leftover[188];
static const int MAX_TS_PER_BBFRAME = 58112/8/188 + 1;
static const int MAX_TS_PER_BBFRAME = fec_info::KBCH_MAX/8/188 + 1;
bool locked;
pipereader<bbframe> in;
pipewriter<tspacket> out;