C0urante commented on code in PR #6934:
URL: https://github.com/apache/kafka/pull/6934#discussion_r1568959278
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final
@PathParam("connector") String connecto
return response.entity(createdInfo.result()).build();
}
+ @PATCH
+ @Path("/{connector}/config")
+ public Response patchConnectorConfig(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean
forward,
+ final Map<String, String>
connectorConfigPatch) throws Throwable {
+ FutureCallback<Herder.Created<ConnectorInfo>> cb = new
FutureCallback<>();
+ herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
+ Herder.Created<ConnectorInfo> createdInfo =
requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector +
"/config",
+ "PATCH", headers, connectorConfigPatch, new
TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(),
forward);
+ return Response.ok().entity(createdInfo.result()).build();
Review Comment:
Just realizing now that we don't actually specify the status and body of the
REST response in the KIP. I agree with what's here, especially since it matches
the existing `PUT /connectors/{name}/config` endpoint, but it's worth
specifying in the KIP for completeness.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ ClusterConfigState clusterConfigState = new ClusterConfigState(
+ 0,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ expectConfigRefreshAndSnapshot(clusterConfigState);
+
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new
FutureCallback<>();
+ herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+ herder.tick();
+ assertTrue(patchCallback.isDone());
+ ExecutionException exception = assertThrows(ExecutionException.class,
patchCallback::get);
+ assertInstanceOf(NotFoundException.class, exception.getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws Exception {
+ when(member.memberId()).thenReturn("leader");
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(),
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+ Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
Review Comment:
Nit: can we add one more key/value pair that should be unchanged after the
patch is applied? This would catch bugs where the set of post-patch keys is
derived from the patch instead of the combination of the patch and the prior
configuration.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -242,6 +242,19 @@ public Response putConnectorConfig(final
@PathParam("connector") String connecto
return response.entity(createdInfo.result()).build();
}
+ @PATCH
+ @Path("/{connector}/config")
+ public Response patchConnectorConfig(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean
forward,
Review Comment:
Nit:
```suggestion
final @Parameter(hidden = true)
@QueryParam("forward") Boolean forward,
```
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(connectorConfigCb);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ Callback<Herder.Created<ConnectorInfo>> patchCallback =
mock(Callback.class);
+ herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,
patchCallback);
+
+ ArgumentCaptor<NotFoundException> exceptionCaptor =
ArgumentCaptor.forClass(NotFoundException.class);
+ verify(patchCallback).onCompletion(exceptionCaptor.capture(),
isNull());
+ assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " +
CONNECTOR_NAME + " not found");
+ assertNull(exceptionCaptor.getValue().getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws ExecutionException,
InterruptedException, TimeoutException {
+ // Create the connector.
+ Map<String, String> originalConnConfig =
connectorConfig(SourceSink.SOURCE);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
+
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "changed");
+ connConfigPatch.put("foo2", null);
+ connConfigPatch.put("foo3", "added");
+
+ Map<String, String> patchedConnConfig = new
HashMap<>(originalConnConfig);
+ patchedConnConfig.put("foo1", "changed");
+ patchedConnConfig.remove("foo2");
+ patchedConnConfig.put("foo3", "added");
+
+ expectAdd(SourceSink.SOURCE);
+ Connector connectorMock = mock(SourceConnector.class);
+ expectConfigValidation(SourceSink.SOURCE, originalConnConfig,
patchedConnConfig);
+
+ expectConnectorStartingWithoutTasks(originalConnConfig);
+
+ herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false,
createCallback);
+ createCallback.get(1000L, TimeUnit.SECONDS);
+
+ expectConnectorStartingWithoutTasks(patchedConnConfig);
+
+ FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new
FutureCallback<>();
+ herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,
patchCallback);
+
+ Map<String, String> returnedConfig = patchCallback.get(1000L,
TimeUnit.SECONDS).result().config();
+ assertEquals(patchedConnConfig, returnedConfig);
+
+ // Also check the returned config when requested.
+ FutureCallback<Map<String, String>> configCallback = new
FutureCallback<>();
+ herder.connectorConfig(CONNECTOR_NAME, configCallback);
+
+ Map<String, String> returnedConfig2 = configCallback.get(1000L,
TimeUnit.SECONDS);
+ assertEquals(patchedConnConfig, returnedConfig2);
+ }
+
+ private void expectConnectorStartingWithoutTasks(Map<String, String>
config) {
Review Comment:
Nit: just to avoid confusion if someone leverages this method in the future,
could we either rename it to `expectSourceConnectorStartingWithoutTasks`, or
add a `SourceSink connectorType` parameter to it that can be used to control
which kind of connector is expected?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(connectorConfigCb);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ Callback<Herder.Created<ConnectorInfo>> patchCallback =
mock(Callback.class);
+ herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,
patchCallback);
+
+ ArgumentCaptor<NotFoundException> exceptionCaptor =
ArgumentCaptor.forClass(NotFoundException.class);
+ verify(patchCallback).onCompletion(exceptionCaptor.capture(),
isNull());
+ assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " +
CONNECTOR_NAME + " not found");
+ assertNull(exceptionCaptor.getValue().getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws ExecutionException,
InterruptedException, TimeoutException {
+ // Create the connector.
+ Map<String, String> originalConnConfig =
connectorConfig(SourceSink.SOURCE);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
Review Comment:
Same suggestion RE adding another k/v pair that stays unchanged before/after
the patch.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String
connName,
}
}
+ @Override
+ public void patchConnectorConfig(String connName, Map<String, String>
configPatch, Callback<Created<ConnectorInfo>> callback) {
Review Comment:
Should we make this method `synchronized` in order to prevent (or at least
reduce) race conditions?
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -992,4 +992,15 @@ public List<String> setWorkerLoggerLevel(String namespace,
String desiredLevelSt
return loggers.setLevel(namespace, level);
}
+ protected Map<String, String> applyConnectorConfigPatch(Map<String,
String> currentConfig, Map<String, String> configPatch) {
+ Map<String, String> patchedConfig = new HashMap<>(currentConfig);
+ configPatch.forEach((k, v) -> {
+ if (v != null) {
+ patchedConfig.put(k, v);
+ } else {
+ patchedConfig.remove(k);
+ }
+ });
+ return patchedConfig;
+ }
Review Comment:
This feels generic enough that we could probably put it into
[ConnectUtils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java)
or even just
[Utils](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/examples/src/main/java/kafka/examples/Utils.java)
as something like `patchConfig` (if in the former) or `combineMaps`,
`mergeMaps`, or `patchMap` (if in the latter).
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName,
final Map<String, String>
);
}
+ @Override
+ public void patchConnectorConfig(String connName, Map<String, String>
configPatch, Callback<Created<ConnectorInfo>> callback) {
+ log.trace("Submitting connector config patch request {}", connName);
+ addRequest(() -> {
+ ConnectorInfo connectorInfo = connectorInfo(connName);
+ if (connectorInfo == null) {
+ callback.onCompletion(new NotFoundException("Connector " +
connName + " not found", null), null);
+ } else {
+ Map<String, String> patchedConfig =
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+ putConnectorConfig(connName, patchedConfig, true, callback);
Review Comment:
This adds another request to the queue instead of executing the follow-up
logic immediately, which increases the chance for race conditions. Can we fix
that? We could possibly have a synchronous variant of `putConnectorConfig`
whose method body is pulled from
[here](https://github.com/apache/kafka/blob/e63efbc5d771a6e69eb8678db6f88730b4e751e1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1099-L1140),
and invoke that variant instead of the asynchronous one at this line in
`patchConnectorConfig`.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -246,6 +246,31 @@ private synchronized void putConnectorConfig(String
connName,
}
}
+ @Override
+ public void patchConnectorConfig(String connName, Map<String, String>
configPatch, Callback<Created<ConnectorInfo>> callback) {
+ try {
+ ConnectorInfo connectorInfo = connectorInfo(connName);
+ if (connectorInfo == null) {
+ callback.onCompletion(new NotFoundException("Connector " +
connName + " not found", null), null);
+ return;
+ }
+
+ Map<String, String> patchedConfig =
applyConnectorConfigPatch(connectorInfo.config(), configPatch);
+ validateConnectorConfig(patchedConfig, (error, configInfos) -> {
+ if (error != null) {
+ callback.onCompletion(error, null);
+ return;
+ }
+
+ requestExecutorService.submit(
+ () -> putConnectorConfig(connName, patchedConfig,
null, true, callback, configInfos)
+ );
+ });
+ } catch (ConnectException e) {
Review Comment:
I don't know why we limit the catch clause to `ConnectException` in
`deleteConnectorConfig`, but here (and probably there as well) it should be
`Throwable` instead; otherwise, the callback is left dangling.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ ClusterConfigState clusterConfigState = new ClusterConfigState(
+ 0,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ expectConfigRefreshAndSnapshot(clusterConfigState);
+
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new
FutureCallback<>();
+ herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+ herder.tick();
+ assertTrue(patchCallback.isDone());
+ ExecutionException exception = assertThrows(ExecutionException.class,
patchCallback::get);
+ assertInstanceOf(NotFoundException.class, exception.getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws Exception {
+ when(member.memberId()).thenReturn("leader");
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(),
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+ Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
+
+ // The connector is pre-existing due to the mocks.
+
+ ClusterConfigState originalSnapshot = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 0),
+ Collections.singletonMap(CONN1, originalConnConfig),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ expectConfigRefreshAndSnapshot(originalSnapshot);
+ when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+ expectMemberPoll();
+
+ // Patch the connector config.
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "changed");
+ connConfigPatch.put("foo2", null);
+ connConfigPatch.put("foo3", "added");
+
+ Map<String, String> patchedConnConfig = new
HashMap<>(originalConnConfig);
+ patchedConnConfig.put("foo1", "changed");
+ patchedConnConfig.remove("foo2");
+ patchedConnConfig.put("foo3", "added");
+
+ expectMemberEnsureActive();
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(),
true);
+ when(worker.getPlugins()).thenReturn(plugins);
Review Comment:
This part is unused, which causes the `DistributedHerder` test suite to fail
when run as a whole thanks to `@RunWith(MockitoJUnitRunner.StrictStubs.class)`.
We can and should remove it:
```suggestion
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1144,6 +1144,21 @@ public void putConnectorConfig(final String connName,
final Map<String, String>
);
}
+ @Override
+ public void patchConnectorConfig(String connName, Map<String, String>
configPatch, Callback<Created<ConnectorInfo>> callback) {
+ log.trace("Submitting connector config patch request {}", connName);
+ addRequest(() -> {
Review Comment:
Can we do a check to make sure we're the leader at the beginning of the
request body here? This would reduce (but not completely eliminate) the chance
for race conditions.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(connectorConfigCb);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ Callback<Herder.Created<ConnectorInfo>> patchCallback =
mock(Callback.class);
+ herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,
patchCallback);
+
+ ArgumentCaptor<NotFoundException> exceptionCaptor =
ArgumentCaptor.forClass(NotFoundException.class);
+ verify(patchCallback).onCompletion(exceptionCaptor.capture(),
isNull());
+ assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " +
CONNECTOR_NAME + " not found");
+ assertNull(exceptionCaptor.getValue().getCause());
Review Comment:
Just curious, what was the rationale for this assertion?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java:
##########
@@ -751,6 +754,74 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(connectorConfigCb);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ Callback<Herder.Created<ConnectorInfo>> patchCallback =
mock(Callback.class);
+ herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch,
patchCallback);
+
+ ArgumentCaptor<NotFoundException> exceptionCaptor =
ArgumentCaptor.forClass(NotFoundException.class);
+ verify(patchCallback).onCompletion(exceptionCaptor.capture(),
isNull());
+ assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " +
CONNECTOR_NAME + " not found");
+ assertNull(exceptionCaptor.getValue().getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws ExecutionException,
InterruptedException, TimeoutException {
+ // Create the connector.
+ Map<String, String> originalConnConfig =
connectorConfig(SourceSink.SOURCE);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
+
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "changed");
+ connConfigPatch.put("foo2", null);
+ connConfigPatch.put("foo3", "added");
+
+ Map<String, String> patchedConnConfig = new
HashMap<>(originalConnConfig);
+ patchedConnConfig.put("foo1", "changed");
+ patchedConnConfig.remove("foo2");
+ patchedConnConfig.put("foo3", "added");
+
+ expectAdd(SourceSink.SOURCE);
+ Connector connectorMock = mock(SourceConnector.class);
Review Comment:
Unused variable?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2336,6 +2336,95 @@ public void testPutConnectorConfig() throws Exception {
verifyNoMoreInteractions(worker, member, configBackingStore,
statusBackingStore);
}
+ @Test
+ public void testPatchConnectorConfigNotFound() {
+ ClusterConfigState clusterConfigState = new ClusterConfigState(
+ 0,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ expectConfigRefreshAndSnapshot(clusterConfigState);
+
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "baz1");
+
+ FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new
FutureCallback<>();
+ herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
+ herder.tick();
+ assertTrue(patchCallback.isDone());
+ ExecutionException exception = assertThrows(ExecutionException.class,
patchCallback::get);
+ assertInstanceOf(NotFoundException.class, exception.getCause());
+ }
+
+ @Test
+ public void testPatchConnectorConfig() throws Exception {
+ when(member.memberId()).thenReturn("leader");
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(),
true);
+
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
+
+ Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
+ originalConnConfig.put("foo1", "bar1");
+ originalConnConfig.put("foo2", "bar2");
+
+ // The connector is pre-existing due to the mocks.
+
+ ClusterConfigState originalSnapshot = new ClusterConfigState(
+ 1,
+ null,
+ Collections.singletonMap(CONN1, 0),
+ Collections.singletonMap(CONN1, originalConnConfig),
+ Collections.singletonMap(CONN1, TargetState.STARTED),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptySet());
+ expectConfigRefreshAndSnapshot(originalSnapshot);
+ when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
+
+ expectMemberPoll();
+
+ // Patch the connector config.
+ Map<String, String> connConfigPatch = new HashMap<>();
+ connConfigPatch.put("foo1", "changed");
+ connConfigPatch.put("foo2", null);
+ connConfigPatch.put("foo3", "added");
+
+ Map<String, String> patchedConnConfig = new
HashMap<>(originalConnConfig);
+ patchedConnConfig.put("foo1", "changed");
+ patchedConnConfig.remove("foo2");
+ patchedConnConfig.put("foo3", "added");
+
+ expectMemberEnsureActive();
+ expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(),
true);
+ when(worker.getPlugins()).thenReturn(plugins);
+
+ ArgumentCaptor<Callback<ConfigInfos>> validateCallback =
ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
+ validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
+ return null;
+ }).when(herder).validateConnectorConfig(eq(patchedConnConfig),
validateCallback.capture());
+
+ // This is effectively the main check of this test:
+ // we validate that what's written in the config storage is the
patched config.
+ doNothing().when(configBackingStore).putConnectorConfig(eq(CONN1),
eq(patchedConnConfig), isNull());
Review Comment:
Nit: could we change this to an explicit `verify` invocation?
```suggestion
// This is effectively the main check of this test:
// we validate that what's written in the config storage is the
patched config.
verify(configBackingStore).putConnectorConfig(eq(CONN1),
eq(patchedConnConfig), isNull());
```
And, if we want to be extra fancy, we can make sure that no other configs
were written:
```java
verifyNoMoreInteractions(configBackingStore);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]