PLCyr opened a new issue #1128:
URL: https://github.com/apache/camel-kafka-connector/issues/1128


   Hi, 
   
   I'm new to Camel Kafka Connector. I configured a SFTP Connector in 
Kubernetes with download=false and the Kafka Topic idempotency feature enable. 
The idempotency work fine with the FileName as long as the Kafka Connect Pod is 
running. However after a connector restart or pod crash, all the files that are 
still on the SFTP server have newly generated events when the connector is up 
again. I was expecting the idempotency feature to re-consume the idempotency 
Kafka topic at offset 0 so it could have some kind of crash resilience. Same 
behavior with FTP / FTPS / SFTP connector.
   
   Is it the expected behavior or a bug ?
   
   **Repo steps**
   For my test, I uploaded a file called "Test_2.txt" in my SFTP Server. The 
SFTP connector saw it and resulted in one event in Topic and IdempotencyTopic. 
I deleted my Kafka Connect pod to simulate a crash. Upon restart, my Topic and 
IdempotencyTopic each have a second event which is a duplicate of the 
Test_2.txt.
   
   **Setup** 
   Windows 10 Pro with Kubernetes provided by Docker Desktop
   Confluent Kafka running in Kubernetes
   Camel Kafka Connector SFTP Version 0.8.0 running in Kubernetes
   
   **SFTP Connector Config**
   "name": "sftp-connector",
   "connector.class": 
"org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
   "transforms": "RemoteTransformer",
   "transforms.RemoteTransformer.type": 
"org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms",
   "camel.idempotency.enabled": "true",
   "camel.idempotency.expression.type": "header",
   "camel.idempotency.expression.header": "CamelFileNameConsumed",
   "camel.idempotency.repository.type": "kafka",
   "camel.idempotency.kafka.bootstrap.servers": "confluent-kafka-cp-kafka:9092",
   "camel.idempotency.kafka.topic": "SFTP-Persistance-Topic",
   "camel.source.endpoint.recursive": "true",
   "camel.source.path.host": "sftp.sftp",
   "camel.source.path.port": "22",
   "camel.source.endpoint.username": "foo",
   "camel.source.endpoint.password": "pass",
   "camel.source.endpoint.include": ".*.txt",
   "camel.source.path.directoryName": "data/",
   "camel.source.endpoint.download": "false",
   "camel.source.endpoint.noop": "true",
   "camel.source.endpoint.binary": "false",
   "camel.source.contentLogLevel": "INFO",
   "camel.source.endpoint.runLoggingLevel": "INFO",
   "topics": "SFTP-Topic"
   
   **Log at connector restart**
   [2021-03-29 15:03:27,822] INFO Creating Camel route 
from(sftp:sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=RAW(pass)&recursive=true&runLoggingLevel=INFO&username=RAW(foo))
 (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain)
   [2021-03-29 15:03:27,824] INFO idempotentConsumer(header(expressionHeader), 
MemoryIdempotentRepository.memoryIdempotentRepository(100)).to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true)
 (org.apache.camel.kafkaconnector.utils.CamelKafkaConnectMain)
   [2021-03-29 15:03:27,943] INFO Endpoint is configured with noop=true so 
forcing endpoint to be idempotent as well 
(org.apache.camel.component.file.remote.RemoteFileEndpoint)
   [2021-03-29 15:03:27,944] INFO Using default memory based idempotent 
repository with cache max size: 1000 
(org.apache.camel.component.file.remote.RemoteFileEndpoint)
   [2021-03-29 15:03:27,954] INFO ConsumerConfig values:
           allow.auto.create.topics = true
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [confluent-kafka-cp-kafka:9092]
           check.crcs = true
           client.dns.lookup = use_all_dns_ips
           client.id = consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4
           client.rack =
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = true
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = 3efa9c9e-3694-451d-a70b-9690bba7fa4f
           group.instance.id = null
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           internal.throw.on.fetch.stable.offset.unsupported = false
           isolation.level = read_uncommitted
           key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           socket.connection.setup.timeout.max.ms = 127000
           socket.connection.setup.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
           ssl.endpoint.identification.algorithm = https
           ssl.engine.factory.class = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.certificate.chain = null
           ssl.keystore.key = null
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.3
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.certificates = null
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
    (org.apache.kafka.clients.consumer.ConsumerConfig)
   [2021-03-29 15:03:27,975] INFO Kafka version: 6.1.0-ccs 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,975] INFO Kafka commitId: 5496d92defc9bbe4 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,975] INFO Kafka startTimeMs: 1617030207975 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,975] INFO ProducerConfig values:
           acks = 1
           batch.size = 0
           bootstrap.servers = [confluent-kafka-cp-kafka:9092]
           buffer.memory = 33554432
           client.dns.lookup = use_all_dns_ips
           client.id = producer-4
           compression.type = none
           connections.max.idle.ms = 540000
           delivery.timeout.ms = 120000
           enable.idempotence = false
           interceptor.classes = []
           internal.auto.downgrade.txn.commit = false
           key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
           linger.ms = 0
           max.block.ms = 60000
           max.in.flight.requests.per.connection = 5
           max.request.size = 1048576
           metadata.max.age.ms = 300000
           metadata.max.idle.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
           receive.buffer.bytes = 32768
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 30000
           retries = 2147483647
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           socket.connection.setup.timeout.max.ms = 127000
           socket.connection.setup.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
           ssl.endpoint.identification.algorithm = https
           ssl.engine.factory.class = null
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.certificate.chain = null
           ssl.keystore.key = null
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.3
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.certificates = null
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           transaction.timeout.ms = 60000
           transactional.id = null
           value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
    (org.apache.kafka.clients.producer.ProducerConfig)
   [2021-03-29 15:03:27,979] INFO Kafka version: 6.1.0-ccs 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,980] INFO Kafka commitId: 5496d92defc9bbe4 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,980] INFO Kafka startTimeMs: 1617030207979 
