Fast job sorting.

fork-5.53.8
Alan Evans 2021-01-13 19:35:24 -04:00 zatwierdzone przez Greyson Parrelli
rodzic 2d39e43677
commit ae676d7486
1 zmienionych plików z 18 dodań i 2 usunięć

Wyświetl plik

@ -105,9 +105,25 @@ public class FastJobStorage implements JobStorage {
return Collections.emptyList();
} else {
return Stream.of(jobs)
.groupBy(jobSpec -> {
String queueKey = jobSpec.getQueueKey();
if (queueKey != null) {
return queueKey;
} else {
return jobSpec.getId();
}
})
.map(byQueueKey ->
Stream.of(byQueueKey.getValue()).sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
.findFirst()
.orElse(null)
)
.withoutNulls()
.filter(j -> {
List<DependencySpec> dependencies = dependenciesByJobId.get(j.getId());
return dependencies == null || dependencies.isEmpty();
})
.filterNot(JobSpec::isRunning)
.filter(this::firstInQueue)
.filter(j -> !dependenciesByJobId.containsKey(j.getId()) || dependenciesByJobId.get(j.getId()).isEmpty())
.filter(j -> j.getNextRunAttemptTime() <= currentTime)
.sorted((j1, j2) -> Long.compare(j1.getCreateTime(), j2.getCreateTime()))
.toList();