[
https://issues.apache.org/jira/browse/KAFKA-7620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701426#comment-16701426
]
ASF GitHub Bot commented on KAFKA-7620:
---------------------------------------
ewencp closed pull request #5914: KAFKA-7620: Fix restart logic for TTLs in
WorkerConfigTransformer
URL: https://github.com/apache/kafka/pull/5914
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efcc01d2c45..8889aadbae1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -35,6 +35,7 @@
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static
org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -105,8 +106,8 @@
"indicates that a configuration value will expire in the future.";
private static final String CONFIG_RELOAD_ACTION_DISPLAY = "Reload Action";
- public static final String CONFIG_RELOAD_ACTION_NONE =
Herder.ConfigReloadAction.NONE.toString();
- public static final String CONFIG_RELOAD_ACTION_RESTART =
Herder.ConfigReloadAction.RESTART.toString();
+ public static final String CONFIG_RELOAD_ACTION_NONE =
Herder.ConfigReloadAction.NONE.name().toLowerCase(Locale.ROOT);
+ public static final String CONFIG_RELOAD_ACTION_RESTART =
Herder.ConfigReloadAction.RESTART.name().toLowerCase(Locale.ROOT);
public static final String ERRORS_RETRY_TIMEOUT_CONFIG =
"errors.retry.timeout";
public static final String ERRORS_RETRY_TIMEOUT_DISPLAY = "Retry Timeout
for Errors";
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index 5c7cc1429aa..c572e20b52f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -148,12 +148,6 @@
*/
void restartTask(ConnectorTaskId id, Callback<Void> cb);
- /**
- * Get the configuration reload action.
- * @param connName name of the connector
- */
- ConfigReloadAction connectorConfigReloadAction(final String connName);
-
/**
* Restart the connector.
* @param connName name of the connector
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1b715c70c76..3373d5ce328 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -29,6 +34,8 @@
* retrieved TTL values.
*/
public class WorkerConfigTransformer {
+ private static final Logger log =
LoggerFactory.getLogger(WorkerConfigTransformer.class);
+
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests =
new ConcurrentHashMap<>();
@@ -46,7 +53,16 @@ public WorkerConfigTransformer(Worker worker, Map<String,
ConfigProvider> config
if (configs == null) return null;
ConfigTransformerResult result = configTransformer.transform(configs);
if (connectorName != null) {
- scheduleReload(connectorName, result.ttls());
+ String key = ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+ String action = (String) ConfigDef.parseType(key,
configs.get(key), ConfigDef.Type.STRING);
+ if (action == null) {
+ // The default action is "restart".
+ action = ConnectorConfig.CONFIG_RELOAD_ACTION_RESTART;
+ }
+ ConfigReloadAction reloadAction =
ConfigReloadAction.valueOf(action.toUpperCase(Locale.ROOT));
+ if (reloadAction == ConfigReloadAction.RESTART) {
+ scheduleReload(connectorName, result.ttls());
+ }
}
return result.data();
}
@@ -58,21 +74,19 @@ private void scheduleReload(String connectorName,
Map<String, Long> ttls) {
}
private void scheduleReload(String connectorName, String path, long ttl) {
- Herder herder = worker.herder();
- if (herder.connectorConfigReloadAction(connectorName) ==
Herder.ConfigReloadAction.RESTART) {
- Map<String, HerderRequest> connectorRequests =
requests.get(connectorName);
- if (connectorRequests == null) {
- connectorRequests = new ConcurrentHashMap<>();
- requests.put(connectorName, connectorRequests);
- } else {
- HerderRequest previousRequest = connectorRequests.get(path);
- if (previousRequest != null) {
- // Delete previous request for ttl which is now stale
- previousRequest.cancel();
- }
+ Map<String, HerderRequest> connectorRequests =
requests.get(connectorName);
+ if (connectorRequests == null) {
+ connectorRequests = new ConcurrentHashMap<>();
+ requests.put(connectorName, connectorRequests);
+ } else {
+ HerderRequest previousRequest = connectorRequests.get(path);
+ if (previousRequest != null) {
+ // Delete previous request for ttl which is now stale
+ previousRequest.cancel();
}
- HerderRequest request = herder.restartConnector(ttl,
connectorName, null);
- connectorRequests.put(path, request);
}
+ log.info("Scheduling a restart of connector {} in {} ms",
connectorName, ttl);
+ HerderRequest request = worker.herder().restartConnector(ttl,
connectorName, null);
+ connectorRequests.put(path, request);
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index f2009dbac1e..dc91f356f3d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -61,7 +61,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
@@ -642,13 +641,6 @@ else if (!configState.contains(connName))
);
}
- @Override
- public ConfigReloadAction connectorConfigReloadAction(final String
connName) {
- return ConfigReloadAction.valueOf(
-
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
- .toUpperCase(Locale.ROOT));
- }
-
@Override
public void restartConnector(final String connName, final Callback<Void>
callback) {
restartConnector(0, connName, callback);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40ad9803a2c..fe31c284613 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -42,7 +42,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
@@ -260,13 +259,6 @@ public synchronized void restartTask(ConnectorTaskId
taskId, Callback<Void> cb)
cb.onCompletion(new ConnectException("Failed to start task: " +
taskId), null);
}
- @Override
- public ConfigReloadAction connectorConfigReloadAction(final String
connName) {
- return ConfigReloadAction.valueOf(
-
configState.connectorConfig(connName).get(ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG)
- .toUpperCase(Locale.ROOT));
- }
-
@Override
public synchronized void restartConnector(String connName, Callback<Void>
cb) {
if (!configState.contains(connName))
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
index b5da2646c3d..e30acb15ed4 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java
@@ -28,9 +28,12 @@
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_CONFIG;
+import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONFIG_RELOAD_ACTION_NONE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.powermock.api.easymock.PowerMock.replayAll;
@@ -69,18 +72,18 @@ public void testReplaceVariable() {
@Test
public void testReplaceVariableWithTTL() {
EasyMock.expect(worker.herder()).andReturn(herder);
-
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.NONE);
replayAll();
- Map<String, String> result = configTransformer.transform(MY_CONNECTOR,
Collections.singletonMap(MY_KEY, "${test:testPath:testKeyWithTTL}"));
- assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
+ Map<String, String> props = new HashMap<>();
+ props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
+ props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
+ Map<String, String> result = configTransformer.transform(MY_CONNECTOR,
props);
}
@Test
public void testReplaceVariableWithTTLAndScheduleRestart() {
EasyMock.expect(worker.herder()).andReturn(herder);
-
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR,
null)).andReturn(requestId);
replayAll();
@@ -92,11 +95,9 @@ public void testReplaceVariableWithTTLAndScheduleRestart() {
@Test
public void testReplaceVariableWithTTLFirstCancelThenScheduleRestart() {
EasyMock.expect(worker.herder()).andReturn(herder);
-
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expect(herder.restartConnector(1L, MY_CONNECTOR,
null)).andReturn(requestId);
EasyMock.expect(worker.herder()).andReturn(herder);
-
EasyMock.expect(herder.connectorConfigReloadAction(MY_CONNECTOR)).andReturn(Herder.ConfigReloadAction.RESTART);
EasyMock.expectLastCall();
requestId.cancel();
EasyMock.expectLastCall();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> ConfigProvider is broken for KafkaConnect when TTL is not null
> --------------------------------------------------------------
>
> Key: KAFKA-7620
> URL: https://issues.apache.org/jira/browse/KAFKA-7620
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.0.0, 2.0.1
> Reporter: Ye Ji
> Assignee: Robert Yokota
> Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> If the ConfigData returned by ConfigProvider.get implementations has non-null
> and non-negative ttl, it will trigger infinite recursion, here is an excerpt
> of the stack trace:
> {code:java}
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> at
> org.apache.kafka.connect.runtime.distributed.ClusterConfigState.connectorConfig(ClusterConfigState.java:121)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.connectorConfigReloadAction(DistributedHerder.java:648)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:62)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.scheduleReload(WorkerConfigTransformer.java:56)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:49)
> {code}
> Basically,
> 1) if a non-null ttl is returned from the config provider, connect runtime
> will try to schedule a reload in the future,
> 2) scheduleReload function reads the config again to see if it is a restart
> or not, by calling
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform to
> transform the config
> 3) the transform function calls config provider, and gets a non-null ttl,
> causing scheduleReload being called, we are back to step 1.
> To reproduce, simply fork the provided
> [FileConfigProvider|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java],
> and add a non-negative ttl to the ConfigData returned by the get functions.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)