mjsax commented on code in PR #17005:
URL: https://github.com/apache/kafka/pull/17005#discussion_r1731648217
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
Review Comment:
We should update the JavaDocs, too. Using `@deprecated` annotation and
pointing to the new config to be used.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return deserializationExceptionHandler();
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
- public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
+ private DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
return
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
}
@SuppressWarnings("WeakerAccess")
- public ProductionExceptionHandler defaultProductionExceptionHandler() {
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ return
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
+ }
+
+ public ProductionExceptionHandler getProductionExceptionHandler() {
Review Comment:
Some comments as above
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2552,7 +2552,8 @@ public Set<TopicPartition> partitions() {
}
@ParameterizedTest
- @MethodSource("data")
+ @MethodSource("data")
+ @SuppressWarnings("deprecation")
Review Comment:
This test verifies that the handler is doing the right thing. Thus, we
should update the test to use the new config and not add this annotation
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.deserialization.exception.handler";
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class
that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
+ /** {@code deserialization.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
= "deserialization.exception.handler";
+
Review Comment:
nit: no empty line
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -889,6 +901,11 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Review Comment:
Please insert at the right place (we keep it ordered alphabetically)
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.deserialization.exception.handler";
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class
that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
+ /** {@code deserialization.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
= "deserialization.exception.handler";
+
+ protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
= "Exception handling class that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
/** {@code default.production.exception.handler} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.production.exception.handler";
+
+ /** {@code production.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG =
"production.exception.handler";
private static final String DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC
= "Exception handling class that implements the
<code>org.apache.kafka.streams.errors.ProductionExceptionHandler</code>
interface.";
Review Comment:
nit: rename variable, dropping `DEFAULT_` to align to config name.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return deserializationExceptionHandler();
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
- public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
+ private DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
Review Comment:
This is public API, and we cannot just make it private -- we can only
deprecate it -- as above, must be covered by the KIP
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.deserialization.exception.handler";
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class
that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
+ /** {@code deserialization.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
= "deserialization.exception.handler";
+
+ protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
= "Exception handling class that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
Review Comment:
Why `protected` -- should be `private`?
nit: missing empty line below.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -555,12 +555,24 @@ public class StreamsConfig extends AbstractConfig {
/** {@code default.deserialization.exception.handler} */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.deserialization.exception.handler";
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class
that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
+ /** {@code deserialization.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
= "deserialization.exception.handler";
+
+ protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
= "Exception handling class that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
/** {@code default.production.exception.handler} */
Review Comment:
Update JavaDocs?
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -93,6 +95,11 @@ public class TopologyConfig extends AbstractConfig {
null,
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Review Comment:
please insert at right place (alphabetically)
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -919,6 +936,11 @@ public class StreamsConfig extends AbstractConfig {
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
Review Comment:
Please insert at the right place (we keep it ordered alphabetically)
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -113,6 +115,7 @@ public static void handleDeserializationFailure(final
DeserializationExceptionHa
throw new StreamsException("Deserialization exception handler is
set to fail upon" +
" a deserialization error. If you would rather have the
streaming pipeline" +
" continue after a deserialization error, please set the " +
+ DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " or the
deprecated exception handler " +
Review Comment:
We should just point to the new one and not refer to the deprecated one at
all. (For this case, we also don't need the `@SuppressWarnings` annotation
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
Review Comment:
```suggestion
public DeserializationExceptionHandler deserializationExceptionHandler()
{
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return deserializationExceptionHandler();
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
- public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
+ private DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
return
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
}
@SuppressWarnings("WeakerAccess")
- public ProductionExceptionHandler defaultProductionExceptionHandler() {
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
Review Comment:
Seems to be redundant to `[get]DeserializationExceptionHandler` above?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java:
##########
@@ -1158,6 +1159,75 @@ public void
shouldSkipOnDeserializationErrorsWhenReprocessing() {
assertEquals(0, stateRestoreCallback.restored.size());
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void verifyExceptionHandlerAcceptNewConfigWhenBothArePresent() {
Review Comment:
This test raised the question if we should log a WARN if both are set?
But why do we test this on `GlobalStateManagerImplTest`? Seems this should
go into `StreamsConfigTest`?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,16 +1929,42 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
Review Comment:
We should not use `get` prefix.
Also, this is public API and the KIP must cover it -- can you update the KIP
accordingly, and send an update to the VOTE thread?
Or: we don't add it as `public` -- I personally always found that having
these method being `public` is a leaky abstraction.... But there was some
disagreement about it: https://github.com/apache/kafka/pull/14548
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -1066,6 +1067,41 @@ public void
shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
assertThat(topologyBuilder.topologyConfigs().parseStoreType(),
equalTo(Materialized.StoreType.IN_MEMORY));
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void exceptionHandlerShouldAcceptNewConfig() {
+ final Properties topologyOverrides = new Properties();
+
topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
+
topologyOverrides.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+
+ final StreamsConfig config = new
StreamsConfig(StreamsTestUtils.getStreamsConfig());
+ final InternalTopologyBuilder topologyBuilder = new
InternalTopologyBuilder(
+ new TopologyConfig(
+ "my-topology",
+ config,
+ topologyOverrides)
+ );
+
+
assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(),
equalTo(LogAndContinueExceptionHandler.class));
+ }
+
+ @Test
+ public void
exceptionHandlerShouldAcceptNewConfigNoOtherDeprecatedConfigPresent() {
Review Comment:
Instead of adding a new test, should we just update an existing test which
uses the old config now, and let is use the new config instead (maybe
`shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps`) ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -259,6 +259,11 @@ public void setDeserializationExceptionHandler(final
DeserializationExceptionHan
this.deserializationExceptionHandler = deserializationExceptionHandler;
}
+ //Visible for testing
+ public DeserializationExceptionHandler
getDeserializationExceptionHandler() {
Review Comment:
```suggestion
public DeserializationExceptionHandler deserializationExceptionHandler()
{
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java:
##########
@@ -1066,6 +1067,41 @@ public void
shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() {
assertThat(topologyBuilder.topologyConfigs().parseStoreType(),
equalTo(Materialized.StoreType.IN_MEMORY));
}
+ @SuppressWarnings("deprecation")
+ @Test
+ public void exceptionHandlerShouldAcceptNewConfig() {
Review Comment:
```suggestion
public void
newDeserializationExceptionHandlerConfigShouldOverwriteOldOne() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]