This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch lp-functions-proto
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b81edb0592447502907b196dc9303161439afc03
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Mar 29 16:57:20 2026 -0700

    [improve][fn] Fix LightProto equality issues in worker and test classes
    
    LightProto generated classes do not implement equals()/hashCode(), using
    JVM identity-based defaults. This broke several worker classes and tests
    that relied on structural equality of proto objects.
    
    Production fixes:
    - FunctionRuntimeManager: replace Assignment/Instance .equals() with
      Arrays.equals(a.toByteArray(), b.toByteArray()) comparisons
    - MembershipManager: replace Map<Instance,Long> with Map<String,Long>
      keyed by fully-qualified instance ID string to avoid broken HashMap
      behavior with LightProto Instance keys
    - SchedulerManager: replace Instance .equals() with byte-array comparison
      when checking whether an existing assignment's instance has changed
    
    Test fixes:
    - FunctionRuntimeManagerTest, MembershipManagerTest, SchedulerManagerTest,
      FunctionMetaDataManagerTest, FunctionApiV2ResourceTest: replace all
      assertEquals/eq()/argThat()/Set.contains() patterns that compare
      LightProto objects structurally with Arrays.equals(toByteArray())
      equivalents
---
 .../functions/worker/FunctionRuntimeManager.java   |  5 +-
 .../pulsar/functions/worker/MembershipManager.java | 48 +++++------
 .../pulsar/functions/worker/SchedulerManager.java  |  3 +-
 .../worker/FunctionMetaDataManagerTest.java        |  2 -
 .../worker/FunctionRuntimeManagerTest.java         | 55 ++++++++-----
 .../functions/worker/MembershipManagerTest.java    | 15 ++--
 .../functions/worker/SchedulerManagerTest.java     | 96 ++++++++++++----------
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  4 +-
 8 files changed, 126 insertions(+), 102 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index af71c478ed4..d44068b68db 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -721,7 +722,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable {
         Assignment existingAssignment = this.findAssignment(assignment);
         // potential updates need to happen
 
-        if (!existingAssignment.equals(assignment)) {
+        if (!Arrays.equals(existingAssignment.toByteArray(), 
assignment.toByteArray())) {
             FunctionRuntimeInfo functionRuntimeInfo = 
get_FunctionRuntimeInfo(fullyQualifiedInstanceId);
 
             // for externally managed functions we don't really care about 
which worker
@@ -734,7 +735,7 @@ public class FunctionRuntimeManager implements 
AutoCloseable {
 
             if (runtimeFactory.externallyManaged()) {
                 // change in metadata thus need to potentially restart
-                if 
(!assignment.getInstance().equals(existingAssignment.getInstance())) {
+                if (!Arrays.equals(assignment.getInstance().toByteArray(), 
existingAssignment.getInstance().toByteArray())) {
                     //stop function
                     if (functionRuntimeInfo != null) {
                         this.conditionallyStopFunction(functionRuntimeInfo);
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index dea2be99860..efe4fd45115 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -22,7 +22,6 @@ import static 
org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeat
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -55,9 +54,10 @@ public class MembershipManager implements AutoCloseable {
     private static final String WORKER_IDENTIFIER = "id";
 
     // How long functions have remained assigned or scheduled on a failed node
-    // FullyQualifiedFunctionName -> time in millis
+    // FullyQualifiedInstanceId (String) -> time in millis
+    // Uses String key instead of Instance to avoid relying on LightProto's 
identity-based hashCode/equals
     @VisibleForTesting
-    Map<Instance, Long> unsignedFunctionDurations = new HashMap<>();
+    Map<String, Long> unsignedFunctionDurations = new HashMap<>();
 
     MembershipManager(WorkerService workerService, PulsarClient pulsarClient, 
PulsarAdmin pulsarAdmin) {
         this.workerConfig = workerService.getWorkerConfig();
@@ -134,12 +134,13 @@ public class MembershipManager implements AutoCloseable {
         long currentTimeMs = System.currentTimeMillis();
 
         // remove functions
-        Iterator<Map.Entry<Instance, Long>> it = 
unsignedFunctionDurations.entrySet().iterator();
+        Iterator<Map.Entry<String, Long>> it = 
unsignedFunctionDurations.entrySet().iterator();
         while (it.hasNext()) {
-            Map.Entry<Instance, Long> entry = it.next();
-            String fullyQualifiedFunctionName = 
FunctionCommon.getFullyQualifiedName(
-                    entry.getKey().getFunctionMetaData().getFunctionDetails());
-            String fullyQualifiedInstanceId = 
FunctionCommon.getFullyQualifiedInstanceId(entry.getKey());
+            Map.Entry<String, Long> entry = it.next();
+            String fullyQualifiedInstanceId = entry.getKey();
+            // derive function name from instance id (strip the ":instanceId" 
suffix)
+            String fullyQualifiedFunctionName =
+                    fullyQualifiedInstanceId.substring(0, 
fullyQualifiedInstanceId.lastIndexOf(':'));
             //remove functions that don't exist anymore
             if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) {
                 it.remove();
@@ -164,17 +165,18 @@ public class MembershipManager implements AutoCloseable {
                     functionMetaData.getFunctionDetails().getName(),
                     currentAssignments);
 
-            Set<Instance> assignedInstances = assignments.stream()
-                    .map(assignment -> assignment.getInstance())
+            Set<String> assignedInstanceIds = assignments.stream()
+                    .map(assignment -> 
FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()))
                     .collect(Collectors.toSet());
 
-            Set<Instance> instances = new 
HashSet<>(SchedulerManager.computeInstances(functionMetaData,
-                    
functionRuntimeManager.getRuntimeFactory().externallyManaged()));
+            List<Instance> instances = 
SchedulerManager.computeInstances(functionMetaData,
+                    
functionRuntimeManager.getRuntimeFactory().externallyManaged());
 
             for (Instance instance : instances) {
-                if (!assignedInstances.contains(instance)) {
-                    if (!this.unsignedFunctionDurations.containsKey(instance)) 
{
-                        this.unsignedFunctionDurations.put(instance, 
currentTimeMs);
+                String instanceId = 
FunctionCommon.getFullyQualifiedInstanceId(instance);
+                if (!assignedInstanceIds.contains(instanceId)) {
+                    if 
(!this.unsignedFunctionDurations.containsKey(instanceId)) {
+                        this.unsignedFunctionDurations.put(instanceId, 
currentTimeMs);
                     }
                 }
             }
@@ -191,8 +193,9 @@ public class MembershipManager implements AutoCloseable {
                     if (checkHeartBeatFunction(instance) != null) {
                         continue;
                     }
-                    if (!this.unsignedFunctionDurations.containsKey(instance)) 
{
-                        this.unsignedFunctionDurations.put(instance, 
currentTimeMs);
+                    String instanceId = 
FunctionCommon.getFullyQualifiedInstanceId(instance);
+                    if 
(!this.unsignedFunctionDurations.containsKey(instanceId)) {
+                        this.unsignedFunctionDurations.put(instanceId, 
currentTimeMs);
                     }
                 }
             }
@@ -200,17 +203,16 @@ public class MembershipManager implements AutoCloseable {
 
         boolean triggerScheduler = false;
         // check unassigned
-        Collection<Instance> needSchedule = new LinkedList<>();
+        Collection<String> needSchedule = new LinkedList<>();
         Collection<Assignment> needRemove = new LinkedList<>();
         Map<String, Integer> numRemoved = new HashMap<>();
-        for (Map.Entry<Instance, Long> entry : 
this.unsignedFunctionDurations.entrySet()) {
-            Instance instance = entry.getKey();
+        for (Map.Entry<String, Long> entry : 
this.unsignedFunctionDurations.entrySet()) {
+            String instanceId = entry.getKey();
             long unassignedDurationMs = entry.getValue();
             if (currentTimeMs - unassignedDurationMs > 
this.workerConfig.getRescheduleTimeoutMs()) {
-                needSchedule.add(instance);
+                needSchedule.add(instanceId);
                 // remove assignment from failed node
-                Assignment assignment =
-                        
assignmentMap.get(FunctionCommon.getFullyQualifiedInstanceId(instance));
+                Assignment assignment = assignmentMap.get(instanceId);
                 if (assignment != null) {
                     needRemove.add(assignment);
 
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 1d89dfb195e..d5658db296f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -398,7 +399,7 @@ public class SchedulerManager implements AutoCloseable {
                 Assignment assignment = entry.getValue();
                 Instance instance = allInstances.get(fullyQualifiedInstanceId);
 
-                if (!assignment.getInstance().equals(instance)) {
+                if (!Arrays.equals(assignment.getInstance().toByteArray(), 
instance.toByteArray())) {
                     Assignment updatedAssignment = new Assignment();
                     updatedAssignment.copyFrom(assignment);
                     updatedAssignment.setInstance().copyFrom(instance);
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index 3c353d95db8..96803d32d24 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -308,7 +308,6 @@ public class FunctionMetaDataManagerTest {
 
         verify(functionMetaDataManager, times(1)).processUpdate
                 (any(FunctionMetaData.class));
-        
verify(functionMetaDataManager).processUpdate(serviceRequest.getFunctionMetaData());
 
         serviceRequest = new ServiceRequest()
                 
.setServiceRequestType(ServiceRequest.ServiceRequestType.INITIALIZE);
@@ -322,7 +321,6 @@ public class FunctionMetaDataManagerTest {
 
         verify(functionMetaDataManager, times(1)).processDeregister(
                 any(FunctionMetaData.class));
-        
verify(functionMetaDataManager).processDeregister(serviceRequest.getFunctionMetaData());
     }
 
     @Test
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 8cb6660fbe9..b84755cbec7 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -39,6 +39,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import io.netty.buffer.Unpooled;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -178,13 +179,16 @@ public class FunctionRuntimeManagerTest {
                     .get("test-tenant/test-namespace/func-2:0"), assignment2);
             verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
             verify(functionActioner).startFunction(argThat(
-                    functionRuntimeInfo -> 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                            .equals(function1)));
+                    functionRuntimeInfo -> Arrays.equals(
+                            
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(),
+                            function1.toByteArray())));
             verify(functionActioner, 
times(0)).stopFunction(any(FunctionRuntimeInfo.class));
 
             assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 
1);
-            
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
-                    new 
FunctionRuntimeInfo().setFunctionInstance(createInstance(function1, 0)));
+            assertEquals(
+                    
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
+                            .getFunctionInstance().toByteArray(),
+                    createInstance(function1, 0).toByteArray());
         }
     }
 
@@ -265,8 +269,9 @@ public class FunctionRuntimeManagerTest {
             verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
             verify(functionActioner, 
times(1)).terminateFunction(any(FunctionRuntimeInfo.class));
             verify(functionActioner).terminateFunction(argThat(
-                    functionRuntimeInfo -> 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                            .equals(function1)));
+                    functionRuntimeInfo -> Arrays.equals(
+                            
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(),
+                            function1.toByteArray())));
 
             assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 
0);
         }
@@ -349,13 +354,15 @@ public class FunctionRuntimeManagerTest {
             verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
             verify(functionActioner).stopFunction(argThat(
-                    functionRuntimeInfo -> 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                            .equals(function2)));
+                    functionRuntimeInfo -> Arrays.equals(
+                            
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(),
+                            function2.toByteArray())));
 
             verify(functionActioner, 
times(1)).startFunction(any(FunctionRuntimeInfo.class));
             verify(functionActioner).startFunction(argThat(
-                    functionRuntimeInfo -> 
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                            .equals(function2)));
+                    functionRuntimeInfo -> Arrays.equals(
+                            
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(),
+                            function2.toByteArray())));
 
             assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 
2);
             assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 
1);
@@ -379,7 +386,8 @@ public class FunctionRuntimeManagerTest {
             verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
             verify(functionActioner).stopFunction(argThat(functionRuntimeInfo 
->
-                    
functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2)));
+                    
Arrays.equals(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(),
+                            function2.toByteArray())));
 
             verify(functionActioner, 
times(0)).startFunction(any(FunctionRuntimeInfo.class));
 
@@ -486,8 +494,10 @@ public class FunctionRuntimeManagerTest {
                     .get("worker-2"));
 
             assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 
1);
-            
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
-                    functionRuntimeInfo);
+            assertEquals(
+                    
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
+                            .getFunctionInstance().toByteArray(),
+                    functionRuntimeInfo.getFunctionInstance().toByteArray());
         }
     }
 
@@ -609,12 +619,15 @@ public class FunctionRuntimeManagerTest {
             verify(functionActioner, 
times(0)).terminateFunction(any(FunctionRuntimeInfo.class));
 
             verify(functionActioner).startFunction(
-                    argThat(functionRuntimeInfo -> 
functionRuntimeInfo.getFunctionInstance()
-                            .equals(assignment1.getInstance())));
+                    argThat(functionRuntimeInfo -> Arrays.equals(
+                            
functionRuntimeInfo.getFunctionInstance().toByteArray(),
+                            assignment1.getInstance().toByteArray())));
 
             assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 
1);
-            
assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"),
-                    new 
FunctionRuntimeInfo().setFunctionInstance(createInstance(function1, 0)));
+            assertEquals(
+                    
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
+                            .getFunctionInstance().toByteArray(),
+                    createInstance(function1, 0).toByteArray());
 
             // verify no errors occurred
             verify(errorNotifier, times(0)).triggerError(any());
