PLCyr opened a new issue #1128: URL: https://github.com/apache/camel-kafka-connector/issues/1128
Hi, I'm new to Camel Kafka Connector. I configured a SFTP Connector in Kubernetes with download=false and the Kafka Topic idempotency feature enable. The idempotency work fine with the FileName as long as the Kafka Connect Pod is running. However after a connector restart or pod crash, all the files that are still on the SFTP server have newly generated events when the connector is up again. I was expecting the idempotency feature to re-consume the idempotency Kafka topic at offset 0 so it could have some kind of crash resilience. Same behavior with FTP / FTPS / SFTP connector. Is it the expected behavior or a bug ? **Repo steps** For my test, I uploaded a file called "Test_2.txt" in my SFTP Server. The SFTP connector saw it and resulted in one event in Topic and IdempotencyTopic. I deleted my Kafka Connect pod to simulate a crash. Upon restart, my Topic and IdempotencyTopic each have a second event which is a duplicate of the Test_2.txt. **Setup** Windows 10 Pro with Kubernetes provided by Docker Desktop Confluent Kafka running in Kubernetes Camel Kafka Connector SFTP Version 0.8.0 running in Kubernetes **SFTP Connector Config** "name": "sftp-connector", "connector.class": "org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "transforms": "RemoteTransformer", "transforms.RemoteTransformer.type": "org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms", "camel.idempotency.enabled": "true", "camel.idempotency.expression.type": "header", "camel.idempotency.expression.header": "CamelFileNameConsumed", "camel.idempotency.repository.type": "kafka", "camel.idempotency.kafka.bootstrap.servers": "confluent-kafka-cp-kafka:9092", "camel.idempotency.kafka.topic": "SFTP-Persistance-Topic", "camel.source.endpoint.recursive": "true", "camel.source.path.host": "sftp.sftp", "camel.source.path.port": "22", "camel.source.endpoint.username": "foo", "camel.source.endpoint.password": "pass", "camel.source.endpoint.include": ".*.txt", "camel.source.path.directoryName": "data/", "camel.source.endpoint.download": "false", "camel.source.endpoint.noop": "true", "camel.source.endpoint.binary": "false", "camel.source.contentLogLevel": "INFO", "camel.source.endpoint.runLoggingLevel": "INFO", "topics": "SFTP-Topic" **Log at connector restart** [2021-03-29 15:03:27,822] INFO Creating Camel route from(sftp:sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=RAW(pass)&recursive=true&runLoggingLevel=INFO&username=RAW(foo)) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 15:03:27,824] INFO idempotentConsumer(header(expressionHeader), MemoryIdempotentRepository.memoryIdempotentRepository(100)).to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain) [2021-03-29 15:03:27,943] INFO Endpoint is configured with noop=true so forcing endpoint to be idempotent as well (org.apache.camel.component.file.remote.RemoteFileEndpoint) [2021-03-29 15:03:27,944] INFO Using default memory based idempotent repository with cache max size: 1000 (org.apache.camel.component.file.remote.RemoteFileEndpoint) [2021-03-29 15:03:27,954] INFO ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [confluent-kafka-cp-kafka:9092] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 3efa9c9e-3694-451d-a70b-9690bba7fa4f group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer (org.apache.kafka.clients.consumer.ConsumerConfig) [2021-03-29 15:03:27,975] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO Kafka startTimeMs: 1617030207975 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,975] INFO ProducerConfig values: acks = 1 batch.size = 0 bootstrap.servers = [confluent-kafka-cp-kafka:9092] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-4 compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 socket.connection.setup.timeout.max.ms = 127000 socket.connection.setup.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.certificate.chain = null ssl.keystore.key = null ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.certificates = null ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer (org.apache.kafka.clients.producer.ProducerConfig) [2021-03-29 15:03:27,979] INFO Kafka version: 6.1.0-ccs (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,980] INFO Kafka commitId: 5496d92defc9bbe4 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,980] INFO Kafka startTimeMs: 1617030207979 (org.apache.kafka.common.utils.AppInfoParser) [2021-03-29 15:03:27,984] INFO SourceConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = sftp-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [RemoteTransformer] value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter (org.apache.kafka.connect.runtime.SourceConnectorConfig) [2021-03-29 15:03:27,986] INFO Warming up cache from topic SFTP-Persistance-Topic (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository) [2021-03-29 15:03:27,986] INFO EnrichedConnectorConfig values: config.action.reload = restart connector.class = org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector errors.log.enable = false errors.log.include.messages = false errors.retry.delay.max.ms = 60000 errors.retry.timeout = 0 errors.tolerance = none header.converter = null key.converter = class org.apache.kafka.connect.storage.StringConverter name = sftp-connector predicates = [] tasks.max = 1 topic.creation.groups = [] transforms = [RemoteTransformer] transforms.RemoteTransformer.key = null transforms.RemoteTransformer.negate = false transforms.RemoteTransformer.predicate = transforms.RemoteTransformer.type = class org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms value.converter = class org.apache.kafka.connect.converters.ByteArrayConverter (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig) [2021-03-29 15:03:27,987] INFO Setting task configurations for 1 workers. (org.apache.camel.kafkaconnector.CamelSourceConnector) [2021-03-29 15:03:27,990] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Subscribed to topic(s): SFTP-Persistance-Topic (org.apache.kafka.clients.consumer.KafkaConsumer) [2021-03-29 15:03:28,001] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Cluster ID: fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata) [2021-03-29 15:03:28,001] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Discovered group coordinator confluent-kafka-cp-kafka-2.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:28,004] INFO [Producer clientId=producer-4] Cluster ID: fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata) [2021-03-29 15:03:28,006] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:28,074] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,096] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully joined group with generation Generation{generationId=1, memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,100] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Finished assignment for group at generation 1: {consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a=Assignment(partitions=[SFTP-Persistance-Topic-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully synced group in generation Generation{generationId=1, memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Notifying assignor about the new Assignment(partitions=[SFTP-Persistance-Topic-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,142] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Adding newly assigned partitions: SFTP-Persistance-Topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,149] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Found no committed offset for partition SFTP-Persistance-Topic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2021-03-29 15:03:31,153] INFO Cache OK (org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository) [2021-03-29 15:03:31,164] INFO Known host file not configured, using user known host file: /home/appuser/.ssh/known_hosts (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:31,166] INFO [Consumer clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Resetting offset for partition SFTP-Persistance-Topic-0 to position FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[confluent-kafka-cp-kafka-1.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 1 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState) [2021-03-29 15:03:31,713] WARN JSCH -> Permanently added 'sftp.sftp' (RSA) to the list of known hosts. (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:31,713] WARN Server asks for confirmation (yes|no): /home/appuser/.ssh/known_hosts does not exist. Are you sure you want to create it?. Camel will answer no. (org.apache.camel.component.file.remote.SftpOperations) [2021-03-29 15:03:32,037] INFO Routes startup summary (total:1 started:1) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,038] INFO Started route1 (sftp://sftp.sftp:22/data/) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,038] INFO Apache Camel 3.8.0 (camel-1) started in 4s306ms (build:98ms init:117ms start:4s91ms) (org.apache.camel.impl.engine.AbstractCamelContext) [2021-03-29 15:03:32,039] INFO CamelSourceTask connector task started (org.apache.camel.kafkaconnector.CamelSourceTask) [2021-03-29 15:03:32,040] INFO WorkerSourceTask{id=sftp-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask) [2021-03-29 15:03:33,035] INFO Scheduled task started on: sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx (org.apache.camel.support.ScheduledPollConsumer) [2021-03-29 15:03:33,079] INFO SourceRecord{sourcePartition={filename=sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx}, sourceOffset={position=9F81987AE7F9AE2-0000000000000000}} ConnectRecord{topic='SFTP-Topic', kafkaPartition=null, key=null, keySchema=null, value=null, valueSchema=null, timestamp=1617030213076, headers=ConnectHeaders(headers=[ConnectHeader(key=CamelHeader.CamelFileAbsolute, value=false, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelHeader.CamelFileAbsolutePath, value=data/Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileHost, value=sftp.sftp, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileLastModified, value=1616182842000, schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileLength, value=72, schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileName, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileNameConsumed, value=Test_2 .txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileNameOnly, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileParent, value=data, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFilePath, value=data//Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileRelativePath, value=Test_2.txt, schema=Schema{STRING}), ConnectHeader(key=CamelProperty.CamelBatchSize, value=1, schema=Schema{INT32}), ConnectHeader(key=CamelProperty.CamelUnitOfWorkProcessSync, value=true, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelProperty.CamelBatchComplete, value=true, schema=Schema{BOOLEAN}), ConnectHeader(key=CamelProperty.CamelBatchIndex, value=0, schema=Schema{INT32}), ConnectHeader(key=CamelProperty.CamelToEndpoint, value=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000, schema=Schema{STRING})])} (org.apache.camel.kafkaconnector.CamelSourceTask) [2021-03-29 15:03:33,080] INFO Scheduled task completed on: sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx (org.apache.camel.support.ScheduledPollConsumer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org