AishD3 opened a new issue, #1602: URL: https://github.com/apache/camel-kafka-connector/issues/1602
Facing below error while using Azure Gen2 Data Lake Sink connector. "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util .concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Exchange delivery has failed!\n\tat org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:210)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t... 11 more\nCaused by: com.azure.storage.file.datalake.models.DataLakeStorageException: If you are using a StorageSharedKeyCredential, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To log the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate method call.\nIf you are using a SAS token, and the server returned an error message that says 'Signature did not match', you can compare the string to sign with the one generated by the SDK. To l og the string to sign, pass in the context key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method call.\nPlease remember to disable 'Azure-Storage-Log-String-To-Sign' before going to production as this string can potentially contain PII.\nStatus code 403, \"{\"error\":{\"code\":\"AuthorizationPermissionMismatch\",\"message\":\"This request is not authorized to perform this operation using this permission.\\nRequestId:63c48fdb-d01f-000d-4c22-6e3603000000\\nTime:2024-03-04T10:57:07.2520699Z\"}}\"\n\tat java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)\n\tat com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:56)\n\tat com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:356)\n\tat com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:127)\n\tat re actor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)\n\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)\n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)\n\tat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)\n\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.ja va:137)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)\n\tat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\ n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666)\n\tat reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)\n\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)\n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap .java:275)\n\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:220)\n\tat reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)\n\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:236)\n\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840)\n\tat reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)\n\tat reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)\n\tat reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)\n\tat reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:478)\n\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)\n\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat com.azure.core.http.netty.implementation.AzureSdkHandler.channelRead(AzureSdkHandler.java:222)\n\tat io.netty.channel.AbstractChanne lHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerCont ext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1385)\n\tat io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1259)\n\tat io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1299)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline .java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)\n\tat io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)\n\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t... 1 more\n\tSuppressed: java.lang.Exception: #block terminated with an error\n\t\tat reactor.core.publisher.Block ingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)\n\t\tat reactor.core.publisher.Mono.block(Mono.java:1742)\n\t\tat com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:146)\n\t\tat com.azure.storage.file.datalake.DataLakeFileClient.uploadWithResponse(DataLakeFileClient.java:432)\n\t\tat org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper.uploadWithResponse(DataLakeFileClientWrapper.java:113)\n\t\tat org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations.upload(DataLakeFileOperations.java:221)\n\t\tat org.apache.camel.component.azure.storage.datalake.DataLakeProducer.process(DataLakeProducer.java:93)\n\t\tat org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)\n\t\tat org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)\n\t\tat org.apache.camel.processor.erro rhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)\n\t\tat org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)\n\t\tat org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)\n\t\tat org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)\n\t\tat org.apache.camel.processor.Pipeline.process(Pipeline.java:163)\n\t\tat org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)\n\t\tat org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)\n\t\tat org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:212)\n\t\tat org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:109)\n\t\tat org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAs yncProcessorAwaitManager.java:81)\n\t\tat org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:106)\n\t\tat org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:180)\n\t\tat org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)\n\t\tat org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)\n\t\tat org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\t\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\t\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\t\tat org.apache.kafka.conne ct.runtime.WorkerTask.run(WorkerTask.java:257)\n\t\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\t\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\t\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\t... 1 more\n" }, Below is connector configuration apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: adls-sink labels: strimzi.io/cluster: cdt-connect-shared-emp spec: class: org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector tasksMax: 2 config: topics: msk.emp.cdtadlsconnectorsink.topic.internal.any.v1 camel.kamelet.azure-storage-datalake-sink.accountName: retinaconnectorpoc camel.kamelet.azure-storage-datalake-sink.clientId: 328xxxxxxxxxxxxxxxxxxxx68792db9 camel.kamelet.azure-storage-datalake-sink.clientSecret: uqKxxxxxxxxxxxxxxxx6N5eS-lcbw camel.kamelet.azure-storage-datalake-sink.tenantId: 05d7xxxxxxxxxxxxx-eb416c396f2d camel.kamelet.azure-storage-datalake-sink.fileSystemName: test azure.datalake.gen2.sas.key value.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks value.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks value.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} value.converter.schema.registry.url: https://cxxxxxxxxxxxxxdigital.net value.converter.schemas.enable: false key.converter.schema.registry.ssl.key.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.keystore.location: /opt/kafka/external-configuration/sr-certs/keystore.jks key.converter.schema.registry.ssl.keystore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/sr-certs/truststore.jks key.converter.schema.registry.ssl.truststore.password: ${env:SSL_TRUSTSTORE_PASSWORD} key.converter.schema.registry.url: https://xxxxxxxxxxxxxxxxxxxigital.net key.converter.schemas.enable: false key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org