gharris1727 commented on code in PR #18325:
URL: https://github.com/apache/kafka/pull/18325#discussion_r2058956971
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir,
Path binDir) throws IOExc
if (!success) {
throw new RuntimeException("Failed to compile test plugin:\n"
+ writer);
}
+ } finally {
+ if (!replacements.isEmpty()) {
+ sourceFiles.forEach(File::delete);
+ }
+ }
+ }
+
+ private static File copyAndReplace(File source, Map<String, String>
replacements) throws RuntimeException {
+ if (replacements.isEmpty()) {
+ return source;
+ }
+ try {
+ String content = Files.readString(source.toPath());
+ for (Map.Entry<String, String> entry : replacements.entrySet()) {
+ content = content.replace(entry.getKey(), entry.getValue());
+ }
+ File tmpFile = new File(System.getProperty("java.io.tmpdir") +
File.separator + source.getName());
Review Comment:
nit: use Files.createTempFile?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -52,6 +52,7 @@
import javax.tools.StandardJavaFileManager;
import javax.tools.ToolProvider;
+
Review Comment:
nit: newline
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+ private static final String VERSION_PLACEHOLDER =
"PLACEHOLDER_FOR_VERSION";
+
+ public enum VersionedTestPlugin {
+
+ SINK_CONNECTOR("sampling-connector",
"test.plugins.VersionedSamplingSinkConnector"),
+ SOURCE_CONNECTOR("versioned-source-connector",
"test.plugins.VersionedSamplingSourceConnector"),
+ CONVERTER("sampling-converter",
"test.plugins.VersionedSamplingConverter"),
+ HEADER_CONVERTER("sampling-header-converter",
"test.plugins.VersionedSamplingHeaderConverter"),
+ TRANSFORMATION("versioned-transformation",
"test.plugins.VersionedTransformation"),
+ PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+ private final String resourceDir;
+ private final String className;
+
+ VersionedTestPlugin(String resourceDir, String className) {
+ this.resourceDir = resourceDir;
+ this.className = className;
+ }
+
+ public String resourceDir() {
+ return resourceDir;
+ }
+
+ public String className() {
+ return className;
+ }
+ }
+
+ public static class BuildInfo {
+
+ private final VersionedTestPlugin plugin;
+ private final String version;
+ private String location;
+
+ private BuildInfo(VersionedTestPlugin plugin, String version) {
+ this.plugin = plugin;
+ this.version = version;
+ }
+
+ private void setLocation(String location) {
+ this.location = location;
+ }
+
+ public VersionedTestPlugin plugin() {
+ return plugin;
+ }
+
+ public String version() {
+ return version;
+ }
+
+ public String location() {
+ return location;
+ }
+ }
+
+ private final List<BuildInfo> pluginBuilds;
+
+ public VersionedPluginBuilder() {
+ pluginBuilds = new ArrayList<>();
+ }
+
+ public VersionedPluginBuilder include(VersionedTestPlugin plugin, String
version) {
+ pluginBuilds.add(new BuildInfo(plugin, version));
+ return this;
+ }
+
+ public synchronized Path build(String pluginDir) throws IOException {
+ Path pluginDirPath = Files.createTempDirectory(pluginDir);
+ Path subDir = Files.createDirectory(pluginDirPath.resolve("lib"));
+ for (BuildInfo buildInfo : pluginBuilds) {
Review Comment:
Allowing the code to combine multiple test plugins together into a single
plugin location is very interesting. Currently the bad-packaging plugin relies
on plugins being located together, but this is because the code is laid out as
multiple source files inside of a single plugin.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+ private static Plugins setUpPlugins(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+ String pluginPath =
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+ Map<String, String> configs = new HashMap<>();
+ configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+ return new Plugins(configs);
+ }
+
+ private void assertPluginLoad(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+ throws InvalidVersionSpecificationException,
ClassNotFoundException {
+
+ Plugins plugins = setUpPlugins(artifacts, mode);
+
+ for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry :
artifacts.entrySet()) {
+ String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+ for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
+ Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
+ Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ }
+ }
+ }
+
+ private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+ PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+ PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION,
PluginType.PREDICATE
+ };
+
+ private void assertCorrectLatestPluginVersion(
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+ PluginDiscoveryMode mode,
+ String latestVersion
+ ) {
+ Plugins plugins = setUpPlugins(artifacts, mode);
+ List<String> classes = artifacts.values().stream()
+ .flatMap(List::stream)
+ .map(VersionedPluginBuilder.BuildInfo::plugin)
+ .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+ .distinct()
+ .toList();
+ for (String className : classes) {
+ String version = plugins.latestVersion(className,
ALL_PLUGIN_TYPES);
+ Assertions.assertEquals(latestVersion, version);
+ }
+ }
+
+ private static Map<Path, List<VersionedPluginBuilder.BuildInfo>>
buildIsolatedArtifacts(
+ String[] versions,
+ VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+ ) throws IOException {
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ for (String v : versions) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
pluginTypes) {
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+ builder.include(pluginType, v);
+ artifacts.put(builder.build(pluginType + "-" + v),
builder.buildInfos());
+ }
+ }
+ return artifacts;
+ }
+
+ public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_ISOLATED_ARTIFACTS;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_COMBINED_ARTIFACT;
+ public static final Plugins MULTI_VERSION_PLUGINS;
+ public static final Map<VersionedPluginBuilder.VersionedTestPlugin,
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+ static {
+
+ String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0",
"2.3.0", "4.3.0"};
+ try {
+ DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+ defaultIsolatedArtifactsVersions,
VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+ DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+ DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
k -> "0.0.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
k -> "0.1.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
k -> "0.2.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
k -> "0.3.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
k -> "0.4.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
k -> "0.5.0"));
+ DEFAULT_COMBINED_ARTIFACT =
Collections.singletonMap(builder.build("all_versioned_artifact"),
builder.buildInfos());
+
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+ artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+ MULTI_VERSION_PLUGINS = setUpPlugins(artifacts,
PluginDiscoveryMode.SERVICE_LOAD);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void TestVersionedPluginLoaded() throws
InvalidVersionSpecificationException, ClassNotFoundException {
Review Comment:
nit: lowercase the first letter of all of the tests. Here and all tests in
PluginRecommenderTest.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir,
Path binDir) throws IOExc
if (!success) {
throw new RuntimeException("Failed to compile test plugin:\n"
+ writer);
}
+ } finally {
+ if (!replacements.isEmpty()) {
+ sourceFiles.forEach(File::delete);
+ }
+ }
+ }
+
+ private static File copyAndReplace(File source, Map<String, String>
replacements) throws RuntimeException {
+ if (replacements.isEmpty()) {
+ return source;
+ }
Review Comment:
Having the same condition in multiple disconnected places is a common source
of bugs. If these two pieces of code are not called together, or someone
modifies one condition but not the other, things won't behave correctly.
Could the control flow be simplified? It's worth considering whether you
need either of these conditions in the code at all.
##########
connect/runtime/src/test/resources/test-plugins/versioned-predicate/test/plugins/VersionedPredicate.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+import java.util.Map;
+
+/**
+ /**
+ * Predicate to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedPredicate<R extends ConnectRecord<R>> implements
Predicate<R>, Versioned {
Review Comment:
nit: We have NonMigratedPredicate already, maybe we need a migrated, but
non-versioned predicate?
Similar for Transformation.
This is already missing test coverage on trunk, so maybe it can be addressed
in a follow-up. I think it's also a bug that you fixed in an earlier PR.
##########
connect/runtime/src/test/resources/test-plugins/sampling-converter/test/plugins/VersionedSamplingConverter.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.runtime.isolation.SamplingTestPlugin;
+
+/**
+ * Converter to test multiverioning of plugins.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public class VersionedSamplingConverter extends SamplingConverter implements
Versioned {
Review Comment:
There is definitely value in having a distinct class here, because the
Converter interface doesn't extend Versioned.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+ private static final String VERSION_PLACEHOLDER =
"PLACEHOLDER_FOR_VERSION";
+
+ public enum VersionedTestPlugin {
Review Comment:
WDYT about rolling these into the TestPlugin enum?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir,
Path binDir) throws IOExc
if (!success) {
throw new RuntimeException("Failed to compile test plugin:\n"
+ writer);
}
+ } finally {
+ if (!replacements.isEmpty()) {
+ sourceFiles.forEach(File::delete);
+ }
+ }
+ }
+
+ private static File copyAndReplace(File source, Map<String, String>
replacements) throws RuntimeException {
+ if (replacements.isEmpty()) {
+ return source;
+ }
+ try {
+ String content = Files.readString(source.toPath());
+ for (Map.Entry<String, String> entry : replacements.entrySet()) {
+ content = content.replace(entry.getKey(), entry.getValue());
Review Comment:
nit: I don't really like this "modifying arbitrary source code before
compilation" technique, it's very powerful and open-ended, and could be misused
and be more difficult to debug like self-modifying code. But because the
current application is within reason, maybe this is good enough to merge.
Prior art here is the ReadVersionFromResourcePlugin, whose code is
duplicated in read-version-from-resource-v1 and read-version-from-resource-v2,
with version files are specified explicitly. I think that we shouldn't follow
this same strategy for these new plugins and versions, but there's a middle
ground where we generate version files from Strings specified in
createPluginJar/writeJar.
##########
connect/runtime/src/test/resources/test-plugins/sampling-connector/test/plugins/VersionedSamplingSinkConnector.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.plugins;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * VersionedSamplingSinkConnector is a test connector that extends
SamplingConnector and overrides the version method.
+ * Any instance of the string PLACEHOLDER_FOR_VERSION will be replaced with
the actual version during plugin compilation.
+ */
+public final class VersionedSamplingSinkConnector extends SamplingConnector {
Review Comment:
Is it significant that this is a SamplingConnector or SamplingPlugin, or is
that just for convenience? I see that the SamplingPlugin interface is only used
in PluginsTest, which doesn't use this (or the other Versioned) plugins.
The SamplingPlugin was meant specifically for the plugins to exfiltrate
ClassLoader references to assertions in PluginsTest code, it's not needed if
you're not performing those assertions, as you can just extend the connect-api
class.
Theoretically you could add classloader assertions to the multiversion
tests, but that doesn't feel very high priority, as the existing classloader
assertions should have pretty good coverage. Or because Connectors are always
Versioned, maybe you could add the version substitution to the
SamplingConnector itself, rather than extending it.
One problem with combining both fixed and dynamic versioned plugins together
is that you are obligated to bring an additional copy of the fixed version
plugin with each dynamic versioned plugin. For example, if you want a 1.0 and
2.0 VersionedSamplingSinkConnector in the same plugin location, you are forced
to have two .class files for the 1.0.0 SamplingConnector. This is not generally
a supported condition because of classloader nondeterminism, but because it's
all compiled from the same source it probably won't break. Unless the code
substitution mechanism causes a binary incompatibility....
TLDR: I think the Versioned plugins should not implement SamplingPlugin, and
should be in their own packages like the versioned-predicate and
versioned-transformation.
This applies to all of the Sampling test plugins.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+ private static Plugins setUpPlugins(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+ String pluginPath =
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+ Map<String, String> configs = new HashMap<>();
+ configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+ return new Plugins(configs);
+ }
+
+ private void assertPluginLoad(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+ throws InvalidVersionSpecificationException,
ClassNotFoundException {
+
+ Plugins plugins = setUpPlugins(artifacts, mode);
+
+ for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry :
artifacts.entrySet()) {
+ String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+ for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
+ Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
+ Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ }
+ }
+ }
+
+ private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
Review Comment:
nit: The tests still pass if this variable is set to PluginType.values().
Should they fail? Can we eliminate this variable?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java:
##########
@@ -477,6 +480,27 @@ private static void compileJavaSources(Path sourceDir,
Path binDir) throws IOExc
if (!success) {
throw new RuntimeException("Failed to compile test plugin:\n"
+ writer);
}
+ } finally {
+ if (!replacements.isEmpty()) {
+ sourceFiles.forEach(File::delete);
+ }
+ }
+ }
+
+ private static File copyAndReplace(File source, Map<String, String>
replacements) throws RuntimeException {
+ if (replacements.isEmpty()) {
+ return source;
+ }
+ try {
+ String content = Files.readString(source.toPath());
+ for (Map.Entry<String, String> entry : replacements.entrySet()) {
+ content = content.replace(entry.getKey(), entry.getValue());
+ }
+ File tmpFile = new File(System.getProperty("java.io.tmpdir") +
File.separator + source.getName());
+ Files.writeString(tmpFile.toPath(), content);
+ return tmpFile;
Review Comment:
Please also call deleteOnExit() for this file.
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+ private static Plugins setUpPlugins(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+ String pluginPath =
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+ Map<String, String> configs = new HashMap<>();
+ configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+ return new Plugins(configs);
+ }
+
+ private void assertPluginLoad(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+ throws InvalidVersionSpecificationException,
ClassNotFoundException {
+
+ Plugins plugins = setUpPlugins(artifacts, mode);
+
+ for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry :
artifacts.entrySet()) {
+ String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+ for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
+ Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
+ Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ }
+ }
+ }
+
+ private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+ PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+ PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION,
PluginType.PREDICATE
+ };
+
+ private void assertCorrectLatestPluginVersion(
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+ PluginDiscoveryMode mode,
+ String latestVersion
+ ) {
+ Plugins plugins = setUpPlugins(artifacts, mode);
+ List<String> classes = artifacts.values().stream()
+ .flatMap(List::stream)
+ .map(VersionedPluginBuilder.BuildInfo::plugin)
+ .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+ .distinct()
+ .toList();
+ for (String className : classes) {
+ String version = plugins.latestVersion(className,
ALL_PLUGIN_TYPES);
+ Assertions.assertEquals(latestVersion, version);
+ }
+ }
+
+ private static Map<Path, List<VersionedPluginBuilder.BuildInfo>>
buildIsolatedArtifacts(
+ String[] versions,
+ VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+ ) throws IOException {
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ for (String v : versions) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
pluginTypes) {
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+ builder.include(pluginType, v);
+ artifacts.put(builder.build(pluginType + "-" + v),
builder.buildInfos());
+ }
+ }
+ return artifacts;
+ }
+
+ public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_ISOLATED_ARTIFACTS;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_COMBINED_ARTIFACT;
+ public static final Plugins MULTI_VERSION_PLUGINS;
+ public static final Map<VersionedPluginBuilder.VersionedTestPlugin,
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+ static {
+
+ String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0",
"2.3.0", "4.3.0"};
+ try {
+ DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+ defaultIsolatedArtifactsVersions,
VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+ DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+ DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
k -> "0.0.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
k -> "0.1.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
k -> "0.2.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
k -> "0.3.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
k -> "0.4.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
k -> "0.5.0"));
+ DEFAULT_COMBINED_ARTIFACT =
Collections.singletonMap(builder.build("all_versioned_artifact"),
builder.buildInfos());
+
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ artifacts.putAll(DEFAULT_COMBINED_ARTIFACT);
+ artifacts.putAll(DEFAULT_ISOLATED_ARTIFACTS);
+ MULTI_VERSION_PLUGINS = setUpPlugins(artifacts,
PluginDiscoveryMode.SERVICE_LOAD);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void TestVersionedPluginLoaded() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+ assertPluginLoad(DEFAULT_COMBINED_ARTIFACT,
PluginDiscoveryMode.SERVICE_LOAD);
+ assertPluginLoad(DEFAULT_COMBINED_ARTIFACT,
PluginDiscoveryMode.ONLY_SCAN);
+ }
+
+ @Test
+ public void TestMultipleIsolatedVersionedPluginLoading() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+ assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.SERVICE_LOAD);
+ assertPluginLoad(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.ONLY_SCAN);
+ }
+
+ @Test
+ public void TestLatestVersion() {
+ assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.SERVICE_LOAD, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+ assertCorrectLatestPluginVersion(DEFAULT_ISOLATED_ARTIFACTS,
PluginDiscoveryMode.ONLY_SCAN, DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION);
+ }
+
+ @Test
+ public void TestBundledPluginLoading() throws
InvalidVersionSpecificationException, ClassNotFoundException {
+
+ Plugins plugins = MULTI_VERSION_PLUGINS;
+ // get the connector loader of the combined artifact which includes
all plugin types
+ ClassLoader connectorLoader = plugins.pluginLoader(
+
VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR.className(),
+ PluginUtils.connectorVersionRequirement("0.1.0")
+ );
+ Assertions.assertInstanceOf(PluginClassLoader.class, connectorLoader);
+
+ List<VersionedPluginBuilder.VersionedTestPlugin> pluginTypes = List.of(
+ VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+ VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+ VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+ VersionedPluginBuilder.VersionedTestPlugin.PREDICATE
+ );
+ // should match the version used in setUp for creating the combined
artifact
+ List<String> versions = Arrays.asList("0.2.0", "0.3.0", "0.4.0",
"0.5.0");
Review Comment:
Do these versions correspond to the versions specified in the static
initializer? Can they be shared constants?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/MultiVersionTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.components.Versioned;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+public class MultiVersionTest {
+
+ private static Plugins setUpPlugins(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode) {
+ String pluginPath =
artifacts.keySet().stream().map(Path::toString).collect(Collectors.joining(","));
+ Map<String, String> configs = new HashMap<>();
+ configs.put(WorkerConfig.PLUGIN_PATH_CONFIG, pluginPath);
+ configs.put(WorkerConfig.PLUGIN_DISCOVERY_CONFIG, mode.name());
+ return new Plugins(configs);
+ }
+
+ private void assertPluginLoad(Map<Path,
List<VersionedPluginBuilder.BuildInfo>> artifacts, PluginDiscoveryMode mode)
+ throws InvalidVersionSpecificationException,
ClassNotFoundException {
+
+ Plugins plugins = setUpPlugins(artifacts, mode);
+
+ for (Map.Entry<Path, List<VersionedPluginBuilder.BuildInfo>> entry :
artifacts.entrySet()) {
+ String pluginLocation = entry.getKey().toAbsolutePath().toString();
+
+ for (VersionedPluginBuilder.BuildInfo buildInfo :
entry.getValue()) {
+ ClassLoader pluginLoader =
plugins.pluginLoader(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(PluginClassLoader.class,
pluginLoader);
+ Assertions.assertTrue(((PluginClassLoader)
pluginLoader).location().contains(pluginLocation));
+ Object p = plugins.newPlugin(buildInfo.plugin().className(),
PluginUtils.connectorVersionRequirement(buildInfo.version()));
+ Assertions.assertInstanceOf(Versioned.class, p);
+ Assertions.assertEquals(buildInfo.version(), ((Versioned)
p).version());
+ }
+ }
+ }
+
+ private static final PluginType[] ALL_PLUGIN_TYPES = new PluginType[]{
+ PluginType.SINK, PluginType.SOURCE, PluginType.CONVERTER,
+ PluginType.HEADER_CONVERTER, PluginType.TRANSFORMATION,
PluginType.PREDICATE
+ };
+
+ private void assertCorrectLatestPluginVersion(
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts,
+ PluginDiscoveryMode mode,
+ String latestVersion
+ ) {
+ Plugins plugins = setUpPlugins(artifacts, mode);
+ List<String> classes = artifacts.values().stream()
+ .flatMap(List::stream)
+ .map(VersionedPluginBuilder.BuildInfo::plugin)
+ .map(VersionedPluginBuilder.VersionedTestPlugin::className)
+ .distinct()
+ .toList();
+ for (String className : classes) {
+ String version = plugins.latestVersion(className,
ALL_PLUGIN_TYPES);
+ Assertions.assertEquals(latestVersion, version);
+ }
+ }
+
+ private static Map<Path, List<VersionedPluginBuilder.BuildInfo>>
buildIsolatedArtifacts(
+ String[] versions,
+ VersionedPluginBuilder.VersionedTestPlugin[] pluginTypes
+ ) throws IOException {
+ Map<Path, List<VersionedPluginBuilder.BuildInfo>> artifacts = new
HashMap<>();
+ for (String v : versions) {
+ for (VersionedPluginBuilder.VersionedTestPlugin pluginType:
pluginTypes) {
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+ builder.include(pluginType, v);
+ artifacts.put(builder.build(pluginType + "-" + v),
builder.buildInfos());
+ }
+ }
+ return artifacts;
+ }
+
+ public static final String DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_ISOLATED_ARTIFACTS;
+ public static final Map<Path, List<VersionedPluginBuilder.BuildInfo>>
DEFAULT_COMBINED_ARTIFACT;
+ public static final Plugins MULTI_VERSION_PLUGINS;
+ public static final Map<VersionedPluginBuilder.VersionedTestPlugin,
String> DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+
+ static {
+
+ String[] defaultIsolatedArtifactsVersions = new String[]{"1.1.0",
"2.3.0", "4.3.0"};
+ try {
+ DEFAULT_ISOLATED_ARTIFACTS = buildIsolatedArtifacts(
+ defaultIsolatedArtifactsVersions,
VersionedPluginBuilder.VersionedTestPlugin.values()
+ );
+ DEFAULT_ISOLATED_ARTIFACTS_LATEST_VERSION = "4.3.0";
+ DEFAULT_COMBINED_ARTIFACT_VERSIONS = new HashMap<>();
+
+ VersionedPluginBuilder builder = new VersionedPluginBuilder();
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SOURCE_CONNECTOR,
k -> "0.0.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.SINK_CONNECTOR,
k -> "0.1.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.CONVERTER,
k -> "0.2.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.HEADER_CONVERTER,
k -> "0.3.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.TRANSFORMATION,
k -> "0.4.0"));
+
builder.include(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
+
DEFAULT_COMBINED_ARTIFACT_VERSIONS.computeIfAbsent(VersionedPluginBuilder.VersionedTestPlugin.PREDICATE,
k -> "0.5.0"));
+ DEFAULT_COMBINED_ARTIFACT =
Collections.singletonMap(builder.build("all_versioned_artifact"),
builder.buildInfos());
Review Comment:
nit: Why is this computeIfAbsent, when we are constructing the map for the
first time?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/VersionedPluginBuilder.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VersionedPluginBuilder {
+
+ private static final String VERSION_PLACEHOLDER =
"PLACEHOLDER_FOR_VERSION";
+
+ public enum VersionedTestPlugin {
+
+ SINK_CONNECTOR("sampling-connector",
"test.plugins.VersionedSamplingSinkConnector"),
+ SOURCE_CONNECTOR("versioned-source-connector",
"test.plugins.VersionedSamplingSourceConnector"),
+ CONVERTER("sampling-converter",
"test.plugins.VersionedSamplingConverter"),
+ HEADER_CONVERTER("sampling-header-converter",
"test.plugins.VersionedSamplingHeaderConverter"),
+ TRANSFORMATION("versioned-transformation",
"test.plugins.VersionedTransformation"),
+ PREDICATE("versioned-predicate", "test.plugins.VersionedPredicate");
+
+ private final String resourceDir;
+ private final String className;
+
+ VersionedTestPlugin(String resourceDir, String className) {
+ this.resourceDir = resourceDir;
+ this.className = className;
+ }
+
+ public String resourceDir() {
+ return resourceDir;
+ }
+
+ public String className() {
+ return className;
+ }
+ }
+
+ public static class BuildInfo {
+
+ private final VersionedTestPlugin plugin;
+ private final String version;
+ private String location;
+
+ private BuildInfo(VersionedTestPlugin plugin, String version) {
+ this.plugin = plugin;
+ this.version = version;
+ }
+
+ private void setLocation(String location) {
+ this.location = location;
+ }
+
+ public VersionedTestPlugin plugin() {
+ return plugin;
+ }
+
+ public String version() {
+ return version;
+ }
+
+ public String location() {
+ return location;
+ }
+ }
+
+ private final List<BuildInfo> pluginBuilds;
+
+ public VersionedPluginBuilder() {
+ pluginBuilds = new ArrayList<>();
+ }
+
+ public VersionedPluginBuilder include(VersionedTestPlugin plugin, String
version) {
+ pluginBuilds.add(new BuildInfo(plugin, version));
+ return this;
+ }
+
+ public synchronized Path build(String pluginDir) throws IOException {
+ Path pluginDirPath = Files.createTempDirectory(pluginDir);
Review Comment:
This method is leaking files on each run. I found a bunch of files left over
on my machine:
```
cd /var/folders/k1/6b93_bm16596yf4d3dlp6vbh0000gn/T
ls
rm -rf SINK_CONNECTOR-* SOURCE_CONNECTOR-* PREDICATE-* TRANSFORMATION-*
HEADER_CONVERTER-* CONVERTER-* all_versioned_artifact*
```
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginRecommenderTest.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_COMBINED_ARTIFACT_VERSIONS;
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.DEFAULT_ISOLATED_ARTIFACTS;
+import static
org.apache.kafka.connect.runtime.isolation.MultiVersionTest.MULTI_VERSION_PLUGINS;
+
+public class PluginRecommenderTest {
+
+ private Set<String> allVersionsOff(String classOrAlias) {
Review Comment:
nit: off -> of typo?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]