wmedvede commented on code in PR #2214:
URL: 
https://github.com/apache/incubator-kie-kogito-apps/pull/2214#discussion_r2093553471


##########
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java:
##########
@@ -45,47 +38,56 @@ public class DelegateJob implements Job<JobDetailsContext> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DelegateJob.class);
 
-    private final JobExecutorResolver jobExecutorResolver;
+    private JobExecutorResolver jobExecutorResolver;
 
-    ReactiveJobScheduler scheduler;
+    private JobScheduler<JobDetails> scheduler;
 
-    public DelegateJob(JobExecutorResolver executorResolver, 
ReactiveJobScheduler scheduler) {
+    public DelegateJob(JobExecutorResolver executorResolver, 
JobScheduler<JobDetails> scheduler) {
         this.jobExecutorResolver = executorResolver;
         this.scheduler = scheduler;
     }
 
     @Override
     public void execute(JobDetailsContext ctx) {
-        final AtomicReference<JobExecutionResponse> executionResponse = new 
AtomicReference<>();
-        final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () 
-> String.format("JobDetails cannot be null for context: %s", ctx));
-        final JobExecutor executor = 
requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
-        LOGGER.info("Executing job for context: {}", jobDetails);
-        executor.execute(jobDetails)
-                .flatMap(response -> {
-                    executionResponse.set(response);
-                    return handleJobExecutionSuccess(response);
-                })
-                .onFailure(JobExecutionException.class).recoverWithUni(ex -> {
-                    String jobId = ((JobExecutionException) ex).getJobId();
-                    executionResponse.set(JobExecutionResponse.builder()
-                            .message(ex.getMessage())
-                            .now()
-                            .jobId(jobId)
-                            .build());
-                    return handleJobExecutionError(executionResponse.get());
-                })
-                // avoid blocking IO pool from the event-loop since 
alternative EmbeddedJobExecutor is blocking.
-                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
-                .subscribe().with(ignore -> LOGGER.info("Job execution 
response processing has finished: {}", executionResponse.get()));
+        JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () -> 
String.format("JobDetails cannot be null for context: %s", ctx));
+        try {
+            JobExecutor executor = jobExecutorResolver.get(jobDetails);
+            executor = requireNonNull(executor, () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
+            LOGGER.trace("Executing job for context: {}", jobDetails);
+            JobExecutionResponse response = executor.execute(jobDetails);
+            LOGGER.trace("Job execution response processing has finished: {}", 
response);
+            handleJobExecutionSuccess(response);
+        } catch (JobExecutionException ex) {
+            LOGGER.error("Executing job error: {}", ex.getMessage());
+            JobExecutionResponse errorResponse = JobExecutionResponse.builder()
+                    .message(ex.getMessage())
+                    .now()
+                    .jobId(jobDetails.getId())
+                    .code("500")
+                    .build();
+
+            handleJobExecutionError(errorResponse);
+        } catch (Exception ex) {
+            LOGGER.error("Unexpected error during the job execution: {}", 
ex.getMessage());

Review Comment:
   I think no exceptions are thrown here



##########
jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java:
##########
@@ -45,47 +38,56 @@ public class DelegateJob implements Job<JobDetailsContext> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DelegateJob.class);
 
-    private final JobExecutorResolver jobExecutorResolver;
+    private JobExecutorResolver jobExecutorResolver;
 
-    ReactiveJobScheduler scheduler;
+    private JobScheduler<JobDetails> scheduler;
 
-    public DelegateJob(JobExecutorResolver executorResolver, 
ReactiveJobScheduler scheduler) {
+    public DelegateJob(JobExecutorResolver executorResolver, 
JobScheduler<JobDetails> scheduler) {
         this.jobExecutorResolver = executorResolver;
         this.scheduler = scheduler;
     }
 
     @Override
     public void execute(JobDetailsContext ctx) {
-        final AtomicReference<JobExecutionResponse> executionResponse = new 
AtomicReference<>();
-        final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () 
-> String.format("JobDetails cannot be null for context: %s", ctx));
-        final JobExecutor executor = 
requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
-        LOGGER.info("Executing job for context: {}", jobDetails);
-        executor.execute(jobDetails)
-                .flatMap(response -> {
-                    executionResponse.set(response);
-                    return handleJobExecutionSuccess(response);
-                })
-                .onFailure(JobExecutionException.class).recoverWithUni(ex -> {
-                    String jobId = ((JobExecutionException) ex).getJobId();
-                    executionResponse.set(JobExecutionResponse.builder()
-                            .message(ex.getMessage())
-                            .now()
-                            .jobId(jobId)
-                            .build());
-                    return handleJobExecutionError(executionResponse.get());
-                })
-                // avoid blocking IO pool from the event-loop since 
alternative EmbeddedJobExecutor is blocking.
-                .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
-                .subscribe().with(ignore -> LOGGER.info("Job execution 
response processing has finished: {}", executionResponse.get()));
+        JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () -> 
String.format("JobDetails cannot be null for context: %s", ctx));
+        try {
+            JobExecutor executor = jobExecutorResolver.get(jobDetails);
+            executor = requireNonNull(executor, () -> String.format("No 
JobExecutor was found for jobDetails: %s", jobDetails));
+            LOGGER.trace("Executing job for context: {}", jobDetails);
+            JobExecutionResponse response = executor.execute(jobDetails);
+            LOGGER.trace("Job execution response processing has finished: {}", 
response);
+            handleJobExecutionSuccess(response);
+        } catch (JobExecutionException ex) {
+            LOGGER.error("Executing job error: {}", ex.getMessage());
+            JobExecutionResponse errorResponse = JobExecutionResponse.builder()
+                    .message(ex.getMessage())
+                    .now()
+                    .jobId(jobDetails.getId())
+                    .code("500")

Review Comment:
   we can't assume code 500 here, executors are not necessary all http



##########
jobs-service/jobs-recipients/job-recipient-common-http/src/main/java/org/kie/kogito/job/recipient/common/http/HTTPRequestExecutor.java:
##########
@@ -78,44 +79,63 @@ public WebClient createClient() {
         return WebClient.create(vertx);
     }
 
-    public Uni<JobExecutionResponse> execute(JobDetails jobDetails) {
-        return Uni.createFrom().item(jobDetails)
-                .chain(job -> {
-                    final R recipient = getRecipient(job);
-                    final String limit = getLimit(job);
-                    final HTTPRequest request = buildRequest(recipient, limit);
-                    final long requestTimeout = getTimeoutInMillis(job);
-                    return executeRequest(request, requestTimeout)
-                            .onFailure().transform(unexpected -> new 
JobExecutionException(job.getId(),
-                                    "Unexpected error when executing HTTP 
request for job: " + jobDetails.getId() + ". " + unexpected.getMessage()))
-                            .onItem().transform(response -> 
JobExecutionResponse.builder()
-                                    .message(response.bodyAsString())
-                                    
.code(String.valueOf(response.statusCode()))
-                                    .now()
-                                    .jobId(job.getId())
-                                    .build())
-                            .chain(this::handleResponse);
-                });
+    public JobExecutionResponse execute(JobDetails jobDetails) {
+        try {
+            R recipient = getRecipient(jobDetails);
+            String limit = getLimit(jobDetails);
+            HTTPRequest request = buildRequest(recipient, limit);
+            long requestTimeout = getTimeoutInMillis(jobDetails);
+            HttpResponse<?> response = executeRequest(request, requestTimeout);
+            JobExecutionResponse jobExecutionResponse = 
JobExecutionResponse.builder()
+                    .message(response.bodyAsString())
+                    .code(String.valueOf(response.statusCode()))
+                    .now()
+                    .jobId(jobDetails.getId())
+                    .build();
+
+            return this.handleResponse(jobExecutionResponse);
+        } catch (Throwable unexpected) {
+            LOGGER.error("Executing error for {}", jobDetails.getId(), 
unexpected);

Review Comment:
   Shadows a potential unexpected error. And also the potential translation of 
the http response error to  JobsExecution exception in handleResponse.
   
   If the handleResponse throws JobExecutionException, that exception must be 
thrown.
   
   If totally unexpected error, we must create and throw a new 
JobExecutionException. 
   See also JobDelegate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to