make test more stable

pull/1/head
Mike Barry 2021-08-05 21:18:55 -04:00
rodzic a7907b3d47
commit 08b39b72fa
1 zmienionych plików z 19 dodań i 11 usunięć

Wyświetl plik

@ -12,26 +12,34 @@ public class ProgressLoggersTest {
@Test
@Timeout(10)
public void testLogWorkerPipeline() {
var latch = new CountDownLatch(1);
var pipeline = WorkerPipeline.start("topo", Stats.inMemory())
.fromGenerator("reader", next -> latch.await())
public void testLogWorkerPipeline() throws InterruptedException {
var continueLatch = new CountDownLatch(1);
var readyLatch = new CountDownLatch(5);
var pipeline = WorkerPipeline.start("pipeline", Stats.inMemory())
.fromGenerator("reader", next -> {
readyLatch.countDown();
continueLatch.await();
})
.addBuffer("reader_queue", 10)
.addWorker("worker", 2, (a, b) -> latch.await())
.addWorker("worker", 2, (a, b) -> {
readyLatch.countDown();
continueLatch.await();
})
.addBuffer("writer_queue", 10)
.sinkTo("writer", 2, a -> latch.await());
.sinkTo("writer", 2, a -> {
readyLatch.countDown();
continueLatch.await();
});
var loggers = new ProgressLoggers("prefix")
.addPipelineStats(pipeline);
String log;
while ((log = loggers.getLog()).split("%").length < 6) {
// spin waiting for threads to start
}
readyLatch.await();
String log = loggers.getLog();
assertEquals("[prefix]\n reader( 0%) -> (0/13) -> worker( 0% 0%) -> (0/14) -> writer( 0% 0%)",
log.replaceAll("[ 0-9][0-9]%", " 0%"));
latch.countDown();
continueLatch.countDown();
pipeline.awaitAndLog(loggers, Duration.ofSeconds(10));
loggers.getLog();
assertEquals("[prefix]\n reader( -%) -> (0/13) -> worker( -% -%) -> (0/14) -> writer( -% -%)",