@@ -741,16 +754,16 @@ public class FunctionRuntimeManagerTest {
 
             assertEquals(
                     
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
-                            .getFunctionInstance(),
-                    functionRuntimeInfo.getFunctionInstance());
+                            .getFunctionInstance().toByteArray(),
+                    functionRuntimeInfo.getFunctionInstance().toByteArray());
             assertNotNull(
                     
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                             .getRuntimeSpawner());
 
             assertEquals(
                     
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
-                            
.getRuntimeSpawner().getInstanceConfig().getFunctionDetails(),
-                    function1.getFunctionDetails());
+                            
.getRuntimeSpawner().getInstanceConfig().getFunctionDetails().toByteArray(),
+                    function1.getFunctionDetails().toByteArray());
             assertEquals(
                     
functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0")
                             
.getRuntimeSpawner().getInstanceConfig().getInstanceId(),
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 03c5e38d5d9..b572cd8196b 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.functions.proto.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Instance;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -237,10 +238,11 @@ public class MembershipManagerTest {
         verify(functionRuntimeManager, times(0)).removeAssignments(any());
         assertEquals(membershipManager.unsignedFunctionDurations.size(), 1);
         Instance instance = createInstance(function2, 0);
-        
assertNotNull(membershipManager.unsignedFunctionDurations.get(instance));
+        String instanceId = 
FunctionCommon.getFullyQualifiedInstanceId(instance);
+        
assertNotNull(membershipManager.unsignedFunctionDurations.get(instanceId));
 
-        membershipManager.unsignedFunctionDurations.put(instance,
-                membershipManager.unsignedFunctionDurations.get(instance) - 
30001);
+        membershipManager.unsignedFunctionDurations.put(instanceId,
+                membershipManager.unsignedFunctionDurations.get(instanceId) - 
30001);
 
         membershipManager.checkFailures(functionMetaDataManager, 
functionRuntimeManager, schedulerManager);
 
@@ -315,10 +317,11 @@ public class MembershipManagerTest {
         verify(functionRuntimeManager, times(0)).removeAssignments(any());
         assertEquals(membershipManager.unsignedFunctionDurations.size(), 1);
         Instance instance = createInstance(function2, 0);
-        
assertNotNull(membershipManager.unsignedFunctionDurations.get(instance));
+        String instanceId = 
FunctionCommon.getFullyQualifiedInstanceId(instance);
+        
assertNotNull(membershipManager.unsignedFunctionDurations.get(instanceId));
 
-        membershipManager.unsignedFunctionDurations.put(instance,
-                membershipManager.unsignedFunctionDurations.get(instance) - 
30001);
+        membershipManager.unsignedFunctionDurations.put(instanceId,
+                membershipManager.unsignedFunctionDurations.get(instanceId) - 
30001);
 
         membershipManager.checkFailures(functionMetaDataManager, 
functionRuntimeManager, schedulerManager);
 
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index d83a09c90f8..d234d921cbe 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyBoolean;
@@ -33,6 +34,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -290,10 +293,11 @@ public class SchedulerManagerTest {
 
         log.info("assignments: {}", assignments);
         Assignment assignment2 = createAssignment("worker-1", function2, 0);
-        Assert.assertEquals(assignment2, assignments);
+        Assert.assertEquals(assignment2.toByteArray(), 
assignments.toByteArray());
 
         // make sure we also directly added the assignment to in memory 
assignment cache in function runtime manager
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment2));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment2.toByteArray())));
     }
 
     @Test
