mjsax commented on code in PR #17005:
URL: https://github.com/apache/kafka/pull/17005#discussion_r1733654478
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -553,15 +553,35 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG =
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
- /** {@code default.deserialization.exception.handler} */
+ /**
+ * {@code default.deserialization.exception.handler}
+ * @deprecated since 4.0.
+ * Use deserialization.exception.handler instead
+ */
@SuppressWarnings("WeakerAccess")
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG =
"default.deserialization.exception.handler";
+ @Deprecated
public static final String
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class
that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
- /** {@code default.production.exception.handler} */
+ /** {@code deserialization.exception.handler} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG
= "deserialization.exception.handler";
+ protected static final String DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC
= "Exception handling class that implements the
<code>org.apache.kafka.streams.errors.DeserializationExceptionHandler</code>
interface.";
+
+ /**
+ * {@code default.production.exception.handler}
+ * @deprecated since 4.0.
+ * Use production.exception.handler instead
Review Comment:
as above
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
+ }
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
+ /**
+ * @deprecated as of kafka 4.0. Use deserializationExceptionHandler()
instead
Review Comment:
```suggestion
* @deprecated as of kafka 4.0; use {@link
#deserializationExceptionHandler()} instead
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
Review Comment:
```suggestion
log.warn("Both the deprecated and new config for deserialization
exception handler are configured. The deprecated one will be ignored.");
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -918,12 +938,7 @@ public class StreamsConfig extends AbstractConfig {
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
- DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
- .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
Review Comment:
Thanks for the side cleanup!
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() {
);
}
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() {
+
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndFailExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
Review Comment:
This one should expect `LogAndContinue` right? (I think that this test
passing right now exposes the bug that you use `getClass()` right now, instead
of `origins().containsKey()`
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -93,6 +95,11 @@ public class TopologyConfig extends AbstractConfig {
null,
Importance.MEDIUM,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
Review Comment:
Seems this was not addressed yet.
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -553,15 +553,35 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG =
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
- /** {@code default.deserialization.exception.handler} */
+ /**
+ * {@code default.deserialization.exception.handler}
+ * @deprecated since 4.0.
+ * Use deserialization.exception.handler instead
Review Comment:
Make one line:
```
@deprecated since 4.0; use {@link
#DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG} instead
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -918,12 +938,7 @@ public class StreamsConfig extends AbstractConfig {
Type.CLASS,
DefaultProductionExceptionHandler.class.getName(),
Importance.MEDIUM,
- DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
- .define(PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
- Type.CLASS,
- LogAndFailProcessingExceptionHandler.class.getName(),
- Importance.MEDIUM,
- PROCESSING_EXCEPTION_HANDLER_CLASS_DOC)
+ PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC)
Review Comment:
Should this stay `DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC`?
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
+ }
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
+ /**
+ * @deprecated as of kafka 4.0. Use deserializationExceptionHandler()
instead
+ * @return DeserializationExceptionHandler
+ */
+ @Deprecated
@SuppressWarnings("WeakerAccess")
public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
return
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
}
+ public ProductionExceptionHandler productionExceptionHandler() {
+ if (getClass(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
Review Comment:
This should work the same as for deserialization handler, including the
check if both old and new configs are set?
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -223,11 +230,15 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
timestampExtractorSupplier = () ->
globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
- if
(isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
topologyOverrides)) {
- deserializationExceptionHandlerSupplier = () ->
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
- log.info("Topology {} is overriding {} to {}", topologyName,
DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG));
+ final String deserializationExceptionHandlerKey =
getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null ?
Review Comment:
as above: `getClass` -> `origins().containsKey()`
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
+ }
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
Review Comment:
as above
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
+ }
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
+ /**
+ * @deprecated as of kafka 4.0. Use deserializationExceptionHandler()
instead
+ * @return DeserializationExceptionHandler
+ */
+ @Deprecated
@SuppressWarnings("WeakerAccess")
public DeserializationExceptionHandler
defaultDeserializationExceptionHandler() {
return
getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
}
+ public ProductionExceptionHandler productionExceptionHandler() {
+ if (getClass(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return
getConfiguredInstance(PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
ProductionExceptionHandler.class);
+ } else {
+ return defaultProductionExceptionHandler();
+ }
+ }
+
@SuppressWarnings("WeakerAccess")
- public ProductionExceptionHandler defaultProductionExceptionHandler() {
+ private ProductionExceptionHandler defaultProductionExceptionHandler() {
Review Comment:
We cannot make public stuff private -- we need to first deprecate it
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
Review Comment:
`getClass` would return the default if not set. I think we would need to use
```
originals().containsKey(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
```
we should add a test if the log line is printed, too (cf helper
`LogCaptureAppender.java`)
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() {
);
}
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
Review Comment:
We can add an assertion for the WARN log line in this test.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() {
);
}
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
Review Comment:
```suggestion
public void
shouldSetAndGetDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1907,13 +1937,38 @@ public TimestampExtractor defaultTimestampExtractor() {
return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
}
+ public DeserializationExceptionHandler deserializationExceptionHandler() {
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null &&
+ getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)
!= null) {
+ log.warn("Both the deprecated and new config for deserialization
exception handler are configured !!");
+ }
+ if (getClass(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG) != null) {
+ return
getConfiguredInstance(DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
DeserializationExceptionHandler.class);
+ } else {
+ return defaultDeserializationExceptionHandler();
+ }
+ }
+
+ /**
+ * @deprecated as of kafka 4.0. Use deserializationExceptionHandler()
instead
+ * @return DeserializationExceptionHandler
Review Comment:
I think we can omit this one
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() {
);
}
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() {
Review Comment:
```suggestion
public void
shouldUseOldDeserializationExceptionHandlerWhenOnlyOldConfigIsSet() {
```
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1622,6 +1624,30 @@ public void testInvalidProcessingExceptionHandler() {
);
}
+ @Test
+ public void testDeserializationExceptionHandlerWhenOnlyNewConfigIsSet() {
+
props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class);
+ streamsConfig = new StreamsConfig(props);
+ assertEquals(LogAndContinueExceptionHandler.class,
streamsConfig.deserializationExceptionHandler().getClass());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testDeserializationExceptionHandlerWhenBothConfigsAreSet() {
Review Comment:
```suggestion
public void
shouldUseNewDeserializationExceptionHandlerWhenBothConfigsAreSet() {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]