This is an automated email from the ASF dual-hosted git repository.
egonzalez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 34d12a34a [NO_ISSUE] fix race condition when reschedule (#2259)
34d12a34a is described below
commit 34d12a34a0697f2aa6bed759df098915ecdb68fa
Author: Enrique <[email protected]>
AuthorDate: Thu Aug 28 17:04:55 2025 +0200
[NO_ISSUE] fix race condition when reschedule (#2259)
* [NO_ISSUE] fix race condition when reschedule
* race condition revisited
* fix number of retries
* fix process merger
* process job serialization
* end time fix
* fix compilation problem
* fix type
* fix formatting
* fix marshaller
* fix phatom scheduler in error state
* add test exact time
* fix inmediate schedule
* fix retry millis
* fix deserializer
* fix formatting
---
.../service/json/JobDescriptionDeserializer.java | 9 ++
.../service/json/JobDescriptionSerializer.java | 5 +-
.../kie/kogito/jobs/service/model/JobStatus.java | 22 +--
jobs/README.md | 2 +-
.../kogito/app/jobs/api/JobDescriptionMerger.java | 22 +--
.../kogito/app/jobs/api/JobSchedulerBuilder.java | 2 +
.../kogito/app/jobs/impl/VertxJobScheduler.java | 162 +++++++++++++++------
...tractJobDescriptionJobInstanceEventAdapter.java | 2 +-
.../JobDescriptionHelper.java | 41 +-----
...tanceJobDescriptionJobInstanceEventAdapter.java | 2 +-
.../ProcessInstanceJobDescriptionMerger.java | 51 +++++++
.../ProcessInstanceJobExecutor.java | 2 +-
...ocessJobDescriptionJobInstanceEventAdapter.java | 2 +-
.../ProcessJobDescriptionMerger.java} | 26 ++--
.../ProcessJobExecutor.java | 2 +-
...tanceJobDescriptionJobInstanceEventAdapter.java | 2 +-
.../UserTaskInstanceJobDescriptorMerger.java | 51 +++++++
.../UserTaskInstanceJobExecutor.java | 2 +-
.../impl/LatchFailureJobSchedulerListener.java | 98 +++++++++++++
...tAdapter.java => TestJobDescriptionMerger.java} | 23 ++-
.../app/jobs/impl/TestJobDetailsEventAdapter.java | 2 +-
.../app/jobs/impl/VertxJobSchedulerTest.java | 74 +++++++++-
.../jobs/quarkus/QuarkusJobServiceProducer.java | 6 +-
.../app/jobs/quarkus/QuarkusJobsService.java | 12 +-
.../app/jobs/quarkus/resource/JobResourceV1.java | 2 +-
.../SpringbootJobServiceConfiguration.java | 6 +-
.../app/jobs/springboot/SpringbootJobsService.java | 12 +-
.../jobs/springboot/resource/JobResourceV1.java | 2 +-
28 files changed, 479 insertions(+), 165 deletions(-)
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
index e7becd90e..16a1acb49 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
+++
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionDeserializer.java
@@ -25,6 +25,7 @@ import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescriptionBuilder;
+import org.kie.kogito.jobs.descriptors.ProcessJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescriptionBuilder;
@@ -50,6 +51,14 @@ public class JobDescriptionDeserializer extends
StdDeserializer<JobDescription>
JsonNode node = jp.getCodec().readTree(jp);
String jobDescriptionType = node.get("@type").asText();
switch (jobDescriptionType) {
+ case "ProcessJobDescription": {
+ String id =
ofNullable(node.get("id")).map(JsonNode::textValue).orElse(null);
+ String processId =
ofNullable(node.get("processId")).map(JsonNode::textValue).orElse(null);
+ Integer priority =
ofNullable(node.get("priority")).map(JsonNode::asInt).orElse(0);
+ String expirationTimeType =
node.get("expirationTime").get("@type").asText();
+ ExpirationTime expirationTime = (ExpirationTime)
ctxt.readTreeAsValue(node.get("expirationTime"),
Class.forName(expirationTimeType));
+ return ProcessJobDescription.of(expirationTime, priority,
processId, id);
+ }
case "ProcessInstanceJobDescription": {
ProcessInstanceJobDescriptionBuilder builder =
ProcessInstanceJobDescription.newProcessInstanceJobDescriptionBuilder();
ofNullable(node.get("id")).ifPresent(e ->
builder.id(e.textValue()));
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
index 37544d67b..46d9bc987 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
+++
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/json/JobDescriptionSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessJobDescription;
import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -43,7 +44,9 @@ public class JobDescriptionSerializer extends
StdSerializer<JobDescription> {
jgen.writeStringField("id", value.id());
jgen.writeNumberField("priority", value.priority());
jgen.writeObjectField("expirationTime", value.expirationTime());
- if (value instanceof ProcessInstanceJobDescription jobDescription) {
+ if (value instanceof ProcessJobDescription processJobDescription) {
+ jgen.writeStringField("processId",
processJobDescription.processId());
+ } else if (value instanceof ProcessInstanceJobDescription
jobDescription) {
jgen.writeStringField("timerId", jobDescription.timerId());
jgen.writeStringField("processInstanceId",
jobDescription.processInstanceId());
jgen.writeStringField("rootProcessInstanceId",
jobDescription.rootProcessInstanceId());
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
index 02bdddb9c..2b891b494 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
+++
b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
@@ -19,20 +19,10 @@
package org.kie.kogito.jobs.service.model;
public enum JobStatus {
- ERROR(false), //final
- RUNNING(false),
- EXECUTED(true), //final
- SCHEDULED(false), //active
- RETRY(false), //active
- CANCELED(true); //final
-
- private boolean finalStatus;
-
- JobStatus(boolean finalStatus) {
- this.finalStatus = finalStatus;
- }
-
- public boolean isFinalStatus() {
- return finalStatus;
- }
+ ERROR, //final
+ EXECUTED, //final
+ SCHEDULED, //active
+ RETRY, //active
+ CANCELED, //final
+ RUNNING
}
diff --git a/jobs/README.md b/jobs/README.md
index a650159cf..49c4fe8de 100644
--- a/jobs/README.md
+++ b/jobs/README.md
@@ -34,7 +34,7 @@ At present Addons jobs supports quarkus and spring boot
The properties supported are:
* kogito.jobs-service.numberOfWorkerThreads: maximum of number of worker
thread to execute timeouts (default is 10)
* kogito.jobs-service.maxNumberOfRetries: numbers of retry of a failred job.
After this number is reached the job will be set to failure. (default is 3
times)
-* kogito.jobs-service.maxIntervalLimitToRetryMillis: interval used to retry
the new job (default 60 seconds)
+* kogito.jobs-service.retryMillis: interval used to retry the new job (default
60 seconds)
* kogito.jobs-service.schedulerChunkInMinutes: max window minutes from actual
date to the future to load timers in memory (default is 10 minutes)
* kogito.service.url: url service is this collocated service. (default is
localhost:8080)
diff --git
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java
similarity index 66%
copy from
jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
copy to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java
index 02bdddb9c..10bfbf05a 100644
---
a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobStatus.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobDescriptionMerger.java
@@ -16,23 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.jobs.service.model;
+package org.kie.kogito.app.jobs.api;
-public enum JobStatus {
- ERROR(false), //final
- RUNNING(false),
- EXECUTED(true), //final
- SCHEDULED(false), //active
- RETRY(false), //active
- CANCELED(true); //final
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.timer.Trigger;
- private boolean finalStatus;
+public interface JobDescriptionMerger {
- JobStatus(boolean finalStatus) {
- this.finalStatus = finalStatus;
- }
+ boolean accept(Object instance);
+
+ JobDescription mergeTrigger(JobDescription jobDescription, Trigger
trigger);
- public boolean isFinalStatus() {
- return finalStatus;
- }
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java
index 0ee283f46..77439524c 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/api/JobSchedulerBuilder.java
@@ -56,4 +56,6 @@ public interface JobSchedulerBuilder {
JobSchedulerBuilder withNumberOfWorkerThreads(Integer
numberOfWorkerThreads);
JobSchedulerBuilder withJobSynchronization(JobSynchronization
jobSynchronization);
+
+ JobSchedulerBuilder withJobDescriptorMergers(JobDescriptionMerger...
jobDescriptionMergers);
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
index cbbc19ae9..d8d15d1c7 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/VertxJobScheduler.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
import org.kie.kogito.app.jobs.api.JobDetailsEventAdapter;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.api.JobScheduler;
@@ -38,6 +39,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
+import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionMerger;
+import org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionMerger;
+import
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptorMerger;
import org.kie.kogito.app.jobs.spi.JobContext;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
@@ -48,7 +52,9 @@ import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
+import org.kie.kogito.jobs.service.model.RecipientInstance;
import org.kie.kogito.jobs.service.utils.DateUtil;
+import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.SimpleTimerTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +93,8 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
private List<JobTimeoutInterceptor> interceptors;
+ private List<JobDescriptionMerger> jobDescriptionMergers;
+
private ConcurrentMap<String, TimerInfo> jobsScheduled;
private Long refreshJobsIntervalTimerId;
@@ -185,6 +193,12 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
return this;
}
+ @Override
+ public JobSchedulerBuilder
withJobDescriptorMergers(JobDescriptionMerger... jobDescriptionMergers) {
+
VertxJobScheduler.this.jobDescriptionMergers.addAll(List.of(jobDescriptionMergers));
+ return this;
+ }
+
}
public VertxJobScheduler() {
@@ -197,6 +211,10 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
this.jobEventAdapters = new ArrayList<>();
this.jobSchedulerListeners = new ArrayList<>();
this.interceptors = new ArrayList<>();
+ this.jobDescriptionMergers = new ArrayList<>();
+ this.jobDescriptionMergers.add(new
UserTaskInstanceJobDescriptorMerger());
+ this.jobDescriptionMergers.add(new
ProcessInstanceJobDescriptionMerger());
+ this.jobDescriptionMergers.add(new ProcessJobDescriptionMerger());
this.jobSynchronization = new JobSynchronization() {
@Override
@@ -338,17 +356,7 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
jobSchedulerListeners.forEach(l ->
l.onReschedule(rescheduledJobDetails));
jobStore.update(jobContextFactory.newContext(), rescheduledJobDetails);
- this.jobSynchronization.synchronize(new Runnable() {
-
- @Override
- public void run() {
- jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo)
-> {
- removeTimerInfo(timerInfo);
- return addTimerInfo(rescheduledJobDetails);
- });
- }
-
- });
+ updateTxTimer(rescheduledJobDetails);
return rescheduledJobDetails.getId();
}
@@ -412,34 +420,68 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
private void removeIfFinal(Long timerId, JobContext jobContext, JobDetails
nextJobDetails) {
String jobId = nextJobDetails.getId();
- if (!nextJobDetails.getStatus().isFinalStatus()) {
- LOG.trace("Timeout {} with jobId {} will be updated", timerId,
jobId);
- jobStore.update(jobContext, nextJobDetails);
- doSchedule(nextJobDetails);
- } else {
- LOG.trace("Timeout {} with jobId {} will be removed", timerId,
jobId);
- jobStore.remove(jobContext, jobId);
+ switch (nextJobDetails.getStatus()) {
+ case EXECUTED:
+ case CANCELED:
+ LOG.trace("Timeout {} with jobId {} will be removed", timerId,
jobId);
+ removeTxTimer(nextJobDetails);
+ jobStore.remove(jobContext, jobId);
+ break;
+ case SCHEDULED:
+ case RETRY:
+ LOG.trace("Timeout {} with jobId {} will be updated and
scheduled", timerId, jobId);
+ jobStore.update(jobContext, nextJobDetails);
+ doNextSchedule(nextJobDetails);
+ break;
+ case ERROR:
+ LOG.trace("Timeout {} with jobId {} will be set to error",
timerId, jobId);
+ removeTxTimer(nextJobDetails);
+ jobStore.update(jobContext, nextJobDetails);
+ break;
+ default:
+ LOG.trace("Timeout {} with jobId {} is RUNNING and should not
happen", timerId, jobId);
+ break;
}
}
- // lifecycle calls
- private JobDetails doSchedule(JobDetails jobDetails) {
+ // add tx timer and remove tx timer
+ private void updateTxTimer(JobDetails jobDetails) {
this.jobSynchronization.synchronize(new Runnable() {
-
@Override
public void run() {
- jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo)
-> {
+ // if the timer info does not exist we should not reschedule
as it was executed or cancelled by
+ jobsScheduled.computeIfPresent(jobDetails.getId(), (jobId,
timerInfo) -> {
+ removeTimerInfo(timerInfo);
return addTimerInfo(jobDetails);
});
}
+ });
+ }
+ private void addTxTimer(JobDetails jobDetails) {
+ this.jobSynchronization.synchronize(new Runnable() {
+ @Override
+ public void run() {
+ jobsScheduled.computeIfAbsent(jobDetails.getId(), jobId -> {
+ return addTimerInfo(jobDetails);
+ });
+ }
});
+ }
- LOG.trace("doSchedule {}", jobDetails);
- fireEvents(jobDetails);
- return jobDetails;
+ private void removeTxTimer(JobDetails jobDetails) {
+ this.jobSynchronization.synchronize(new Runnable() {
+ @Override
+ public void run() {
+ jobsScheduled.computeIfPresent(jobDetails.getId(), (jobId,
timerInfo) -> {
+ removeTimerInfo(timerInfo);
+ return null;
+ });
+ }
+ });
}
+ // vertx calls
private TimerInfo addTimerInfo(JobDetails jobDetails) {
LOG.trace("addTimerInfo {}", jobDetails);
// if it is negative means it should be executed right away
@@ -462,6 +504,21 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
this.vertx.cancelTimer(timerId);
}
+ // lifecycle calls
+ private JobDetails doNextSchedule(JobDetails jobDetails) {
+ updateTxTimer(jobDetails);
+ LOG.trace("doNextSchedule {}", jobDetails);
+ fireEvents(jobDetails);
+ return jobDetails;
+ }
+
+ private JobDetails doSchedule(JobDetails jobDetails) {
+ addTxTimer(jobDetails);
+ LOG.trace("doSchedule {}", jobDetails);
+ fireEvents(jobDetails);
+ return jobDetails;
+ }
+
private JobDetails doRun(JobDetails jobDetails) {
JobDetails runJobDetails =
JobDetails.builder().of(jobDetails).status(JobStatus.RUNNING).build();
LOG.trace("doRun {}", runJobDetails);
@@ -470,18 +527,7 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
}
private JobDetails doCancel(JobDetails jobDetails) {
- this.jobSynchronization.synchronize(new Runnable() {
-
- @Override
- public void run() {
- jobsScheduled.compute(jobDetails.getId(), (jobId, timerInfo)
-> {
- removeTimerInfo(timerInfo);
- return null;
- });
- }
-
- });
-
+ removeTxTimer(jobDetails);
JobDetails canceledJobDetails =
JobDetails.builder().of(jobDetails).status(JobStatus.CANCELED).build();
LOG.trace("doCancel {}", canceledJobDetails);
fireEvents(canceledJobDetails);
@@ -493,11 +539,6 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
LOG.trace("valid executors are: {}", validExecutors);
validExecutors.forEach(executor -> executor.execute(jobDetails));
JobDetails executedJobDetails =
JobDetails.builder().of(jobDetails).status(JobStatus.EXECUTED).incrementExecutionCounter().build();
- this.jobSynchronization.synchronize(new Runnable() {
- public void run() {
- jobsScheduled.remove(jobDetails.getId());
- };
- });
LOG.trace("doExecute {}", executedJobDetails);
fireEvents(executedJobDetails);
return executedJobDetails;
@@ -507,7 +548,13 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
LOG.trace("doRetryIfAny {}", jobDetails);
Integer retryCounter = jobDetails.getRetries();
if (retryCounter < this.maxNumberOfRetries) {
+
+ Date now =
Date.from(DateUtil.now().plus(Duration.ofMillis(retryInterval)).toInstant());
+ Trigger newTrigger = setTriggerDate(jobDetails.getTrigger(), now);
+ JobDescription jobDescriptionMerged =
setJobDescription(jobDetails, newTrigger);
JobDetails retryJobDetails = JobDetails.builder().of(jobDetails)
+ .trigger(newTrigger)
+ .recipient(new RecipientInstance(new InVMRecipient(new
InVMPayloadData(jobDescriptionMerged))))
.status(JobStatus.RETRY)
.executionTimeout(jobDetails.getExecutionTimeout() +
retryInterval)
.incrementRetries()
@@ -521,15 +568,40 @@ public class VertxJobScheduler implements JobScheduler,
Handler<Long> {
}
}
+ private Trigger setTriggerDate(Trigger oldTrigger, Date newOriginDate) {
+ SimpleTimerTrigger oldSimpleTimerTrigger = (SimpleTimerTrigger)
oldTrigger;
+ SimpleTimerTrigger newTrigger = new SimpleTimerTrigger(
+ newOriginDate,
+ oldSimpleTimerTrigger.getPeriod(),
+ oldSimpleTimerTrigger.getPeriodUnit(),
+ oldSimpleTimerTrigger.getRepeatCount(),
+ oldSimpleTimerTrigger.getEndTime(),
+ oldSimpleTimerTrigger.getZoneId());
+ return newTrigger;
+ }
+
+ private JobDescription setJobDescription(JobDetails jobDetails, Trigger
newTrigger) {
+ JobDescription jobDescription =
jobDetails.getRecipient().<InVMPayloadData>
getRecipient().getPayload().getJobDescription();
+
+ JobDescription newJobDescription = jobDescriptionMergers.stream()
+ .filter(merger -> merger.accept(jobDescription))
+ .map(merger -> merger.mergeTrigger(jobDescription, newTrigger))
+ .findFirst()
+ .orElseThrow();
+ return newJobDescription;
+ }
+
private JobDetails computeNextJobDetailsIfAny(JobDetails jobDetails) {
// there is a problem here. If we retried the job the origin, the
current time is different.
// so we set the current time as the time of execution so we do
execute things at fixed interval time.
((SimpleTimerTrigger)
jobDetails.getTrigger()).setNextFireTime(Date.from(Instant.now()));
-
jobDetails.getTrigger().nextFireTime();
if (jobDetails.getTrigger().hasNextFireTime() != null) {
-
- JobDetails nextJobDetails = JobDetails.builder().of(jobDetails)
+ // we set the date for the trigger so we compute new job
description
+ JobDescription jobDescriptionMerged =
setJobDescription(jobDetails, jobDetails.getTrigger());
+ JobDetails nextJobDetails = JobDetails.builder()
+ .of(jobDetails)
+ .recipient(new RecipientInstance(new InVMRecipient(new
InVMPayloadData(jobDescriptionMerged))))
.status(JobStatus.SCHEDULED)
.retries(0)
.executionTimeout(jobDetails.getTrigger().hasNextFireTime().getTime())
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java
similarity index 99%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java
index e1947934d..c242aefc3 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/AbstractJobDescriptionJobInstanceEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/AbstractJobDescriptionJobInstanceEventAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import java.util.Optional;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java
similarity index 50%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java
index bdfd350bb..66d6d1169 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/impl/JobDescriptionHelper.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/JobDescriptionHelper.java
@@ -16,52 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.impl;
+package org.kie.kogito.app.jobs.integrations;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
+import java.util.List;
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
import org.kie.kogito.jobs.DurationExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.JobDescription;
-import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
-import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Trigger;
import org.kie.kogito.timer.impl.SimpleTimerTrigger;
public class JobDescriptionHelper {
- public static JobDescription newJobDescription(JobDescription
jobDescription, Trigger trigger) {
- if (jobDescription instanceof ProcessInstanceJobDescription
processInstanceJobDescription) {
- ProcessInstanceJobDescription newProcessInstanceJobDescription =
new ProcessInstanceJobDescription(
- processInstanceJobDescription.id(),
- processInstanceJobDescription.timerId(),
- toExpirationTime(trigger),
- processInstanceJobDescription.priority(),
- processInstanceJobDescription.processInstanceId(),
- processInstanceJobDescription.rootProcessInstanceId(),
- processInstanceJobDescription.processId(),
- processInstanceJobDescription.rootProcessId(),
- processInstanceJobDescription.nodeInstanceId());
- return newProcessInstanceJobDescription;
- } else if (jobDescription instanceof UserTaskInstanceJobDescription
userTaskInstanceJobDescription) {
- UserTaskInstanceJobDescription newUserTaskInstanceJobDescription =
new UserTaskInstanceJobDescription(
- userTaskInstanceJobDescription.id(),
- toExpirationTime(trigger),
- userTaskInstanceJobDescription.priority(),
- userTaskInstanceJobDescription.userTaskInstanceId(),
- userTaskInstanceJobDescription.processId(),
- userTaskInstanceJobDescription.processInstanceId(),
- userTaskInstanceJobDescription.nodeInstanceId(),
- userTaskInstanceJobDescription.rootProcessInstanceId(),
- userTaskInstanceJobDescription.rootProcessId());
- return newUserTaskInstanceJobDescription;
- } else {
- return jobDescription;
- }
- }
-
public static ExpirationTime toExpirationTime(Trigger trigger) {
if (trigger instanceof SimpleTimerTrigger simpleTimerTrigger) {
ZonedDateTime zoneDateTime =
DateUtil.fromDate(simpleTimerTrigger.hasNextFireTime());
@@ -72,4 +42,9 @@ public class JobDescriptionHelper {
}
throw new IllegalArgumentException("this type of trigger is not
supported " + trigger.getClass().getName());
}
+
+ public static JobDescription newJobDescription(JobDescription
jobDescription, Trigger trigger) {
+ List<JobDescriptionMerger> mergers = List.of(new
ProcessInstanceJobDescriptionMerger(), new ProcessJobDescriptionMerger(), new
UserTaskInstanceJobDescriptorMerger());
+ return mergers.stream().filter(merger ->
merger.accept(jobDescription)).map(merger ->
merger.mergeTrigger(jobDescription, trigger)).findFirst().orElseThrow();
+ }
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
similarity index 97%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
index 46f98f375..33670646a 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionJobInstanceEventAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.api.JobBuilder;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java
new file mode 100644
index 000000000..d25c21565
--- /dev/null
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobDescriptionMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.integrations;
+
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
+import org.kie.kogito.timer.Trigger;
+
+public class ProcessInstanceJobDescriptionMerger implements
JobDescriptionMerger {
+
+ @Override
+ public boolean accept(Object instance) {
+ return instance instanceof ProcessInstanceJobDescription;
+ }
+
+ @Override
+ public JobDescription mergeTrigger(JobDescription jobDescription, Trigger
trigger) {
+ if (jobDescription instanceof ProcessInstanceJobDescription
processInstanceJobDescription) {
+ ProcessInstanceJobDescription newProcessInstanceJobDescription =
new ProcessInstanceJobDescription(
+ processInstanceJobDescription.id(),
+ processInstanceJobDescription.timerId(),
+ JobDescriptionHelper.toExpirationTime(trigger),
+ processInstanceJobDescription.priority(),
+ processInstanceJobDescription.processInstanceId(),
+ processInstanceJobDescription.rootProcessInstanceId(),
+ processInstanceJobDescription.processId(),
+ processInstanceJobDescription.rootProcessId(),
+ processInstanceJobDescription.nodeInstanceId());
+ return newProcessInstanceJobDescription;
+ }
+ throw new IllegalArgumentException("jobDescription type not supported
by this merger " + jobDescription);
+ }
+
+}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java
similarity index 98%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java
index d807d1443..ed7d77325 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessInstanceJobExecutor.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessInstanceJobExecutor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.impl.JobDetailsHelper;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java
similarity index 97%
copy from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
copy to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java
index f13a8f683..d7ca4a6f7 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionJobInstanceEventAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.api.JobBuilder;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java
similarity index 53%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java
index f13a8f683..e900f15d3 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobDescriptionJobInstanceEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobDescriptionMerger.java
@@ -16,29 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
import org.kie.kogito.jobs.JobDescription;
-import org.kie.kogito.jobs.api.JobBuilder;
+import org.kie.kogito.jobs.descriptors.ProcessInstanceJobDescription;
import org.kie.kogito.jobs.descriptors.ProcessJobDescription;
-import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.timer.Trigger;
-public class ProcessJobDescriptionJobInstanceEventAdapter extends
AbstractJobDescriptionJobInstanceEventAdapter {
-
- public ProcessJobDescriptionJobInstanceEventAdapter(String serviceURL) {
- super(serviceURL);
- }
+public class ProcessJobDescriptionMerger implements JobDescriptionMerger {
@Override
- public boolean accept(JobDetails jobDetails) {
- return extractJobDescription(jobDetails) instanceof
ProcessJobDescription;
+ public boolean accept(Object instance) {
+ return instance instanceof ProcessInstanceJobDescription;
}
@Override
- protected void doAdaptPayload(JobBuilder jobBuilder, JobDescription
jobDescription) {
+ public JobDescription mergeTrigger(JobDescription jobDescription, Trigger
trigger) {
if (jobDescription instanceof ProcessJobDescription
processJobDescription) {
- jobBuilder.processId(processJobDescription.processId());
+ ProcessJobDescription newProcessJobDescription =
ProcessJobDescription.of(
+ JobDescriptionHelper.toExpirationTime(trigger),
+ processJobDescription.priority(),
+ processJobDescription.processId());
+ return newProcessJobDescription;
}
+ throw new IllegalArgumentException("jobDescription type not supported
by this merger " + jobDescription);
}
}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java
similarity index 98%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java
index 6fa71148e..773d458eb 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/ProcessJobExecutor.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/ProcessJobExecutor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.impl.JobDetailsHelper;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
similarity index 97%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
index d9242b073..5dcfef63e 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptionJobInstanceEventAdapter.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.api.JobBuilder;
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java
new file mode 100644
index 000000000..2d347a7f0
--- /dev/null
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobDescriptorMerger.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.integrations;
+
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
+import org.kie.kogito.jobs.JobDescription;
+import org.kie.kogito.jobs.descriptors.UserTaskInstanceJobDescription;
+import org.kie.kogito.timer.Trigger;
+
+public class UserTaskInstanceJobDescriptorMerger implements
JobDescriptionMerger {
+
+ @Override
+ public boolean accept(Object instance) {
+ return instance instanceof UserTaskInstanceJobDescription;
+ }
+
+ @Override
+ public JobDescription mergeTrigger(JobDescription jobDescription, Trigger
trigger) {
+ if (jobDescription instanceof UserTaskInstanceJobDescription
userTaskInstanceJobDescription) {
+ UserTaskInstanceJobDescription newUserTaskInstanceJobDescription =
new UserTaskInstanceJobDescription(
+ userTaskInstanceJobDescription.id(),
+ JobDescriptionHelper.toExpirationTime(trigger),
+ userTaskInstanceJobDescription.priority(),
+ userTaskInstanceJobDescription.userTaskInstanceId(),
+ userTaskInstanceJobDescription.processId(),
+ userTaskInstanceJobDescription.processInstanceId(),
+ userTaskInstanceJobDescription.nodeInstanceId(),
+ userTaskInstanceJobDescription.rootProcessInstanceId(),
+ userTaskInstanceJobDescription.rootProcessId());
+ return newUserTaskInstanceJobDescription;
+ }
+ throw new IllegalArgumentException("jobDescription type not supported
by this merger " + jobDescription);
+ }
+
+}
diff --git
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java
similarity index 97%
rename from
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java
rename to
jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java
index 1a3cda6d3..86507cf93 100644
---
a/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integregations/UserTaskInstanceJobExecutor.java
+++
b/jobs/jobs-common-embedded/src/main/java/org/kie/kogito/app/jobs/integrations/UserTaskInstanceJobExecutor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.kie.kogito.app.jobs.integregations;
+package org.kie.kogito.app.jobs.integrations;
import org.kie.kogito.app.jobs.api.JobExecutor;
import org.kie.kogito.app.jobs.impl.JobDetailsHelper;
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java
new file mode 100644
index 000000000..6ae3df33e
--- /dev/null
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/LatchFailureJobSchedulerListener.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.app.jobs.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.kie.kogito.app.jobs.api.JobSchedulerListener;
+import org.kie.kogito.jobs.service.model.JobDetails;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LatchFailureJobSchedulerListener implements JobSchedulerListener {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LatchFailureJobSchedulerListener.class);
+
+ private List<JobDetails> jobDetailsList;
+ private CountDownLatch latch;
+
+ private AtomicInteger count;
+
+ public LatchFailureJobSchedulerListener() {
+ this(1);
+ }
+
+ public LatchFailureJobSchedulerListener(Integer executions) {
+ latch = new CountDownLatch(executions);
+ count = new AtomicInteger(0);
+ jobDetailsList = new ArrayList<>();
+ }
+
+ @Override
+ public void onSchedule(JobDetails jobDetails) {
+ // do nothing
+ }
+
+ @Override
+ public void onReschedule(JobDetails jobDetails) {
+ // do nothing
+ }
+
+ @Override
+ public void onCancel(JobDetails jobDetails) {
+ // do nothing
+ }
+
+ @Override
+ public void onFailure(JobDetails jobDetails) {
+ LOG.info("executing {}", jobDetails);
+ latch.countDown();
+ count.incrementAndGet();
+ jobDetailsList.add(jobDetails);
+ }
+
+ @Override
+ public void onExecution(JobDetails jobDetails) {
+
+ }
+
+ public Integer getCount() {
+ return count.get();
+ }
+
+ public void waitForExecution() throws InterruptedException {
+ latch.await();
+ }
+
+ public void waitForExecution(Long timeout) throws InterruptedException {
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ public List<JobDetails> getJobDetailsList() {
+ return jobDetailsList;
+ }
+
+ public boolean isExecuted() {
+ return latch.getCount() == 0;
+ }
+}
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java
similarity index 59%
copy from
jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
copy to
jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java
index b76839f31..e659815e4 100644
---
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDescriptionMerger.java
@@ -18,25 +18,24 @@
*/
package org.kie.kogito.app.jobs.impl;
-import
org.kie.kogito.app.jobs.integregations.AbstractJobDescriptionJobInstanceEventAdapter;
+import org.kie.kogito.app.jobs.api.JobDescriptionMerger;
+import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper;
import org.kie.kogito.jobs.JobDescription;
-import org.kie.kogito.jobs.api.JobBuilder;
-import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.timer.Trigger;
-public class TestJobDetailsEventAdapter extends
AbstractJobDescriptionJobInstanceEventAdapter {
-
- public TestJobDetailsEventAdapter() {
- super("http://localhost:8080");
- }
+public class TestJobDescriptionMerger implements JobDescriptionMerger {
@Override
- public boolean accept(JobDetails jobDetails) {
- return extractJobDescription(jobDetails) instanceof TestJobDescription;
+ public boolean accept(Object instance) {
+ return instance instanceof TestJobDescription;
}
@Override
- protected void doAdaptPayload(JobBuilder jobBuilder, JobDescription
jobDescription) {
- // do nothing
+ public JobDescription mergeTrigger(JobDescription jobDescription, Trigger
trigger) {
+ if (jobDescription instanceof TestJobDescription testJobDescription) {
+ return new TestJobDescription(jobDescription.id(),
JobDescriptionHelper.toExpirationTime(trigger));
+ }
+ return null;
}
}
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
index b76839f31..5b839b062 100644
---
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/TestJobDetailsEventAdapter.java
@@ -18,7 +18,7 @@
*/
package org.kie.kogito.app.jobs.impl;
-import
org.kie.kogito.app.jobs.integregations.AbstractJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.AbstractJobDescriptionJobInstanceEventAdapter;
import org.kie.kogito.jobs.JobDescription;
import org.kie.kogito.jobs.api.JobBuilder;
import org.kie.kogito.jobs.service.model.JobDetails;
diff --git
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
index 1ae15e176..fc9427dcd 100644
---
a/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
+++
b/jobs/jobs-common-embedded/src/test/java/org/kie/kogito/app/jobs/impl/VertxJobSchedulerTest.java
@@ -21,6 +21,7 @@ package org.kie.kogito.app.jobs.impl;
import java.time.Duration;
import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
import org.junit.jupiter.api.Test;
import org.kie.kogito.app.jobs.api.JobScheduler;
@@ -30,8 +31,10 @@ import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory;
import org.kie.kogito.app.jobs.spi.memory.MemoryJobStore;
import org.kie.kogito.jobs.DurationExpirationTime;
+import org.kie.kogito.jobs.ExactExpirationTime;
import org.kie.kogito.jobs.ExpirationTime;
import org.kie.kogito.jobs.service.model.JobDetails;
+import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.timer.impl.SimpleTimerTrigger;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,6 +55,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
jobScheduler.schedule(new TestJobDescription(jobId,
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
@@ -76,6 +80,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
ExpirationTime expirationTime = DurationExpirationTime.repeat(0,
1000L, 3);
@@ -94,7 +99,7 @@ public class VertxJobSchedulerTest {
final String jobId = "1";
JobStore memoryJobStore = new MemoryJobStore();
JobContextFactory jobContextFactory = new MemoryJobContextFactory();
- LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener
= new LatchExecutionJobSchedulerListener(3);
+ LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener
= new LatchExecutionJobSchedulerListener(6);
TestJobExecutor latchJobExecutor = new TestJobExecutor();
JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
.withJobExecutors(latchJobExecutor)
@@ -103,13 +108,14 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
ExpirationTime expirationTime = DurationExpirationTime.repeat(0,
1000L, SimpleTimerTrigger.INDEFINITELY);
jobScheduler.schedule(new TestJobDescription(jobId, expirationTime));
latchExecutionJobSchedulerListener.waitForExecution();
- assertThat(latchJobExecutor.getJobsExecuted()).hasSize(3);
+ assertThat(latchJobExecutor.getJobsExecuted()).hasSize(6);
assertThat(memoryJobStore.find(jobContextFactory.newContext(),
jobId)).isNotNull();
assertThat(latchExecutionJobSchedulerListener.isExecuted()).isTrue();
jobScheduler.close();
@@ -128,6 +134,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
@@ -152,6 +159,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
@@ -179,6 +187,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
@@ -206,6 +215,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
@@ -216,6 +226,65 @@ public class VertxJobSchedulerTest {
jobScheduler.close();
}
+ @Test
+ public void testExactTime() throws Exception {
+ final String jobId = "1";
+ LatchExecutionJobSchedulerListener latchExecutionJobSchedulerListener
= new LatchExecutionJobSchedulerListener();
+ TestJobExecutor latchJobExecutor = new TestJobExecutor();
+ JobStore memoryJobStore = new MemoryJobStore();
+ JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+ JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
+ .withJobExecutors(latchJobExecutor)
+ .withJobEventAdapters(new TestJobDetailsEventAdapter())
+ .withEventPublishers(new TestEventPublisher())
+ .withJobContextFactory(jobContextFactory)
+ .withJobStore(memoryJobStore)
+ .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
+ .withRefreshJobsInterval(100000L)
+ .build();
+ jobScheduler.init();
+ ExpirationTime expirationTime =
ExactExpirationTime.of(ZonedDateTime.now().plus(1, ChronoUnit.MILLIS));
+ jobScheduler.schedule(new TestJobDescription(jobId, expirationTime));
+ latchExecutionJobSchedulerListener.waitForExecution(1000L);
+ assertThat(latchJobExecutor.getJobsExecuted()).hasSize(1);
+ assertThat(memoryJobStore.find(jobContextFactory.newContext(),
jobId)).isNull();
+ assertThat(latchExecutionJobSchedulerListener.isExecuted()).isTrue();
+ jobScheduler.close();
+
+ }
+
+ @Test
+ public void testNumberOfRetries() throws Exception {
+ final int NUMBER_OF_FAILURES = 4; // first execution + number of
retries
+ final int NUMBER_OF_RETRIES = NUMBER_OF_FAILURES - 1;
+
+ final String jobId = "1";
+ JobStore memoryJobStore = new MemoryJobStore();
+ JobContextFactory jobContextFactory = new MemoryJobContextFactory();
+ TestFailureJobExecutor latchJobExecutor = new
TestFailureJobExecutor(NUMBER_OF_FAILURES);
+ LatchFailureJobSchedulerListener latchExecutionJobSchedulerListener =
new LatchFailureJobSchedulerListener(NUMBER_OF_FAILURES);
+ JobScheduler jobScheduler =
JobSchedulerBuilder.newJobSchedulerBuilder()
+ .withMaxNumberOfRetries(NUMBER_OF_RETRIES)
+ .withJobExecutors(latchJobExecutor)
+ .withRetryInterval(1000L)
+ .withJobEventAdapters(new TestJobDetailsEventAdapter())
+ .withEventPublishers(new TestEventPublisher())
+ .withJobContextFactory(jobContextFactory)
+ .withJobStore(memoryJobStore)
+ .withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
+ .build();
+
+ jobScheduler.init();
+
+ jobScheduler.schedule(new TestJobDescription(jobId,
ZonedDateTime.now().plus(Duration.ofSeconds(1))));
+ latchExecutionJobSchedulerListener.waitForExecution();
+ assertThat(memoryJobStore.find(jobContextFactory.newContext(),
jobId)).isNotNull().extracting(JobDetails::getStatus).isEqualTo(JobStatus.ERROR);
+
+ jobScheduler.close();
+ }
+
@Test
public void testBasicOverdueTime() throws Exception {
final String jobId = "1";
@@ -235,6 +304,7 @@ public class VertxJobSchedulerTest {
.withJobContextFactory(jobContextFactory)
.withJobStore(memoryJobStore)
.withJobSchedulerListeners(latchExecutionJobSchedulerListener)
+ .withJobDescriptorMergers(new TestJobDescriptionMerger())
.build();
jobScheduler.init();
latchExecutionJobSchedulerListener.waitForExecution();
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java
index 7a46452f4..1a70b4cb0 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobServiceProducer.java
@@ -19,9 +19,9 @@
package org.kie.kogito.app.jobs.quarkus;
import org.kie.kogito.app.jobs.api.JobExecutor;
-import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobExecutor;
-import org.kie.kogito.app.jobs.integregations.ProcessJobExecutor;
-import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobExecutor;
+import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobExecutor;
+import org.kie.kogito.app.jobs.integrations.ProcessJobExecutor;
+import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobExecutor;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory;
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
index f1b9f5b2a..a182ec713 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/QuarkusJobsService.java
@@ -27,9 +27,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
-import
org.kie.kogito.app.jobs.integregations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
-import
org.kie.kogito.app.jobs.integregations.ProcessJobDescriptionJobInstanceEventAdapter;
-import
org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
import org.kie.kogito.app.jobs.quarkus.resource.RestApiConstants;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
@@ -78,8 +78,8 @@ public class QuarkusJobsService implements JobsService {
@ConfigProperty(name = "kogito.jobs-service.maxNumberOfRetries",
defaultValue = "3")
protected Integer maxNumberOfRetries;
- @ConfigProperty(name =
"kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue = "60000")
- protected Long maxIntervalLimitToRetryMillis;
+ @ConfigProperty(name = "kogito.jobs-service.retryMillis", defaultValue =
"100")
+ protected Long retryMillis;
@ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes",
defaultValue = "10")
protected Long maxRefreshJobsIntervalWindow;
@@ -117,7 +117,7 @@ public class QuarkusJobsService implements JobsService {
new
UserTaskInstanceJobDescriptionJobInstanceEventAdapter(serviceURL +
RestApiConstants.JOBS_PATH))
.withJobExecutors(jobExecutors.stream().toArray(JobExecutor[]::new))
.withMaxRefreshJobsIntervalWindow(maxRefreshJobsIntervalWindow
* 60 * 1000L)
- .withRetryInterval(maxIntervalLimitToRetryMillis)
+ .withRetryInterval(retryMillis)
.withMaxNumberOfRetries(maxNumberOfRetries)
.withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 *
1000L)
.withTimeoutInterceptor(txInterceptor)
diff --git
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java
index 774b02269..1a9bb7604 100644
---
a/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java
+++
b/jobs/kogito-addons-quarkus-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/quarkus/resource/JobResourceV1.java
@@ -22,7 +22,7 @@ import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.parameters.RequestBody;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.kie.kogito.app.jobs.impl.InVMPayloadData;
-import org.kie.kogito.app.jobs.impl.JobDescriptionHelper;
+import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.jobs.JobDescription;
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java
index 60db14d30..3f66ce946 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobServiceConfiguration.java
@@ -19,9 +19,9 @@
package org.kie.kogito.app.jobs.springboot;
import org.kie.kogito.app.jobs.api.JobExecutor;
-import org.kie.kogito.app.jobs.integregations.ProcessInstanceJobExecutor;
-import org.kie.kogito.app.jobs.integregations.ProcessJobExecutor;
-import org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobExecutor;
+import org.kie.kogito.app.jobs.integrations.ProcessInstanceJobExecutor;
+import org.kie.kogito.app.jobs.integrations.ProcessJobExecutor;
+import org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobExecutor;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.app.jobs.spi.memory.MemoryJobContextFactory;
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
index c9be49922..8bdcaecda 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/SpringbootJobsService.java
@@ -28,9 +28,9 @@ import org.kie.kogito.app.jobs.api.JobSchedulerBuilder;
import org.kie.kogito.app.jobs.api.JobSchedulerListener;
import org.kie.kogito.app.jobs.api.JobSynchronization;
import org.kie.kogito.app.jobs.api.JobTimeoutInterceptor;
-import
org.kie.kogito.app.jobs.integregations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
-import
org.kie.kogito.app.jobs.integregations.ProcessJobDescriptionJobInstanceEventAdapter;
-import
org.kie.kogito.app.jobs.integregations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.ProcessInstanceJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.ProcessJobDescriptionJobInstanceEventAdapter;
+import
org.kie.kogito.app.jobs.integrations.UserTaskInstanceJobDescriptionJobInstanceEventAdapter;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.app.jobs.springboot.resource.RestApiConstants;
@@ -76,8 +76,8 @@ public class SpringbootJobsService implements JobsService {
@Value("${kogito.jobs-service.maxNumberOfRetries:3}")
protected Integer maxNumberOfRetries;
- @Value("${kogito.jobs-service.maxIntervalLimitToRetryMillis:60000}")
- protected Long maxIntervalLimitToRetryMillis;
+ @Value("${kogito.jobs-service.retryMillis:100}")
+ protected Long retryMillis;
@Value("${kogito.jobs-service.schedulerChunkInMinutes:10}")
protected Long maxRefreshJobsIntervalWindow;
@@ -123,7 +123,7 @@ public class SpringbootJobsService implements JobsService {
new
UserTaskInstanceJobDescriptionJobInstanceEventAdapter(serviceURL +
RestApiConstants.JOBS_PATH))
.withJobExecutors(ofNullable(jobExecutors).toArray(JobExecutor[]::new))
.withMaxRefreshJobsIntervalWindow(maxRefreshJobsIntervalWindow
* 60 * 1000L)
- .withRetryInterval(maxIntervalLimitToRetryMillis)
+ .withRetryInterval(retryMillis)
.withMaxNumberOfRetries(maxNumberOfRetries)
.withRefreshJobsInterval(maxRefreshJobsIntervalWindow * 60 *
1000L)
.withTimeoutInterceptor(txInterceptor)
diff --git
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java
index f5cf2eaed..03d080380 100644
---
a/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java
+++
b/jobs/kogito-addons-springboot-embedded-jobs/src/main/java/org/kie/kogito/app/jobs/springboot/resource/JobResourceV1.java
@@ -22,7 +22,7 @@ import org.eclipse.microprofile.openapi.annotations.Operation;
import org.eclipse.microprofile.openapi.annotations.parameters.RequestBody;
import org.eclipse.microprofile.openapi.annotations.tags.Tag;
import org.kie.kogito.app.jobs.impl.InVMPayloadData;
-import org.kie.kogito.app.jobs.impl.JobDescriptionHelper;
+import org.kie.kogito.app.jobs.integrations.JobDescriptionHelper;
import org.kie.kogito.app.jobs.spi.JobContextFactory;
import org.kie.kogito.app.jobs.spi.JobStore;
import org.kie.kogito.jobs.JobDescription;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]