@@ -395,7 +399,7 @@ public class SchedulerManagerTest {
         log.info("assignments: {}", assignments);
 
         Assignment assignment2 = createAssignment("worker-1", function2, 0);
-        Assert.assertEquals(assignments, assignment2);
+        Assert.assertEquals(assignments.toByteArray(), 
assignment2.toByteArray());
 
         // updating assignments
         currentAssignments.get("worker-1")
@@ -420,14 +424,14 @@ public class SchedulerManagerTest {
         invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
 
-        Set<Assignment> allAssignments = new HashSet<>();
+        List<byte[]> allAssignmentBytesScaled = new ArrayList<>();
         invocations.forEach(invocation -> {
-            allAssignments.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]));
+            allAssignmentBytesScaled.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]).toByteArray());
         });
 
-        assertTrue(allAssignments.contains(assignment2Scaled1));
-        assertTrue(allAssignments.contains(assignment2Scaled2));
-        assertTrue(allAssignments.contains(assignment2Scaled3));
+        assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> 
Arrays.equals(b, assignment2Scaled1.toByteArray())));
+        assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> 
Arrays.equals(b, assignment2Scaled2.toByteArray())));
+        assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> 
Arrays.equals(b, assignment2Scaled3.toByteArray())));
     }
 
     @Test
