This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bc10ec  CAMEL-16307 azure-storage-datalake:...?operation=getFile 
appends newline to the file content
9bc10ec is described below

commit 9bc10ec094d16d76c864ba719998b9ca0104e837
Author: Peter Palaga <ppal...@redhat.com>
AuthorDate: Thu Mar 4 21:32:16 2021 +0100

    CAMEL-16307 azure-storage-datalake:...?operation=getFile appends newline
    to the file content
---
 components/camel-azure-storage-datalake/pom.xml    |   7 +-
 .../datalake/client/DataLakeFileClientWrapper.java |   5 +-
 .../datalake/integration/DataLakeProducerIT.java   | 113 +++++++++++++++++++++
 .../apache/camel/util/SkipLastByteInputStream.java | 108 ++++++++++++++++++++
 .../camel/util/SkipLastNewlineInputStreamTest.java |  72 +++++++++++++
 5 files changed, 303 insertions(+), 2 deletions(-)

diff --git a/components/camel-azure-storage-datalake/pom.xml 
b/components/camel-azure-storage-datalake/pom.xml
index c9a66ce..0c8c1da 100644
--- a/components/camel-azure-storage-datalake/pom.xml
+++ b/components/camel-azure-storage-datalake/pom.xml
@@ -34,7 +34,7 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-support</artifactId>
         </dependency>
-        
+
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-storage-file-datalake</artifactId>
@@ -85,6 +85,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-test-infra-common</artifactId>
             <version>${project.version}</version>
diff --git 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
index 3d9f7bd..36bb300 100644
--- 
a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
+++ 
b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
@@ -37,6 +37,7 @@ import com.azure.storage.file.datalake.models.PathProperties;
 import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
 import com.azure.storage.file.datalake.options.FileQueryOptions;
 import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
+import org.apache.camel.util.SkipLastByteInputStream;
 
 public class DataLakeFileClientWrapper {
     private final DataLakeFileClient client;
@@ -63,7 +64,9 @@ public class DataLakeFileClientWrapper {
 
     public InputStream openInputStream() {
         String query = "SELECT * from BlobStorage";
-        return client.openQueryInputStream(query);
+        final InputStream sourceInputStream = 
client.openQueryInputStream(query);
+        /* Workaround for 
https://github.com/Azure/azure-sdk-for-java/issues/19612 */
+        return new SkipLastByteInputStream(sourceInputStream, (byte) '\n');
     }
 
     public Response<InputStream> openQueryInputStreamWithResponse(final 
FileQueryOptions queryOptions) {
diff --git 
a/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
 
b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
new file mode 100644
index 0000000..d306d77
--- /dev/null
+++ 
b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.storage.datalake.integration;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+
+import com.azure.storage.file.datalake.models.FileSystemItem;
+import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
+import 
org.apache.camel.component.azure.storage.datalake.DataLakeOperationsDefinition;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataLakeProducerIT extends BaseIT {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeProducerIT.class);
+
+    private String fileName;
+    private byte[] fileContent;
+
+    @BeforeAll
+    public void setup() {
+        final String randomSuffix = 
RandomStringUtils.randomAlphabetic(5).toLowerCase(Locale.ROOT);
+        fileName = "file" + randomSuffix + ".txt";
+        fileContent = ("Hello " + 
randomSuffix).getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Test
+    void testConsumer() throws Exception {
+
+        {
+            @SuppressWarnings("unchecked")
+            List<FileSystemItem> filesystems = template.requestBody(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.listFileSystem),
+                    null,
+                    List.class);
+
+            
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName);
+        }
+
+        template.sendBody(
+                componentUri(fileSystemName, 
DataLakeOperationsDefinition.createFileSystem),
+                null);
+
+        {
+            @SuppressWarnings("unchecked")
+            List<FileSystemItem> filesystems = template.requestBody(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.listFileSystem),
+                    null,
+                    List.class);
+
+            
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).contains(fileSystemName);
+        }
+
+        try {
+            template.sendBodyAndHeader(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.upload),
+                    fileContent,
+                    DataLakeConstants.FILE_NAME,
+                    fileName);
+
+            byte[] actual = template.requestBodyAndHeader(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.getFile),
+                    null,
+                    DataLakeConstants.FILE_NAME,
+                    fileName,
+                    byte[].class);
+
+            Assertions.assertThat(actual).containsExactly(fileContent);
+        } finally {
+            /* Cleanup */
+            template.sendBody(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.deleteFileSystem),
+                    null);
+
+            @SuppressWarnings("unchecked")
+            List<FileSystemItem> filesystems = template.requestBody(
+                    componentUri(fileSystemName, 
DataLakeOperationsDefinition.listFileSystem),
+                    null,
+                    List.class);
+
+            
Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName);
+        }
+
+    }
+
+    private String componentUri(final String filesystem, final 
DataLakeOperationsDefinition operation) {
+        return String.format("azure-storage-datalake://%s%s?operation=%s",
+                service.azureCredentials().accountName(),
+                filesystem == null ? "" : ("/" + filesystem),
+                operation.name());
+    }
+
+}
diff --git 
a/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
 
