This is an automated email from the ASF dual-hosted git repository.

lahirujayathilake pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airavata-custos.git

commit b9cc6490760e703ff0dd63cc0e0275f93cc6efde
Author: lahiruj <[email protected]>
AuthorDate: Fri Mar 27 00:55:16 2026 -0400

    prevent duplicate accounts, fix retry loop, correct AMIE field names, 
persist DN lists
---
 .../handler/amie/DataAccountCreateHandler.java     |  12 +-
 .../handler/amie/DataProjectCreateHandler.java     |  13 +-
 .../service/model/amie/ProcessingEventEntity.java  |  11 +
 .../ci/service/model/amie/ProcessingStatus.java    |   7 +-
 .../ci/service/repo/ClusterAccountRepository.java  |   4 +
 .../service/repo/ProjectMembershipRepository.java  |   2 +
 .../repo/amie/ProcessingEventRepository.java       |  13 +-
 .../access/ci/service/service/PersonService.java   |  61 ++++-
 .../service/service/ProjectMembershipService.java  |  29 +-
 .../ci/service/service/UserAccountService.java     |  26 +-
 .../service/worker/amie/ProcessingEventWorker.java | 164 +++++++----
 .../V2__add_next_retry_at_to_processing_events.sql |   5 +
 .../handler/amie/DataAccountCreateHandlerTest.java |  36 ++-
 .../handler/amie/DataProjectCreateHandlerTest.java |  35 ++-
 .../ci/service/service/PersonServiceTest.java      | 216 ++++++++++++++-
 .../service/ProjectMembershipServiceTest.java      | 120 ++++++++-
 .../ci/service/service/UserAccountServiceTest.java |  76 ++++++
 .../worker/amie/ProcessingEventWorkerTest.java     | 299 +++++++++++++++++++++
 18 files changed, 1023 insertions(+), 106 deletions(-)

diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java
index 4a1165e62..943f26a45 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandler.java
@@ -21,6 +21,7 @@ package org.apache.custos.access.ci.service.handler.amie;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.custos.access.ci.service.client.amie.AmieClient;
 import org.apache.custos.access.ci.service.model.amie.PacketEntity;
+import org.apache.custos.access.ci.service.service.PersonService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -40,9 +41,11 @@ public class DataAccountCreateHandler implements 
PacketHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DataAccountCreateHandler.class);
 
     private final AmieClient amieClient;
+    private final PersonService personService;
 