(org.apache.kafka.common.utils.AppInfoParser)
   [2021-03-29 15:03:27,984] INFO SourceConnectorConfig values:
           config.action.reload = restart
           connector.class = 
org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector
           errors.log.enable = false
           errors.log.include.messages = false
           errors.retry.delay.max.ms = 60000
           errors.retry.timeout = 0
           errors.tolerance = none
           header.converter = null
           key.converter = class 
org.apache.kafka.connect.storage.StringConverter
           name = sftp-connector
           predicates = []
           tasks.max = 1
           topic.creation.groups = []
           transforms = [RemoteTransformer]
           value.converter = class 
org.apache.kafka.connect.converters.ByteArrayConverter
    (org.apache.kafka.connect.runtime.SourceConnectorConfig)
   [2021-03-29 15:03:27,986] INFO Warming up cache from topic 
SFTP-Persistance-Topic 
(org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository)
   [2021-03-29 15:03:27,986] INFO EnrichedConnectorConfig values:
           config.action.reload = restart
           connector.class = 
org.apache.camel.kafkaconnector.sftp.CamelSftpSourceConnector
           errors.log.enable = false
           errors.log.include.messages = false
           errors.retry.delay.max.ms = 60000
           errors.retry.timeout = 0
           errors.tolerance = none
           header.converter = null
           key.converter = class 
org.apache.kafka.connect.storage.StringConverter
           name = sftp-connector
           predicates = []
           tasks.max = 1
           topic.creation.groups = []
           transforms = [RemoteTransformer]
           transforms.RemoteTransformer.key = null
           transforms.RemoteTransformer.negate = false
           transforms.RemoteTransformer.predicate =
           transforms.RemoteTransformer.type = class 
org.apache.camel.kafkaconnector.sftp.transformers.SftpRemoteFileTransforms
           value.converter = class 
org.apache.kafka.connect.converters.ByteArrayConverter
    (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
   [2021-03-29 15:03:27,987] INFO Setting task configurations for 1 workers. 
(org.apache.camel.kafkaconnector.CamelSourceConnector)
   [2021-03-29 15:03:27,990] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Subscribed to topic(s): 
SFTP-Persistance-Topic (org.apache.kafka.clients.consumer.KafkaConsumer)
   [2021-03-29 15:03:28,001] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Cluster ID: 
fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata)
   [2021-03-29 15:03:28,001] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Discovered group coordinator 
confluent-kafka-cp-kafka-2.confluent-kafka-cp-kafka-headless.kafka:9092 (id: 
2147483645 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
   [2021-03-29 15:03:28,004] INFO [Producer clientId=producer-4] Cluster ID: 
fFQtkPXrT92v270crRLidQ (org.apache.kafka.clients.Metadata)
   [2021-03-29 15:03:28,006] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
   [2021-03-29 15:03:28,074] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
   [2021-03-29 15:03:31,096] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully joined group with 
generation Generation{generationId=1, 
memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a',
 protocol='range'} 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
   [2021-03-29 15:03:31,100] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Finished assignment for group at 
generation 1: 
{consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a=Assignment(partitions=[SFTP-Persistance-Topic-0])}
 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
   [2021-03-29 15:03:31,142] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Successfully synced group in 
generation Generation{generationId=1, 
memberId='consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4-124f20c5-6776-424e-abfe-ec07081e230a',
 protocol='range'} 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
   [2021-03-29 15:03:31,142] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Notifying assignor about the new 
Assignment(partitions=[SFTP-Persistance-Topic-0]) 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
   [2021-03-29 15:03:31,142] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Adding newly assigned partitions: 
SFTP-Persistance-Topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
   [2021-03-29 15:03:31,149] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Found no committed offset for 
partition SFTP-Persistance-Topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
   [2021-03-29 15:03:31,153] INFO Cache OK 
(org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository)
   [2021-03-29 15:03:31,164] INFO Known host file not configured, using user 
known host file: /home/appuser/.ssh/known_hosts 
(org.apache.camel.component.file.remote.SftpOperations)
   [2021-03-29 15:03:31,166] INFO [Consumer 
clientId=consumer-3efa9c9e-3694-451d-a70b-9690bba7fa4f-4, 
groupId=3efa9c9e-3694-451d-a70b-9690bba7fa4f] Resetting offset for partition 
SFTP-Persistance-Topic-0 to position FetchPosition{offset=1, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[confluent-kafka-cp-kafka-1.confluent-kafka-cp-kafka-headless.kafka:9092
 (id: 1 rack: null)], epoch=0}}. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState)
   [2021-03-29 15:03:31,713] WARN JSCH -> Permanently added 'sftp.sftp' (RSA) 
