valdar commented on a change in pull request #205: URL: https://github.com/apache/camel-kafka-connector/pull/205#discussion_r423005983
########## File path: core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java ########## @@ -32,270 +31,239 @@ public class CamelSourceTaskTest { + private static final String DIRECT_URI = "direct:start"; + private static final String TOPIC_NAME = "my-topic"; + + private void sendBatchOfRecords(CamelSourceTask sourceTask, long size) { + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + for (int i = 0; i < size; i++) { + template.sendBody(DIRECT_URI, "test" + i); + } + } + @Test public void testSourcePolling() { + final long size = 2; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - template.sendBody("direct:start", "awesome!"); + sendBatchOfRecords(sourceTask, size); + List<SourceRecord> poll = sourceTask.poll(); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals("mytopic", poll.get(0).topic()); + assertEquals(size, poll.size()); + assertEquals(TOPIC_NAME, poll.get(0).topic()); - camelSourceTask.stop(); + sourceTask.stop(); } @Test - public void testSourcePollingWithKey() { + public void testSourcePollingMaxBatchPollSize() { + final long size = 2; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); - - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_BATCH_POLL_SIZE_CONF, String.valueOf(size)); - // first we test if we have a key in the message with body - template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", 1234); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals(1234, poll.get(0).key()); - assertEquals(Schema.Type.INT32, poll.get(0).keySchema().type()); + sendBatchOfRecords(sourceTask, size + 1); + List<SourceRecord> poll = sourceTask.poll(); + int pollSize = poll.size(); - // second we test if we have no key under the header - template.sendBodyAndHeader("direct:start", "awesome!", "WrongHeader", 1234); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // third we test if we have the header but with null value - template.sendBodyAndHeader("direct:start", "awesome!", "CamelSpecialTestKey", null); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - camelSourceTask.stop(); - } - - @Test - public void testSourcePollingWithBody() { - Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); - - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); - - // send first data - template.sendBody("direct:start", "testing kafka connect"); - - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals("testing kafka connect", poll.get(0).value()); - assertEquals(Schema.Type.STRING, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // send second data - template.sendBody("direct:start", true); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertTrue((boolean)poll.get(0).value()); - assertEquals(Schema.Type.BOOLEAN, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // second third data - template.sendBody("direct:start", 1234L); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertEquals(1, poll.size()); - assertEquals(1234L, poll.get(0).value()); - assertEquals(Schema.Type.INT64, poll.get(0).valueSchema().type()); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - - // third with null data - template.sendBody("direct:start", null); - - poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); - assertNull(poll.get(0).key()); - assertNull(poll.get(0).keySchema()); - assertNull(poll.get(0).value()); - assertNull(poll.get(0).valueSchema()); - - camelSourceTask.stop(); + assertTrue(pollSize >= 0 && pollSize <= size, "Batch size: " + pollSize + ", expected between 0 and " + size); + sourceTask.stop(); } @Test public void testSourcePollingTimeout() { - final int nuberOfMessagesSent = 999; + final long size = 100; Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put("camel.source.maxPollDuration", "1"); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MAX_POLL_DURATION_CONF, "2"); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + sendBatchOfRecords(sourceTask, size); + List<SourceRecord> poll = sourceTask.poll(); + int pollSize = poll.size(); - // first we send nuberOfMessagesSent of messages - Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); - - // then we assert we received only a fraction of them (proving that polling timeout of 1 Millisecond is working) - List<SourceRecord> poll = camelSourceTask.poll(); - assertTrue(poll.size() < nuberOfMessagesSent, "Expected received messages count to be strictly less than " + nuberOfMessagesSent + ", got " + poll.size()); - - camelSourceTask.stop(); + assertTrue(pollSize < size, "Batch size: " + pollSize + ", expected strictly less than " + size); + sourceTask.stop(); } @Test - public void testSourcePollingMaxRecordNumber() { - final int nuberOfMessagesSent = 2; + public void testSourcePollingWithKey() { Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "direct:start"); - props.put("topics", "mytopic"); - props.put("camel.source.maxBatchPollSize", "1"); - - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_MESSAGE_HEADER_KEY_CONF, "CamelSpecialTestKey"); - final ProducerTemplate template = camelSourceTask.getCms().createProducerTemplate(); + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); - // first we send nuberOfMessagesSent of messages > camel.source.maxBatchPollSize - Stream.of(nuberOfMessagesSent).forEach(i -> template.sendBody("direct:start", "awesome!")); + // key in the message with body + template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", 1234); - List<SourceRecord> poll = camelSourceTaskPollWithRetries(camelSourceTask, 5); + List<SourceRecord> poll1 = sourceTask.poll(); + assertEquals(1, poll1.size()); + assertEquals(1234, poll1.get(0).key()); + assertEquals(Schema.Type.INT32, poll1.get(0).keySchema().type()); - // then we assert we received just camel.source.maxBatchPollSize - assertEquals(1, poll.size()); - camelSourceTask.stop(); - } + // no key under the header + template.sendBodyAndHeader(DIRECT_URI, "test", "WrongHeader", 1234); - @Test - public void testSourceConsumerOptions() { - Map<String, String> props = new HashMap<>(); - props.put("camel.source.url", "timer:kafkaconnector"); - props.put("topics", "mytopic"); - props.put("camel.source.pollingConsumerQueueSize", "10"); - props.put("camel.source.pollingConsumerBlockTimeout", "1000"); - props.put("camel.source.pollingConsumerBlockWhenFull", "false"); + List<SourceRecord> poll2 = sourceTask.poll(); + assertEquals(1, poll2.size()); + assertNull(poll2.get(0).key()); + assertNull(poll2.get(0).keySchema()); - CamelSourceTask camelSourceTask = new CamelSourceTask(); - camelSourceTask.start(props); + // header with null value + template.sendBodyAndHeader(DIRECT_URI, "test", "CamelSpecialTestKey", null); - assertEquals(2, camelSourceTask.getCms().getEndpoints().size()); + List<SourceRecord> poll3 = sourceTask.poll(); + assertEquals(1, poll3.size()); + assertNull(poll3.get(0).key()); + assertNull(poll3.get(0).keySchema()); - camelSourceTask.getCms().getEndpoints().stream() - .filter(e -> e.getEndpointUri().startsWith("direct")) - .forEach(e -> { - assertTrue(e.getEndpointUri().contains("end")); - assertTrue(e.getEndpointUri().contains("pollingConsumerBlockTimeout=1000")); - assertTrue(e.getEndpointUri().contains("pollingConsumerBlockWhenFull=false")); - assertTrue(e.getEndpointUri().contains("pollingConsumerQueueSize=10")); - }); + sourceTask.stop(); + } - camelSourceTask.stop(); + @Test + public void testSourcePollingWithBody() { + Map<String, String> props = new HashMap<>(); + props.put(CamelSourceConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_URL_CONF, DIRECT_URI); + + CamelSourceTask sourceTask = new CamelSourceTask(); + sourceTask.start(props); + final ProducerTemplate template = sourceTask.getCms().createProducerTemplate(); + + // send String + template.sendBody(DIRECT_URI, "test"); + + List<SourceRecord> poll1 = sourceTask.poll(); + assertEquals(1, poll1.size()); + assertEquals("test", poll1.get(0).value()); + assertEquals(Schema.Type.STRING, poll1.get(0).valueSchema().type()); + assertNull(poll1.get(0).key()); + assertNull(poll1.get(0).keySchema()); + + // send boolean + template.sendBody(DIRECT_URI, true); + + List<SourceRecord> poll2 = sourceTask.poll(); + assertEquals(1, poll2.size()); + assertTrue((boolean)poll2.get(0).value()); + assertEquals(Schema.Type.BOOLEAN, poll2.get(0).valueSchema().type()); + assertNull(poll2.get(0).key()); + assertNull(poll2.get(0).keySchema()); + + // send long + template.sendBody(DIRECT_URI, 1234L); + + List<SourceRecord> poll3 = sourceTask.poll(); + assertEquals(1, poll3.size()); + assertEquals(1234L, poll3.get(0).value()); + assertEquals(Schema.Type.INT64, poll3.get(0).valueSchema().type()); + assertNull(poll3.get(0).key()); + assertNull(poll3.get(0).keySchema()); + + // send null + template.sendBody(DIRECT_URI, null); + + List<SourceRecord> poll4 = sourceTask.poll(); + assertNull(poll4.get(0).key()); + assertNull(poll4.get(0).keySchema()); + assertNull(poll4.get(0).value()); + assertNull(poll4.get(0).valueSchema()); + + sourceTask.stop(); } @Test - public void testSourceUrlPrecedenceOnComponentProperty() { + public void testUrlPrecedenceOnComponentProperty() { Review comment: I think this version is not testing what the old version was testing since `endpointProperty` and `pathChunk` are not legal properties of `direct` component. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org