Adapt maxInstancesForQueue to only consider instances of the same job.

Currently the maxInstancesForQueue limit checks the count of all jobs in a
given queue. If there are already too many jobs, the new job is discarded.

However this is not the expected behavior for the two jobs where it's used:
GroupCallPeekWorkerJob and AutomaticSessionResetJob
For both the expected behavior is that there aren't too many jobs of them
started, but that there will be at least one instance of them started.
Both of them use the same queue as the PushProcessMessageJob and the MarkerJob.
Those two jobs are often in the queue at the same time, effectively preventing
the GroupCallPeekWorkerJob and AutomaticSessionResetJob from being enqueued.
fork-5.53.8
AsamK 2021-02-06 16:54:52 +01:00 zatwierdzone przez Cody Henthorne
rodzic 53dc5bab43
commit 8f51bdcb78
5 zmienionych plików z 11 dodań i 9 usunięć

Wyświetl plik

@ -407,8 +407,8 @@ public abstract class Job {
/** /**
* Specify the maximum number of instances you'd want of this job at any given time, as * Specify the maximum number of instances you'd want of this job at any given time, as
* determined by the job's queue key. If enqueueing this job would put it over that limit, * determined by the job's factory key and queue key. If enqueueing this job would put it over
* it will be ignored. * that limit, it will be ignored.
* *
* This property is ignored if the job is submitted as part of a {@link JobManager.Chain}, or * This property is ignored if the job is submitted as part of a {@link JobManager.Chain}, or
* if the job has no queue key. * if the job has no queue key.

Wyświetl plik

@ -314,7 +314,7 @@ class JobController {
boolean exceedsQueue = solo.getParameters().getQueue() != null && boolean exceedsQueue = solo.getParameters().getQueue() != null &&
solo.getParameters().getMaxInstancesForQueue() != Job.Parameters.UNLIMITED && solo.getParameters().getMaxInstancesForQueue() != Job.Parameters.UNLIMITED &&
jobStorage.getJobCountForQueue(solo.getParameters().getQueue()) >= solo.getParameters().getMaxInstancesForQueue(); jobStorage.getJobCountForFactoryAndQueue(solo.getFactoryKey(), solo.getParameters().getQueue()) >= solo.getParameters().getMaxInstancesForQueue();
if (exceedsQueue) { if (exceedsQueue) {
return true; return true;

Wyświetl plik

@ -30,7 +30,7 @@ public interface JobStorage {
int getJobCountForFactory(@NonNull String factoryKey); int getJobCountForFactory(@NonNull String factoryKey);
@WorkerThread @WorkerThread
int getJobCountForQueue(@NonNull String queueKey); int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey);
@WorkerThread @WorkerThread
void updateJobRunningState(@NonNull String id, boolean isRunning); void updateJobRunningState(@NonNull String id, boolean isRunning);

Wyświetl plik

@ -167,9 +167,10 @@ public class FastJobStorage implements JobStorage {
} }
@Override @Override
public synchronized int getJobCountForQueue(@NonNull String queueKey) { public synchronized int getJobCountForFactoryAndQueue(@NonNull String factoryKey, @NonNull String queueKey) {
return (int) Stream.of(jobs) return (int) Stream.of(jobs)
.filter(j -> queueKey.equals(j.getQueueKey())) .filter(j -> factoryKey.equals(j.getFactoryKey()) &&
queueKey.equals(j.getQueueKey()))
.count(); .count();
} }

Wyświetl plik

@ -547,13 +547,14 @@ public class FastJobStorageTest {
} }
@Test @Test
public void getJobCountForQueue_general() { public void getJobCountForFactoryAndQueue_general() {
FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS)); FastJobStorage subject = new FastJobStorage(fixedDataDatabase(DataSet1.FULL_SPECS));
subject.init(); subject.init();
assertEquals(1, subject.getJobCountForQueue("q1")); assertEquals(1, subject.getJobCountForFactoryAndQueue("f1", "q1"));
assertEquals(0, subject.getJobCountForQueue("does-not-exist")); assertEquals(0, subject.getJobCountForFactoryAndQueue("f2", "q1"));
assertEquals(0, subject.getJobCountForFactoryAndQueue("f1", "does-not-exist"));
} }
private JobDatabase noopDatabase() { private JobDatabase noopDatabase() {