to the list of known hosts. 
(org.apache.camel.component.file.remote.SftpOperations)
   [2021-03-29 15:03:31,713] WARN Server asks for confirmation (yes|no): 
/home/appuser/.ssh/known_hosts does not exist.
   Are you sure you want to create it?. Camel will answer no. 
(org.apache.camel.component.file.remote.SftpOperations)
   [2021-03-29 15:03:32,037] INFO Routes startup summary (total:1 started:1) 
(org.apache.camel.impl.engine.AbstractCamelContext)
   [2021-03-29 15:03:32,038] INFO  Started route1 (sftp://sftp.sftp:22/data/) 
(org.apache.camel.impl.engine.AbstractCamelContext)
   [2021-03-29 15:03:32,038] INFO Apache Camel 3.8.0 (camel-1) started in 
4s306ms (build:98ms init:117ms start:4s91ms) 
(org.apache.camel.impl.engine.AbstractCamelContext)
   [2021-03-29 15:03:32,039] INFO CamelSourceTask connector task started 
(org.apache.camel.kafkaconnector.CamelSourceTask)
   [2021-03-29 15:03:32,040] INFO WorkerSourceTask{id=sftp-connector-0} Source 
task finished initialization and start 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
   [2021-03-29 15:03:33,035] INFO Scheduled task started on:   
sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx
 (org.apache.camel.support.ScheduledPollConsumer)
   [2021-03-29 15:03:33,079] INFO 
SourceRecord{sourcePartition={filename=sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx},
 sourceOffset={position=9F81987AE7F9AE2-0000000000000000}} 
ConnectRecord{topic='SFTP-Topic', kafkaPartition=null, key=null, 
keySchema=null, value=null, valueSchema=null, timestamp=1617030213076, 
headers=ConnectHeaders(headers=[ConnectHeader(key=CamelHeader.CamelFileAbsolute,
 value=false, schema=Schema{BOOLEAN}), 
ConnectHeader(key=CamelHeader.CamelFileAbsolutePath, value=data/Test_2.txt, 
schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileHost, 
value=sftp.sftp, schema=Schema{STRING}), 
ConnectHeader(key=CamelHeader.CamelFileLastModified, value=1616182842000, 
schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileLength, value=72, 
schema=Schema{INT64}), ConnectHeader(key=CamelHeader.CamelFileName, 
value=Test_2.txt, schema=Schema{STRING}), 
ConnectHeader(key=CamelHeader.CamelFileNameConsumed, value=Test_2
 .txt, schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFileNameOnly, 
value=Test_2.txt, schema=Schema{STRING}), 
ConnectHeader(key=CamelHeader.CamelFileParent, value=data, 
schema=Schema{STRING}), ConnectHeader(key=CamelHeader.CamelFilePath, 
value=data//Test_2.txt, schema=Schema{STRING}), 
ConnectHeader(key=CamelHeader.CamelFileRelativePath, value=Test_2.txt, 
schema=Schema{STRING}), ConnectHeader(key=CamelProperty.CamelBatchSize, 
value=1, schema=Schema{INT32}), 
ConnectHeader(key=CamelProperty.CamelUnitOfWorkProcessSync, value=true, 
schema=Schema{BOOLEAN}), ConnectHeader(key=CamelProperty.CamelBatchComplete, 
value=true, schema=Schema{BOOLEAN}), 
ConnectHeader(key=CamelProperty.CamelBatchIndex, value=0, 
schema=Schema{INT32}), ConnectHeader(key=CamelProperty.CamelToEndpoint, 
value=direct://end?pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true&pollingConsumerQueueSize=1000,
 schema=Schema{STRING})])} (org.apache.camel.kafkaconnector.CamelSourceTask)
   [2021-03-29 15:03:33,080] INFO Scheduled task completed on: 
sftp://sftp.sftp:22/data/?binary=false&download=false&include=.*.txt&noop=true&password=xxxxxx
 (org.apache.camel.support.ScheduledPollConsumer)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to