This is an automated email from the ASF dual-hosted git repository. lahirujayathilake pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airavata-custos.git
commit b9cc6490760e703ff0dd63cc0e0275f93cc6efde Author: lahiruj <[email protected]> AuthorDate: Fri Mar 27 00:55:16 2026 -0400 prevent duplicate accounts, fix retry loop, correct AMIE field names, persist DN lists --- .../handler/amie/DataAccountCreateHandler.java | 12 +- .../handler/amie/DataProjectCreateHandler.java | 13 +- .../service/model/amie/ProcessingEventEntity.java | 11 + .../ci/service/model/amie/ProcessingStatus.java | 7 +- .../ci/service/repo/ClusterAccountRepository.java | 4 + .../service/repo/ProjectMembershipRepository.java | 2 + .../repo/amie/ProcessingEventRepository.java | 13 +- .../access/ci/service/service/PersonService.java | 61 ++++- .../service/service/ProjectMembershipService.java | 29 +- .../ci/service/service/UserAccountService.java | 26 +- .../service/worker/amie/ProcessingEventWorker.java | 164 +++++++---- .../V2__add_next_retry_at_to_processing_events.sql | 5 + .../handler/amie/DataAccountCreateHandlerTest.java | 36 ++- .../handler/amie/DataProjectCreateHandlerTest.java | 35 ++- .../ci/service/service/PersonServiceTest.java | 216 ++++++++++++++- .../service/ProjectMembershipServiceTest.java | 120 ++++++++- .../ci/service/service/UserAccountServiceTest.java | 76 ++++++ .../worker/amie/ProcessingEventWorkerTest.java | 299 +++++++++++++++++++++ 18 files changed, 1023 insertions(+), 106 deletions(-) diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java index 4a1165e62..943f26a45 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java @@ -21,6 +21,7 @@ package org.apache.custos.access.ci.service.handler.amie; import com.fasterxml.jackson.databind.JsonNode; import org.apache.custos.access.ci.service.client.amie.AmieClient; import org.apache.custos.access.ci.service.model.amie.PacketEntity; +import org.apache.custos.access.ci.service.service.PersonService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -40,9 +41,11 @@ public class DataAccountCreateHandler implements PacketHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DataAccountCreateHandler.class); private final AmieClient amieClient; + private final PersonService personService; - public DataAccountCreateHandler(AmieClient amieClient) { + public DataAccountCreateHandler(AmieClient amieClient, PersonService personService) { this.amieClient = amieClient; + this.personService = personService; } @Override @@ -63,12 +66,9 @@ public class DataAccountCreateHandler implements PacketHandler { Assert.hasText(personId, "'PersonID' must not be empty."); LOGGER.info("Packet validated. ProjectID: [{}], PersonID: [{}].", projectId, personId); - - // TODO - perform the business logic - // - find the user's record by 'personId' (localID) and update the distinguished names (dnList) if (dnList.isArray() && !dnList.isEmpty()) { - LOGGER.info("Received DnList for user [{}]. In a real implementation, this would be saved to the user's profile.", personId); - // TODO userService.updateUserDnList(personId, dnList); + LOGGER.info("Persisting DnList for person [{}] from data_account_create.", personId); + personService.persistDnsForPerson(personId, dnList); } // Send the 'inform_transaction_complete' reply to close the transaction. diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java index 0ca934b92..2a96d05dc 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java @@ -21,6 +21,7 @@ package org.apache.custos.access.ci.service.handler.amie; import com.fasterxml.jackson.databind.JsonNode; import org.apache.custos.access.ci.service.client.amie.AmieClient; import org.apache.custos.access.ci.service.model.amie.PacketEntity; +import org.apache.custos.access.ci.service.service.PersonService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -38,9 +39,11 @@ public class DataProjectCreateHandler implements PacketHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DataProjectCreateHandler.class); private final AmieClient amieClient; + private final PersonService personService; - public DataProjectCreateHandler(AmieClient amieClient) { + public DataProjectCreateHandler(AmieClient amieClient, PersonService personService) { this.amieClient = amieClient; + this.personService = personService; } @Override @@ -61,11 +64,11 @@ public class DataProjectCreateHandler implements PacketHandler { Assert.hasText(personId, "'PersonID' must not be empty."); LOGGER.info("Packet validated. ProjectID: [{}], PersonID: [{}].", projectId, personId); - // TODO update the local DB with the dnList against to the user's profile if (dnList.isArray() && !dnList.isEmpty()) { - LOGGER.info("Received DnList for user [{}].", personId); - // TODO - userService.updateUserDnList(personId, dnList); + LOGGER.info("Persisting DnList for person [{}] from data_project_create.", personId); + personService.persistDnsForPerson(personId, dnList); } + sendSuccessReply(packetEntity.getAmieId()); } @@ -84,4 +87,4 @@ public class DataProjectCreateHandler implements PacketHandler { LOGGER.info("Successfully sent 'inform_transaction_complete' for packet_rec_id [{}].", packetRecId); } -} \ No newline at end of file +} diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java index e03d04d17..66c8a163d 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java @@ -71,6 +71,9 @@ public class ProcessingEventEntity { @Column(name = "last_error", columnDefinition = "TEXT") private String lastError; + @Column(name = "next_retry_at") + private Instant nextRetryAt; + public ProcessingEventEntity() { this.id = UUID.randomUUID().toString(); } @@ -154,4 +157,12 @@ public class ProcessingEventEntity { public void setLastError(String lastError) { this.lastError = lastError; } + + public Instant getNextRetryAt() { + return nextRetryAt; + } + + public void setNextRetryAt(Instant nextRetryAt) { + this.nextRetryAt = nextRetryAt; + } } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java index 14229c4e6..f4a9290ea 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java @@ -39,5 +39,10 @@ public enum ProcessingStatus { /** * The event failed a previous attempt and is waiting to be retried. */ - RETRY_SCHEDULED + RETRY_SCHEDULED, + /** + * The event has exhausted all retry attempts and will never be retried automatically. + * Manual intervention is required. + */ + PERMANENTLY_FAILED } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java index 88b9463c0..a4771cb98 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java @@ -19,9 +19,11 @@ package org.apache.custos.access.ci.service.repo; import org.apache.custos.access.ci.service.model.ClusterAccountEntity; +import org.apache.custos.access.ci.service.model.PersonEntity; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.stereotype.Repository; +import java.util.List; import java.util.Optional; @Repository @@ -29,4 +31,6 @@ public interface ClusterAccountRepository extends JpaRepository<ClusterAccountEn Optional<ClusterAccountEntity> findByUsername(String username); + List<ClusterAccountEntity> findByPerson(PersonEntity person); + } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java index 930f350c4..262de9bc3 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java @@ -36,4 +36,6 @@ public interface ProjectMembershipRepository extends JpaRepository<ProjectMember List<ProjectMembershipEntity> findByProjectIdAndClusterAccount_Person_Id(String projectId, String personId); + List<ProjectMembershipEntity> findByClusterAccount_Person_IdAndProjectIdAndRole(String personId, String projectId, String role); + } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java index 3facb704a..eee5034db 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java @@ -25,12 +25,21 @@ import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; +import java.time.Instant; import java.util.Collection; import java.util.List; @Repository public interface ProcessingEventRepository extends JpaRepository<ProcessingEventEntity, String> { - @Query("SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet WHERE e.status IN :statuses ORDER BY e.createdAt ASC") - List<ProcessingEventEntity> findTop50EventsToProcess(@Param("statuses") Collection<ProcessingStatus> statuses); + @Query(""" + SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet + WHERE e.status IN :statuses + AND (e.nextRetryAt IS NULL OR e.nextRetryAt <= :now) + ORDER BY e.createdAt ASC + LIMIT 50 + """) + List<ProcessingEventEntity> findTop50EventsToProcess( + @Param("statuses") Collection<ProcessingStatus> statuses, + @Param("now") Instant now); } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java index f5d9a62c5..ea588f993 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java @@ -27,6 +27,7 @@ import org.apache.custos.access.ci.service.repo.PersonDnsRepository; import org.apache.custos.access.ci.service.repo.PersonRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -111,17 +112,16 @@ public class PersonService { PersonEntity person = personRepository.findById(personId) .orElseThrow(() -> new IllegalArgumentException("Unknown local PersonID: " + personId)); - // Update fields - if (body.has("FirstName")) person.setFirstName(body.path("FirstName").asText(person.getFirstName())); - if (body.has("LastName")) person.setLastName(body.path("LastName").asText(person.getLastName())); - if (body.has("Email")) person.setEmail(body.path("Email").asText(person.getEmail())); - person.setOrganization(body.has("Organization") ? body.path("Organization").asText(null) : null); - person.setOrgCode(body.has("OrgCode") ? body.path("OrgCode").asText(null) : null); - person.setNsfStatusCode(body.has("NsfStatusCode") ? body.path("NsfStatusCode").asText(null) : null); + if (body.has("UserFirstName")) person.setFirstName(body.path("UserFirstName").asText(person.getFirstName())); + if (body.has("UserLastName")) person.setLastName(body.path("UserLastName").asText(person.getLastName())); + if (body.has("UserEmail")) person.setEmail(body.path("UserEmail").asText(person.getEmail())); + if (body.has("UserOrganization")) person.setOrganization(body.path("UserOrganization").asText(null)); + if (body.has("UserOrgCode")) person.setOrgCode(body.path("UserOrgCode").asText(null)); + if (body.has("NsfStatusCode")) person.setNsfStatusCode(body.path("NsfStatusCode").asText(null)); personRepository.save(person); Set<String> newDns = new HashSet<>(); - JsonNode dnList = body.path("DnList"); + JsonNode dnList = body.path("UserDnList"); if (dnList != null && dnList.isArray()) { for (JsonNode dnNode : dnList) { String dn = dnNode.asText(null); @@ -130,7 +130,9 @@ public class PersonService { } if (newDns.isEmpty()) { - personDnsRepository.deleteByPerson_Id(personId); + if (body.has("UserDnList")) { + personDnsRepository.deleteByPerson_Id(personId); + } } else { personDnsRepository.deleteByPerson_IdAndDnNotIn(personId, new ArrayList<>(newDns)); for (String dn : newDns) { @@ -144,6 +146,47 @@ public class PersonService { } } + @Transactional + public void persistDnsForPerson(String personId, JsonNode dnList) { + Assert.hasText(personId, "personId must not be blank"); + + if (dnList == null || !dnList.isArray() || dnList.isEmpty()) { + return; + } + + PersonEntity person = personRepository.findById(personId) + .orElseThrow(() -> new IllegalArgumentException("Unknown local PersonID: " + personId)); + + Set<String> incomingDns = new HashSet<>(); + for (JsonNode dnNode : dnList) { + String dn = dnNode.asText(null); + if (dn != null && !dn.isBlank()) { + incomingDns.add(dn); + } + } + + if (incomingDns.isEmpty()) { + return; + } + + for (String dn : incomingDns) { + if (!personDnsRepository.existsByPerson_IdAndDn(personId, dn)) { + PersonDnsEntity pde = new PersonDnsEntity(); + pde.setPerson(person); + pde.setDn(dn); + try { + personDnsRepository.save(pde); + LOGGER.info("Persisted new DN for person [{}].", personId); + LOGGER.debug("Persisted DN [{}] for person [{}].", dn, personId); + } catch (DataIntegrityViolationException ex) { + LOGGER.debug("DN already exists for person [{}] (concurrent insert), skipping.", personId); + } + } else { + LOGGER.debug("DN [{}] already exists for person [{}], skipping.", dn, personId); + } + } + } + @Transactional public void deleteFromModifyPacket(JsonNode body) { String personId = body.path("PersonID").asText(null); diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java index 12471cf95..3726e636a 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java @@ -58,9 +58,20 @@ public class ProjectMembershipService { @Transactional public ProjectMembershipEntity createMembership(String projectId, String clusterAccountId, String role) { Optional<ProjectMembershipEntity> existing = membershipRepository.findByProjectIdAndClusterAccountId(projectId, clusterAccountId); + if (existing.isPresent()) { - LOGGER.info("Membership already exists for project [{}] and cluster account [{}]", projectId, clusterAccountId); - return existing.get(); + ProjectMembershipEntity membership = existing.get(); + if (membership.isActive()) { + LOGGER.info("Active membership already exists for project [{}] and cluster account [{}], returning existing record", + projectId, clusterAccountId); + return membership; + } + LOGGER.info("Reactivating inactive membership for project [{}] and cluster account [{}] with role [{}]", + projectId, clusterAccountId, role); + membership.setActive(true); + membership.setRole(role); + membershipRepository.save(membership); + return membership; } ProjectEntity project = projectRepository.findById(projectId) @@ -88,19 +99,20 @@ public class ProjectMembershipService { * @param personId Person ID */ @Transactional - public void inactivateMembershipsByPersonAndProject(String projectId, String personId) { + public int inactivateMembershipsByPersonAndProject(String projectId, String personId) { // TODO - If the user is a PI of a project? // - right now only the membership is turned inactive, no changes to the project List<ProjectMembershipEntity> memberships = membershipRepository.findByProjectIdAndClusterAccount_Person_Id(projectId, personId); if (memberships.isEmpty()) { - LOGGER.warn("No memberships found for person [{}] on project [{}]. No action taken.", personId, projectId); - return; + LOGGER.warn("No memberships found for person [{}] on project [{}]. Inactivation had no effect.", personId, projectId); + return 0; } memberships.forEach(membership -> membership.setActive(false)); membershipRepository.saveAll(memberships); LOGGER.info("Inactivated {} membership(s) for person [{}] on project [{}]", memberships.size(), personId, projectId); + return memberships.size(); } /** @@ -143,17 +155,18 @@ public class ProjectMembershipService { * @param personId Person ID */ @Transactional - public void reactivateMembershipsByPersonAndProject(String projectId, String personId) { + public int reactivateMembershipsByPersonAndProject(String projectId, String personId) { List<ProjectMembershipEntity> memberships = membershipRepository.findByProjectIdAndClusterAccount_Person_Id(projectId, personId); if (memberships.isEmpty()) { - LOGGER.warn("No memberships found for person [{}] on project [{}]. No action taken.", personId, projectId); - return; + LOGGER.warn("No memberships found for person [{}] on project [{}]. Reactivation had no effect.", personId, projectId); + return 0; } memberships.forEach(membership -> membership.setActive(true)); membershipRepository.saveAll(memberships); LOGGER.info("Reactivated {} membership(s) for person [{}] on project [{}]", memberships.size(), personId, projectId); + return memberships.size(); } } diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java index c50ba715b..5672014de 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java @@ -23,9 +23,11 @@ import org.apache.custos.access.ci.service.model.PersonEntity; import org.apache.custos.access.ci.service.repo.ClusterAccountRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.dao.DataIntegrityViolationException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.List; import java.util.UUID; /** @@ -52,6 +54,14 @@ public class UserAccountService { public ClusterAccountEntity provisionClusterAccount(PersonEntity person) { // TODO Replace with external source of truth (e.g., COmanage) lookup for PersonID and username + List<ClusterAccountEntity> existing = clusterAccountRepository.findByPerson(person); + if (!existing.isEmpty()) { + ClusterAccountEntity account = existing.get(0); + LOGGER.info("Cluster account already exists for person [{}], returning existing account with username [{}]", + person.getId(), account.getUsername()); + return account; + } + String proposedUsername = (person.getFirstName().trim().charAt(0) + person.getLastName().trim().replace(" ", "-")).toLowerCase(); String uniqueUsername = ensureUniqueUsername(proposedUsername); @@ -61,9 +71,20 @@ public class UserAccountService { newClusterAccount.setId(UUID.randomUUID().toString()); newClusterAccount.setPerson(person); newClusterAccount.setUsername(uniqueUsername); - clusterAccountRepository.save(newClusterAccount); - return newClusterAccount; + ClusterAccountEntity account; + try { + account = clusterAccountRepository.save(newClusterAccount); + } catch (DataIntegrityViolationException e) { + List<ClusterAccountEntity> retryLookup = clusterAccountRepository.findByPerson(person); + if (!retryLookup.isEmpty()) { + LOGGER.warn("Concurrent account creation detected for person [{}]; returning existing account.", person.getId()); + return retryLookup.get(0); + } + throw e; // Re-throw if it was a different constraint violation + } + + return account; } private String ensureUniqueUsername(String baseUsername) { @@ -79,4 +100,3 @@ public class UserAccountService { return candidate; } } - diff --git a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java index 89e5292ea..bdc2d105b 100644 --- a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java +++ b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java @@ -43,14 +43,34 @@ import java.time.Instant; import java.util.List; /** - * A scheduled worker that fetches for new/ processing events and executes them. - * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED) + * Scheduled worker that polls for pending AMIE processing events and executes them. + * Failures are recorded in a separate transaction to prevent infinite retry loops. + * + * <pre> + * NEW → RUNNING → SUCCEEDED + * → RETRY_SCHEDULED → (backoff) → RUNNING → ... + * → PERMANENTLY_FAILED (after MAX_ATTEMPTS) + * </pre> */ @Component public class ProcessingEventWorker { private static final Logger LOGGER = LoggerFactory.getLogger(ProcessingEventWorker.class); - private static final int MAX_ATTEMPTS = 3; + + /** + * Maximum number of execution attempts before an event is permanently failed. + */ + static final int MAX_ATTEMPTS = 3; + + /** + * Base delay in seconds for exponential backoff between retry attempts. + */ + private static final long BASE_BACKOFF_SECONDS = 30L; + + /** + * Upper bound on the computed backoff delay (10 minutes). + */ + private static final long MAX_BACKOFF_SECONDS = 600L; private final ProcessingEventRepository eventRepo; private final PacketRepository packetRepo; @@ -59,8 +79,11 @@ public class ProcessingEventWorker { private final ObjectMapper objectMapper = new ObjectMapper(); private final ProcessingEventWorker self; - public ProcessingEventWorker(ProcessingEventRepository eventRepo, PacketRepository packetRepo, - ProcessingErrorRepository errorRepo, PacketRouter router, @Lazy ProcessingEventWorker self) { + public ProcessingEventWorker(ProcessingEventRepository eventRepo, + PacketRepository packetRepo, + ProcessingErrorRepository errorRepo, + PacketRouter router, + @Lazy ProcessingEventWorker self) { this.eventRepo = eventRepo; this.packetRepo = packetRepo; this.errorRepo = errorRepo; @@ -69,88 +92,129 @@ public class ProcessingEventWorker { } /** - * Runs on a fixed delay, checks for NEW/RETRY_SCHEDULED events, and processes them one by one on a separate transaction. + * Polls for due events and processes each in its own transaction. */ @Scheduled(fixedDelayString = "#{T(org.springframework.boot.convert.DurationStyle).detectAndParse('${access.amie.scheduler.worker-delay}').toMillis()}") public void processPendingEvents() { - List<ProcessingEventEntity> eventsToProcess = eventRepo.findTop50EventsToProcess(List.of(ProcessingStatus.NEW, ProcessingStatus.RETRY_SCHEDULED)); + List<ProcessingEventEntity> eventsToProcess = + eventRepo.findTop50EventsToProcess( + List.of(ProcessingStatus.NEW, ProcessingStatus.RETRY_SCHEDULED), + Instant.now()); + + if (eventsToProcess.isEmpty()) { + return; + } + + LOGGER.info("Found {} event(s) to process.", eventsToProcess.size()); - if (!eventsToProcess.isEmpty()) { - LOGGER.info("Found {} event(s) to process.", eventsToProcess.size()); - eventsToProcess.forEach(event -> { + for (ProcessingEventEntity event : eventsToProcess) { + String eventId = event.getId(); + try { + self.executeEventInTransaction(event); + } catch (Exception e) { + LOGGER.error("Transaction failed for eventId [{}]. Opening recovery transaction to record failure.", + eventId, e); try { - self.executeEventInTransaction(event); - } catch (Exception e) { - LOGGER.error("An unexpected error occurred while processing of eventId [{}].", event.getId(), e); + self.recordFailureInNewTransaction(eventId, e); + } catch (Exception recoveryEx) { + LOGGER.error("CRITICAL: Recovery transaction also failed for eventId [{}]. " + + "Event may remain stuck until the next worker cycle.", eventId, recoveryEx); } - }); + } } } @Transactional(propagation = Propagation.REQUIRES_NEW) - public void executeEventInTransaction(ProcessingEventEntity event) { + public void executeEventInTransaction(ProcessingEventEntity event) throws Exception { PacketEntity packet = event.getPacket(); - LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: {}", event.getType(), packet.getAmieId(), event.getAttempts() + 1); + LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: {}", + event.getType(), packet.getAmieId(), event.getAttempts() + 1); event.setStatus(ProcessingStatus.RUNNING); event.setStartedAt(Instant.now()); event.setAttempts(event.getAttempts() + 1); eventRepo.saveAndFlush(event); - try { - var packetJson = objectMapper.readTree(packet.getRawJson()); - router.route(packetJson, packet); + var packetJson = objectMapper.readTree(packet.getRawJson()); + router.route(packetJson, packet); - handleSuccess(event, packet); - - } catch (Exception e) { - LOGGER.error("Event processing failed for packet amie_id [{}]. Attempt {} of {}.", packet.getAmieId(), event.getAttempts(), MAX_ATTEMPTS, e); - handleFailure(event, packet, e); - } + handleSuccess(event, packet); } - private void handleSuccess(ProcessingEventEntity event, PacketEntity packet) { - event.setStatus(ProcessingStatus.SUCCEEDED); - event.setFinishedAt(Instant.now()); - eventRepo.save(event); - - if (event.getType() == ProcessingEventType.DECODE_PACKET) { - packet.setStatus(PacketStatus.DECODED); - packet.setDecodedAt(Instant.now()); - packetRepo.save(packet); + @Transactional(propagation = Propagation.REQUIRES_NEW) + public void recordFailureInNewTransaction(String eventId, Exception cause) { + ProcessingEventEntity event = eventRepo.findById(eventId).orElse(null); + if (event == null) { + LOGGER.error("Cannot record failure: event [{}] not found in the database.", eventId); + return; } - LOGGER.info("Successfully processed event [{}] for packet amie_id [{}].", event.getType(), packet.getAmieId()); - } + PacketEntity packet = event.getPacket(); - private void handleFailure(ProcessingEventEntity event, PacketEntity packet, Exception e) { - // Check if the event should be retried or marked as failed - boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS; - ProcessingStatus newStatus = isRetryable ? ProcessingStatus.RETRY_SCHEDULED : ProcessingStatus.FAILED; + int effectiveAttempts = event.getAttempts() + 1; + event.setAttempts(effectiveAttempts); + + boolean isRetryable = effectiveAttempts < MAX_ATTEMPTS; + ProcessingStatus newStatus = isRetryable + ? ProcessingStatus.RETRY_SCHEDULED + : ProcessingStatus.PERMANENTLY_FAILED; event.setStatus(newStatus); - event.setLastError(e.getMessage()); + event.setLastError(cause.getMessage()); event.setFinishedAt(Instant.now()); - eventRepo.save(event); - if (!isRetryable) { - LOGGER.error("Event for packet amie_id [{}] has failed permanently after {} attempts.", packet.getAmieId(), event.getAttempts()); + if (isRetryable) { + Instant nextRetryAt = computeNextRetryAt(effectiveAttempts); + event.setNextRetryAt(nextRetryAt); + LOGGER.warn("Event [{}] for packet amie_id [{}] failed on attempt {}/{}. Scheduled for retry after [{}].", + eventId, packet.getAmieId(), effectiveAttempts, MAX_ATTEMPTS, nextRetryAt); + } else { + event.setNextRetryAt(null); + LOGGER.error("Event [{}] for packet amie_id [{}] is PERMANENTLY_FAILED after {} attempt(s). Manual intervention required.", + eventId, packet.getAmieId(), effectiveAttempts); packet.setStatus(PacketStatus.FAILED); - packet.setLastError(e.getMessage()); + packet.setLastError(cause.getMessage()); packetRepo.save(packet); - - } else { - LOGGER.warn("Event for packet amie_id [{}] will be retried. Status set to {}.", packet.getAmieId(), newStatus); } + eventRepo.save(event); + ProcessingErrorEntity error = new ProcessingErrorEntity(); error.setPacket(packet); error.setEvent(event); - error.setSummary(e.getClass().getSimpleName() + ": " + e.getMessage()); - error.setDetail(getStackTraceAsString(e)); + error.setSummary(cause.getClass().getSimpleName() + ": " + cause.getMessage()); + String stackTrace = getStackTraceAsString(cause); + if (stackTrace.length() > 8000) { + stackTrace = stackTrace.substring(0, 8000) + "\n... [truncated]"; + } + error.setDetail(stackTrace); errorRepo.save(error); } + private void handleSuccess(ProcessingEventEntity event, PacketEntity packet) { + event.setStatus(ProcessingStatus.SUCCEEDED); + event.setFinishedAt(Instant.now()); + event.setNextRetryAt(null); + eventRepo.save(event); + + if (event.getType() == ProcessingEventType.DECODE_PACKET) { + packet.setStatus(PacketStatus.DECODED); + packet.setDecodedAt(Instant.now()); + packetRepo.save(packet); + } + + LOGGER.info("Successfully processed event [{}] for packet amie_id [{}].", + event.getType(), packet.getAmieId()); + } + + // Exponential backoff: BASE * 2^(attempt-1), capped at MAX_BACKOFF_SECONDS. + static Instant computeNextRetryAt(int attemptNumber) { + long exponent = Math.max(0, attemptNumber - 1); + long delaySec = BASE_BACKOFF_SECONDS * (1L << exponent); + delaySec = Math.min(delaySec, MAX_BACKOFF_SECONDS); + return Instant.now().plusSeconds(delaySec); + } + private String getStackTraceAsString(Exception e) { StringWriter sw = new StringWriter(); e.printStackTrace(new PrintWriter(sw)); diff --git a/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql b/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql new file mode 100644 index 000000000..3c6e7a31d --- /dev/null +++ b/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql @@ -0,0 +1,5 @@ +-- Add retry backoff support to processing events. + +ALTER TABLE amie_processing_events + ADD COLUMN next_retry_at TIMESTAMP(6) NULL DEFAULT NULL, + ADD INDEX idx_amie_events_next_retry_at (next_retry_at); diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java index 9ea46c424..ded76306d 100644 --- a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.custos.access.ci.service.client.amie.AmieClient; import org.apache.custos.access.ci.service.model.amie.PacketEntity; +import org.apache.custos.access.ci.service.service.PersonService; import org.apache.custos.access.ci.service.util.JsonTestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) @@ -47,12 +49,15 @@ class DataAccountCreateHandlerTest { @Mock private AmieClient amieClient; + @Mock + private PersonService personService; + private DataAccountCreateHandler handler; private ObjectMapper objectMapper; @BeforeEach void setUp() { - handler = new DataAccountCreateHandler(amieClient); + handler = new DataAccountCreateHandler(amieClient, personService); objectMapper = new ObjectMapper(); } @@ -115,27 +120,52 @@ class DataAccountCreateHandlerTest { } @Test - void handle_withEmptyDnList_shouldProcessSuccessfully() { + void handle_withEmptyDnList_shouldNotCallPersonServiceAndSendReply() { JsonNode packetJson = createValidPacketJson(); PacketEntity packetEntity = createPacketEntity(); handler.handle(packetJson, packetEntity); + verify(personService, never()).persistDnsForPerson(any(), any()); //noinspection unchecked verify(amieClient).replyToPacket(eq(12345L), any(Map.class)); } @Test - void handle_withNonEmptyDnList_shouldProcessSuccessfully() { + void handle_withNonEmptyDnList_shouldPersistDnsAndSendReply() { JsonNode packetJson = createPacketJsonWithDnList(); PacketEntity packetEntity = createPacketEntity(); handler.handle(packetJson, packetEntity); + // Verify personService.persistDnsForPerson was called with the correct personId + ArgumentCaptor<String> personIdCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<JsonNode> dnListCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(personService).persistDnsForPerson(personIdCaptor.capture(), dnListCaptor.capture()); + + assertThat(personIdCaptor.getValue()).isEqualTo("person-123"); + assertThat(dnListCaptor.getValue().isArray()).isTrue(); + assertThat(dnListCaptor.getValue().size()).isEqualTo(2); + //noinspection unchecked verify(amieClient).replyToPacket(eq(12345L), any(Map.class)); } + @Test + void handle_withValidPacketContainingDnList_shouldPersistAllDns() { + JsonNode incomingPacket = JsonTestUtils.loadMockPacket("data_account_create", "incoming-data.json"); + + PacketEntity packetEntity = new PacketEntity(); + packetEntity.setAmieId(233497918L); + packetEntity.setType("data_account_create"); + + handler.handle(incomingPacket, packetEntity); + + ArgumentCaptor<JsonNode> dnListCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(personService).persistDnsForPerson(eq("test-user-person-123"), dnListCaptor.capture()); + assertThat(dnListCaptor.getValue().size()).isEqualTo(3); + } + private JsonNode createValidPacketJson() { return objectMapper.createObjectNode() .set("body", objectMapper.createObjectNode() diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java index 5e5dc844c..a1f911fdf 100644 --- a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.custos.access.ci.service.client.amie.AmieClient; import org.apache.custos.access.ci.service.model.amie.PacketEntity; +import org.apache.custos.access.ci.service.service.PersonService; import org.apache.custos.access.ci.service.util.JsonTestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) @@ -47,12 +49,15 @@ class DataProjectCreateHandlerTest { @Mock private AmieClient amieClient; + @Mock + private PersonService personService; + private DataProjectCreateHandler handler; private ObjectMapper objectMapper; @BeforeEach void setUp() { - handler = new DataProjectCreateHandler(amieClient); + handler = new DataProjectCreateHandler(amieClient, personService); objectMapper = new ObjectMapper(); } @@ -115,27 +120,51 @@ class DataProjectCreateHandlerTest { } @Test - void handle_withEmptyDnList_shouldProcessSuccessfully() { + void handle_withEmptyDnList_shouldNotCallPersonServiceAndSendReply() { JsonNode packetJson = createValidPacketJson(); PacketEntity packetEntity = createPacketEntity(); handler.handle(packetJson, packetEntity); + verify(personService, never()).persistDnsForPerson(any(), any()); //noinspection unchecked verify(amieClient).replyToPacket(eq(12345L), any(Map.class)); } @Test - void handle_withNonEmptyDnList_shouldProcessSuccessfully() { + void handle_withNonEmptyDnList_shouldPersistDnsAndSendReply() { JsonNode packetJson = createPacketJsonWithDnList(); PacketEntity packetEntity = createPacketEntity(); handler.handle(packetJson, packetEntity); + ArgumentCaptor<String> personIdCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<JsonNode> dnListCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(personService).persistDnsForPerson(personIdCaptor.capture(), dnListCaptor.capture()); + + assertThat(personIdCaptor.getValue()).isEqualTo("person-123"); + assertThat(dnListCaptor.getValue().isArray()).isTrue(); + assertThat(dnListCaptor.getValue().size()).isEqualTo(2); + //noinspection unchecked verify(amieClient).replyToPacket(eq(12345L), any(Map.class)); } + @Test + void handle_withValidPacketContainingDnList_shouldPersistAllDns() { + JsonNode incomingPacket = JsonTestUtils.loadMockPacket("data_project_create", "incoming-data.json"); + + PacketEntity packetEntity = new PacketEntity(); + packetEntity.setAmieId(233497909L); + packetEntity.setType("data_project_create"); + + handler.handle(incomingPacket, packetEntity); + + ArgumentCaptor<JsonNode> dnListCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(personService).persistDnsForPerson(eq("test-person-456"), dnListCaptor.capture()); + assertThat(dnListCaptor.getValue().size()).isEqualTo(3); + } + private JsonNode createValidPacketJson() { return objectMapper.createObjectNode().set("body", objectMapper.createObjectNode() .put("ProjectID", "PRJ-TEST123") diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java index 23371afcb..434fd1423 100644 --- a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java @@ -120,6 +120,10 @@ class PersonServiceTest { .hasMessageContaining("Packet body must contain a 'UserGlobalID'"); } + // ------------------------------------------------------------------------- + // replaceFromModifyPacket + // ------------------------------------------------------------------------- + @Test void replaceFromModifyPacket_shouldUpdatePersonFields() { JsonNode body = createModifyPacketBody(); @@ -137,6 +141,95 @@ class PersonServiceTest { assertThat(existingPerson.getNsfStatusCode()).isEqualTo("INACTIVE"); } + @Test + void replaceFromModifyPacket_shouldUseCorrectAmieFieldNames() { + // Verify that the correct AMIE field names are used (UserFirstName not FirstName, etc.) + ObjectNode body = objectMapper.createObjectNode() + .put("PersonID", "person-123") + .put("UserFirstName", "Updated") + .put("UserLastName", "Name") + .put("UserEmail", "[email protected]") + .put("UserOrganization", "Updated Org") + .put("UserOrgCode", "UPD") + .put("NsfStatusCode", "INACTIVE"); + + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + + personService.replaceFromModifyPacket(body); + + assertThat(existingPerson.getFirstName()).isEqualTo("Updated"); + assertThat(existingPerson.getLastName()).isEqualTo("Name"); + assertThat(existingPerson.getEmail()).isEqualTo("[email protected]"); + assertThat(existingPerson.getOrganization()).isEqualTo("Updated Org"); + assertThat(existingPerson.getOrgCode()).isEqualTo("UPD"); + assertThat(existingPerson.getNsfStatusCode()).isEqualTo("INACTIVE"); + } + + @Test + void replaceFromModifyPacket_shouldNotWipeFieldsAbsentFromPacket() { + // Packet only contains PersonID and UserFirstName — other fields must remain unchanged + ObjectNode body = objectMapper.createObjectNode() + .put("PersonID", "person-123") + .put("UserFirstName", "OnlyFirstUpdated"); + + PersonEntity existingPerson = createPersonEntity(); + // Pre-existing values that must survive + existingPerson.setLastName("OriginalLast"); + existingPerson.setEmail("[email protected]"); + existingPerson.setOrganization("Original Org"); + existingPerson.setOrgCode("ORIG"); + existingPerson.setNsfStatusCode("ACTIVE"); + + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + + personService.replaceFromModifyPacket(body); + + assertThat(existingPerson.getFirstName()).isEqualTo("OnlyFirstUpdated"); + assertThat(existingPerson.getLastName()).isEqualTo("OriginalLast"); + assertThat(existingPerson.getEmail()).isEqualTo("[email protected]"); + assertThat(existingPerson.getOrganization()).isEqualTo("Original Org"); + assertThat(existingPerson.getOrgCode()).isEqualTo("ORIG"); + assertThat(existingPerson.getNsfStatusCode()).isEqualTo("ACTIVE"); + } + + @Test + void replaceFromModifyPacket_shouldNotWipeOrganizationWhenAbsent() { + // Regression test: old code unconditionally wiped organization/orgCode to null when field absent + ObjectNode body = objectMapper.createObjectNode() + .put("PersonID", "person-123") + .put("UserFirstName", "Jane"); + // No UserOrganization, UserOrgCode in packet + + PersonEntity existingPerson = createPersonEntity(); + existingPerson.setOrganization("Keep This Org"); + existingPerson.setOrgCode("KEEP"); + + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + + personService.replaceFromModifyPacket(body); + + assertThat(existingPerson.getOrganization()).isEqualTo("Keep This Org"); + assertThat(existingPerson.getOrgCode()).isEqualTo("KEEP"); + } + + @Test + void replaceFromModifyPacket_shouldNotClearDnsWhenUserDnListAbsent() { + // When UserDnList is not present in the packet, existing DNs must not be touched + ObjectNode body = objectMapper.createObjectNode() + .put("PersonID", "person-123") + .put("UserFirstName", "Jane"); + // No UserDnList field + + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + + personService.replaceFromModifyPacket(body); + + // deleteByPerson_Id should NOT have been called since UserDnList was absent + verify(personDnsRepository, never()).deleteByPerson_Id("person-123"); + } + @Test void replaceFromModifyPacket_shouldUpdateDnList() { JsonNode body = createModifyPacketBodyWithDns(); @@ -151,9 +244,24 @@ class PersonServiceTest { verify(personDnsRepository).save(any(PersonDnsEntity.class)); } + @Test + void replaceFromModifyPacket_shouldClearDnsWhenEmptyUserDnListPresent() { + // When UserDnList is present but empty, existing DNs should be cleared + ObjectNode body = objectMapper.createObjectNode() + .put("PersonID", "person-123"); + body.set("UserDnList", objectMapper.createArrayNode()); // explicitly empty array + + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + + personService.replaceFromModifyPacket(body); + + verify(personDnsRepository).deleteByPerson_Id("person-123"); + } + @Test void replaceFromModifyPacket_shouldThrowExceptionForMissingPersonId() { - ObjectNode body = objectMapper.createObjectNode().put("FirstName", "Jane"); + ObjectNode body = objectMapper.createObjectNode().put("UserFirstName", "Jane"); assertThatThrownBy(() -> personService.replaceFromModifyPacket(body)) .isInstanceOf(IllegalArgumentException.class) @@ -170,6 +278,84 @@ class PersonServiceTest { .hasMessageContaining("Unknown local PersonID: person-123"); } + // ------------------------------------------------------------------------- + // persistDnsForPerson tests + // ------------------------------------------------------------------------- + + @Test + void persistDnsForPerson_shouldPersistNewDns() { + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + when(personDnsRepository.existsByPerson_IdAndDn("person-123", "/C=US/O=Test/CN=John Doe")).thenReturn(false); + + JsonNode dnList = objectMapper.createArrayNode().add("/C=US/O=Test/CN=John Doe"); + personService.persistDnsForPerson("person-123", dnList); + + ArgumentCaptor<PersonDnsEntity> captor = ArgumentCaptor.forClass(PersonDnsEntity.class); + verify(personDnsRepository).save(captor.capture()); + assertThat(captor.getValue().getDn()).isEqualTo("/C=US/O=Test/CN=John Doe"); + } + + @Test + void persistDnsForPerson_shouldBeIdempotentWhenDnAlreadyExists() { + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + when(personDnsRepository.existsByPerson_IdAndDn("person-123", "/C=US/O=Test/CN=John Doe")).thenReturn(true); + + JsonNode dnList = objectMapper.createArrayNode().add("/C=US/O=Test/CN=John Doe"); + personService.persistDnsForPerson("person-123", dnList); + + verify(personDnsRepository, never()).save(any(PersonDnsEntity.class)); + } + + @Test + void persistDnsForPerson_shouldPersistOnlyNewDnsWhenSomeDnsAlreadyExist() { + PersonEntity existingPerson = createPersonEntity(); + when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson)); + when(personDnsRepository.existsByPerson_IdAndDn("person-123", "/C=US/O=Existing/CN=John")).thenReturn(true); + when(personDnsRepository.existsByPerson_IdAndDn("person-123", "/C=US/O=New/CN=John")).thenReturn(false); + + JsonNode dnList = objectMapper.createArrayNode() + .add("/C=US/O=Existing/CN=John") + .add("/C=US/O=New/CN=John"); + personService.persistDnsForPerson("person-123", dnList); + + ArgumentCaptor<PersonDnsEntity> captor = ArgumentCaptor.forClass(PersonDnsEntity.class); + verify(personDnsRepository, times(1)).save(captor.capture()); + assertThat(captor.getValue().getDn()).isEqualTo("/C=US/O=New/CN=John"); + } + + @Test + void persistDnsForPerson_shouldDoNothingForNullDnList() { + personService.persistDnsForPerson("person-123", null); + + verify(personRepository, never()).findById(any()); + verify(personDnsRepository, never()).save(any()); + } + + @Test + void persistDnsForPerson_shouldDoNothingForEmptyDnList() { + JsonNode emptyDnList = objectMapper.createArrayNode(); + personService.persistDnsForPerson("person-123", emptyDnList); + + verify(personRepository, never()).findById(any()); + verify(personDnsRepository, never()).save(any()); + } + + @Test + void persistDnsForPerson_shouldThrowForUnknownPersonId() { + when(personRepository.findById("unknown-id")).thenReturn(Optional.empty()); + JsonNode dnList = objectMapper.createArrayNode().add("/C=US/O=Test/CN=John"); + + assertThatThrownBy(() -> personService.persistDnsForPerson("unknown-id", dnList)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unknown local PersonID: unknown-id"); + } + + // ------------------------------------------------------------------------- + // deleteFromModifyPacket tests + // ------------------------------------------------------------------------- + @Test void deleteFromModifyPacket_shouldDeletePerson() { JsonNode body = createModifyPacketBody(); @@ -179,13 +365,17 @@ class PersonServiceTest { @Test void deleteFromModifyPacket_shouldThrowExceptionForMissingPersonId() { - ObjectNode body = objectMapper.createObjectNode().put("FirstName", "Jane"); + ObjectNode body = objectMapper.createObjectNode().put("UserFirstName", "Jane"); assertThatThrownBy(() -> personService.deleteFromModifyPacket(body)) .isInstanceOf(IllegalArgumentException.class) .hasMessageContaining("Missing required 'PersonID'"); } + // ------------------------------------------------------------------------- + // mergePersons tests + // ------------------------------------------------------------------------- + @Test void mergePersons_shouldMergeSuccessfully() { PersonEntity survivingPerson = createPersonEntity(); @@ -228,6 +418,10 @@ class PersonServiceTest { .hasMessageContaining("Could not find retiring person with local ID: retiring-person-123"); } + // ------------------------------------------------------------------------- + // Test data helpers + // ------------------------------------------------------------------------- + private JsonNode createValidPacketBody() { return objectMapper.createObjectNode() .put("UserGlobalID", "USER-GLOBAL-123") @@ -249,20 +443,20 @@ class PersonServiceTest { private JsonNode createModifyPacketBody() { return objectMapper.createObjectNode() .put("PersonID", "person-123") - .put("FirstName", "Jane") - .put("LastName", "Smith") - .put("Email", "[email protected]") - .put("Organization", "New Org") - .put("OrgCode", "NEW") + .put("UserFirstName", "Jane") + .put("UserLastName", "Smith") + .put("UserEmail", "[email protected]") + .put("UserOrganization", "New Org") + .put("UserOrgCode", "NEW") .put("NsfStatusCode", "INACTIVE"); } private JsonNode createModifyPacketBodyWithDns() { return objectMapper.createObjectNode() .put("PersonID", "person-123") - .put("FirstName", "Jane") - .put("LastName", "Smith") - .set("DnList", objectMapper.createArrayNode().add("CN=Jane Smith,OU=Users,DC=example,DC=com")); + .put("UserFirstName", "Jane") + .put("UserLastName", "Smith") + .set("UserDnList", objectMapper.createArrayNode().add("CN=Jane Smith,OU=Users,DC=example,DC=com")); } private PersonEntity createPersonEntity() { @@ -277,4 +471,4 @@ class PersonServiceTest { entity.setNsfStatusCode("ACTIVE"); return entity; } -} \ No newline at end of file +} diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java index b089a8bee..fda180f8f 100644 --- a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.custos.access.ci.service.service; import org.apache.custos.access.ci.service.model.ClusterAccountEntity; @@ -42,18 +60,61 @@ class ProjectMembershipServiceTest { } @Test - void createMembership_shouldReturnExistingMembership() { + void createMembership_shouldReturnExistingActiveMembership() { ProjectMembershipEntity existingMembership = createMembershipEntity(); + existingMembership.setActive(true); when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", "ACCOUNT-456")) .thenReturn(Optional.of(existingMembership)); ProjectMembershipEntity result = membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI"); assertThat(result).isEqualTo(existingMembership); + assertThat(result.isActive()).isTrue(); verify(membershipRepository).findByProjectIdAndClusterAccountId("PROJECT-123", "ACCOUNT-456"); verify(membershipRepository, never()).save(any(ProjectMembershipEntity.class)); } + @Test + void createMembership_shouldReactivateInactiveMembership() { + ProjectMembershipEntity inactiveMembership = createMembershipEntity(); + inactiveMembership.setActive(false); + when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", "ACCOUNT-456")) + .thenReturn(Optional.of(inactiveMembership)); + when(membershipRepository.save(any(ProjectMembershipEntity.class))) + .thenAnswer(invocation -> invocation.<ProjectMembershipEntity>getArgument(0)); + + ProjectMembershipEntity result = membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "USER"); + + assertThat(result).isSameAs(inactiveMembership); + assertThat(result.isActive()).isTrue(); + assertThat(result.getRole()).isEqualTo("USER"); + verify(membershipRepository).save(inactiveMembership); + } + + @Test + void createMembership_calledTwiceForSamePair_shouldNotCreateDuplicate() { + ProjectEntity project = createProjectEntity(); + ClusterAccountEntity clusterAccount = createClusterAccountEntity(); + + // First call: no membership exists. + when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", "ACCOUNT-456")) + .thenReturn(Optional.empty()); + when(projectRepository.findById("PROJECT-123")).thenReturn(Optional.of(project)); + when(clusterAccountRepository.findById("ACCOUNT-456")).thenReturn(Optional.of(clusterAccount)); + when(membershipRepository.save(any(ProjectMembershipEntity.class))) + .thenAnswer(invocation -> invocation.<ProjectMembershipEntity>getArgument(0)); + + ProjectMembershipEntity firstResult = membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI"); + assertThat(firstResult.isActive()).isTrue(); + + // Second call: membership now exists and is active. + when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", "ACCOUNT-456")) + .thenReturn(Optional.of(firstResult)); + + ProjectMembershipEntity secondResult = membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI"); + assertThat(secondResult).isSameAs(firstResult); + } + @Test void createMembership_shouldCreateNewMembership() { ProjectEntity project = createProjectEntity(); @@ -98,8 +159,12 @@ class ProjectMembershipServiceTest { .hasMessageContaining("Cluster account not found: ACCOUNT-456"); } + // ------------------------------------------------------------------------- + // inactivateMembershipsByPersonAndProject tests + // ------------------------------------------------------------------------- + @Test - void inactivateMembershipsByPersonAndProject_shouldInactivateMemberships() { + void inactivateMembershipsByPersonAndProject_shouldInactivateMembershipsAndReturnCount() { ProjectMembershipEntity membership1 = createMembershipEntity(); ProjectMembershipEntity membership2 = createMembershipEntity(); membership2.setId("membership-2"); @@ -107,20 +172,28 @@ class ProjectMembershipServiceTest { List<ProjectMembershipEntity> memberships = List.of(membership1, membership2); when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123", "PERSON-456")).thenReturn(memberships); - membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + int count = membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + assertThat(count).isEqualTo(2); assertThat(membership1.isActive()).isFalse(); assertThat(membership2.isActive()).isFalse(); verify(membershipRepository).saveAll(memberships); } @Test - void inactivateMembershipsByPersonAndProject_shouldHandleNoMemberships() { + void inactivateMembershipsByPersonAndProject_shouldReturnZeroWhenNoMembershipsFound() { when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123", "PERSON-456")).thenReturn(List.of()); - membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + + int count = membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + + assertThat(count).isZero(); verify(membershipRepository, never()).saveAll(any()); } + // ------------------------------------------------------------------------- + // inactivateAllMembershipsForProject tests + // ------------------------------------------------------------------------- + @Test void inactivateAllMembershipsForProject_shouldInactivateAllMemberships() { ProjectMembershipEntity membership1 = createMembershipEntity(); @@ -144,6 +217,10 @@ class ProjectMembershipServiceTest { verify(membershipRepository, never()).saveAll(any()); } + // ------------------------------------------------------------------------- + // reactivatePiMembership tests + // ------------------------------------------------------------------------- + @Test void reactivatePiMembership_shouldReactivatePiMemberships() { ProjectMembershipEntity piMembership1 = createMembershipEntity(); @@ -172,6 +249,39 @@ class ProjectMembershipServiceTest { verify(membershipRepository).saveAll(List.of()); } + // ------------------------------------------------------------------------- + // reactivateMembershipsByPersonAndProject tests + // ------------------------------------------------------------------------- + + @Test + void reactivateMembershipsByPersonAndProject_shouldReactivateMembershipsAndReturnCount() { + ProjectMembershipEntity membership1 = createMembershipEntity(); + membership1.setActive(false); + ProjectMembershipEntity membership2 = createMembershipEntity(); + membership2.setId("membership-2"); + membership2.setActive(false); + + List<ProjectMembershipEntity> memberships = List.of(membership1, membership2); + when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123", "PERSON-456")).thenReturn(memberships); + + int count = membershipService.reactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + + assertThat(count).isEqualTo(2); + assertThat(membership1.isActive()).isTrue(); + assertThat(membership2.isActive()).isTrue(); + verify(membershipRepository).saveAll(memberships); + } + + @Test + void reactivateMembershipsByPersonAndProject_shouldReturnZeroWhenNoMembershipsFound() { + when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123", "PERSON-456")).thenReturn(List.of()); + + int count = membershipService.reactivateMembershipsByPersonAndProject("PROJECT-123", "PERSON-456"); + + assertThat(count).isZero(); + verify(membershipRepository, never()).saveAll(any()); + } + private ProjectEntity createProjectEntity() { ProjectEntity entity = new ProjectEntity(); entity.setId("PROJECT-123"); diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java index 38119a10a..81ee9671b 100644 --- a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.custos.access.ci.service.service; import org.apache.custos.access.ci.service.model.ClusterAccountEntity; @@ -10,11 +28,13 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.List; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,9 +52,49 @@ class UserAccountServiceTest { userAccountService = new UserAccountService(clusterAccountRepository); } + @Test + void provisionClusterAccount_shouldReturnExistingAccountWhenPersonAlreadyHasOne() { + PersonEntity person = createPersonEntity(); + ClusterAccountEntity existingAccount = createClusterAccountEntity(person, "jdoe"); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of(existingAccount)); + + ClusterAccountEntity result = userAccountService.provisionClusterAccount(person); + + assertThat(result).isSameAs(existingAccount); + assertThat(result.getUsername()).isEqualTo("jdoe"); + verify(clusterAccountRepository, never()).save(any(ClusterAccountEntity.class)); + } + + @Test + void provisionClusterAccount_calledTwiceForSamePerson_shouldReturnSameAccountBothTimes() { + PersonEntity person = createPersonEntity(); + ClusterAccountEntity existingAccount = createClusterAccountEntity(person, "jdoe"); + + // First call: no account exists yet, create one. + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); + when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.empty()); + when(clusterAccountRepository.save(any(ClusterAccountEntity.class))) + .thenAnswer(invocation -> invocation.<ClusterAccountEntity>getArgument(0)); + + ClusterAccountEntity firstResult = userAccountService.provisionClusterAccount(person); + assertThat(firstResult.getUsername()).isEqualTo("jdoe"); + + // Second call: account now exists. + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of(existingAccount)); + + ClusterAccountEntity secondResult = userAccountService.provisionClusterAccount(person); + assertThat(secondResult).isSameAs(existingAccount); + assertThat(secondResult.getUsername()).isEqualTo("jdoe"); + } + + // ------------------------------------------------------------------------- + // New account creation tests + // ------------------------------------------------------------------------- + @Test void provisionClusterAccount_shouldCreateUniqueUsername() { PersonEntity person = createPersonEntity(); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.empty()); when(clusterAccountRepository.save(any(ClusterAccountEntity.class))) .thenAnswer(invocation -> invocation.<ClusterAccountEntity>getArgument(0)); @@ -50,6 +110,7 @@ class UserAccountServiceTest { @Test void provisionClusterAccount_shouldGenerateUniqueUsernameWhenBaseIsTaken() { PersonEntity person = createPersonEntity(); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.of(new ClusterAccountEntity())); when(clusterAccountRepository.findByUsername("jdoe1")).thenReturn(Optional.empty()); when(clusterAccountRepository.save(any(ClusterAccountEntity.class))) @@ -66,6 +127,7 @@ class UserAccountServiceTest { @Test void provisionClusterAccount_shouldHandleMultipleSuffixes() { PersonEntity person = createPersonEntity(); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.of(new ClusterAccountEntity())); when(clusterAccountRepository.findByUsername("jdoe1")).thenReturn(Optional.of(new ClusterAccountEntity())); when(clusterAccountRepository.findByUsername("jdoe2")).thenReturn(Optional.empty()); @@ -85,6 +147,7 @@ class UserAccountServiceTest { PersonEntity person = createPersonEntity(); person.setFirstName("John Michael"); person.setLastName("Doe Smith"); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); when(clusterAccountRepository.findByUsername("jdoe-smith")).thenReturn(Optional.empty()); when(clusterAccountRepository.save(any(ClusterAccountEntity.class))) .thenAnswer(invocation -> invocation.<ClusterAccountEntity>getArgument(0)); @@ -102,10 +165,15 @@ class UserAccountServiceTest { PersonEntity person = createPersonEntity(); person.setFirstName(""); person.setLastName(""); + when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of()); assertThatThrownBy(() -> userAccountService.provisionClusterAccount(person)).isInstanceOf(StringIndexOutOfBoundsException.class); } + // ------------------------------------------------------------------------- + // Helper factories + // ------------------------------------------------------------------------- + private PersonEntity createPersonEntity() { PersonEntity entity = new PersonEntity(); entity.setId("person-123"); @@ -115,4 +183,12 @@ class UserAccountServiceTest { entity.setEmail("[email protected]"); return entity; } + + private ClusterAccountEntity createClusterAccountEntity(PersonEntity person, String username) { + ClusterAccountEntity entity = new ClusterAccountEntity(); + entity.setId("account-456"); + entity.setPerson(person); + entity.setUsername(username); + return entity; + } } diff --git a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java new file mode 100644 index 000000000..05e7f1df5 --- /dev/null +++ b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.custos.access.ci.service.worker.amie; + +import org.apache.custos.access.ci.service.handler.amie.PacketRouter; +import org.apache.custos.access.ci.service.model.amie.PacketEntity; +import org.apache.custos.access.ci.service.model.amie.PacketStatus; +import org.apache.custos.access.ci.service.model.amie.ProcessingErrorEntity; +import org.apache.custos.access.ci.service.model.amie.ProcessingEventEntity; +import org.apache.custos.access.ci.service.model.amie.ProcessingEventType; +import org.apache.custos.access.ci.service.model.amie.ProcessingStatus; +import org.apache.custos.access.ci.service.repo.amie.PacketRepository; +import org.apache.custos.access.ci.service.repo.amie.ProcessingErrorRepository; +import org.apache.custos.access.ci.service.repo.amie.ProcessingEventRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for ProcessingEventWorker. + */ +@ExtendWith(MockitoExtension.class) +@Tag("unit") +class ProcessingEventWorkerTest { + + @Mock + private ProcessingEventRepository eventRepo; + + @Mock + private PacketRepository packetRepo; + + @Mock + private ProcessingErrorRepository errorRepo; + + @Mock + private PacketRouter router; + + @Mock + private ProcessingEventWorker self; + + private ProcessingEventWorker worker; + + @BeforeEach + void setUp() { + worker = new ProcessingEventWorker(eventRepo, packetRepo, errorRepo, router, self); + } + + // ------------------------------------------------------------------ + // processPendingEvents — orchestration + // ------------------------------------------------------------------ + + @Test + void processPendingEvents_whenNoEvents_doesNothing() throws Exception { + when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class))).thenReturn(List.of()); + + worker.processPendingEvents(); + + verify(self, never()).executeEventInTransaction(any()); + verify(self, never()).recordFailureInNewTransaction(any(), any()); + } + + @Test + void processPendingEvents_callsExecuteForEachEvent() throws Exception { + ProcessingEventEntity event1 = buildEvent("id-1", ProcessingStatus.NEW, 0); + ProcessingEventEntity event2 = buildEvent("id-2", ProcessingStatus.RETRY_SCHEDULED, 1); + when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class))) + .thenReturn(List.of(event1, event2)); + + worker.processPendingEvents(); + + verify(self).executeEventInTransaction(event1); + verify(self).executeEventInTransaction(event2); + verify(self, never()).recordFailureInNewTransaction(any(), any()); + } + + @Test + void processPendingEvents_whenExecuteThrows_callsRecoveryTransactionWithEventId() throws Exception { + ProcessingEventEntity event = buildEvent("id-thrown", ProcessingStatus.NEW, 0); + when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class))) + .thenReturn(List.of(event)); + + RuntimeException cause = new RuntimeException("handler blew up"); + doThrow(cause).when(self).executeEventInTransaction(event); + + worker.processPendingEvents(); + + verify(self).recordFailureInNewTransaction("id-thrown", cause); + } + + @Test + void processPendingEvents_whenBothExecuteAndRecoveryThrow_continuesProcessingRemainingEvents() throws Exception { + ProcessingEventEntity badEvent = buildEvent("bad-id", ProcessingStatus.NEW, 0); + ProcessingEventEntity goodEvent = buildEvent("good-id", ProcessingStatus.NEW, 0); + when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class))) + .thenReturn(List.of(badEvent, goodEvent)); + + RuntimeException executeCause = new RuntimeException("execute failed"); + doThrow(executeCause).when(self).executeEventInTransaction(badEvent); + doThrow(new RuntimeException("recovery also failed")) + .when(self).recordFailureInNewTransaction("bad-id", executeCause); + + // Should not propagate — the worker logs and continues to goodEvent. + worker.processPendingEvents(); + + verify(self).executeEventInTransaction(goodEvent); + } + + // ------------------------------------------------------------------ + // recordFailureInNewTransaction — retryable failures + // ------------------------------------------------------------------ + + @Test + void recordFailureInNewTransaction_onFirstFailure_setsRetryScheduledWithBackoff() { + ProcessingEventEntity event = buildEvent("evt-1", ProcessingStatus.NEW, 0); + PacketEntity packet = event.getPacket(); + when(eventRepo.findById("evt-1")).thenReturn(Optional.of(event)); + + RuntimeException cause = new IllegalArgumentException("Project not found"); + Instant before = Instant.now(); + + worker.recordFailureInNewTransaction("evt-1", cause); + + Instant after = Instant.now(); + + assertThat(event.getStatus()).isEqualTo(ProcessingStatus.RETRY_SCHEDULED); + assertThat(event.getAttempts()).isEqualTo(1); // incremented from 0 + assertThat(event.getLastError()).isEqualTo("Project not found"); + assertThat(event.getNextRetryAt()).isNotNull(); + // Backoff for attempt 1 = 30 s; nextRetryAt must be in the near future + assertThat(event.getNextRetryAt()).isAfterOrEqualTo(before.plusSeconds(29)); + assertThat(event.getNextRetryAt()).isBeforeOrEqualTo(after.plusSeconds(31)); + + verify(eventRepo).save(event); + verify(packetRepo, never()).save(any()); + + ArgumentCaptor<ProcessingErrorEntity> errorCaptor = ArgumentCaptor.forClass(ProcessingErrorEntity.class); + verify(errorRepo).save(errorCaptor.capture()); + ProcessingErrorEntity error = errorCaptor.getValue(); + assertThat(error.getSummary()).contains("IllegalArgumentException"); + assertThat(error.getSummary()).contains("Project not found"); + assertThat(error.getPacket()).isSameAs(packet); + assertThat(error.getEvent()).isSameAs(event); + } + + @Test + void recordFailureInNewTransaction_onSecondFailure_setsRetryScheduledWithDoubledBackoff() { + // Simulate: the first attempt rolled back (REQUIRES_NEW), so the event is + // re-read with RETRY_SCHEDULED status and attempts=1 (the previous committed + // retry count). The recovery method unconditionally increments attempts to 2 + // and applies a 60-second backoff (BASE * 2^1). + ProcessingEventEntity event = buildEvent("evt-2", ProcessingStatus.RETRY_SCHEDULED, 1); + when(eventRepo.findById("evt-2")).thenReturn(Optional.of(event)); + + Instant before = Instant.now(); + worker.recordFailureInNewTransaction("evt-2", new RuntimeException("second failure")); + Instant after = Instant.now(); + + assertThat(event.getStatus()).isEqualTo(ProcessingStatus.RETRY_SCHEDULED); + // Attempts unconditionally incremented from 1 to 2 + assertThat(event.getAttempts()).isEqualTo(2); + // Backoff for attempt 2 = 60 s + assertThat(event.getNextRetryAt()).isAfterOrEqualTo(before.plusSeconds(59)); + assertThat(event.getNextRetryAt()).isBeforeOrEqualTo(after.plusSeconds(61)); + } + + // ------------------------------------------------------------------ + // recordFailureInNewTransaction — permanent failure + // ------------------------------------------------------------------ + + @Test + void recordFailureInNewTransaction_afterMaxAttempts_marksPermanentlyFailed() { + // Simulate event on its last allowed retry (attempts already at MAX-1 in RETRY_SCHEDULED) + // When we increment, it reaches MAX_ATTEMPTS = 3 + ProcessingEventEntity event = buildEvent("evt-perm", ProcessingStatus.RETRY_SCHEDULED, 2); + PacketEntity packet = event.getPacket(); + when(eventRepo.findById("evt-perm")).thenReturn(Optional.of(event)); + + RuntimeException cause = new RuntimeException("final failure"); + worker.recordFailureInNewTransaction("evt-perm", cause); + + assertThat(event.getStatus()).isEqualTo(ProcessingStatus.PERMANENTLY_FAILED); + assertThat(event.getAttempts()).isEqualTo(3); + assertThat(event.getNextRetryAt()).isNull(); + + // Packet must also be marked FAILED + verify(packetRepo).save(packet); + assertThat(packet.getStatus()).isEqualTo(PacketStatus.FAILED); + assertThat(packet.getLastError()).isEqualTo("final failure"); + + verify(eventRepo).save(event); + verify(errorRepo).save(any(ProcessingErrorEntity.class)); + } + + @Test + void recordFailureInNewTransaction_whenEventNotFound_doesNotThrow() { + when(eventRepo.findById("missing")).thenReturn(Optional.empty()); + + // Should log and return without throwing or saving anything + worker.recordFailureInNewTransaction("missing", new RuntimeException("cause")); + + verify(eventRepo, never()).save(any()); + verify(errorRepo, never()).save(any()); + } + + // ------------------------------------------------------------------ + // computeNextRetryAt — exponential backoff + // ------------------------------------------------------------------ + + @Test + void computeNextRetryAt_attempt1_returns30SecondDelay() { + Instant before = Instant.now(); + Instant result = ProcessingEventWorker.computeNextRetryAt(1); + Instant after = Instant.now(); + + assertThat(result).isAfterOrEqualTo(before.plusSeconds(29)); + assertThat(result).isBeforeOrEqualTo(after.plusSeconds(31)); + } + + @Test + void computeNextRetryAt_attempt2_returns60SecondDelay() { + Instant before = Instant.now(); + Instant result = ProcessingEventWorker.computeNextRetryAt(2); + Instant after = Instant.now(); + + assertThat(result).isAfterOrEqualTo(before.plusSeconds(59)); + assertThat(result).isBeforeOrEqualTo(after.plusSeconds(61)); + } + + @Test + void computeNextRetryAt_highAttemptNumber_capsAtMaxBackoff() { + // At attempt 100, delay would be astronomically large without capping. + // MAX_BACKOFF_SECONDS = 600 + Instant before = Instant.now(); + Instant result = ProcessingEventWorker.computeNextRetryAt(100); + Instant after = Instant.now(); + + // Must not exceed MAX_BACKOFF_SECONDS + small tolerance + assertThat(result).isBeforeOrEqualTo(after.plusSeconds(601)); + // Must still be at least MAX_BACKOFF_SECONDS in the future + assertThat(result).isAfterOrEqualTo(before.plusSeconds(599)); + } + + private ProcessingEventEntity buildEvent(String id, ProcessingStatus status, int attempts) { + PacketEntity packet = new PacketEntity(); + packet.setAmieId(42L); + packet.setType("request_account_create"); + packet.setStatus(PacketStatus.NEW); + // Provide minimal valid JSON so objectMapper.readTree won't NPE if called directly + packet.setRawJson("{\"body\":{}}"); + + ProcessingEventEntity event = new ProcessingEventEntity(); + // Directly set the id field via reflection to avoid UUID randomness issues in tests + try { + java.lang.reflect.Field idField = ProcessingEventEntity.class.getDeclaredField("id"); + idField.setAccessible(true); + idField.set(event, id); + } catch (Exception e) { + throw new RuntimeException(e); + } + event.setPacket(packet); + event.setType(ProcessingEventType.DECODE_PACKET); + event.setStatus(status); + event.setAttempts(attempts); + event.setPayload(new byte[0]); + return event; + } +}
