Added JobTracker.

fork-5.53.8
Greyson Parrelli 2019-10-15 16:46:29 -04:00
rodzic 5b1d91016c
commit ccb18cd46c
4 zmienionych plików z 197 dodań i 26 usunięć

Wyświetl plik

@ -10,6 +10,7 @@ import org.thoughtcrime.securesms.logging.Log;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
@ -30,7 +31,6 @@ public abstract class Job {
private final Parameters parameters;
private String id;
private int runAttempt;
private long nextRunAttemptTime;
@ -41,7 +41,7 @@ public abstract class Job {
}
public final String getId() {
return id;
return parameters.getId();
}
public final @NonNull Parameters getParameters() {
@ -64,11 +64,6 @@ public abstract class Job {
this.context = context;
}
/** Should only be invoked by {@link JobController} */
final void setId(@NonNull String id) {
this.id = id;
}
/** Should only be invoked by {@link JobController} */
final void setRunAttempt(int runAttempt) {
this.runAttempt = runAttempt;
@ -203,6 +198,7 @@ public abstract class Job {
public static final int IMMORTAL = -1;
public static final int UNLIMITED = -1;
private final String id;
private final long createTime;
private final long lifespan;
private final int maxAttempts;
@ -211,7 +207,8 @@ public abstract class Job {
private final String queue;
private final List<String> constraintKeys;
private Parameters(long createTime,
private Parameters(@NonNull String id,
long createTime,
long lifespan,
int maxAttempts,
long maxBackoff,
@ -219,6 +216,7 @@ public abstract class Job {
@Nullable String queue,
@NonNull List<String> constraintKeys)
{
this.id = id;
this.createTime = createTime;
this.lifespan = lifespan;
this.maxAttempts = maxAttempts;
@ -228,41 +226,46 @@ public abstract class Job {
this.constraintKeys = constraintKeys;
}
public long getCreateTime() {
@NonNull String getId() {
return id;
}
long getCreateTime() {
return createTime;
}
public long getLifespan() {
long getLifespan() {
return lifespan;
}
public int getMaxAttempts() {
int getMaxAttempts() {
return maxAttempts;
}
public long getMaxBackoff() {
long getMaxBackoff() {
return maxBackoff;
}
public int getMaxInstances() {
int getMaxInstances() {
return maxInstances;
}
public @Nullable String getQueue() {
@Nullable String getQueue() {
return queue;
}
public List<String> getConstraintKeys() {
List<String> getConstraintKeys() {
return constraintKeys;
}
public Builder toBuilder() {
return new Builder(createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys);
return new Builder(id, createTime, maxBackoff, lifespan, maxAttempts, maxInstances, queue, constraintKeys);
}
public static final class Builder {
private String id;
private long createTime;
private long maxBackoff;
private long lifespan;
@ -272,10 +275,15 @@ public abstract class Job {
private List<String> constraintKeys;
public Builder() {
this(System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>());
this(UUID.randomUUID().toString());
}
private Builder(long createTime,
Builder(@NonNull String id) {
this(id, System.currentTimeMillis(), TimeUnit.SECONDS.toMillis(30), IMMORTAL, 1, UNLIMITED, null, new LinkedList<>());
}
private Builder(@NonNull String id,
long createTime,
long maxBackoff,
long lifespan,
int maxAttempts,
@ -283,6 +291,7 @@ public abstract class Job {
@Nullable String queue,
@NonNull List<String> constraintKeys)
{
this.id = id;
this.createTime = createTime;
this.maxBackoff = maxBackoff;
this.lifespan = lifespan;
@ -368,7 +377,7 @@ public abstract class Job {
}
public @NonNull Parameters build() {
return new Parameters(createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
return new Parameters(id, createTime, lifespan, maxAttempts, maxBackoff, maxInstances, queue, constraintKeys);
}
}
}

Wyświetl plik

@ -36,6 +36,7 @@ class JobController {
private final JobInstantiator jobInstantiator;
private final ConstraintInstantiator constraintInstantiator;
private final Data.Serializer dataSerializer;
private final JobTracker jobTracker;
private final Scheduler scheduler;
private final Debouncer debouncer;
private final Callback callback;
@ -46,6 +47,7 @@ class JobController {
@NonNull JobInstantiator jobInstantiator,
@NonNull ConstraintInstantiator constraintInstantiator,
@NonNull Data.Serializer dataSerializer,
@NonNull JobTracker jobTracker,
@NonNull Scheduler scheduler,
@NonNull Debouncer debouncer,
@NonNull Callback callback)
@ -55,6 +57,7 @@ class JobController {
this.jobInstantiator = jobInstantiator;
this.constraintInstantiator = constraintInstantiator;
this.dataSerializer = dataSerializer;
this.jobTracker = jobTracker;
this.scheduler = scheduler;
this.debouncer = debouncer;
this.callback = callback;
@ -82,6 +85,7 @@ class JobController {
if (chainExceedsMaximumInstances(chain)) {
Job solo = chain.get(0).get(0);
jobTracker.onStateChange(solo.getId(), JobTracker.JobState.IGNORED);
Log.w(TAG, JobLogger.format(solo, "Already at the max instance count of " + solo.getParameters().getMaxInstances() + ". Skipping."));
return;
}
@ -98,6 +102,7 @@ class JobController {
long nextRunAttemptTime = calculateNextRunAttemptTime(System.currentTimeMillis(), nextRunAttempt, job.getParameters().getMaxBackoff());
jobStorage.updateJobAfterRetry(job.getId(), false, nextRunAttempt, nextRunAttemptTime);
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING);
List<Constraint> constraints = Stream.of(jobStorage.getConstraintSpecs(job.getId()))
.map(ConstraintSpec::getFactoryKey)
@ -120,6 +125,7 @@ class JobController {
@WorkerThread
synchronized void onSuccess(@NonNull Job job) {
jobStorage.deleteJob(job.getId());
jobTracker.onStateChange(job.getId(), JobTracker.JobState.SUCCESS);
notifyAll();
}
@ -143,6 +149,7 @@ class JobController {
all.addAll(dependents);
jobStorage.deleteJobs(Stream.of(all).map(Job::getId).toList());
Stream.of(all).forEach(j -> jobTracker.onStateChange(j.getId(), JobTracker.JobState.FAILURE));
return dependents;
}
@ -170,6 +177,7 @@ class JobController {
jobStorage.updateJobRunningState(job.getId(), true);
runningJobs.add(job.getId());
jobTracker.onStateChange(job.getId(), JobTracker.JobState.RUNNING);
return job;
} catch (InterruptedException e) {
@ -253,9 +261,6 @@ class JobController {
@WorkerThread
private @NonNull FullSpec buildFullSpec(@NonNull Job job, @NonNull List<Job> dependsOn) {
String id = UUID.randomUUID().toString();
job.setId(id);
job.setRunAttempt(0);
JobSpec jobSpec = new JobSpec(job.getId(),
@ -319,7 +324,6 @@ class JobController {
Data data = dataSerializer.deserialize(jobSpec.getSerializedData());
Job job = jobInstantiator.instantiate(jobSpec.getFactoryKey(), parameters, data);
job.setId(jobSpec.getId());
job.setRunAttempt(jobSpec.getRunAttempt());
job.setNextRunAttemptTime(jobSpec.getNextRunAttemptTime());
job.setContext(application);
@ -328,7 +332,7 @@ class JobController {
}
private @NonNull Job.Parameters buildJobParameters(@NonNull JobSpec jobSpec, @NonNull List<ConstraintSpec> constraintSpecs) {
return new Job.Parameters.Builder()
return new Job.Parameters.Builder(jobSpec.getId())
.setCreateTime(jobSpec.getCreateTime())
.setLifespan(jobSpec.getLifespan())
.setMaxAttempts(jobSpec.getMaxAttempts())

Wyświetl plik

@ -5,6 +5,7 @@ import android.content.Intent;
import android.os.Build;
import androidx.annotation.NonNull;
import androidx.annotation.WorkerThread;
import androidx.lifecycle.LiveData;
import org.thoughtcrime.securesms.jobmanager.impl.DefaultExecutorFactory;
import org.thoughtcrime.securesms.jobmanager.impl.JsonDataSerializer;
@ -39,6 +40,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private final Configuration configuration;
private final ExecutorService executor;
private final JobController jobController;
private final JobTracker jobTracker;
private final Set<EmptyQueueListener> emptyQueueListeners = new CopyOnWriteArraySet<>();
@ -46,11 +48,13 @@ public class JobManager implements ConstraintObserver.Notifier {
this.application = application;
this.configuration = configuration;
this.executor = configuration.getExecutorFactory().newSingleThreadExecutor("signal-JobManager");
this.jobTracker = configuration.getJobTracker();
this.jobController = new JobController(application,
configuration.getJobStorage(),
configuration.getJobInstantiator(),
configuration.getConstraintFactories(),
configuration.getDataSerializer(),
configuration.getJobTracker(),
Build.VERSION.SDK_INT < 26 ? new AlarmManagerScheduler(application)
: new CompositeScheduler(new InAppScheduler(this), new JobSchedulerScheduler(application)),
new Debouncer(500),
@ -92,6 +96,23 @@ public class JobManager implements ConstraintObserver.Notifier {
});
}
/**
* Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary
* background thread. You must eventually call {@link #removeListener(JobTracker.JobListener)} to avoid
* memory leaks.
*/
public void addListener(@NonNull String id, @NonNull JobTracker.JobListener listener) {
jobTracker.addListener(id, listener);
}
/**
* Unsubscribe the provided listener from all job updates.
*/
public void removeListener(@NonNull JobTracker.JobListener listener) {
jobTracker.removeListener(listener);
}
/**
* Enqueues a single job to be run.
*/
@ -160,6 +181,12 @@ public class JobManager implements ConstraintObserver.Notifier {
}
private void enqueueChain(@NonNull Chain chain) {
for (List<Job> jobList : chain.getJobListChain()) {
for (Job job : jobList) {
jobTracker.onStateChange(job.getId(), JobTracker.JobState.PENDING);
}
}
executor.execute(() -> {
jobController.submitNewJobChain(chain.getJobListChain());
wakeUp();
@ -225,6 +252,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private final Data.Serializer dataSerializer;
private final JobStorage jobStorage;
private final JobMigrator jobMigrator;
private final JobTracker jobTracker;
private Configuration(int jobThreadCount,
@NonNull ExecutorFactory executorFactory,
@ -233,7 +261,8 @@ public class JobManager implements ConstraintObserver.Notifier {
@NonNull List<ConstraintObserver> constraintObservers,
@NonNull Data.Serializer dataSerializer,
@NonNull JobStorage jobStorage,
@NonNull JobMigrator jobMigrator)
@NonNull JobMigrator jobMigrator,
@NonNull JobTracker jobTracker)
{
this.executorFactory = executorFactory;
this.jobThreadCount = jobThreadCount;
@ -243,6 +272,7 @@ public class JobManager implements ConstraintObserver.Notifier {
this.dataSerializer = dataSerializer;
this.jobStorage = jobStorage;
this.jobMigrator = jobMigrator;
this.jobTracker = jobTracker;
}
int getJobThreadCount() {
@ -278,6 +308,10 @@ public class JobManager implements ConstraintObserver.Notifier {
return jobMigrator;
}
@NonNull JobTracker getJobTracker() {
return jobTracker;
}
public static class Builder {
private ExecutorFactory executorFactory = new DefaultExecutorFactory();
@ -288,6 +322,7 @@ public class JobManager implements ConstraintObserver.Notifier {
private Data.Serializer dataSerializer = new JsonDataSerializer();
private JobStorage jobStorage = null;
private JobMigrator jobMigrator = null;
private JobTracker jobTracker = new JobTracker();
public @NonNull Builder setJobThreadCount(int jobThreadCount) {
this.jobThreadCount = jobThreadCount;
@ -337,7 +372,8 @@ public class JobManager implements ConstraintObserver.Notifier {
new ArrayList<>(constraintObservers),
dataSerializer,
jobStorage,
jobMigrator);
jobMigrator,
jobTracker);
}
}
}

Wyświetl plik

@ -0,0 +1,122 @@
package org.thoughtcrime.securesms.jobmanager;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
import org.thoughtcrime.securesms.util.LRUCache;
import org.thoughtcrime.securesms.util.concurrent.SignalExecutors;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
/**
* Tracks the state of {@link Job}s and allows callers to listen to changes.
*/
public class JobTracker {
private final Map<String, TrackingState> trackingStates;
private final Executor listenerExecutor;
JobTracker() {
this.trackingStates = new LRUCache<>(1000);
this.listenerExecutor = SignalExecutors.BOUNDED;
}
/**
* Add a listener to subscribe to job state updates. Listeners will be invoked on an arbitrary
* background thread. You must eventually call {@link #removeListener(JobListener)} to avoid
* memory leaks.
*/
synchronized void addListener(@NonNull String id, @NonNull JobListener jobListener) {
TrackingState state = getOrCreateTrackingState(id);
JobState currentJobState = state.getJobState();
state.addListener(jobListener);
if (currentJobState != null) {
listenerExecutor.execute(() -> jobListener.onStateChanged(currentJobState));
}
}
/**
* Unsubscribe the provided listener from all job updates.
*/
synchronized void removeListener(@NonNull JobListener jobListener) {
Collection<TrackingState> allTrackingState = trackingStates.values();
for (TrackingState state : allTrackingState) {
state.removeListener(jobListener);
}
}
/**
* Update the state of a job with the associated ID.
*/
synchronized void onStateChange(@NonNull String id, @NonNull JobState jobState) {
TrackingState trackingState = getOrCreateTrackingState(id);
trackingState.setJobState(jobState);
for (JobListener listener : trackingState.getListeners()) {
listenerExecutor.execute(() -> listener.onStateChanged(jobState));
}
}
private @NonNull TrackingState getOrCreateTrackingState(@NonNull String id) {
TrackingState state = trackingStates.get(id);
if (state == null) {
state = new TrackingState();
}
trackingStates.put(id, state);
return state;
}
public interface JobListener {
void onStateChanged(@NonNull JobState jobState);
}
public enum JobState {
PENDING(false), RUNNING(false), SUCCESS(true), FAILURE(true), IGNORED(true);
private final boolean complete;
JobState(boolean complete) {
this.complete = complete;
}
public boolean isComplete() {
return complete;
}
}
private static class TrackingState {
private JobState jobState;
private final CopyOnWriteArraySet<JobListener> listeners = new CopyOnWriteArraySet<>();
void addListener(@NonNull JobListener jobListener) {
listeners.add(jobListener);
}
void removeListener(@NonNull JobListener jobListener) {
listeners.remove(jobListener);
}
@NonNull Collection<JobListener> getListeners() {
return listeners;
}
void setJobState(@NonNull JobState jobState) {
this.jobState = jobState;
}
@Nullable JobState getJobState() {
return jobState;
}
}
}