@@ -468,32 +472,27 @@ public class SchedulerManagerTest {
         invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
 
-        for (int i = 0; i < invocations.size(); i++) {
-            Invocation invocation = invocations.get(i);
-            byte[] send = (byte[]) invocation.getRawArguments()[0];
-            Assignment assignment = parseAssignment(send);
-            Assignment expectedAssignment = createAssignment("worker-1", 
function2, i);
-            Assert.assertEquals(assignment, expectedAssignment);
-        }
-
-        Set<Assignment> allAssignments = new HashSet<>();
+        List<byte[]> allAssignmentBytes = new ArrayList<>();
         invocations.forEach(invocation -> {
-            allAssignments.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]));
+            allAssignmentBytes.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]).toByteArray());
         });
 
         Assignment assignment21 = createAssignment("worker-1", function2, 0);
         Assignment assignment22 = createAssignment("worker-1", function2, 1);
         Assignment assignment23 = createAssignment("worker-1", function2, 2);
 
-        assertTrue(allAssignments.contains(assignment21));
-        assertTrue(allAssignments.contains(assignment22));
-        assertTrue(allAssignments.contains(assignment23));
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment21.toByteArray())));
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment22.toByteArray())));
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment23.toByteArray())));
 
         // make sure we also directly add the assignment to the in memory 
