This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 35a0838 Added Google pub/sub sink integration test 35a0838 is described below commit 35a0838ebacba92b8f3ad900b9b6e0a3c51193bc Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Mar 4 10:00:14 2021 +0100 Added Google pub/sub sink integration test --- .../src/test/resources/log4j2.properties | 8 + tests/itests-google-pubsub/pom.xml | 65 ++++++++ .../google/pubsub/clients/GooglePubEasy.java | 167 +++++++++++++++++++++ .../sink/CamelGooglePubSubPropertyFactory.java | 54 +++++++ .../pubsub/sink/CamelSinkGooglePubSubITCase.java | 130 ++++++++++++++++ tests/pom.xml | 1 + 6 files changed, 425 insertions(+) diff --git a/tests/itests-common/src/test/resources/log4j2.properties b/tests/itests-common/src/test/resources/log4j2.properties index aa48d0a..b7df3d4 100644 --- a/tests/itests-common/src/test/resources/log4j2.properties +++ b/tests/itests-common/src/test/resources/log4j2.properties @@ -95,3 +95,11 @@ logger.azure.name = com.azure logger.azure.level = trace logger.azure.additivity = false logger.azure.appenderRef.file.ref = file + +# Google +logger.google.name = com.google +logger.google.level = INFO +logger.google.additivity = false +logger.google.appenderRef.file.ref = file + + diff --git a/tests/itests-google-pubsub/pom.xml b/tests/itests-google-pubsub/pom.xml new file mode 100644 index 0000000..1dc469a --- /dev/null +++ b/tests/itests-google-pubsub/pom.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-parent</artifactId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../itests-parent/pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>itests-google-pubsub</artifactId> + <name>Camel-Kafka-Connector :: Tests :: Google Pub/Sub</name> + + <dependencies> + <dependency> + <groupId>org.apache.camel.kafkaconnector</groupId> + <artifactId>itests-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- test infra --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-common</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-infra-google-pubsub</artifactId> + <version>${camel.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-google-pubsub</artifactId> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java new file mode 100644 index 0000000..02420fe --- /dev/null +++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/clients/GooglePubEasy.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.google.pubsub.clients; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminSettings; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class GooglePubEasy { + private static final Logger LOG = LoggerFactory.getLogger(GooglePubEasy.class); + private final List<String> receivedMessages = new ArrayList<>(); + + private final String serviceAddress; + private final String project; + + + private final ManagedChannel channel; + private final FixedTransportChannelProvider channelProvider; + + private ProjectSubscriptionName projectSubscriptionName; + private Subscriber subscriber; + + public GooglePubEasy(String serviceAddress, String project) { + this.serviceAddress = serviceAddress; + this.project = project; + + channel = ManagedChannelBuilder + .forTarget(String.format(serviceAddress)) + .usePlaintext() + .build(); + + channelProvider = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + } + + public void createTopic(String topicName) throws IOException, InterruptedException { + doCreateTopic(topicName); + } + + public void createSubscription(String subscriptionName, String topicName) throws IOException { + TopicName googleTopic = TopicName.of(project, topicName); + + projectSubscriptionName = ProjectSubscriptionName.of(project, subscriptionName); + + SubscriptionAdminSettings adminSettings = SubscriptionAdminSettings + .newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider) + .build(); + + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(adminSettings)) { + Subscription subscription = subscriptionAdminClient.createSubscription( + projectSubscriptionName, googleTopic, PushConfig.getDefaultInstance(), 10); + } + } + + private void doCreateTopic(String topicName) throws IOException, InterruptedException { + TopicName googleTopic = TopicName.of(project, topicName); + + TopicAdminSettings topicAdminSettings = TopicAdminSettings + .newBuilder() + .setCredentialsProvider(NoCredentialsProvider.create()) + .setTransportChannelProvider(channelProvider) + .build(); + + try (TopicAdminClient client = TopicAdminClient.create(topicAdminSettings)) { + LOG.info("Creating topic {} (original {})", googleTopic.toString(), googleTopic.getTopic()); + + client.createTopic(googleTopic); + + if (client.awaitTermination(10, TimeUnit.SECONDS)) { + client.shutdownNow(); + } + } + } + + public void receive() { + try { + MessageReceiver receiver = (pubsubMessage, ackReplyConsumer) -> { + String data = pubsubMessage.getData().toString(); + LOG.info("Received: {}", data); + receivedMessages.add(data); + + if (receivedMessages.size() >= 10) { + subscriber.stopAsync(); + } + + ackReplyConsumer.ack(); + }; + + subscriber = Subscriber + .newBuilder(projectSubscriptionName, receiver) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(channelProvider) + .build(); + + + LOG.info("Adding listener ..."); + subscriber.addListener( + new Subscriber.Listener() { + @Override + public void failed(Subscriber.State from, Throwable failure) { + LOG.error(failure.getMessage(), failure); + } + }, + MoreExecutors.directExecutor()); + + LOG.info("Starting async ..."); + subscriber.startAsync().awaitRunning(); + LOG.info("Waiting for messages ..."); + subscriber.awaitTerminated(25, TimeUnit.SECONDS); + } catch (TimeoutException e) { + subscriber.stopAsync(); + } finally { + if (subscriber != null) { + subscriber.stopAsync(); + } + } + } + + public void shutdown() { + if (channel != null) { + channel.shutdown(); + } + } + + public List<String> getReceivedMessages() { + return Collections.unmodifiableList(receivedMessages); + } +} diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java new file mode 100644 index 0000000..e7a1c01 --- /dev/null +++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelGooglePubSubPropertyFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.google.pubsub.sink; + +import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; + +public class CamelGooglePubSubPropertyFactory extends SinkConnectorPropertyFactory<CamelGooglePubSubPropertyFactory> { + + public CamelGooglePubSubPropertyFactory withProjectId(String value) { + return setProperty("camel.sink.path.projectId", value); + } + + public CamelGooglePubSubPropertyFactory withDestinationName(String value) { + return setProperty("camel.sink.path.destinationName", value); + } + + public CamelGooglePubSubPropertyFactory withEndpoint(String value) { + return setProperty("camel.component.google-pubsub.endpoint", value); + } + + + public EndpointUrlBuilder<CamelGooglePubSubPropertyFactory> withUrl(String projectId, String destinationName) { + String queueUrl = String.format("google-pubsub:%s:%s", projectId, destinationName); + + return new EndpointUrlBuilder<>(this::withSinkUrl, queueUrl); + } + + public static CamelGooglePubSubPropertyFactory basic() { + return new CamelGooglePubSubPropertyFactory() + .withTasksMax(1) + .withName("CamelGooglePubSub") + .withConnectorClass("org.apache.camel.kafkaconnector.googlepubsub.CamelGooglepubsubSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + + } + +} diff --git a/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java new file mode 100644 index 0000000..ff1adf7 --- /dev/null +++ b/tests/itests-google-pubsub/src/test/java/org/apache/camel/kafkaconnector/google/pubsub/sink/CamelSinkGooglePubSubITCase.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.google.pubsub.sink; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.kafkaconnector.google.pubsub.clients.GooglePubEasy; +import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubService; +import org.apache.camel.test.infra.google.pubsub.services.GooglePubSubServiceFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class CamelSinkGooglePubSubITCase extends CamelSinkTestSupport { + @RegisterExtension + public static GooglePubSubService service = GooglePubSubServiceFactory.createService(); + + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkGooglePubSubITCase.class); + private String project = "ckc"; + private GooglePubEasy easyClient; + + private String googlePubSubTopic; + + private final int expected = 10; + + @Override + protected String[] getConnectorsInTest() { + return new String[]{"camel-google-pubsub-kafka-connector"}; + } + + + @BeforeEach + public void setUp() { + googlePubSubTopic = "ckctopic" + TestUtils.randomWithRange(0, 100); + LOG.info("Requesting topic {} for the pub/sub client", googlePubSubTopic); + + easyClient = new GooglePubEasy(service.getServiceAddress(), project); + + try { + easyClient.createTopic(googlePubSubTopic); + easyClient.createSubscription("test-subscription", googlePubSubTopic); + } catch (InterruptedException | IOException e) { + fail(e.getMessage()); + } + + } + + @AfterEach + public void tearDown() { + easyClient.shutdown(); + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + easyClient.receive(); + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + List<String> receivedMessages = easyClient.getReceivedMessages(); + + if (latch.await(30, TimeUnit.SECONDS)) { + assertEquals(expected, receivedMessages.size(), "Did not receive as many messages as was sent"); + } else { + fail("Failed to receive the messages within the specified time"); + } + } + + @Test + public void testBasicSendReceive() throws Exception { + String topicName = getTopicForTest(this); + + ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory + .basic() + .withTopics(topicName) + .withProjectId(project) + .withDestinationName(googlePubSubTopic) + .withEndpoint(service.getServiceAddress()); + + runTest(connectorPropertyFactory, topicName, expected); + } + + @Disabled("Disabled due to #1086") + @Test + public void testBasicSendReceiveUrl() throws Exception { + String topicName = getTopicForTest(this); + + ConnectorPropertyFactory connectorPropertyFactory = CamelGooglePubSubPropertyFactory + .basic() + .withTopics(topicName) + .withUrl(project, googlePubSubTopic) + .append("endpoint", service.getServiceAddress()) + .buildUrl(); + + runTest(connectorPropertyFactory, topicName, expected); + } + +} diff --git a/tests/pom.xml b/tests/pom.xml index fadfc35..0c3ff80 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -65,6 +65,7 @@ <module>itests-cxf</module> <module>itests-netty</module> <module>itests-netty-http</module> + <module>itests-google-pubsub</module> </modules>