b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
new file mode 100644
index 0000000..c68233c
--- /dev/null
+++ 
b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An {@link InputStream} that skips the last byte of the underlying delegate 
{@link InputStream} if the last byte is
+ * equal to the given {@code matchLast} value.
+ */
+public class SkipLastByteInputStream extends BufferedInputStream {
+
+    private final byte matchLast;
+
+    public SkipLastByteInputStream(InputStream delegate, byte matchLast) {
+        super(delegate);
+        this.matchLast = matchLast;
+    }
+
+    public SkipLastByteInputStream(InputStream delegate, int size, byte 
matchLast) {
+        super(delegate, size);
+        this.matchLast = matchLast;
+    }
+
+    @Override
+    public int read() throws IOException {
+        int c = super.read();
+        if (c < 0) {
+            return -1;
+        } else if (c == matchLast) {
+            /* look ahead */
+            super.mark(1);
+            int nextC = super.read();
+            if (nextC < 0) {
+                /* matchLast is the last byte */
+                return -1;
+            }
+            super.reset();
+        }
+        return c;
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+
+    @Override
+    public int read(byte[] buffer, int off, int len) throws IOException {
+        final int count = super.read(buffer, off, len);
+        if (count < 0) {
+            return -1;
+        }
+        final int lastIndex = off + count - 1;
+        if (lastIndex >= 0) {
+            byte lastByte = buffer[lastIndex];
+            if (lastByte == matchLast) {
+                /* look ahead */
+                super.mark(1);
+                int nextC = super.read();
+                if (nextC < 0) {
+                    /* matchLast is the last byte - cut it away and do not 
reset */
+                    return count - 1;
+                } else {
+                    super.reset();
+                }
+            }
+        }
+        return count;
+    }
+
+    public boolean markSupported() {
+        /* we do not want callers to mess with mark() and reset() because we 
use it ourselves */
+        return false;
+    }
+
+    @Override
+    public synchronized long skip(long n) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public synchronized void reset() {
+        throw new UnsupportedOperationException();
+    }
+
+}
diff --git 
a/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
 
b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
new file mode 100644
index 0000000..94e3f1a
--- /dev/null
+++ 
b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class SkipLastNewlineInputStreamTest {
+
+    @Test
+    void read() throws IOException {
+
+        assertRead("foo bar\n", "foo bar");
+        assertRead("foo bar\n\n", "foo bar\n");
+        assertRead("foo\nbar\n", "foo\nbar");
+        assertRead("foo\n\nbar\n", "foo\n\nbar");
+        assertRead("", "");
+        assertRead("foo bar", "foo bar");
+        assertRead("\n", "");
+        assertRead("f\n", "f");
+        assertRead("fo\n", "fo");
+        assertRead("foo\n", "foo");
+        assertRead("foo \n", "foo ");
+
+    }
+
+    private void assertRead(String input, String expected) throws IOException {
+
+        try (InputStream in
+                = new SkipLastByteInputStream(new 
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n');
+             ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            int c;
+            while ((c = in.read()) >= 0) {
+                out.write(c);
+            }
+            Assertions.assertEquals(expected, new String(out.toByteArray(), 
StandardCharsets.UTF_8));
+        }
+
+        try (InputStream in
+                = new SkipLastByteInputStream(new 
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n');
+             ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            byte[] buf = new byte[3];
+            int len;
+            while ((len = in.read(buf)) >= 0) {
+                out.write(buf, 0, len);
+            }
+            Assertions.assertEquals(expected, new String(out.toByteArray(), 
StandardCharsets.UTF_8));
+        }
+
+    }
+
+}

Reply via email to