mjsax commented on code in PR #17205:
URL: https://github.com/apache/kafka/pull/17205#discussion_r1760466560
##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -46,7 +46,6 @@ default ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], b
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
- @SuppressWarnings("deprecation")
Review Comment:
Side cleanup.
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java:
##########
@@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data)
{
}
@Test
- @SuppressWarnings("unchecked")
public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01,
(byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, null, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertNull(result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertNull(result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new
SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new
SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null,
srw);
+ final SubscriptionResponseWrapper<String> result =
srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
- assertThrows(UnsupportedVersionException.class,
- () -> new SubscriptionResponseWrapper<>(hashedValue,
"foreignValue", (byte) 0xFF, 1));
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> new SubscriptionResponseWrapper<>(hashedValue,
"foreignValue", (byte) -1, 1)
Review Comment:
Minor change for this test, too, passing in `-1` now to make it more logical.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java:
##########
@@ -66,9 +66,8 @@ public void setIfUnset(final SerdeGetter getter) {
public byte[] serialize(final String topic, final
SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized
data}
- //7-bit (0x7F) maximum for data version.
- if (Byte.compare((byte) 0x7F, data.version()) < 0) {
Review Comment:
This check is always `false`. Byte has a range from `-128...127`; thus, this
comparison is incorrect.
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java:
##########
@@ -73,68 +69,95 @@ public T deserialize(final String topic, final byte[] data)
{
}
@Test
- @SuppressWarnings("unchecked")
Review Comment:
Side cleanup... (same other tests below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]