AishD3 opened a new issue, #1602:
URL: https://github.com/apache/camel-kafka-connector/issues/1602

   Facing below error while using Azure Gen2 Data Lake Sink connector.
   
   "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask 
due to unrecoverable exception.\n\tat 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat
 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
 java.base/java.util
 .concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat 
java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: 
org.apache.kafka.connect.errors.ConnectException: Exchange delivery has 
failed!\n\tat 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:210)\n\tat 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t...
 11 more\nCaused by: 
com.azure.storage.file.datalake.models.DataLakeStorageException: If you are 
using a StorageSharedKeyCredential, and the server returned an error message 
that says 'Signature did not match', you can compare the string to sign with 
the one generated by the SDK. To log the string to sign, pass in the context 
key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate 
method call.\nIf you are using a SAS token, and the server returned an error 
message that says 'Signature did not match', you can compare the string to sign 
with the one generated by the SDK. To l
 og the string to sign, pass in the context key value pair 
'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method 
call.\nPlease remember to disable 'Azure-Storage-Log-String-To-Sign' before 
going to production as this string can potentially contain PII.\nStatus code 
403, 
\"{\"error\":{\"code\":\"AuthorizationPermissionMismatch\",\"message\":\"This 
request is not authorized to perform this operation using this 
permission.\\nRequestId:63c48fdb-d01f-000d-4c22-6e3603000000\\nTime:2024-03-04T10:57:07.2520699Z\"}}\"\n\tat
 
java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)\n\tat
 
com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:56)\n\tat
 
com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:356)\n\tat
 
com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:127)\n\tat
 re
 
actor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)\n\tat
 
reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)\n\tat
 
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)\n\tat
 
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)\n\tat
 
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)\n\tat
 
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)\n\tat
 reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\tat 
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\tat
 
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n\tat
 
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
 reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.ja
 va:137)\n\tat 
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
 
reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)\n\tat
 
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat
 
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
 
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
 
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
 
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
 
reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)\n\tat
 
reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n\tat
 
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
 reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\
 n\tat 
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
 
reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\tat
 
reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)\n\tat
 
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat
 
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
 
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
 
reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666)\n\tat
 
reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)\n\tat
 
reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)\n\tat
 
reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat
 reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap
 .java:275)\n\tat 
reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)\n\tat
 
reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat
 
reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:220)\n\tat
 
reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)\n\tat
 
reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat
 
reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:236)\n\tat
 
reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat
 
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840)\n\tat
 
reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)\n\tat
 
reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)\n\tat
 
 
reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)\n\tat 
reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)\n\tat 
reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)\n\tat
 
reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:478)\n\tat
 
reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)\n\tat
 
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
 
com.azure.core.http.netty.implementation.AzureSdkHandler.channelRead(AzureSdkHandler.java:222)\n\tat
 io.netty.channel.AbstractChanne
 
lHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
 
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat
 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat
 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat
 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
 io.netty.channel.AbstractChannelHandlerCont
 ext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat 
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1385)\n\tat 
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1259)\n\tat
 io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1299)\n\tat 
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat
 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat
 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline
 .java:1410)\n\tat 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat
 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat
 
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)\n\tat
 
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)\n\tat
 io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)\n\tat 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat
 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t...
 1 more\n\tSuppressed: java.lang.Exception: #block terminated with an 
error\n\t\tat reactor.core.publisher.Block
 ingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)\n\t\tat 
reactor.core.publisher.Mono.block(Mono.java:1742)\n\t\tat 
com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:146)\n\t\tat
 
com.azure.storage.file.datalake.DataLakeFileClient.uploadWithResponse(DataLakeFileClient.java:432)\n\t\tat
 
org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper.uploadWithResponse(DataLakeFileClientWrapper.java:113)\n\t\tat
 
org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations.upload(DataLakeFileOperations.java:221)\n\t\tat
 
org.apache.camel.component.azure.storage.datalake.DataLakeProducer.process(DataLakeProducer.java:93)\n\t\tat
 
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)\n\t\tat
 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)\n\t\tat
 org.apache.camel.processor.erro
 
rhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)\n\t\tat
 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)\n\t\tat
 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)\n\t\tat
 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)\n\t\tat
 org.apache.camel.processor.Pipeline.process(Pipeline.java:163)\n\t\tat 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)\n\t\tat
 
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)\n\t\tat
 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:212)\n\t\tat
 
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:109)\n\t\tat
 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAs
 yncProcessorAwaitManager.java:81)\n\t\tat 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:106)\n\t\tat
 
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:180)\n\t\tat
 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)\n\t\tat
 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)\n\t\tat
 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)\n\t\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\t\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\t\tat
 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\t\tat
 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\t\tat 
org.apache.kafka.conne
 ct.runtime.WorkerTask.run(WorkerTask.java:257)\n\t\tat 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\t\tat
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\t\tat
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\t...
 1 more\n"
       },
   
   Below is connector configuration 
   
   apiVersion: kafka.strimzi.io/v1beta2
   kind: KafkaConnector
   metadata:
     name: adls-sink
     labels:
       strimzi.io/cluster: cdt-connect-shared-emp
   spec:
     class: 
org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector
     tasksMax: 2
     config:
       topics: msk.emp.cdtadlsconnectorsink.topic.internal.any.v1
       camel.kamelet.azure-storage-datalake-sink.accountName: retinaconnectorpoc
       camel.kamelet.azure-storage-datalake-sink.clientId: 
328xxxxxxxxxxxxxxxxxxxx68792db9
       camel.kamelet.azure-storage-datalake-sink.clientSecret: 
uqKxxxxxxxxxxxxxxxx6N5eS-lcbw
       camel.kamelet.azure-storage-datalake-sink.tenantId: 
05d7xxxxxxxxxxxxx-eb416c396f2d
       camel.kamelet.azure-storage-datalake-sink.fileSystemName: test
       azure.datalake.gen2.sas.key
       value.converter.schema.registry.ssl.key.password: 
${env:SSL_TRUSTSTORE_PASSWORD} 
       value.converter.schema.registry.ssl.keystore.location: 
/opt/kafka/external-configuration/sr-certs/keystore.jks 
       value.converter.schema.registry.ssl.keystore.password: 
${env:SSL_TRUSTSTORE_PASSWORD} 
       value.converter.schema.registry.ssl.truststore.location: 
/opt/kafka/external-configuration/sr-certs/truststore.jks
       value.converter.schema.registry.ssl.truststore.password: 
${env:SSL_TRUSTSTORE_PASSWORD} 
       value.converter.schema.registry.url: https://cxxxxxxxxxxxxxdigital.net
       value.converter.schemas.enable: false
       key.converter.schema.registry.ssl.key.password: 
${env:SSL_TRUSTSTORE_PASSWORD} 
       key.converter.schema.registry.ssl.keystore.location: 
/opt/kafka/external-configuration/sr-certs/keystore.jks
       key.converter.schema.registry.ssl.keystore.password: 
${env:SSL_TRUSTSTORE_PASSWORD} 
       key.converter.schema.registry.ssl.truststore.location: 
/opt/kafka/external-configuration/sr-certs/truststore.jks
       key.converter.schema.registry.ssl.truststore.password: 
${env:SSL_TRUSTSTORE_PASSWORD}
       key.converter.schema.registry.url: https://xxxxxxxxxxxxxxxxxxxigital.net
       key.converter.schemas.enable: false 
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: org.apache.kafka.connect.storage.StringConverter
    
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to