assignment cache in function runtime manager
         verify(functionRuntimeManager, times(3)).processAssignment(any());
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment21));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment22));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment23));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment21.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment22.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment23.toByteArray())));
 
         // updating assignments
         currentAssignments.get("worker-1")
@@ -520,12 +519,12 @@ public class SchedulerManagerTest {
         invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
 
-        Set<Assignment> allAssignments2 = new HashSet<>();
+        List<byte[]> allAssignmentBytes2 = new ArrayList<>();
         invocations.forEach(invocation -> {
-            allAssignments2.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]));
+            allAssignmentBytes2.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]).toByteArray());
         });
 
-        assertTrue(allAssignments2.contains(assignment2Scaled));
+        assertTrue(allAssignmentBytes2.stream().anyMatch(b -> Arrays.equals(b, 
assignment2Scaled.toByteArray())));
 
         // make sure we also directly removed the assignment from the in 
memory assignment cache in
         // function runtime manager
@@ -536,7 +535,8 @@ public class SchedulerManagerTest {
                 
.deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment22.getInstance())));
 
         verify(functionRuntimeManager, times(4)).processAssignment(any());
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment2Scaled));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment2Scaled.toByteArray())));
     }
 
     @Test
@@ -642,21 +642,24 @@ public class SchedulerManagerTest {
         invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
 
-        Set<Assignment> allAssignments = new HashSet<>();
+        List<byte[]> allAssignmentBytes = new ArrayList<>();
         invocations.forEach(invocation -> {
-            allAssignments.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]));
+            allAssignmentBytes.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]).toByteArray());
         });
 