-    public DataAccountCreateHandler(AmieClient amieClient) {
+    public DataAccountCreateHandler(AmieClient amieClient, PersonService 
personService) {
         this.amieClient = amieClient;
+        this.personService = personService;
     }
 
     @Override
@@ -63,12 +66,9 @@ public class DataAccountCreateHandler implements 
PacketHandler {
         Assert.hasText(personId, "'PersonID' must not be empty.");
         LOGGER.info("Packet validated. ProjectID: [{}], PersonID: [{}].", 
projectId, personId);
 
-
-        // TODO - perform the business logic
-        //  - find the user's record by 'personId' (localID) and update the 
distinguished names (dnList)
         if (dnList.isArray() && !dnList.isEmpty()) {
-            LOGGER.info("Received DnList for user [{}]. In a real 
implementation, this would be saved to the user's profile.", personId);
-            // TODO userService.updateUserDnList(personId, dnList);
+            LOGGER.info("Persisting DnList for person [{}] from 
data_account_create.", personId);
+            personService.persistDnsForPerson(personId, dnList);
         }
 
         // Send the 'inform_transaction_complete' reply to close the 
transaction.
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java
index 0ca934b92..2a96d05dc 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandler.java
@@ -21,6 +21,7 @@ package org.apache.custos.access.ci.service.handler.amie;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.custos.access.ci.service.client.amie.AmieClient;
 import org.apache.custos.access.ci.service.model.amie.PacketEntity;
+import org.apache.custos.access.ci.service.service.PersonService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -38,9 +39,11 @@ public class DataProjectCreateHandler implements 
PacketHandler {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DataProjectCreateHandler.class);
 
     private final AmieClient amieClient;
+    private final PersonService personService;
 
-    public DataProjectCreateHandler(AmieClient amieClient) {
+    public DataProjectCreateHandler(AmieClient amieClient, PersonService 
personService) {
         this.amieClient = amieClient;
+        this.personService = personService;
     }
 
     @Override
@@ -61,11 +64,11 @@ public class DataProjectCreateHandler implements 
PacketHandler {
         Assert.hasText(personId, "'PersonID' must not be empty.");
         LOGGER.info("Packet validated. ProjectID: [{}], PersonID: [{}].", 
projectId, personId);
 
-        // TODO update the local DB with the dnList against to the user's 
profile
         if (dnList.isArray() && !dnList.isEmpty()) {
-            LOGGER.info("Received DnList for user [{}].", personId);
-            // TODO - userService.updateUserDnList(personId, dnList);
+            LOGGER.info("Persisting DnList for person [{}] from 
data_project_create.", personId);
+            personService.persistDnsForPerson(personId, dnList);
         }
+
         sendSuccessReply(packetEntity.getAmieId());
     }
 
@@ -84,4 +87,4 @@ public class DataProjectCreateHandler implements 
PacketHandler {
 
         LOGGER.info("Successfully sent 'inform_transaction_complete' for 
packet_rec_id [{}].", packetRecId);
     }
-}
\ No newline at end of file
+}
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java
index e03d04d17..66c8a163d 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingEventEntity.java
@@ -71,6 +71,9 @@ public class ProcessingEventEntity {
     @Column(name = "last_error", columnDefinition = "TEXT")
     private String lastError;
 
+    @Column(name = "next_retry_at")
+    private Instant nextRetryAt;
+
     public ProcessingEventEntity() {
         this.id = UUID.randomUUID().toString();
     }
@@ -154,4 +157,12 @@ public class ProcessingEventEntity {
     public void setLastError(String lastError) {
         this.lastError = lastError;
     }
+
+    public Instant getNextRetryAt() {
+        return nextRetryAt;
+    }
+
+    public void setNextRetryAt(Instant nextRetryAt) {
+        this.nextRetryAt = nextRetryAt;
+    }
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java
index 14229c4e6..f4a9290ea 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/model/amie/ProcessingStatus.java
@@ -39,5 +39,10 @@ public enum ProcessingStatus {
     /**
      * The event failed a previous attempt and is waiting to be retried.
      */
-    RETRY_SCHEDULED
+    RETRY_SCHEDULED,
+    /**
+     * The event has exhausted all retry attempts and will never be retried 
automatically.
+     * Manual intervention is required.
+     */
+    PERMANENTLY_FAILED
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java
index 88b9463c0..a4771cb98 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ClusterAccountRepository.java
@@ -19,9 +19,11 @@
 package org.apache.custos.access.ci.service.repo;
 
 import org.apache.custos.access.ci.service.model.ClusterAccountEntity;
+import org.apache.custos.access.ci.service.model.PersonEntity;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.stereotype.Repository;
 
+import java.util.List;
 import java.util.Optional;
 
 @Repository
@@ -29,4 +31,6 @@ public interface ClusterAccountRepository extends 
JpaRepository<ClusterAccountEn
 
     Optional<ClusterAccountEntity> findByUsername(String username);
 
+    List<ClusterAccountEntity> findByPerson(PersonEntity person);
+
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java
index 930f350c4..262de9bc3 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/ProjectMembershipRepository.java
@@ -36,4 +36,6 @@ public interface ProjectMembershipRepository extends 
JpaRepository<ProjectMember
 
     List<ProjectMembershipEntity> 
findByProjectIdAndClusterAccount_Person_Id(String projectId, String personId);
 
+    List<ProjectMembershipEntity> 
findByClusterAccount_Person_IdAndProjectIdAndRole(String personId, String 
projectId, String role);
+
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java
index 3facb704a..eee5034db 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/repo/amie/ProcessingEventRepository.java
@@ -25,12 +25,21 @@ import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.query.Param;
 import org.springframework.stereotype.Repository;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.List;
 
 @Repository
 public interface ProcessingEventRepository extends 
JpaRepository<ProcessingEventEntity, String> {
 
-    @Query("SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet WHERE 
e.status IN :statuses ORDER BY e.createdAt ASC")
-    List<ProcessingEventEntity> findTop50EventsToProcess(@Param("statuses") 
Collection<ProcessingStatus> statuses);
+    @Query("""
+            SELECT e FROM ProcessingEventEntity e JOIN FETCH e.packet
+            WHERE e.status IN :statuses
+              AND (e.nextRetryAt IS NULL OR e.nextRetryAt <= :now)
+            ORDER BY e.createdAt ASC
+            LIMIT 50
+            """)
+    List<ProcessingEventEntity> findTop50EventsToProcess(
+            @Param("statuses") Collection<ProcessingStatus> statuses,
+            @Param("now") Instant now);
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java
index f5d9a62c5..ea588f993 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/PersonService.java
@@ -27,6 +27,7 @@ import 
org.apache.custos.access.ci.service.repo.PersonDnsRepository;
 import org.apache.custos.access.ci.service.repo.PersonRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataIntegrityViolationException;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.Assert;
@@ -111,17 +112,16 @@ public class PersonService {
         PersonEntity person = personRepository.findById(personId)
                 .orElseThrow(() -> new IllegalArgumentException("Unknown local 
PersonID: " + personId));
 
-        // Update fields
-        if (body.has("FirstName")) 
person.setFirstName(body.path("FirstName").asText(person.getFirstName()));
-        if (body.has("LastName")) 
person.setLastName(body.path("LastName").asText(person.getLastName()));
-        if (body.has("Email")) 
person.setEmail(body.path("Email").asText(person.getEmail()));
-        person.setOrganization(body.has("Organization") ? 
body.path("Organization").asText(null) : null);
-        person.setOrgCode(body.has("OrgCode") ? 
body.path("OrgCode").asText(null) : null);
-        person.setNsfStatusCode(body.has("NsfStatusCode") ? 
body.path("NsfStatusCode").asText(null) : null);
+        if (body.has("UserFirstName")) 
person.setFirstName(body.path("UserFirstName").asText(person.getFirstName()));
+        if (body.has("UserLastName")) 
person.setLastName(body.path("UserLastName").asText(person.getLastName()));
+        if (body.has("UserEmail")) 
person.setEmail(body.path("UserEmail").asText(person.getEmail()));
+        if (body.has("UserOrganization")) 
person.setOrganization(body.path("UserOrganization").asText(null));
+        if (body.has("UserOrgCode")) 
person.setOrgCode(body.path("UserOrgCode").asText(null));
+        if (body.has("NsfStatusCode")) 
person.setNsfStatusCode(body.path("NsfStatusCode").asText(null));
         personRepository.save(person);
 
         Set<String> newDns = new HashSet<>();
-        JsonNode dnList = body.path("DnList");
+        JsonNode dnList = body.path("UserDnList");
         if (dnList != null && dnList.isArray()) {
             for (JsonNode dnNode : dnList) {
                 String dn = dnNode.asText(null);
@@ -130,7 +130,9 @@ public class PersonService {
         }
 
         if (newDns.isEmpty()) {
-            personDnsRepository.deleteByPerson_Id(personId);
+            if (body.has("UserDnList")) {
+                personDnsRepository.deleteByPerson_Id(personId);
+            }
         } else {
             personDnsRepository.deleteByPerson_IdAndDnNotIn(personId, new 
ArrayList<>(newDns));
             for (String dn : newDns) {
@@ -144,6 +146,47 @@ public class PersonService {
         }
     }
 
+    @Transactional
+    public void persistDnsForPerson(String personId, JsonNode dnList) {
+        Assert.hasText(personId, "personId must not be blank");
+
+        if (dnList == null || !dnList.isArray() || dnList.isEmpty()) {
+            return;
+        }
+
+        PersonEntity person = personRepository.findById(personId)
+                .orElseThrow(() -> new IllegalArgumentException("Unknown local 
PersonID: " + personId));
+
+        Set<String> incomingDns = new HashSet<>();
+        for (JsonNode dnNode : dnList) {
+            String dn = dnNode.asText(null);
+            if (dn != null && !dn.isBlank()) {
+                incomingDns.add(dn);
+            }
+        }
+
+        if (incomingDns.isEmpty()) {
+            return;
+        }
+
+        for (String dn : incomingDns) {
+            if (!personDnsRepository.existsByPerson_IdAndDn(personId, dn)) {
+                PersonDnsEntity pde = new PersonDnsEntity();
+                pde.setPerson(person);
+                pde.setDn(dn);
+                try {
+                    personDnsRepository.save(pde);
+                    LOGGER.info("Persisted new DN for person [{}].", personId);
+                    LOGGER.debug("Persisted DN [{}] for person [{}].", dn, 
personId);
+                } catch (DataIntegrityViolationException ex) {
+                    LOGGER.debug("DN already exists for person [{}] 
(concurrent insert), skipping.", personId);
+                }
+            } else {
+                LOGGER.debug("DN [{}] already exists for person [{}], 
skipping.", dn, personId);
+            }
+        }
+    }
+
     @Transactional
     public void deleteFromModifyPacket(JsonNode body) {
         String personId = body.path("PersonID").asText(null);
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java
index 12471cf95..3726e636a 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/ProjectMembershipService.java
@@ -58,9 +58,20 @@ public class ProjectMembershipService {
     @Transactional
     public ProjectMembershipEntity createMembership(String projectId, String 
clusterAccountId, String role) {
         Optional<ProjectMembershipEntity> existing = 
membershipRepository.findByProjectIdAndClusterAccountId(projectId, 
clusterAccountId);
+
         if (existing.isPresent()) {
-            LOGGER.info("Membership already exists for project [{}] and 
cluster account [{}]", projectId, clusterAccountId);
-            return existing.get();
+            ProjectMembershipEntity membership = existing.get();
+            if (membership.isActive()) {
+                LOGGER.info("Active membership already exists for project [{}] 
and cluster account [{}], returning existing record",
+                        projectId, clusterAccountId);
+                return membership;
+            }
+            LOGGER.info("Reactivating inactive membership for project [{}] and 
cluster account [{}] with role [{}]",
+                    projectId, clusterAccountId, role);
+            membership.setActive(true);
+            membership.setRole(role);
+            membershipRepository.save(membership);
+            return membership;
         }
 
         ProjectEntity project = projectRepository.findById(projectId)
@@ -88,19 +99,20 @@ public class ProjectMembershipService {
      * @param personId  Person ID
      */
     @Transactional
-    public void inactivateMembershipsByPersonAndProject(String projectId, 
String personId) {
+    public int inactivateMembershipsByPersonAndProject(String projectId, 
String personId) {
         // TODO - If the user is a PI of a project?
         //  - right now only the membership is turned inactive, no changes to 
the project
         List<ProjectMembershipEntity> memberships = 
membershipRepository.findByProjectIdAndClusterAccount_Person_Id(projectId, 
personId);
 
         if (memberships.isEmpty()) {
-            LOGGER.warn("No memberships found for person [{}] on project [{}]. 
No action taken.", personId, projectId);
-            return;
+            LOGGER.warn("No memberships found for person [{}] on project [{}]. 
Inactivation had no effect.", personId, projectId);
+            return 0;
         }
 
         memberships.forEach(membership -> membership.setActive(false));
         membershipRepository.saveAll(memberships);
         LOGGER.info("Inactivated {} membership(s) for person [{}] on project 
[{}]", memberships.size(), personId, projectId);
+        return memberships.size();
     }
 
     /**
@@ -143,17 +155,18 @@ public class ProjectMembershipService {
      * @param personId  Person ID
      */
     @Transactional
-    public void reactivateMembershipsByPersonAndProject(String projectId, 
String personId) {
+    public int reactivateMembershipsByPersonAndProject(String projectId, 
String personId) {
         List<ProjectMembershipEntity> memberships = 
membershipRepository.findByProjectIdAndClusterAccount_Person_Id(projectId, 
personId);
 
         if (memberships.isEmpty()) {
-            LOGGER.warn("No memberships found for person [{}] on project [{}]. 
No action taken.", personId, projectId);
-            return;
+            LOGGER.warn("No memberships found for person [{}] on project [{}]. 
Reactivation had no effect.", personId, projectId);
+            return 0;
         }
 
         memberships.forEach(membership -> membership.setActive(true));
         membershipRepository.saveAll(memberships);
         LOGGER.info("Reactivated {} membership(s) for person [{}] on project 
[{}]", memberships.size(), personId, projectId);
+        return memberships.size();
     }
 
 }
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java
index c50ba715b..5672014de 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/service/UserAccountService.java
@@ -23,9 +23,11 @@ import 
org.apache.custos.access.ci.service.model.PersonEntity;
 import org.apache.custos.access.ci.service.repo.ClusterAccountRepository;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.dao.DataIntegrityViolationException;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -52,6 +54,14 @@ public class UserAccountService {
     public ClusterAccountEntity provisionClusterAccount(PersonEntity person) {
         // TODO Replace with external source of truth (e.g., COmanage) lookup 
for PersonID and username
 
+        List<ClusterAccountEntity> existing = 
clusterAccountRepository.findByPerson(person);
+        if (!existing.isEmpty()) {
+            ClusterAccountEntity account = existing.get(0);
+            LOGGER.info("Cluster account already exists for person [{}], 
returning existing account with username [{}]",
+                    person.getId(), account.getUsername());
+            return account;
+        }
+
         String proposedUsername = (person.getFirstName().trim().charAt(0) + 
person.getLastName().trim().replace(" ", "-")).toLowerCase();
         String uniqueUsername = ensureUniqueUsername(proposedUsername);
 
@@ -61,9 +71,20 @@ public class UserAccountService {
         newClusterAccount.setId(UUID.randomUUID().toString());
         newClusterAccount.setPerson(person);
         newClusterAccount.setUsername(uniqueUsername);
-        clusterAccountRepository.save(newClusterAccount);
 
-        return newClusterAccount;
+        ClusterAccountEntity account;
+        try {
+            account = clusterAccountRepository.save(newClusterAccount);
+        } catch (DataIntegrityViolationException e) {
+            List<ClusterAccountEntity> retryLookup = 
clusterAccountRepository.findByPerson(person);
+            if (!retryLookup.isEmpty()) {
+                LOGGER.warn("Concurrent account creation detected for person 
[{}]; returning existing account.", person.getId());
+                return retryLookup.get(0);
+            }
+            throw e; // Re-throw if it was a different constraint violation
+        }
+
+        return account;
     }
 
     private String ensureUniqueUsername(String baseUsername) {
@@ -79,4 +100,3 @@ public class UserAccountService {
         return candidate;
     }
 }
-
diff --git 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java
 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java
index 89e5292ea..bdc2d105b 100644
--- 
a/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java
+++ 
b/allocations/access-ci-service/src/main/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorker.java
@@ -43,14 +43,34 @@ import java.time.Instant;
 import java.util.List;
 
 /**
- * A scheduled worker that fetches for new/ processing events and executes 
them.
- * State of an event (NEW -> RUNNING -> SUCCEEDED/FAILED)
+ * Scheduled worker that polls for pending AMIE processing events and executes 
them.
+ * Failures are recorded in a separate transaction to prevent infinite retry 
loops.
+ *
+ * <pre>
+ * NEW → RUNNING → SUCCEEDED
+ *                → RETRY_SCHEDULED → (backoff) → RUNNING → ...
+ *                → PERMANENTLY_FAILED (after MAX_ATTEMPTS)
+ * </pre>
  */
 @Component
 public class ProcessingEventWorker {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcessingEventWorker.class);
-    private static final int MAX_ATTEMPTS = 3;
+
+    /**
+     * Maximum number of execution attempts before an event is permanently 
failed.
+     */
+    static final int MAX_ATTEMPTS = 3;
+
+    /**
+     * Base delay in seconds for exponential backoff between retry attempts.
+     */
+    private static final long BASE_BACKOFF_SECONDS = 30L;
+
+    /**
+     * Upper bound on the computed backoff delay (10 minutes).
+     */
+    private static final long MAX_BACKOFF_SECONDS = 600L;
 
     private final ProcessingEventRepository eventRepo;
     private final PacketRepository packetRepo;
@@ -59,8 +79,11 @@ public class ProcessingEventWorker {
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final ProcessingEventWorker self;
 
-    public ProcessingEventWorker(ProcessingEventRepository eventRepo, 
PacketRepository packetRepo,
-                                 ProcessingErrorRepository errorRepo, 
PacketRouter router, @Lazy ProcessingEventWorker self) {
+    public ProcessingEventWorker(ProcessingEventRepository eventRepo,
+                                 PacketRepository packetRepo,
+                                 ProcessingErrorRepository errorRepo,
+                                 PacketRouter router,
+                                 @Lazy ProcessingEventWorker self) {
         this.eventRepo = eventRepo;
         this.packetRepo = packetRepo;
         this.errorRepo = errorRepo;
@@ -69,88 +92,129 @@ public class ProcessingEventWorker {
     }
 
     /**
-     * Runs on a fixed delay, checks for NEW/RETRY_SCHEDULED events, and 
processes them one by one on a separate transaction.
+     * Polls for due events and processes each in its own transaction.
      */
     @Scheduled(fixedDelayString = 
"#{T(org.springframework.boot.convert.DurationStyle).detectAndParse('${access.amie.scheduler.worker-delay}').toMillis()}")
     public void processPendingEvents() {
-        List<ProcessingEventEntity> eventsToProcess = 
eventRepo.findTop50EventsToProcess(List.of(ProcessingStatus.NEW, 
ProcessingStatus.RETRY_SCHEDULED));
+        List<ProcessingEventEntity> eventsToProcess =
+                eventRepo.findTop50EventsToProcess(
+                        List.of(ProcessingStatus.NEW, 
ProcessingStatus.RETRY_SCHEDULED),
+                        Instant.now());
+
+        if (eventsToProcess.isEmpty()) {
+            return;
+        }
+
+        LOGGER.info("Found {} event(s) to process.", eventsToProcess.size());
 
-        if (!eventsToProcess.isEmpty()) {
-            LOGGER.info("Found {} event(s) to process.", 
eventsToProcess.size());
-            eventsToProcess.forEach(event -> {
+        for (ProcessingEventEntity event : eventsToProcess) {
+            String eventId = event.getId();
+            try {
+                self.executeEventInTransaction(event);
+            } catch (Exception e) {
+                LOGGER.error("Transaction failed for eventId [{}]. Opening 
recovery transaction to record failure.",
+                        eventId, e);
                 try {
-                    self.executeEventInTransaction(event);
-                } catch (Exception e) {
-                    LOGGER.error("An unexpected error occurred while 
processing of eventId [{}].", event.getId(), e);
+                    self.recordFailureInNewTransaction(eventId, e);
+                } catch (Exception recoveryEx) {
+                    LOGGER.error("CRITICAL: Recovery transaction also failed 
for eventId [{}]. " +
+                            "Event may remain stuck until the next worker 
cycle.", eventId, recoveryEx);
                 }
-            });
+            }
         }
     }
 
     @Transactional(propagation = Propagation.REQUIRES_NEW)
-    public void executeEventInTransaction(ProcessingEventEntity event) {
+    public void executeEventInTransaction(ProcessingEventEntity event) throws 
Exception {
         PacketEntity packet = event.getPacket();
-        LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: 
{}", event.getType(), packet.getAmieId(), event.getAttempts() + 1);
+        LOGGER.info("Processing event [{}] for packet amie_id [{}]. Attempt: 
{}",
+                event.getType(), packet.getAmieId(), event.getAttempts() + 1);
 
         event.setStatus(ProcessingStatus.RUNNING);
         event.setStartedAt(Instant.now());
         event.setAttempts(event.getAttempts() + 1);
         eventRepo.saveAndFlush(event);
 
-        try {
-            var packetJson = objectMapper.readTree(packet.getRawJson());
-            router.route(packetJson, packet);
+        var packetJson = objectMapper.readTree(packet.getRawJson());
+        router.route(packetJson, packet);
 
-            handleSuccess(event, packet);
-
-        } catch (Exception e) {
-            LOGGER.error("Event processing failed for packet amie_id [{}]. 
Attempt {} of {}.", packet.getAmieId(), event.getAttempts(), MAX_ATTEMPTS, e);
-            handleFailure(event, packet, e);
-        }
+        handleSuccess(event, packet);
     }
 
-    private void handleSuccess(ProcessingEventEntity event, PacketEntity 
packet) {
-        event.setStatus(ProcessingStatus.SUCCEEDED);
-        event.setFinishedAt(Instant.now());
-        eventRepo.save(event);
-
-        if (event.getType() == ProcessingEventType.DECODE_PACKET) {
-            packet.setStatus(PacketStatus.DECODED);
-            packet.setDecodedAt(Instant.now());
-            packetRepo.save(packet);
+    @Transactional(propagation = Propagation.REQUIRES_NEW)
+    public void recordFailureInNewTransaction(String eventId, Exception cause) 
{
+        ProcessingEventEntity event = eventRepo.findById(eventId).orElse(null);
+        if (event == null) {
+            LOGGER.error("Cannot record failure: event [{}] not found in the 
database.", eventId);
+            return;
         }
 
-        LOGGER.info("Successfully processed event [{}] for packet amie_id 
[{}].", event.getType(), packet.getAmieId());
-    }
+        PacketEntity packet = event.getPacket();
 
-    private void handleFailure(ProcessingEventEntity event, PacketEntity 
packet, Exception e) {
-        // Check if the event should be retried or marked as failed
-        boolean isRetryable = event.getAttempts() < MAX_ATTEMPTS;
-        ProcessingStatus newStatus = isRetryable ? 
ProcessingStatus.RETRY_SCHEDULED : ProcessingStatus.FAILED;
+        int effectiveAttempts = event.getAttempts() + 1;
+        event.setAttempts(effectiveAttempts);
+
+        boolean isRetryable = effectiveAttempts < MAX_ATTEMPTS;
+        ProcessingStatus newStatus = isRetryable
+                ? ProcessingStatus.RETRY_SCHEDULED
+                : ProcessingStatus.PERMANENTLY_FAILED;
 
         event.setStatus(newStatus);
-        event.setLastError(e.getMessage());
+        event.setLastError(cause.getMessage());
         event.setFinishedAt(Instant.now());
-        eventRepo.save(event);
 
-        if (!isRetryable) {
-            LOGGER.error("Event for packet amie_id [{}] has failed permanently 
after {} attempts.", packet.getAmieId(), event.getAttempts());
+        if (isRetryable) {
+            Instant nextRetryAt = computeNextRetryAt(effectiveAttempts);
+            event.setNextRetryAt(nextRetryAt);
+            LOGGER.warn("Event [{}] for packet amie_id [{}] failed on attempt 
{}/{}. Scheduled for retry after [{}].",
+                    eventId, packet.getAmieId(), effectiveAttempts, 
MAX_ATTEMPTS, nextRetryAt);
+        } else {
+            event.setNextRetryAt(null);
+            LOGGER.error("Event [{}] for packet amie_id [{}] is 
PERMANENTLY_FAILED after {} attempt(s). Manual intervention required.",
+                    eventId, packet.getAmieId(), effectiveAttempts);
             packet.setStatus(PacketStatus.FAILED);
-            packet.setLastError(e.getMessage());
+            packet.setLastError(cause.getMessage());
             packetRepo.save(packet);
-
-        } else {
-            LOGGER.warn("Event for packet amie_id [{}] will be retried. Status 
set to {}.", packet.getAmieId(), newStatus);
         }
 
+        eventRepo.save(event);
+
         ProcessingErrorEntity error = new ProcessingErrorEntity();
         error.setPacket(packet);
         error.setEvent(event);
-        error.setSummary(e.getClass().getSimpleName() + ": " + e.getMessage());
-        error.setDetail(getStackTraceAsString(e));
+        error.setSummary(cause.getClass().getSimpleName() + ": " + 
cause.getMessage());
+        String stackTrace = getStackTraceAsString(cause);
+        if (stackTrace.length() > 8000) {
+            stackTrace = stackTrace.substring(0, 8000) + "\n... [truncated]";
+        }
+        error.setDetail(stackTrace);
         errorRepo.save(error);
     }
 
+    private void handleSuccess(ProcessingEventEntity event, PacketEntity 
packet) {
+        event.setStatus(ProcessingStatus.SUCCEEDED);
+        event.setFinishedAt(Instant.now());
+        event.setNextRetryAt(null);
+        eventRepo.save(event);
+
+        if (event.getType() == ProcessingEventType.DECODE_PACKET) {
+            packet.setStatus(PacketStatus.DECODED);
+            packet.setDecodedAt(Instant.now());
+            packetRepo.save(packet);
+        }
+
+        LOGGER.info("Successfully processed event [{}] for packet amie_id 
[{}].",
+                event.getType(), packet.getAmieId());
+    }
+
+    // Exponential backoff: BASE * 2^(attempt-1), capped at 
MAX_BACKOFF_SECONDS.
+    static Instant computeNextRetryAt(int attemptNumber) {
+        long exponent = Math.max(0, attemptNumber - 1);
+        long delaySec = BASE_BACKOFF_SECONDS * (1L << exponent);
+        delaySec = Math.min(delaySec, MAX_BACKOFF_SECONDS);
+        return Instant.now().plusSeconds(delaySec);
+    }
+
     private String getStackTraceAsString(Exception e) {
         StringWriter sw = new StringWriter();
         e.printStackTrace(new PrintWriter(sw));
diff --git 
a/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql
 
b/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql
new file mode 100644
index 000000000..3c6e7a31d
--- /dev/null
+++ 
b/allocations/access-ci-service/src/main/resources/db/migration/V2__add_next_retry_at_to_processing_events.sql
@@ -0,0 +1,5 @@
+-- Add retry backoff support to processing events.
+
+ALTER TABLE amie_processing_events
+    ADD COLUMN next_retry_at TIMESTAMP(6) NULL DEFAULT NULL,
+    ADD INDEX idx_amie_events_next_retry_at (next_retry_at);
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java
index 9ea46c424..ded76306d 100644
--- 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataAccountCreateHandlerTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.custos.access.ci.service.client.amie.AmieClient;
 import org.apache.custos.access.ci.service.model.amie.PacketEntity;
+import org.apache.custos.access.ci.service.service.PersonService;
 import org.apache.custos.access.ci.service.util.JsonTestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
@@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
@@ -47,12 +49,15 @@ class DataAccountCreateHandlerTest {
     @Mock
     private AmieClient amieClient;
 
+    @Mock
+    private PersonService personService;
+
     private DataAccountCreateHandler handler;
     private ObjectMapper objectMapper;
 
     @BeforeEach
     void setUp() {
-        handler = new DataAccountCreateHandler(amieClient);
+        handler = new DataAccountCreateHandler(amieClient, personService);
         objectMapper = new ObjectMapper();
     }
 
@@ -115,27 +120,52 @@ class DataAccountCreateHandlerTest {
     }
 
     @Test
-    void handle_withEmptyDnList_shouldProcessSuccessfully() {
+    void handle_withEmptyDnList_shouldNotCallPersonServiceAndSendReply() {
         JsonNode packetJson = createValidPacketJson();
         PacketEntity packetEntity = createPacketEntity();
 
         handler.handle(packetJson, packetEntity);
 
+        verify(personService, never()).persistDnsForPerson(any(), any());
         //noinspection unchecked
         verify(amieClient).replyToPacket(eq(12345L), any(Map.class));
     }
 
     @Test
-    void handle_withNonEmptyDnList_shouldProcessSuccessfully() {
+    void handle_withNonEmptyDnList_shouldPersistDnsAndSendReply() {
         JsonNode packetJson = createPacketJsonWithDnList();
         PacketEntity packetEntity = createPacketEntity();
 
         handler.handle(packetJson, packetEntity);
 
+        // Verify personService.persistDnsForPerson was called with the 
correct personId
+        ArgumentCaptor<String> personIdCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<JsonNode> dnListCaptor = 
ArgumentCaptor.forClass(JsonNode.class);
+        verify(personService).persistDnsForPerson(personIdCaptor.capture(), 
dnListCaptor.capture());
+
+        assertThat(personIdCaptor.getValue()).isEqualTo("person-123");
+        assertThat(dnListCaptor.getValue().isArray()).isTrue();
+        assertThat(dnListCaptor.getValue().size()).isEqualTo(2);
+
         //noinspection unchecked
         verify(amieClient).replyToPacket(eq(12345L), any(Map.class));
     }
 
+    @Test
+    void handle_withValidPacketContainingDnList_shouldPersistAllDns() {
+        JsonNode incomingPacket = 
JsonTestUtils.loadMockPacket("data_account_create", "incoming-data.json");
+
+        PacketEntity packetEntity = new PacketEntity();
+        packetEntity.setAmieId(233497918L);
+        packetEntity.setType("data_account_create");
+
+        handler.handle(incomingPacket, packetEntity);
+
+        ArgumentCaptor<JsonNode> dnListCaptor = 
ArgumentCaptor.forClass(JsonNode.class);
+        verify(personService).persistDnsForPerson(eq("test-user-person-123"), 
dnListCaptor.capture());
+        assertThat(dnListCaptor.getValue().size()).isEqualTo(3);
+    }
+
     private JsonNode createValidPacketJson() {
         return objectMapper.createObjectNode()
                 .set("body", objectMapper.createObjectNode()
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java
index 5e5dc844c..a1f911fdf 100644
--- 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/handler/amie/DataProjectCreateHandlerTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.custos.access.ci.service.client.amie.AmieClient;
 import org.apache.custos.access.ci.service.model.amie.PacketEntity;
+import org.apache.custos.access.ci.service.service.PersonService;
 import org.apache.custos.access.ci.service.util.JsonTestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
@@ -38,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 
 @ExtendWith(MockitoExtension.class)
@@ -47,12 +49,15 @@ class DataProjectCreateHandlerTest {
     @Mock
     private AmieClient amieClient;
 
+    @Mock
+    private PersonService personService;
+
     private DataProjectCreateHandler handler;
     private ObjectMapper objectMapper;
 
     @BeforeEach
     void setUp() {
-        handler = new DataProjectCreateHandler(amieClient);
+        handler = new DataProjectCreateHandler(amieClient, personService);
         objectMapper = new ObjectMapper();
     }
 
@@ -115,27 +120,51 @@ class DataProjectCreateHandlerTest {
     }
 
     @Test
-    void handle_withEmptyDnList_shouldProcessSuccessfully() {
+    void handle_withEmptyDnList_shouldNotCallPersonServiceAndSendReply() {
         JsonNode packetJson = createValidPacketJson();
         PacketEntity packetEntity = createPacketEntity();
 
         handler.handle(packetJson, packetEntity);
 
+        verify(personService, never()).persistDnsForPerson(any(), any());
         //noinspection unchecked
         verify(amieClient).replyToPacket(eq(12345L), any(Map.class));
     }
 
     @Test
-    void handle_withNonEmptyDnList_shouldProcessSuccessfully() {
+    void handle_withNonEmptyDnList_shouldPersistDnsAndSendReply() {
         JsonNode packetJson = createPacketJsonWithDnList();
         PacketEntity packetEntity = createPacketEntity();
 
         handler.handle(packetJson, packetEntity);
 
+        ArgumentCaptor<String> personIdCaptor = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<JsonNode> dnListCaptor = 
ArgumentCaptor.forClass(JsonNode.class);
+        verify(personService).persistDnsForPerson(personIdCaptor.capture(), 
dnListCaptor.capture());
+
+        assertThat(personIdCaptor.getValue()).isEqualTo("person-123");
+        assertThat(dnListCaptor.getValue().isArray()).isTrue();
+        assertThat(dnListCaptor.getValue().size()).isEqualTo(2);
+
         //noinspection unchecked
         verify(amieClient).replyToPacket(eq(12345L), any(Map.class));
     }
 
+    @Test
+    void handle_withValidPacketContainingDnList_shouldPersistAllDns() {
+        JsonNode incomingPacket = 
JsonTestUtils.loadMockPacket("data_project_create", "incoming-data.json");
+
+        PacketEntity packetEntity = new PacketEntity();
+        packetEntity.setAmieId(233497909L);
+        packetEntity.setType("data_project_create");
+
+        handler.handle(incomingPacket, packetEntity);
+
+        ArgumentCaptor<JsonNode> dnListCaptor = 
ArgumentCaptor.forClass(JsonNode.class);
+        verify(personService).persistDnsForPerson(eq("test-person-456"), 
dnListCaptor.capture());
+        assertThat(dnListCaptor.getValue().size()).isEqualTo(3);
+    }
+
     private JsonNode createValidPacketJson() {
         return objectMapper.createObjectNode().set("body", 
objectMapper.createObjectNode()
                 .put("ProjectID", "PRJ-TEST123")
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java
index 23371afcb..434fd1423 100644
--- 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/PersonServiceTest.java
@@ -120,6 +120,10 @@ class PersonServiceTest {
                 .hasMessageContaining("Packet body must contain a 
'UserGlobalID'");
     }
 
+    // 
-------------------------------------------------------------------------
+    // replaceFromModifyPacket
+    // 
-------------------------------------------------------------------------
+
     @Test
     void replaceFromModifyPacket_shouldUpdatePersonFields() {
         JsonNode body = createModifyPacketBody();
@@ -137,6 +141,95 @@ class PersonServiceTest {
         assertThat(existingPerson.getNsfStatusCode()).isEqualTo("INACTIVE");
     }
 
+    @Test
+    void replaceFromModifyPacket_shouldUseCorrectAmieFieldNames() {
+        // Verify that the correct AMIE field names are used (UserFirstName 
not FirstName, etc.)
+        ObjectNode body = objectMapper.createObjectNode()
+                .put("PersonID", "person-123")
+                .put("UserFirstName", "Updated")
+                .put("UserLastName", "Name")
+                .put("UserEmail", "[email protected]")
+                .put("UserOrganization", "Updated Org")
+                .put("UserOrgCode", "UPD")
+                .put("NsfStatusCode", "INACTIVE");
+
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+
+        personService.replaceFromModifyPacket(body);
+
+        assertThat(existingPerson.getFirstName()).isEqualTo("Updated");
+        assertThat(existingPerson.getLastName()).isEqualTo("Name");
+        assertThat(existingPerson.getEmail()).isEqualTo("[email protected]");
+        assertThat(existingPerson.getOrganization()).isEqualTo("Updated Org");
+        assertThat(existingPerson.getOrgCode()).isEqualTo("UPD");
+        assertThat(existingPerson.getNsfStatusCode()).isEqualTo("INACTIVE");
+    }
+
+    @Test
+    void replaceFromModifyPacket_shouldNotWipeFieldsAbsentFromPacket() {
+        // Packet only contains PersonID and UserFirstName — other fields must 
remain unchanged
+        ObjectNode body = objectMapper.createObjectNode()
+                .put("PersonID", "person-123")
+                .put("UserFirstName", "OnlyFirstUpdated");
+
+        PersonEntity existingPerson = createPersonEntity();
+        // Pre-existing values that must survive
+        existingPerson.setLastName("OriginalLast");
+        existingPerson.setEmail("[email protected]");
+        existingPerson.setOrganization("Original Org");
+        existingPerson.setOrgCode("ORIG");
+        existingPerson.setNsfStatusCode("ACTIVE");
+
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+
+        personService.replaceFromModifyPacket(body);
+
+        
assertThat(existingPerson.getFirstName()).isEqualTo("OnlyFirstUpdated");
+        assertThat(existingPerson.getLastName()).isEqualTo("OriginalLast");
+        
assertThat(existingPerson.getEmail()).isEqualTo("[email protected]");
+        assertThat(existingPerson.getOrganization()).isEqualTo("Original Org");
+        assertThat(existingPerson.getOrgCode()).isEqualTo("ORIG");
+        assertThat(existingPerson.getNsfStatusCode()).isEqualTo("ACTIVE");
+    }
+
+    @Test
+    void replaceFromModifyPacket_shouldNotWipeOrganizationWhenAbsent() {
+        // Regression test: old code unconditionally wiped 
organization/orgCode to null when field absent
+        ObjectNode body = objectMapper.createObjectNode()
+                .put("PersonID", "person-123")
+                .put("UserFirstName", "Jane");
+        // No UserOrganization, UserOrgCode in packet
+
+        PersonEntity existingPerson = createPersonEntity();
+        existingPerson.setOrganization("Keep This Org");
+        existingPerson.setOrgCode("KEEP");
+
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+
+        personService.replaceFromModifyPacket(body);
+
+        assertThat(existingPerson.getOrganization()).isEqualTo("Keep This 
Org");
+        assertThat(existingPerson.getOrgCode()).isEqualTo("KEEP");
+    }
+
+    @Test
+    void replaceFromModifyPacket_shouldNotClearDnsWhenUserDnListAbsent() {
+        // When UserDnList is not present in the packet, existing DNs must not 
be touched
+        ObjectNode body = objectMapper.createObjectNode()
+                .put("PersonID", "person-123")
+                .put("UserFirstName", "Jane");
+        // No UserDnList field
+
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+
+        personService.replaceFromModifyPacket(body);
+
+        // deleteByPerson_Id should NOT have been called since UserDnList was 
absent
+        verify(personDnsRepository, never()).deleteByPerson_Id("person-123");
+    }
+
     @Test
     void replaceFromModifyPacket_shouldUpdateDnList() {
         JsonNode body = createModifyPacketBodyWithDns();
@@ -151,9 +244,24 @@ class PersonServiceTest {
         verify(personDnsRepository).save(any(PersonDnsEntity.class));
     }
 
+    @Test
+    void replaceFromModifyPacket_shouldClearDnsWhenEmptyUserDnListPresent() {
+        // When UserDnList is present but empty, existing DNs should be cleared
+        ObjectNode body = objectMapper.createObjectNode()
+                .put("PersonID", "person-123");
+        body.set("UserDnList", objectMapper.createArrayNode()); // explicitly 
empty array
+
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+
+        personService.replaceFromModifyPacket(body);
+
+        verify(personDnsRepository).deleteByPerson_Id("person-123");
+    }
+
     @Test
     void replaceFromModifyPacket_shouldThrowExceptionForMissingPersonId() {
-        ObjectNode body = objectMapper.createObjectNode().put("FirstName", 
"Jane");
+        ObjectNode body = objectMapper.createObjectNode().put("UserFirstName", 
"Jane");
 
         assertThatThrownBy(() -> personService.replaceFromModifyPacket(body))
                 .isInstanceOf(IllegalArgumentException.class)
@@ -170,6 +278,84 @@ class PersonServiceTest {
                 .hasMessageContaining("Unknown local PersonID: person-123");
     }
 
+    // 
-------------------------------------------------------------------------
+    // persistDnsForPerson tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void persistDnsForPerson_shouldPersistNewDns() {
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+        when(personDnsRepository.existsByPerson_IdAndDn("person-123", 
"/C=US/O=Test/CN=John Doe")).thenReturn(false);
+
+        JsonNode dnList = 
objectMapper.createArrayNode().add("/C=US/O=Test/CN=John Doe");
+        personService.persistDnsForPerson("person-123", dnList);
+
+        ArgumentCaptor<PersonDnsEntity> captor = 
ArgumentCaptor.forClass(PersonDnsEntity.class);
+        verify(personDnsRepository).save(captor.capture());
+        assertThat(captor.getValue().getDn()).isEqualTo("/C=US/O=Test/CN=John 
Doe");
+    }
+
+    @Test
+    void persistDnsForPerson_shouldBeIdempotentWhenDnAlreadyExists() {
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+        when(personDnsRepository.existsByPerson_IdAndDn("person-123", 
"/C=US/O=Test/CN=John Doe")).thenReturn(true);
+
+        JsonNode dnList = 
objectMapper.createArrayNode().add("/C=US/O=Test/CN=John Doe");
+        personService.persistDnsForPerson("person-123", dnList);
+
+        verify(personDnsRepository, never()).save(any(PersonDnsEntity.class));
+    }
+
+    @Test
+    void persistDnsForPerson_shouldPersistOnlyNewDnsWhenSomeDnsAlreadyExist() {
+        PersonEntity existingPerson = createPersonEntity();
+        
when(personRepository.findById("person-123")).thenReturn(Optional.of(existingPerson));
+        when(personDnsRepository.existsByPerson_IdAndDn("person-123", 
"/C=US/O=Existing/CN=John")).thenReturn(true);
+        when(personDnsRepository.existsByPerson_IdAndDn("person-123", 
"/C=US/O=New/CN=John")).thenReturn(false);
+
+        JsonNode dnList = objectMapper.createArrayNode()
+                .add("/C=US/O=Existing/CN=John")
+                .add("/C=US/O=New/CN=John");
+        personService.persistDnsForPerson("person-123", dnList);
+
+        ArgumentCaptor<PersonDnsEntity> captor = 
ArgumentCaptor.forClass(PersonDnsEntity.class);
+        verify(personDnsRepository, times(1)).save(captor.capture());
+        assertThat(captor.getValue().getDn()).isEqualTo("/C=US/O=New/CN=John");
+    }
+
+    @Test
+    void persistDnsForPerson_shouldDoNothingForNullDnList() {
+        personService.persistDnsForPerson("person-123", null);
+
+        verify(personRepository, never()).findById(any());
+        verify(personDnsRepository, never()).save(any());
+    }
+
+    @Test
+    void persistDnsForPerson_shouldDoNothingForEmptyDnList() {
+        JsonNode emptyDnList = objectMapper.createArrayNode();
+        personService.persistDnsForPerson("person-123", emptyDnList);
+
+        verify(personRepository, never()).findById(any());
+        verify(personDnsRepository, never()).save(any());
+    }
+
+    @Test
+    void persistDnsForPerson_shouldThrowForUnknownPersonId() {
+        
when(personRepository.findById("unknown-id")).thenReturn(Optional.empty());
+        JsonNode dnList = 
objectMapper.createArrayNode().add("/C=US/O=Test/CN=John");
+
+        assertThatThrownBy(() -> 
personService.persistDnsForPerson("unknown-id", dnList))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Unknown local PersonID: unknown-id");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // deleteFromModifyPacket tests
+    // 
-------------------------------------------------------------------------
+
     @Test
     void deleteFromModifyPacket_shouldDeletePerson() {
         JsonNode body = createModifyPacketBody();
@@ -179,13 +365,17 @@ class PersonServiceTest {
 
     @Test
     void deleteFromModifyPacket_shouldThrowExceptionForMissingPersonId() {
-        ObjectNode body = objectMapper.createObjectNode().put("FirstName", 
"Jane");
+        ObjectNode body = objectMapper.createObjectNode().put("UserFirstName", 
"Jane");
 
         assertThatThrownBy(() -> personService.deleteFromModifyPacket(body))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessageContaining("Missing required 'PersonID'");
     }
 
+    // 
-------------------------------------------------------------------------
+    // mergePersons tests
+    // 
-------------------------------------------------------------------------
+
     @Test
     void mergePersons_shouldMergeSuccessfully() {
         PersonEntity survivingPerson = createPersonEntity();
@@ -228,6 +418,10 @@ class PersonServiceTest {
                 .hasMessageContaining("Could not find retiring person with 
local ID: retiring-person-123");
     }
 
+    // 
-------------------------------------------------------------------------
+    // Test data helpers
+    // 
-------------------------------------------------------------------------
+
     private JsonNode createValidPacketBody() {
         return objectMapper.createObjectNode()
                 .put("UserGlobalID", "USER-GLOBAL-123")
@@ -249,20 +443,20 @@ class PersonServiceTest {
     private JsonNode createModifyPacketBody() {
         return objectMapper.createObjectNode()
                 .put("PersonID", "person-123")
-                .put("FirstName", "Jane")
-                .put("LastName", "Smith")
-                .put("Email", "[email protected]")
-                .put("Organization", "New Org")
-                .put("OrgCode", "NEW")
+                .put("UserFirstName", "Jane")
+                .put("UserLastName", "Smith")
+                .put("UserEmail", "[email protected]")
+                .put("UserOrganization", "New Org")
+                .put("UserOrgCode", "NEW")
                 .put("NsfStatusCode", "INACTIVE");
     }
 
     private JsonNode createModifyPacketBodyWithDns() {
         return objectMapper.createObjectNode()
                 .put("PersonID", "person-123")
-                .put("FirstName", "Jane")
-                .put("LastName", "Smith")
-                .set("DnList", objectMapper.createArrayNode().add("CN=Jane 
Smith,OU=Users,DC=example,DC=com"));
+                .put("UserFirstName", "Jane")
+                .put("UserLastName", "Smith")
+                .set("UserDnList", objectMapper.createArrayNode().add("CN=Jane 
Smith,OU=Users,DC=example,DC=com"));
     }
 
     private PersonEntity createPersonEntity() {
@@ -277,4 +471,4 @@ class PersonServiceTest {
         entity.setNsfStatusCode("ACTIVE");
         return entity;
     }
-}
\ No newline at end of file
+}
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java
index b089a8bee..fda180f8f 100644
--- 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/ProjectMembershipServiceTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.custos.access.ci.service.service;
 
 import org.apache.custos.access.ci.service.model.ClusterAccountEntity;
@@ -42,18 +60,61 @@ class ProjectMembershipServiceTest {
     }
 
     @Test
-    void createMembership_shouldReturnExistingMembership() {
+    void createMembership_shouldReturnExistingActiveMembership() {
         ProjectMembershipEntity existingMembership = createMembershipEntity();
+        existingMembership.setActive(true);
         
when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", 
"ACCOUNT-456"))
                 .thenReturn(Optional.of(existingMembership));
 
         ProjectMembershipEntity result = 
membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI");
 
         assertThat(result).isEqualTo(existingMembership);
+        assertThat(result.isActive()).isTrue();
         
verify(membershipRepository).findByProjectIdAndClusterAccountId("PROJECT-123", 
"ACCOUNT-456");
         verify(membershipRepository, 
never()).save(any(ProjectMembershipEntity.class));
     }
 
+    @Test
+    void createMembership_shouldReactivateInactiveMembership() {
+        ProjectMembershipEntity inactiveMembership = createMembershipEntity();
+        inactiveMembership.setActive(false);
+        
when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", 
"ACCOUNT-456"))
+                .thenReturn(Optional.of(inactiveMembership));
+        when(membershipRepository.save(any(ProjectMembershipEntity.class)))
+                .thenAnswer(invocation -> 
invocation.<ProjectMembershipEntity>getArgument(0));
+
+        ProjectMembershipEntity result = 
membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "USER");
+
+        assertThat(result).isSameAs(inactiveMembership);
+        assertThat(result.isActive()).isTrue();
+        assertThat(result.getRole()).isEqualTo("USER");
+        verify(membershipRepository).save(inactiveMembership);
+    }
+
+    @Test
+    void createMembership_calledTwiceForSamePair_shouldNotCreateDuplicate() {
+        ProjectEntity project = createProjectEntity();
+        ClusterAccountEntity clusterAccount = createClusterAccountEntity();
+
+        // First call: no membership exists.
+        
when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", 
"ACCOUNT-456"))
+                .thenReturn(Optional.empty());
+        
when(projectRepository.findById("PROJECT-123")).thenReturn(Optional.of(project));
+        
when(clusterAccountRepository.findById("ACCOUNT-456")).thenReturn(Optional.of(clusterAccount));
+        when(membershipRepository.save(any(ProjectMembershipEntity.class)))
+                .thenAnswer(invocation -> 
invocation.<ProjectMembershipEntity>getArgument(0));
+
+        ProjectMembershipEntity firstResult = 
membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI");
+        assertThat(firstResult.isActive()).isTrue();
+
+        // Second call: membership now exists and is active.
+        
when(membershipRepository.findByProjectIdAndClusterAccountId("PROJECT-123", 
"ACCOUNT-456"))
+                .thenReturn(Optional.of(firstResult));
+
+        ProjectMembershipEntity secondResult = 
membershipService.createMembership("PROJECT-123", "ACCOUNT-456", "PI");
+        assertThat(secondResult).isSameAs(firstResult);
+    }
+
     @Test
     void createMembership_shouldCreateNewMembership() {
         ProjectEntity project = createProjectEntity();
@@ -98,8 +159,12 @@ class ProjectMembershipServiceTest {
                 .hasMessageContaining("Cluster account not found: 
ACCOUNT-456");
     }
 
+    // 
-------------------------------------------------------------------------
+    // inactivateMembershipsByPersonAndProject tests
+    // 
-------------------------------------------------------------------------
+
     @Test
-    void inactivateMembershipsByPersonAndProject_shouldInactivateMemberships() 
{
+    void 
inactivateMembershipsByPersonAndProject_shouldInactivateMembershipsAndReturnCount()
 {
         ProjectMembershipEntity membership1 = createMembershipEntity();
         ProjectMembershipEntity membership2 = createMembershipEntity();
         membership2.setId("membership-2");
@@ -107,20 +172,28 @@ class ProjectMembershipServiceTest {
         List<ProjectMembershipEntity> memberships = List.of(membership1, 
membership2);
         
when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123",
 "PERSON-456")).thenReturn(memberships);
 
-        
membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
+        int count = 
membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
 
+        assertThat(count).isEqualTo(2);
         assertThat(membership1.isActive()).isFalse();
         assertThat(membership2.isActive()).isFalse();
         verify(membershipRepository).saveAll(memberships);
     }
 
     @Test
-    void inactivateMembershipsByPersonAndProject_shouldHandleNoMemberships() {
+    void 
inactivateMembershipsByPersonAndProject_shouldReturnZeroWhenNoMembershipsFound()
 {
         
when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123",
 "PERSON-456")).thenReturn(List.of());
-        
membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
+
+        int count = 
membershipService.inactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
+
+        assertThat(count).isZero();
         verify(membershipRepository, never()).saveAll(any());
     }
 
+    // 
-------------------------------------------------------------------------
+    // inactivateAllMembershipsForProject tests
+    // 
-------------------------------------------------------------------------
+
     @Test
     void inactivateAllMembershipsForProject_shouldInactivateAllMemberships() {
         ProjectMembershipEntity membership1 = createMembershipEntity();
@@ -144,6 +217,10 @@ class ProjectMembershipServiceTest {
         verify(membershipRepository, never()).saveAll(any());
     }
 
+    // 
-------------------------------------------------------------------------
+    // reactivatePiMembership tests
+    // 
-------------------------------------------------------------------------
+
     @Test
     void reactivatePiMembership_shouldReactivatePiMemberships() {
         ProjectMembershipEntity piMembership1 = createMembershipEntity();
@@ -172,6 +249,39 @@ class ProjectMembershipServiceTest {
         verify(membershipRepository).saveAll(List.of());
     }
 
+    // 
-------------------------------------------------------------------------
+    // reactivateMembershipsByPersonAndProject tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void 
reactivateMembershipsByPersonAndProject_shouldReactivateMembershipsAndReturnCount()
 {
+        ProjectMembershipEntity membership1 = createMembershipEntity();
+        membership1.setActive(false);
+        ProjectMembershipEntity membership2 = createMembershipEntity();
+        membership2.setId("membership-2");
+        membership2.setActive(false);
+
+        List<ProjectMembershipEntity> memberships = List.of(membership1, 
membership2);
+        
when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123",
 "PERSON-456")).thenReturn(memberships);
+
+        int count = 
membershipService.reactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
+
+        assertThat(count).isEqualTo(2);
+        assertThat(membership1.isActive()).isTrue();
+        assertThat(membership2.isActive()).isTrue();
+        verify(membershipRepository).saveAll(memberships);
+    }
+
+    @Test
+    void 
reactivateMembershipsByPersonAndProject_shouldReturnZeroWhenNoMembershipsFound()
 {
+        
when(membershipRepository.findByProjectIdAndClusterAccount_Person_Id("PROJECT-123",
 "PERSON-456")).thenReturn(List.of());
+
+        int count = 
membershipService.reactivateMembershipsByPersonAndProject("PROJECT-123", 
"PERSON-456");
+
+        assertThat(count).isZero();
+        verify(membershipRepository, never()).saveAll(any());
+    }
+
     private ProjectEntity createProjectEntity() {
         ProjectEntity entity = new ProjectEntity();
         entity.setId("PROJECT-123");
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java
index 38119a10a..81ee9671b 100644
--- 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/service/UserAccountServiceTest.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
 package org.apache.custos.access.ci.service.service;
 
 import org.apache.custos.access.ci.service.model.ClusterAccountEntity;
@@ -10,11 +28,13 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import java.util.List;
 import java.util.Optional;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -32,9 +52,49 @@ class UserAccountServiceTest {
         userAccountService = new UserAccountService(clusterAccountRepository);
     }
 
+    @Test
+    void 
provisionClusterAccount_shouldReturnExistingAccountWhenPersonAlreadyHasOne() {
+        PersonEntity person = createPersonEntity();
+        ClusterAccountEntity existingAccount = 
createClusterAccountEntity(person, "jdoe");
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of(existingAccount));
+
+        ClusterAccountEntity result = 
userAccountService.provisionClusterAccount(person);
+
+        assertThat(result).isSameAs(existingAccount);
+        assertThat(result.getUsername()).isEqualTo("jdoe");
+        verify(clusterAccountRepository, 
never()).save(any(ClusterAccountEntity.class));
+    }
+
+    @Test
+    void 
provisionClusterAccount_calledTwiceForSamePerson_shouldReturnSameAccountBothTimes()
 {
+        PersonEntity person = createPersonEntity();
+        ClusterAccountEntity existingAccount = 
createClusterAccountEntity(person, "jdoe");
+
+        // First call: no account exists yet, create one.
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
+        
when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.empty());
+        when(clusterAccountRepository.save(any(ClusterAccountEntity.class)))
+                .thenAnswer(invocation -> 
invocation.<ClusterAccountEntity>getArgument(0));
+
+        ClusterAccountEntity firstResult = 
userAccountService.provisionClusterAccount(person);
+        assertThat(firstResult.getUsername()).isEqualTo("jdoe");
+
+        // Second call: account now exists.
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of(existingAccount));
+
+        ClusterAccountEntity secondResult = 
userAccountService.provisionClusterAccount(person);
+        assertThat(secondResult).isSameAs(existingAccount);
+        assertThat(secondResult.getUsername()).isEqualTo("jdoe");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // New account creation tests
+    // 
-------------------------------------------------------------------------
+
     @Test
     void provisionClusterAccount_shouldCreateUniqueUsername() {
         PersonEntity person = createPersonEntity();
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
         
when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.empty());
         when(clusterAccountRepository.save(any(ClusterAccountEntity.class)))
                 .thenAnswer(invocation -> 
invocation.<ClusterAccountEntity>getArgument(0));
@@ -50,6 +110,7 @@ class UserAccountServiceTest {
     @Test
     void provisionClusterAccount_shouldGenerateUniqueUsernameWhenBaseIsTaken() 
{
         PersonEntity person = createPersonEntity();
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
         
when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.of(new
 ClusterAccountEntity()));
         
when(clusterAccountRepository.findByUsername("jdoe1")).thenReturn(Optional.empty());
         when(clusterAccountRepository.save(any(ClusterAccountEntity.class)))
@@ -66,6 +127,7 @@ class UserAccountServiceTest {
     @Test
     void provisionClusterAccount_shouldHandleMultipleSuffixes() {
         PersonEntity person = createPersonEntity();
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
         
when(clusterAccountRepository.findByUsername("jdoe")).thenReturn(Optional.of(new
 ClusterAccountEntity()));
         
when(clusterAccountRepository.findByUsername("jdoe1")).thenReturn(Optional.of(new
 ClusterAccountEntity()));
         
when(clusterAccountRepository.findByUsername("jdoe2")).thenReturn(Optional.empty());
@@ -85,6 +147,7 @@ class UserAccountServiceTest {
         PersonEntity person = createPersonEntity();
         person.setFirstName("John Michael");
         person.setLastName("Doe Smith");
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
         
when(clusterAccountRepository.findByUsername("jdoe-smith")).thenReturn(Optional.empty());
         when(clusterAccountRepository.save(any(ClusterAccountEntity.class)))
                 .thenAnswer(invocation -> 
invocation.<ClusterAccountEntity>getArgument(0));
@@ -102,10 +165,15 @@ class UserAccountServiceTest {
         PersonEntity person = createPersonEntity();
         person.setFirstName("");
         person.setLastName("");
+        
when(clusterAccountRepository.findByPerson(person)).thenReturn(List.of());
 
         assertThatThrownBy(() -> 
userAccountService.provisionClusterAccount(person)).isInstanceOf(StringIndexOutOfBoundsException.class);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Helper factories
+    // 
-------------------------------------------------------------------------
+
     private PersonEntity createPersonEntity() {
         PersonEntity entity = new PersonEntity();
         entity.setId("person-123");
@@ -115,4 +183,12 @@ class UserAccountServiceTest {
         entity.setEmail("[email protected]");
         return entity;
     }
+
+    private ClusterAccountEntity createClusterAccountEntity(PersonEntity 
person, String username) {
+        ClusterAccountEntity entity = new ClusterAccountEntity();
+        entity.setId("account-456");
+        entity.setPerson(person);
+        entity.setUsername(username);
+        return entity;
+    }
 }
diff --git 
a/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java
 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java
new file mode 100644
index 000000000..05e7f1df5
--- /dev/null
+++ 
b/allocations/access-ci-service/src/test/java/org/apache/custos/access/ci/service/worker/amie/ProcessingEventWorkerTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.custos.access.ci.service.worker.amie;
+
+import org.apache.custos.access.ci.service.handler.amie.PacketRouter;
+import org.apache.custos.access.ci.service.model.amie.PacketEntity;
+import org.apache.custos.access.ci.service.model.amie.PacketStatus;
+import org.apache.custos.access.ci.service.model.amie.ProcessingErrorEntity;
+import org.apache.custos.access.ci.service.model.amie.ProcessingEventEntity;
+import org.apache.custos.access.ci.service.model.amie.ProcessingEventType;
+import org.apache.custos.access.ci.service.model.amie.ProcessingStatus;
+import org.apache.custos.access.ci.service.repo.amie.PacketRepository;
+import org.apache.custos.access.ci.service.repo.amie.ProcessingErrorRepository;
+import org.apache.custos.access.ci.service.repo.amie.ProcessingEventRepository;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for ProcessingEventWorker.
+ */
+@ExtendWith(MockitoExtension.class)
+@Tag("unit")
+class ProcessingEventWorkerTest {
+
+    @Mock
+    private ProcessingEventRepository eventRepo;
+
+    @Mock
+    private PacketRepository packetRepo;
+
+    @Mock
+    private ProcessingErrorRepository errorRepo;
+
+    @Mock
+    private PacketRouter router;
+
+    @Mock
+    private ProcessingEventWorker self;
+
+    private ProcessingEventWorker worker;
+
+    @BeforeEach
+    void setUp() {
+        worker = new ProcessingEventWorker(eventRepo, packetRepo, errorRepo, 
router, self);
+    }
+
+    // ------------------------------------------------------------------
+    // processPendingEvents — orchestration
+    // ------------------------------------------------------------------
+
+    @Test
+    void processPendingEvents_whenNoEvents_doesNothing() throws Exception {
+        when(eventRepo.findTop50EventsToProcess(anyList(), 
any(Instant.class))).thenReturn(List.of());
+
+        worker.processPendingEvents();
+
+        verify(self, never()).executeEventInTransaction(any());
+        verify(self, never()).recordFailureInNewTransaction(any(), any());
+    }
+
+    @Test
+    void processPendingEvents_callsExecuteForEachEvent() throws Exception {
+        ProcessingEventEntity event1 = buildEvent("id-1", 
ProcessingStatus.NEW, 0);
+        ProcessingEventEntity event2 = buildEvent("id-2", 
ProcessingStatus.RETRY_SCHEDULED, 1);
+        when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class)))
+                .thenReturn(List.of(event1, event2));
+
+        worker.processPendingEvents();
+
+        verify(self).executeEventInTransaction(event1);
+        verify(self).executeEventInTransaction(event2);
+        verify(self, never()).recordFailureInNewTransaction(any(), any());
+    }
+
+    @Test
+    void 
processPendingEvents_whenExecuteThrows_callsRecoveryTransactionWithEventId() 
throws Exception {
+        ProcessingEventEntity event = buildEvent("id-thrown", 
ProcessingStatus.NEW, 0);
+        when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class)))
+                .thenReturn(List.of(event));
+
+        RuntimeException cause = new RuntimeException("handler blew up");
+        doThrow(cause).when(self).executeEventInTransaction(event);
+
+        worker.processPendingEvents();
+
+        verify(self).recordFailureInNewTransaction("id-thrown", cause);
+    }
+
+    @Test
+    void 
processPendingEvents_whenBothExecuteAndRecoveryThrow_continuesProcessingRemainingEvents()
 throws Exception {
+        ProcessingEventEntity badEvent = buildEvent("bad-id", 
ProcessingStatus.NEW, 0);
+        ProcessingEventEntity goodEvent = buildEvent("good-id", 
ProcessingStatus.NEW, 0);
+        when(eventRepo.findTop50EventsToProcess(anyList(), any(Instant.class)))
+                .thenReturn(List.of(badEvent, goodEvent));
+
+        RuntimeException executeCause = new RuntimeException("execute failed");
+        doThrow(executeCause).when(self).executeEventInTransaction(badEvent);
+        doThrow(new RuntimeException("recovery also failed"))
+                .when(self).recordFailureInNewTransaction("bad-id", 
executeCause);
+
+        // Should not propagate — the worker logs and continues to goodEvent.
+        worker.processPendingEvents();
+
+        verify(self).executeEventInTransaction(goodEvent);
+    }
+
+    // ------------------------------------------------------------------
+    // recordFailureInNewTransaction — retryable failures
+    // ------------------------------------------------------------------
+
+    @Test
+    void 
recordFailureInNewTransaction_onFirstFailure_setsRetryScheduledWithBackoff() {
+        ProcessingEventEntity event = buildEvent("evt-1", 
ProcessingStatus.NEW, 0);
+        PacketEntity packet = event.getPacket();
+        when(eventRepo.findById("evt-1")).thenReturn(Optional.of(event));
+
+        RuntimeException cause = new IllegalArgumentException("Project not 
found");
+        Instant before = Instant.now();
+
+        worker.recordFailureInNewTransaction("evt-1", cause);
+
+        Instant after = Instant.now();
+
+        
assertThat(event.getStatus()).isEqualTo(ProcessingStatus.RETRY_SCHEDULED);
+        assertThat(event.getAttempts()).isEqualTo(1); // incremented from 0
+        assertThat(event.getLastError()).isEqualTo("Project not found");
+        assertThat(event.getNextRetryAt()).isNotNull();
+        // Backoff for attempt 1 = 30 s; nextRetryAt must be in the near future
+        
assertThat(event.getNextRetryAt()).isAfterOrEqualTo(before.plusSeconds(29));
+        
assertThat(event.getNextRetryAt()).isBeforeOrEqualTo(after.plusSeconds(31));
+
+        verify(eventRepo).save(event);
+        verify(packetRepo, never()).save(any());
+
+        ArgumentCaptor<ProcessingErrorEntity> errorCaptor = 
ArgumentCaptor.forClass(ProcessingErrorEntity.class);
+        verify(errorRepo).save(errorCaptor.capture());
+        ProcessingErrorEntity error = errorCaptor.getValue();
+        assertThat(error.getSummary()).contains("IllegalArgumentException");
+        assertThat(error.getSummary()).contains("Project not found");
+        assertThat(error.getPacket()).isSameAs(packet);
+        assertThat(error.getEvent()).isSameAs(event);
+    }
+
+    @Test
+    void 
recordFailureInNewTransaction_onSecondFailure_setsRetryScheduledWithDoubledBackoff()
 {
+        // Simulate: the first attempt rolled back (REQUIRES_NEW), so the 
event is
+        // re-read with RETRY_SCHEDULED status and attempts=1 (the previous 
committed
+        // retry count). The recovery method unconditionally increments 
attempts to 2
+        // and applies a 60-second backoff (BASE * 2^1).
+        ProcessingEventEntity event = buildEvent("evt-2", 
ProcessingStatus.RETRY_SCHEDULED, 1);
+        when(eventRepo.findById("evt-2")).thenReturn(Optional.of(event));
+
+        Instant before = Instant.now();
+        worker.recordFailureInNewTransaction("evt-2", new 
RuntimeException("second failure"));
+        Instant after = Instant.now();
+
+        
assertThat(event.getStatus()).isEqualTo(ProcessingStatus.RETRY_SCHEDULED);
+        // Attempts unconditionally incremented from 1 to 2
+        assertThat(event.getAttempts()).isEqualTo(2);
+        // Backoff for attempt 2 = 60 s
+        
assertThat(event.getNextRetryAt()).isAfterOrEqualTo(before.plusSeconds(59));
+        
assertThat(event.getNextRetryAt()).isBeforeOrEqualTo(after.plusSeconds(61));
+    }
+
+    // ------------------------------------------------------------------
+    // recordFailureInNewTransaction — permanent failure
+    // ------------------------------------------------------------------
+
+    @Test
+    void 
recordFailureInNewTransaction_afterMaxAttempts_marksPermanentlyFailed() {
+        // Simulate event on its last allowed retry (attempts already at MAX-1 
in RETRY_SCHEDULED)
+        // When we increment, it reaches MAX_ATTEMPTS = 3
+        ProcessingEventEntity event = buildEvent("evt-perm", 
ProcessingStatus.RETRY_SCHEDULED, 2);
+        PacketEntity packet = event.getPacket();
+        when(eventRepo.findById("evt-perm")).thenReturn(Optional.of(event));
+
+        RuntimeException cause = new RuntimeException("final failure");
+        worker.recordFailureInNewTransaction("evt-perm", cause);
+
+        
assertThat(event.getStatus()).isEqualTo(ProcessingStatus.PERMANENTLY_FAILED);
+        assertThat(event.getAttempts()).isEqualTo(3);
+        assertThat(event.getNextRetryAt()).isNull();
+
+        // Packet must also be marked FAILED
+        verify(packetRepo).save(packet);
+        assertThat(packet.getStatus()).isEqualTo(PacketStatus.FAILED);
+        assertThat(packet.getLastError()).isEqualTo("final failure");
+
+        verify(eventRepo).save(event);
+        verify(errorRepo).save(any(ProcessingErrorEntity.class));
+    }
+
+    @Test
+    void recordFailureInNewTransaction_whenEventNotFound_doesNotThrow() {
+        when(eventRepo.findById("missing")).thenReturn(Optional.empty());
+
+        // Should log and return without throwing or saving anything
+        worker.recordFailureInNewTransaction("missing", new 
RuntimeException("cause"));
+
+        verify(eventRepo, never()).save(any());
+        verify(errorRepo, never()).save(any());
+    }
+
+    // ------------------------------------------------------------------
+    // computeNextRetryAt — exponential backoff
+    // ------------------------------------------------------------------
+
+    @Test
+    void computeNextRetryAt_attempt1_returns30SecondDelay() {
+        Instant before = Instant.now();
+        Instant result = ProcessingEventWorker.computeNextRetryAt(1);
+        Instant after = Instant.now();
+
+        assertThat(result).isAfterOrEqualTo(before.plusSeconds(29));
+        assertThat(result).isBeforeOrEqualTo(after.plusSeconds(31));
+    }
+
+    @Test
+    void computeNextRetryAt_attempt2_returns60SecondDelay() {
+        Instant before = Instant.now();
+        Instant result = ProcessingEventWorker.computeNextRetryAt(2);
+        Instant after = Instant.now();
+
+        assertThat(result).isAfterOrEqualTo(before.plusSeconds(59));
+        assertThat(result).isBeforeOrEqualTo(after.plusSeconds(61));
+    }
+
+    @Test
+    void computeNextRetryAt_highAttemptNumber_capsAtMaxBackoff() {
+        // At attempt 100, delay would be astronomically large without capping.
+        // MAX_BACKOFF_SECONDS = 600
+        Instant before = Instant.now();
+        Instant result = ProcessingEventWorker.computeNextRetryAt(100);
+        Instant after = Instant.now();
+
+        // Must not exceed MAX_BACKOFF_SECONDS + small tolerance
+        assertThat(result).isBeforeOrEqualTo(after.plusSeconds(601));
+        // Must still be at least MAX_BACKOFF_SECONDS in the future
+        assertThat(result).isAfterOrEqualTo(before.plusSeconds(599));
+    }
+
+    private ProcessingEventEntity buildEvent(String id, ProcessingStatus 
status, int attempts) {
+        PacketEntity packet = new PacketEntity();
+        packet.setAmieId(42L);
+        packet.setType("request_account_create");
+        packet.setStatus(PacketStatus.NEW);
+        // Provide minimal valid JSON so objectMapper.readTree won't NPE if 
called directly
+        packet.setRawJson("{\"body\":{}}");
+
+        ProcessingEventEntity event = new ProcessingEventEntity();
+        // Directly set the id field via reflection to avoid UUID randomness 
issues in tests
+        try {
+            java.lang.reflect.Field idField = 
ProcessingEventEntity.class.getDeclaredField("id");
+            idField.setAccessible(true);
+            idField.set(event, id);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        event.setPacket(packet);
+        event.setType(ProcessingEventType.DECODE_PACKET);
+        event.setStatus(status);
+        event.setAttempts(attempts);
+        event.setPayload(new byte[0]);
+        return event;
+    }
+}


Reply via email to