wmedvede commented on code in PR #2214:
URL:
https://github.com/apache/incubator-kie-kogito-apps/pull/2214#discussion_r2094154773
##########
jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java:
##########
@@ -18,131 +18,213 @@
*/
package org.kie.kogito.jobs.service.repository.postgresql;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.sql.Types;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
import java.util.Optional;
-import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
-import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import javax.sql.DataSource;
+
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
-import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
-import org.kie.kogito.jobs.service.repository.impl.BaseReactiveJobRepository;
+import org.kie.kogito.jobs.service.repository.JobRepository;
+import org.kie.kogito.jobs.service.repository.impl.AbstractJobRepository;
import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller;
import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller;
-import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.utils.DateUtil;
import org.kie.kogito.timer.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import io.smallrye.mutiny.Multi;
-import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
-import io.vertx.mutiny.pgclient.PgPool;
-import io.vertx.mutiny.sqlclient.Row;
-import io.vertx.mutiny.sqlclient.RowSet;
-import io.vertx.mutiny.sqlclient.Tuple;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
-import static java.util.stream.Collectors.toList;
-import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher;
-import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE;
+import static java.util.Collections.emptyList;
@ApplicationScoped
-public class PostgreSqlJobRepository extends BaseReactiveJobRepository
implements ReactiveJobRepository {
+public class PostgreSqlJobRepository extends AbstractJobRepository implements
JobRepository {
+ private static Logger LOG =
LoggerFactory.getLogger(PostgreSqlJobRepository.class);
private static final String JOB_DETAILS_TABLE = "job_details";
private static final String JOB_DETAILS_COLUMNS = "id, correlation_id,
status, last_update, retries, " +
"execution_counter, scheduled_id, priority, recipient, trigger,
fire_time, execution_timeout, execution_timeout_unit, created";
- private PgPool client;
+ private DataSource client;
private final TriggerMarshaller triggerMarshaller;
private final RecipientMarshaller recipientMarshaller;
PostgreSqlJobRepository() {
- this(null, null, null, null, null);
+ this(null, null, null);
}
@Inject
- public PostgreSqlJobRepository(Vertx vertx, JobEventPublisher
jobEventPublisher, PgPool client,
+ public PostgreSqlJobRepository(DataSource client,
TriggerMarshaller triggerMarshaller, RecipientMarshaller
recipientMarshaller) {
- super(vertx, jobEventPublisher);
+ super();
this.client = client;
this.triggerMarshaller = triggerMarshaller;
this.recipientMarshaller = recipientMarshaller;
}
+ private static final String DO_SAVE =
+ "INSERT INTO " + JOB_DETAILS_TABLE + " (" + JOB_DETAILS_COLUMNS +
") VALUES (?, ?, ?, now(), ?, ?, ?, ?, ?, ?, ?, ?, ?, now()) " +
+ "ON CONFLICT (id) DO " +
+ "UPDATE SET correlation_id = ?, status = ?, last_update =
now(), retries = ?, " +
+ "execution_counter = ?, scheduled_id = ?, priority = ?, " +
+ "recipient = ?, trigger = ?, fire_time = ?,
execution_timeout = ?, execution_timeout_unit = ? " +
+ "RETURNING " + JOB_DETAILS_COLUMNS;
+
+ private static final String FIND = "SELECT " + JOB_DETAILS_COLUMNS + "
FROM " + JOB_DETAILS_TABLE + " WHERE id = ?";
+
+ private static final String DELETE = "DELETE FROM " + JOB_DETAILS_TABLE +
" WHERE id = ? RETURNING " + JOB_DETAILS_COLUMNS;
+
@Override
- public CompletionStage<JobDetails> doSave(JobDetails job) {
- return client.preparedQuery("INSERT INTO " + JOB_DETAILS_TABLE + " ("
+ JOB_DETAILS_COLUMNS +
- ") VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10,
$11, $12, now()) " +
- "ON CONFLICT (id) DO " +
- "UPDATE SET correlation_id = $2, status = $3, last_update =
now(), retries = $4, " +
- "execution_counter = $5, scheduled_id = $6, priority = $7, " +
- "recipient = $8, trigger = $9, fire_time = $10,
execution_timeout = $11, execution_timeout_unit = $12 " +
- "RETURNING " + JOB_DETAILS_COLUMNS)
- .execute(Tuple.tuple(Stream.of(
- job.getId(),
- job.getCorrelationId(),
-
Optional.ofNullable(job.getStatus()).map(Enum::name).orElse(null),
- job.getRetries(),
- job.getExecutionCounter(),
- job.getScheduledId(),
- job.getPriority(),
- recipientMarshaller.marshall(job.getRecipient()),
- triggerMarshaller.marshall(job.getTrigger()),
-
Optional.ofNullable(job.getTrigger()).map(Trigger::hasNextFireTime).map(DateUtil::dateToOffsetDateTime).orElse(null),
- job.getExecutionTimeout(),
-
Optional.ofNullable(job.getExecutionTimeoutUnit()).map(Enum::name).orElse(null))
- .collect(toList())))
- .onItem().transform(RowSet::iterator)
- .onItem().transform(iterator -> iterator.hasNext() ?
from(iterator.next()) : null)
- .convert()
- .toCompletableFuture();
+ public JobDetails doSave(JobDetails job) {
+ JobDetails next = null;
+ String status =
Optional.ofNullable(job.getStatus()).map(JobStatus::name).orElse(null);
+ var timeoutUnit =
Optional.ofNullable(job.getExecutionTimeoutUnit()).map(ChronoUnit::name).orElse(null);
+ Timestamp fireTime =
Optional.ofNullable(job.getTrigger()).map(Trigger::hasNextFireTime).map(date ->
new java.sql.Timestamp(date.getTime())).orElse(null);
+ try (Connection connection = client.getConnection(); PreparedStatement
stmt = connection.prepareStatement(DO_SAVE)) {
+ stmt.setString(1, job.getId());
+ stmt.setString(2, job.getCorrelationId());
+ stmt.setObject(3, status);
+ stmt.setInt(4, job.getRetries());
+ stmt.setInt(5, job.getExecutionCounter());
+ stmt.setString(6, job.getScheduledId());
+ if (job.getPriority() != null) {
+ stmt.setInt(7, job.getPriority());
+ } else {
+ stmt.setNull(7, Types.INTEGER);
+ }
+ JsonObject recipient =
recipientMarshaller.marshall(job.getRecipient());
+ if (recipient != null) {
+ stmt.setObject(8, recipient.toString(), Types.OTHER);
+ } else {
+ stmt.setNull(8, Types.OTHER);
+ }
+ JsonObject trigger = triggerMarshaller.marshall(job.getTrigger());
+ if (trigger != null) {
+ stmt.setObject(9, trigger.toString(), Types.OTHER);
+ } else {
+ stmt.setNull(9, Types.OTHER);
+ }
+ stmt.setTimestamp(10, fireTime);
+ if (job.getExecutionTimeout() != null) {
+ stmt.setLong(11, job.getExecutionTimeout());
+ } else {
+ stmt.setNull(11, Types.BIGINT);
+ }
+ stmt.setString(12, timeoutUnit);
+
+ // on conflict
+ stmt.setString(13, job.getCorrelationId());
+ stmt.setObject(14, status);
+ stmt.setInt(15, job.getRetries());
+ stmt.setInt(16, job.getExecutionCounter());
+ stmt.setString(17, job.getScheduledId());
+ if (job.getPriority() != null) {
+ stmt.setInt(18, job.getPriority());
+ } else {
+ stmt.setNull(18, Types.INTEGER);
+ }
+
+ if (recipient != null) {
+ stmt.setObject(19, recipient.toString(), Types.OTHER);
+ } else {
+ stmt.setNull(19, Types.OTHER);
+ }
+ if (trigger != null) {
+ stmt.setObject(20, trigger.toString(), Types.OTHER);
+ } else {
+ stmt.setNull(20, Types.OTHER);
+ }
+ stmt.setTimestamp(21, fireTime);
+ if (job.getExecutionTimeout() != null) {
+ stmt.setLong(22, job.getExecutionTimeout());
+ } else {
+ stmt.setNull(22, Types.BIGINT);
+ }
+ stmt.setString(23, timeoutUnit);
+
+ ResultSet resultSet = stmt.executeQuery();
+ if (resultSet.next()) {
+ next = from(resultSet);
+ }
+ resultSet.close();
+ return next;
+ } catch (SQLException ex) {
+ LOG.error("Error during job insertion in pgsql", ex);
Review Comment:
Shadows the error, callers will never know that this has failed.
At least we must throw
org.kie.kogito.jobs.service.exception.JobServiceException or we can create
another convenient RuntimeException.
I'd recommend using
`org.kie.kogito.jobs.service.repository.JobRepositoryException` and pass the
catched exception as the root cause
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]