-        assertEquals(allAssignments.size(), 3);
-        assertTrue(allAssignments.contains(assignment21));
-        assertTrue(allAssignments.contains(assignment22));
-        assertTrue(allAssignments.contains(assignment23));
+        assertEquals(allAssignmentBytes.size(), 3);
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment21.toByteArray())));
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment22.toByteArray())));
+        assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, 
assignment23.toByteArray())));
 
         // make sure we also directly add the assignment to the in memory 
assignment cache in function runtime manager
         verify(functionRuntimeManager, times(3)).processAssignment(any());
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment21));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment22));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment23));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment21.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment22.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment23.toByteArray())));
 
         // updating assignments
         currentAssignments.get("worker-1")
@@ -689,21 +692,24 @@ public class SchedulerManagerTest {
         invocations = getMethodInvocationDetails(message, 
TypedMessageBuilder.class.getMethod("value",
                 Object.class));
 
-        Set<Assignment> allAssignments2 = new HashSet<>();
+        List<byte[]> allAssignmentBytes3 = new ArrayList<>();
         invocations.forEach(invocation -> {
-            allAssignments2.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]));
+            allAssignmentBytes3.add(parseAssignment((byte[]) 
invocation.getRawArguments()[0]).toByteArray());
         });
 
-        assertTrue(allAssignments2.contains(assignment2Updated1));
-        assertTrue(allAssignments2.contains(assignment2Updated2));
-        assertTrue(allAssignments2.contains(assignment2Updated3));
+        assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, 
assignment2Updated1.toByteArray())));
+        assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, 
assignment2Updated2.toByteArray())));
+        assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, 
assignment2Updated3.toByteArray())));
 
         // make sure we also directly updated the assignment to the in memory 
assignment cache in
         // function runtime manager
         verify(functionRuntimeManager, times(6)).processAssignment(any());
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment2Updated1));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment2Updated2));
-        verify(functionRuntimeManager, 
times(1)).processAssignment(eq(assignment2Updated3));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment2Updated1.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment2Updated2.toByteArray())));
+        verify(functionRuntimeManager, times(1)).processAssignment(
+                argThat(a -> Arrays.equals(a.toByteArray(), 
assignment2Updated3.toByteArray())));
     }
 
     @Test
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 19e7ddd20b7..b1e05317c86 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -198,7 +198,7 @@ public class FunctionApiV2ResourceTest extends 
AbstractFunctionApiResourceTest {
 
         FunctionDetails actual = getDefaultFunctionInfo();
         assertEquals(
-                functionDetails,
-                actual);
+                functionDetails.toByteArray(),
+                actual.toByteArray());
     }
 }


Reply via email to