This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch lp-functions-proto in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b81edb0592447502907b196dc9303161439afc03 Author: Matteo Merli <[email protected]> AuthorDate: Sun Mar 29 16:57:20 2026 -0700 [improve][fn] Fix LightProto equality issues in worker and test classes LightProto generated classes do not implement equals()/hashCode(), using JVM identity-based defaults. This broke several worker classes and tests that relied on structural equality of proto objects. Production fixes: - FunctionRuntimeManager: replace Assignment/Instance .equals() with Arrays.equals(a.toByteArray(), b.toByteArray()) comparisons - MembershipManager: replace Map<Instance,Long> with Map<String,Long> keyed by fully-qualified instance ID string to avoid broken HashMap behavior with LightProto Instance keys - SchedulerManager: replace Instance .equals() with byte-array comparison when checking whether an existing assignment's instance has changed Test fixes: - FunctionRuntimeManagerTest, MembershipManagerTest, SchedulerManagerTest, FunctionMetaDataManagerTest, FunctionApiV2ResourceTest: replace all assertEquals/eq()/argThat()/Set.contains() patterns that compare LightProto objects structurally with Arrays.equals(toByteArray()) equivalents --- .../functions/worker/FunctionRuntimeManager.java | 5 +- .../pulsar/functions/worker/MembershipManager.java | 48 +++++------ .../pulsar/functions/worker/SchedulerManager.java | 3 +- .../worker/FunctionMetaDataManagerTest.java | 2 - .../worker/FunctionRuntimeManagerTest.java | 55 ++++++++----- .../functions/worker/MembershipManagerTest.java | 15 ++-- .../functions/worker/SchedulerManagerTest.java | 96 ++++++++++++---------- .../rest/api/v2/FunctionApiV2ResourceTest.java | 4 +- 8 files changed, 126 insertions(+), 102 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index af71c478ed4..d44068b68db 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -20,6 +20,7 @@ package org.apache.pulsar.functions.worker; import com.google.common.annotations.VisibleForTesting; import java.net.URI; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -721,7 +722,7 @@ public class FunctionRuntimeManager implements AutoCloseable { Assignment existingAssignment = this.findAssignment(assignment); // potential updates need to happen - if (!existingAssignment.equals(assignment)) { + if (!Arrays.equals(existingAssignment.toByteArray(), assignment.toByteArray())) { FunctionRuntimeInfo functionRuntimeInfo = get_FunctionRuntimeInfo(fullyQualifiedInstanceId); // for externally managed functions we don't really care about which worker @@ -734,7 +735,7 @@ public class FunctionRuntimeManager implements AutoCloseable { if (runtimeFactory.externallyManaged()) { // change in metadata thus need to potentially restart - if (!assignment.getInstance().equals(existingAssignment.getInstance())) { + if (!Arrays.equals(assignment.getInstance().toByteArray(), existingAssignment.getInstance().toByteArray())) { //stop function if (functionRuntimeInfo != null) { this.conditionallyStopFunction(functionRuntimeInfo); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index dea2be99860..efe4fd45115 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -22,7 +22,6 @@ import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeat import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -55,9 +54,10 @@ public class MembershipManager implements AutoCloseable { private static final String WORKER_IDENTIFIER = "id"; // How long functions have remained assigned or scheduled on a failed node - // FullyQualifiedFunctionName -> time in millis + // FullyQualifiedInstanceId (String) -> time in millis + // Uses String key instead of Instance to avoid relying on LightProto's identity-based hashCode/equals @VisibleForTesting - Map<Instance, Long> unsignedFunctionDurations = new HashMap<>(); + Map<String, Long> unsignedFunctionDurations = new HashMap<>(); MembershipManager(WorkerService workerService, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin) { this.workerConfig = workerService.getWorkerConfig(); @@ -134,12 +134,13 @@ public class MembershipManager implements AutoCloseable { long currentTimeMs = System.currentTimeMillis(); // remove functions - Iterator<Map.Entry<Instance, Long>> it = unsignedFunctionDurations.entrySet().iterator(); + Iterator<Map.Entry<String, Long>> it = unsignedFunctionDurations.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<Instance, Long> entry = it.next(); - String fullyQualifiedFunctionName = FunctionCommon.getFullyQualifiedName( - entry.getKey().getFunctionMetaData().getFunctionDetails()); - String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(entry.getKey()); + Map.Entry<String, Long> entry = it.next(); + String fullyQualifiedInstanceId = entry.getKey(); + // derive function name from instance id (strip the ":instanceId" suffix) + String fullyQualifiedFunctionName = + fullyQualifiedInstanceId.substring(0, fullyQualifiedInstanceId.lastIndexOf(':')); //remove functions that don't exist anymore if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) { it.remove(); @@ -164,17 +165,18 @@ public class MembershipManager implements AutoCloseable { functionMetaData.getFunctionDetails().getName(), currentAssignments); - Set<Instance> assignedInstances = assignments.stream() - .map(assignment -> assignment.getInstance()) + Set<String> assignedInstanceIds = assignments.stream() + .map(assignment -> FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance())) .collect(Collectors.toSet()); - Set<Instance> instances = new HashSet<>(SchedulerManager.computeInstances(functionMetaData, - functionRuntimeManager.getRuntimeFactory().externallyManaged())); + List<Instance> instances = SchedulerManager.computeInstances(functionMetaData, + functionRuntimeManager.getRuntimeFactory().externallyManaged()); for (Instance instance : instances) { - if (!assignedInstances.contains(instance)) { - if (!this.unsignedFunctionDurations.containsKey(instance)) { - this.unsignedFunctionDurations.put(instance, currentTimeMs); + String instanceId = FunctionCommon.getFullyQualifiedInstanceId(instance); + if (!assignedInstanceIds.contains(instanceId)) { + if (!this.unsignedFunctionDurations.containsKey(instanceId)) { + this.unsignedFunctionDurations.put(instanceId, currentTimeMs); } } } @@ -191,8 +193,9 @@ public class MembershipManager implements AutoCloseable { if (checkHeartBeatFunction(instance) != null) { continue; } - if (!this.unsignedFunctionDurations.containsKey(instance)) { - this.unsignedFunctionDurations.put(instance, currentTimeMs); + String instanceId = FunctionCommon.getFullyQualifiedInstanceId(instance); + if (!this.unsignedFunctionDurations.containsKey(instanceId)) { + this.unsignedFunctionDurations.put(instanceId, currentTimeMs); } } } @@ -200,17 +203,16 @@ public class MembershipManager implements AutoCloseable { boolean triggerScheduler = false; // check unassigned - Collection<Instance> needSchedule = new LinkedList<>(); + Collection<String> needSchedule = new LinkedList<>(); Collection<Assignment> needRemove = new LinkedList<>(); Map<String, Integer> numRemoved = new HashMap<>(); - for (Map.Entry<Instance, Long> entry : this.unsignedFunctionDurations.entrySet()) { - Instance instance = entry.getKey(); + for (Map.Entry<String, Long> entry : this.unsignedFunctionDurations.entrySet()) { + String instanceId = entry.getKey(); long unassignedDurationMs = entry.getValue(); if (currentTimeMs - unassignedDurationMs > this.workerConfig.getRescheduleTimeoutMs()) { - needSchedule.add(instance); + needSchedule.add(instanceId); // remove assignment from failed node - Assignment assignment = - assignmentMap.get(FunctionCommon.getFullyQualifiedInstanceId(instance)); + Assignment assignment = assignmentMap.get(instanceId); if (assignment != null) { needRemove.add(assignment); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 1d89dfb195e..d5658db296f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -398,7 +399,7 @@ public class SchedulerManager implements AutoCloseable { Assignment assignment = entry.getValue(); Instance instance = allInstances.get(fullyQualifiedInstanceId); - if (!assignment.getInstance().equals(instance)) { + if (!Arrays.equals(assignment.getInstance().toByteArray(), instance.toByteArray())) { Assignment updatedAssignment = new Assignment(); updatedAssignment.copyFrom(assignment); updatedAssignment.setInstance().copyFrom(instance); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index 3c353d95db8..96803d32d24 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -308,7 +308,6 @@ public class FunctionMetaDataManagerTest { verify(functionMetaDataManager, times(1)).processUpdate (any(FunctionMetaData.class)); - verify(functionMetaDataManager).processUpdate(serviceRequest.getFunctionMetaData()); serviceRequest = new ServiceRequest() .setServiceRequestType(ServiceRequest.ServiceRequestType.INITIALIZE); @@ -322,7 +321,6 @@ public class FunctionMetaDataManagerTest { verify(functionMetaDataManager, times(1)).processDeregister( any(FunctionMetaData.class)); - verify(functionMetaDataManager).processDeregister(serviceRequest.getFunctionMetaData()); } @Test diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 8cb6660fbe9..b84755cbec7 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -39,6 +39,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.Unpooled; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -178,13 +179,16 @@ public class FunctionRuntimeManagerTest { .get("test-tenant/test-namespace/func-2:0"), assignment2); verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).startFunction(argThat( - functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .equals(function1))); + functionRuntimeInfo -> Arrays.equals( + functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(), + function1.toByteArray()))); verify(functionActioner, times(0)).stopFunction(any(FunctionRuntimeInfo.class)); assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), - new FunctionRuntimeInfo().setFunctionInstance(createInstance(function1, 0))); + assertEquals( + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") + .getFunctionInstance().toByteArray(), + createInstance(function1, 0).toByteArray()); } } @@ -265,8 +269,9 @@ public class FunctionRuntimeManagerTest { verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner, times(1)).terminateFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).terminateFunction(argThat( - functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .equals(function1))); + functionRuntimeInfo -> Arrays.equals( + functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(), + function1.toByteArray()))); assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 0); } @@ -349,13 +354,15 @@ public class FunctionRuntimeManagerTest { verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).stopFunction(argThat( - functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .equals(function2))); + functionRuntimeInfo -> Arrays.equals( + functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(), + function2.toByteArray()))); verify(functionActioner, times(1)).startFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).startFunction(argThat( - functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance().getFunctionMetaData() - .equals(function2))); + functionRuntimeInfo -> Arrays.equals( + functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(), + function2.toByteArray()))); assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 2); assertEquals(functionRuntimeManager.workerIdToAssignments.size(), 1); @@ -379,7 +386,8 @@ public class FunctionRuntimeManagerTest { verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).stopFunction(argThat(functionRuntimeInfo -> - functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().equals(function2))); + Arrays.equals(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().toByteArray(), + function2.toByteArray()))); verify(functionActioner, times(0)).startFunction(any(FunctionRuntimeInfo.class)); @@ -486,8 +494,10 @@ public class FunctionRuntimeManagerTest { .get("worker-2")); assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), - functionRuntimeInfo); + assertEquals( + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") + .getFunctionInstance().toByteArray(), + functionRuntimeInfo.getFunctionInstance().toByteArray()); } } @@ -609,12 +619,15 @@ public class FunctionRuntimeManagerTest { verify(functionActioner, times(0)).terminateFunction(any(FunctionRuntimeInfo.class)); verify(functionActioner).startFunction( - argThat(functionRuntimeInfo -> functionRuntimeInfo.getFunctionInstance() - .equals(assignment1.getInstance()))); + argThat(functionRuntimeInfo -> Arrays.equals( + functionRuntimeInfo.getFunctionInstance().toByteArray(), + assignment1.getInstance().toByteArray()))); assertEquals(functionRuntimeManager.functionRuntimeInfos.size(), 1); - assertEquals(functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0"), - new FunctionRuntimeInfo().setFunctionInstance(createInstance(function1, 0))); + assertEquals( + functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") + .getFunctionInstance().toByteArray(), + createInstance(function1, 0).toByteArray()); // verify no errors occurred verify(errorNotifier, times(0)).triggerError(any()); @@ -741,16 +754,16 @@ public class FunctionRuntimeManagerTest { assertEquals( functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") - .getFunctionInstance(), - functionRuntimeInfo.getFunctionInstance()); + .getFunctionInstance().toByteArray(), + functionRuntimeInfo.getFunctionInstance().toByteArray()); assertNotNull( functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") .getRuntimeSpawner()); assertEquals( functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") - .getRuntimeSpawner().getInstanceConfig().getFunctionDetails(), - function1.getFunctionDetails()); + .getRuntimeSpawner().getInstanceConfig().getFunctionDetails().toByteArray(), + function1.getFunctionDetails().toByteArray()); assertEquals( functionRuntimeManager.functionRuntimeInfos.get("test-tenant/test-namespace/func-1:0") .getRuntimeSpawner().getInstanceConfig().getInstanceId(), diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 03c5e38d5d9..b572cd8196b 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.functions.proto.FunctionMetaData; import org.apache.pulsar.functions.proto.Instance; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; +import org.apache.pulsar.functions.utils.FunctionCommon; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -237,10 +238,11 @@ public class MembershipManagerTest { verify(functionRuntimeManager, times(0)).removeAssignments(any()); assertEquals(membershipManager.unsignedFunctionDurations.size(), 1); Instance instance = createInstance(function2, 0); - assertNotNull(membershipManager.unsignedFunctionDurations.get(instance)); + String instanceId = FunctionCommon.getFullyQualifiedInstanceId(instance); + assertNotNull(membershipManager.unsignedFunctionDurations.get(instanceId)); - membershipManager.unsignedFunctionDurations.put(instance, - membershipManager.unsignedFunctionDurations.get(instance) - 30001); + membershipManager.unsignedFunctionDurations.put(instanceId, + membershipManager.unsignedFunctionDurations.get(instanceId) - 30001); membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager); @@ -315,10 +317,11 @@ public class MembershipManagerTest { verify(functionRuntimeManager, times(0)).removeAssignments(any()); assertEquals(membershipManager.unsignedFunctionDurations.size(), 1); Instance instance = createInstance(function2, 0); - assertNotNull(membershipManager.unsignedFunctionDurations.get(instance)); + String instanceId = FunctionCommon.getFullyQualifiedInstanceId(instance); + assertNotNull(membershipManager.unsignedFunctionDurations.get(instanceId)); - membershipManager.unsignedFunctionDurations.put(instance, - membershipManager.unsignedFunctionDurations.get(instance) - 30001); + membershipManager.unsignedFunctionDurations.put(instanceId, + membershipManager.unsignedFunctionDurations.get(instanceId) - 30001); membershipManager.checkFailures(functionMetaDataManager, functionRuntimeManager, schedulerManager); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index d83a09c90f8..d234d921cbe 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; @@ -33,6 +34,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -290,10 +293,11 @@ public class SchedulerManagerTest { log.info("assignments: {}", assignments); Assignment assignment2 = createAssignment("worker-1", function2, 0); - Assert.assertEquals(assignment2, assignments); + Assert.assertEquals(assignment2.toByteArray(), assignments.toByteArray()); // make sure we also directly added the assignment to in memory assignment cache in function runtime manager - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2)); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment2.toByteArray()))); } @Test @@ -395,7 +399,7 @@ public class SchedulerManagerTest { log.info("assignments: {}", assignments); Assignment assignment2 = createAssignment("worker-1", function2, 0); - Assert.assertEquals(assignments, assignment2); + Assert.assertEquals(assignments.toByteArray(), assignment2.toByteArray()); // updating assignments currentAssignments.get("worker-1") @@ -420,14 +424,14 @@ public class SchedulerManagerTest { invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Set<Assignment> allAssignments = new HashSet<>(); + List<byte[]> allAssignmentBytesScaled = new ArrayList<>(); invocations.forEach(invocation -> { - allAssignments.add(parseAssignment((byte[]) invocation.getRawArguments()[0])); + allAssignmentBytesScaled.add(parseAssignment((byte[]) invocation.getRawArguments()[0]).toByteArray()); }); - assertTrue(allAssignments.contains(assignment2Scaled1)); - assertTrue(allAssignments.contains(assignment2Scaled2)); - assertTrue(allAssignments.contains(assignment2Scaled3)); + assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> Arrays.equals(b, assignment2Scaled1.toByteArray()))); + assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> Arrays.equals(b, assignment2Scaled2.toByteArray()))); + assertTrue(allAssignmentBytesScaled.stream().anyMatch(b -> Arrays.equals(b, assignment2Scaled3.toByteArray()))); } @Test @@ -468,32 +472,27 @@ public class SchedulerManagerTest { invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - for (int i = 0; i < invocations.size(); i++) { - Invocation invocation = invocations.get(i); - byte[] send = (byte[]) invocation.getRawArguments()[0]; - Assignment assignment = parseAssignment(send); - Assignment expectedAssignment = createAssignment("worker-1", function2, i); - Assert.assertEquals(assignment, expectedAssignment); - } - - Set<Assignment> allAssignments = new HashSet<>(); + List<byte[]> allAssignmentBytes = new ArrayList<>(); invocations.forEach(invocation -> { - allAssignments.add(parseAssignment((byte[]) invocation.getRawArguments()[0])); + allAssignmentBytes.add(parseAssignment((byte[]) invocation.getRawArguments()[0]).toByteArray()); }); Assignment assignment21 = createAssignment("worker-1", function2, 0); Assignment assignment22 = createAssignment("worker-1", function2, 1); Assignment assignment23 = createAssignment("worker-1", function2, 2); - assertTrue(allAssignments.contains(assignment21)); - assertTrue(allAssignments.contains(assignment22)); - assertTrue(allAssignments.contains(assignment23)); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment21.toByteArray()))); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment22.toByteArray()))); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment23.toByteArray()))); // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager verify(functionRuntimeManager, times(3)).processAssignment(any()); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment21)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment22)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment23)); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment21.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment22.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment23.toByteArray()))); // updating assignments currentAssignments.get("worker-1") @@ -520,12 +519,12 @@ public class SchedulerManagerTest { invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Set<Assignment> allAssignments2 = new HashSet<>(); + List<byte[]> allAssignmentBytes2 = new ArrayList<>(); invocations.forEach(invocation -> { - allAssignments2.add(parseAssignment((byte[]) invocation.getRawArguments()[0])); + allAssignmentBytes2.add(parseAssignment((byte[]) invocation.getRawArguments()[0]).toByteArray()); }); - assertTrue(allAssignments2.contains(assignment2Scaled)); + assertTrue(allAssignmentBytes2.stream().anyMatch(b -> Arrays.equals(b, assignment2Scaled.toByteArray()))); // make sure we also directly removed the assignment from the in memory assignment cache in // function runtime manager @@ -536,7 +535,8 @@ public class SchedulerManagerTest { .deleteAssignment(eq(FunctionCommon.getFullyQualifiedInstanceId(assignment22.getInstance()))); verify(functionRuntimeManager, times(4)).processAssignment(any()); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Scaled)); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment2Scaled.toByteArray()))); } @Test @@ -642,21 +642,24 @@ public class SchedulerManagerTest { invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Set<Assignment> allAssignments = new HashSet<>(); + List<byte[]> allAssignmentBytes = new ArrayList<>(); invocations.forEach(invocation -> { - allAssignments.add(parseAssignment((byte[]) invocation.getRawArguments()[0])); + allAssignmentBytes.add(parseAssignment((byte[]) invocation.getRawArguments()[0]).toByteArray()); }); - assertEquals(allAssignments.size(), 3); - assertTrue(allAssignments.contains(assignment21)); - assertTrue(allAssignments.contains(assignment22)); - assertTrue(allAssignments.contains(assignment23)); + assertEquals(allAssignmentBytes.size(), 3); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment21.toByteArray()))); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment22.toByteArray()))); + assertTrue(allAssignmentBytes.stream().anyMatch(b -> Arrays.equals(b, assignment23.toByteArray()))); // make sure we also directly add the assignment to the in memory assignment cache in function runtime manager verify(functionRuntimeManager, times(3)).processAssignment(any()); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment21)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment22)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment23)); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment21.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment22.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment23.toByteArray()))); // updating assignments currentAssignments.get("worker-1") @@ -689,21 +692,24 @@ public class SchedulerManagerTest { invocations = getMethodInvocationDetails(message, TypedMessageBuilder.class.getMethod("value", Object.class)); - Set<Assignment> allAssignments2 = new HashSet<>(); + List<byte[]> allAssignmentBytes3 = new ArrayList<>(); invocations.forEach(invocation -> { - allAssignments2.add(parseAssignment((byte[]) invocation.getRawArguments()[0])); + allAssignmentBytes3.add(parseAssignment((byte[]) invocation.getRawArguments()[0]).toByteArray()); }); - assertTrue(allAssignments2.contains(assignment2Updated1)); - assertTrue(allAssignments2.contains(assignment2Updated2)); - assertTrue(allAssignments2.contains(assignment2Updated3)); + assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, assignment2Updated1.toByteArray()))); + assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, assignment2Updated2.toByteArray()))); + assertTrue(allAssignmentBytes3.stream().anyMatch(b -> Arrays.equals(b, assignment2Updated3.toByteArray()))); // make sure we also directly updated the assignment to the in memory assignment cache in // function runtime manager verify(functionRuntimeManager, times(6)).processAssignment(any()); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated1)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated2)); - verify(functionRuntimeManager, times(1)).processAssignment(eq(assignment2Updated3)); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment2Updated1.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment2Updated2.toByteArray()))); + verify(functionRuntimeManager, times(1)).processAssignment( + argThat(a -> Arrays.equals(a.toByteArray(), assignment2Updated3.toByteArray()))); } @Test diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 19e7ddd20b7..b1e05317c86 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -198,7 +198,7 @@ public class FunctionApiV2ResourceTest extends AbstractFunctionApiResourceTest { FunctionDetails actual = getDefaultFunctionInfo(); assertEquals( - functionDetails, - actual); + functionDetails.toByteArray(), + actual.toByteArray()); } }
