mjsax commented on code in PR #17205:
URL: https://github.com/apache/kafka/pull/17205#discussion_r1760466560


##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -46,7 +46,6 @@ default ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], b
      * @param record The record that failed to produce
      * @param exception The exception that occurred during production
      */
-    @SuppressWarnings("deprecation")

Review Comment:
   Side cleanup.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java:
##########
@@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data) 
{
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void ShouldSerdeWithNonNullsTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullForeignValueTest() {
         final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, 
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, null, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertNull(result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertNull(result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullHashTest() {
         final long[] hashedValue = null;
         final String foreignValue = "foreignValue";
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldSerdeWithNullsTest() {
         final long[] hashedValue = null;
         final String foreignValue = null;
         final SubscriptionResponseWrapper<String> srw = new 
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
-        final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
-        final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
-        final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
+        try (final SubscriptionResponseWrapperSerde<String> srwSerde = new 
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+            final byte[] serResponse = srwSerde.serializer().serialize(null, 
srw);
+            final SubscriptionResponseWrapper<String> result = 
srwSerde.deserializer().deserialize(null, serResponse);
 
-        assertArrayEquals(hashedValue, result.originalValueHash());
-        assertEquals(foreignValue, result.foreignValue());
-        assertNull(result.primaryPartition());
+            assertArrayEquals(hashedValue, result.originalValueHash());
+            assertEquals(foreignValue, result.foreignValue());
+            assertNull(result.primaryPartition());
+        }
     }
 
     @Test
     public void shouldThrowExceptionWithBadVersionTest() {
         final long[] hashedValue = null;
-        assertThrows(UnsupportedVersionException.class,
-            () -> new SubscriptionResponseWrapper<>(hashedValue, 
"foreignValue", (byte) 0xFF, 1));
+        assertThrows(
+            UnsupportedVersionException.class,
+            () -> new SubscriptionResponseWrapper<>(hashedValue, 
"foreignValue", (byte) -1, 1)

Review Comment:
   Minor change for this test, too, passing in `-1` now to make it more logical.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java:
##########
@@ -66,9 +66,8 @@ public void setIfUnset(final SerdeGetter getter) {
         public byte[] serialize(final String topic, final 
SubscriptionResponseWrapper<V> data) {
             
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized 
data}
 
-            //7-bit (0x7F) maximum for data version.
-            if (Byte.compare((byte) 0x7F, data.version()) < 0) {

Review Comment:
   This check is always `false`. Byte has a range from `-128...127`; thus, this 
comparison is incorrect.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java:
##########
@@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data) 
{
     }
 
     @Test
-    @SuppressWarnings("unchecked")

Review Comment:
   Side cleanup... (same other tests below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to