This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 9bc10ec CAMEL-16307 azure-storage-datalake:...?operation=getFile appends newline to the file content 9bc10ec is described below commit 9bc10ec094d16d76c864ba719998b9ca0104e837 Author: Peter Palaga <ppal...@redhat.com> AuthorDate: Thu Mar 4 21:32:16 2021 +0100 CAMEL-16307 azure-storage-datalake:...?operation=getFile appends newline to the file content --- components/camel-azure-storage-datalake/pom.xml | 7 +- .../datalake/client/DataLakeFileClientWrapper.java | 5 +- .../datalake/integration/DataLakeProducerIT.java | 113 +++++++++++++++++++++ .../apache/camel/util/SkipLastByteInputStream.java | 108 ++++++++++++++++++++ .../camel/util/SkipLastNewlineInputStreamTest.java | 72 +++++++++++++ 5 files changed, 303 insertions(+), 2 deletions(-) diff --git a/components/camel-azure-storage-datalake/pom.xml b/components/camel-azure-storage-datalake/pom.xml index c9a66ce..0c8c1da 100644 --- a/components/camel-azure-storage-datalake/pom.xml +++ b/components/camel-azure-storage-datalake/pom.xml @@ -34,7 +34,7 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-support</artifactId> </dependency> - + <dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-file-datalake</artifactId> @@ -85,6 +85,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-test-infra-common</artifactId> <version>${project.version}</version> diff --git a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java index 3d9f7bd..36bb300 100644 --- a/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java +++ b/components/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java @@ -37,6 +37,7 @@ import com.azure.storage.file.datalake.models.PathProperties; import com.azure.storage.file.datalake.options.FileParallelUploadOptions; import com.azure.storage.file.datalake.options.FileQueryOptions; import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues; +import org.apache.camel.util.SkipLastByteInputStream; public class DataLakeFileClientWrapper { private final DataLakeFileClient client; @@ -63,7 +64,9 @@ public class DataLakeFileClientWrapper { public InputStream openInputStream() { String query = "SELECT * from BlobStorage"; - return client.openQueryInputStream(query); + final InputStream sourceInputStream = client.openQueryInputStream(query); + /* Workaround for https://github.com/Azure/azure-sdk-for-java/issues/19612 */ + return new SkipLastByteInputStream(sourceInputStream, (byte) '\n'); } public Response<InputStream> openQueryInputStreamWithResponse(final FileQueryOptions queryOptions) { diff --git a/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java new file mode 100644 index 0000000..d306d77 --- /dev/null +++ b/components/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/integration/DataLakeProducerIT.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.storage.datalake.integration; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Locale; + +import com.azure.storage.file.datalake.models.FileSystemItem; +import org.apache.camel.component.azure.storage.datalake.DataLakeConstants; +import org.apache.camel.component.azure.storage.datalake.DataLakeOperationsDefinition; +import org.apache.commons.lang3.RandomStringUtils; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataLakeProducerIT extends BaseIT { + + private static final Logger LOG = LoggerFactory.getLogger(DataLakeProducerIT.class); + + private String fileName; + private byte[] fileContent; + + @BeforeAll + public void setup() { + final String randomSuffix = RandomStringUtils.randomAlphabetic(5).toLowerCase(Locale.ROOT); + fileName = "file" + randomSuffix + ".txt"; + fileContent = ("Hello " + randomSuffix).getBytes(StandardCharsets.UTF_8); + } + + @Test + void testConsumer() throws Exception { + + { + @SuppressWarnings("unchecked") + List<FileSystemItem> filesystems = template.requestBody( + componentUri(fileSystemName, DataLakeOperationsDefinition.listFileSystem), + null, + List.class); + + Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName); + } + + template.sendBody( + componentUri(fileSystemName, DataLakeOperationsDefinition.createFileSystem), + null); + + { + @SuppressWarnings("unchecked") + List<FileSystemItem> filesystems = template.requestBody( + componentUri(fileSystemName, DataLakeOperationsDefinition.listFileSystem), + null, + List.class); + + Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).contains(fileSystemName); + } + + try { + template.sendBodyAndHeader( + componentUri(fileSystemName, DataLakeOperationsDefinition.upload), + fileContent, + DataLakeConstants.FILE_NAME, + fileName); + + byte[] actual = template.requestBodyAndHeader( + componentUri(fileSystemName, DataLakeOperationsDefinition.getFile), + null, + DataLakeConstants.FILE_NAME, + fileName, + byte[].class); + + Assertions.assertThat(actual).containsExactly(fileContent); + } finally { + /* Cleanup */ + template.sendBody( + componentUri(fileSystemName, DataLakeOperationsDefinition.deleteFileSystem), + null); + + @SuppressWarnings("unchecked") + List<FileSystemItem> filesystems = template.requestBody( + componentUri(fileSystemName, DataLakeOperationsDefinition.listFileSystem), + null, + List.class); + + Assertions.assertThat(filesystems.stream().map(FileSystemItem::getName)).doesNotContain(fileSystemName); + } + + } + + private String componentUri(final String filesystem, final DataLakeOperationsDefinition operation) { + return String.format("azure-storage-datalake://%s%s?operation=%s", + service.azureCredentials().accountName(), + filesystem == null ? "" : ("/" + filesystem), + operation.name()); + } + +} diff --git a/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java new file mode 100644 index 0000000..c68233c --- /dev/null +++ b/core/camel-util/src/main/java/org/apache/camel/util/SkipLastByteInputStream.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link InputStream} that skips the last byte of the underlying delegate {@link InputStream} if the last byte is + * equal to the given {@code matchLast} value. + */ +public class SkipLastByteInputStream extends BufferedInputStream { + + private final byte matchLast; + + public SkipLastByteInputStream(InputStream delegate, byte matchLast) { + super(delegate); + this.matchLast = matchLast; + } + + public SkipLastByteInputStream(InputStream delegate, int size, byte matchLast) { + super(delegate, size); + this.matchLast = matchLast; + } + + @Override + public int read() throws IOException { + int c = super.read(); + if (c < 0) { + return -1; + } else if (c == matchLast) { + /* look ahead */ + super.mark(1); + int nextC = super.read(); + if (nextC < 0) { + /* matchLast is the last byte */ + return -1; + } + super.reset(); + } + return c; + } + + @Override + public void close() throws IOException { + super.close(); + } + + @Override + public int read(byte[] buffer, int off, int len) throws IOException { + final int count = super.read(buffer, off, len); + if (count < 0) { + return -1; + } + final int lastIndex = off + count - 1; + if (lastIndex >= 0) { + byte lastByte = buffer[lastIndex]; + if (lastByte == matchLast) { + /* look ahead */ + super.mark(1); + int nextC = super.read(); + if (nextC < 0) { + /* matchLast is the last byte - cut it away and do not reset */ + return count - 1; + } else { + super.reset(); + } + } + } + return count; + } + + public boolean markSupported() { + /* we do not want callers to mess with mark() and reset() because we use it ourselves */ + return false; + } + + @Override + public synchronized long skip(long n) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException(); + } + + @Override + public synchronized void reset() { + throw new UnsupportedOperationException(); + } + +} diff --git a/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java new file mode 100644 index 0000000..94e3f1a --- /dev/null +++ b/core/camel-util/src/test/java/org/apache/camel/util/SkipLastNewlineInputStreamTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SkipLastNewlineInputStreamTest { + + @Test + void read() throws IOException { + + assertRead("foo bar\n", "foo bar"); + assertRead("foo bar\n\n", "foo bar\n"); + assertRead("foo\nbar\n", "foo\nbar"); + assertRead("foo\n\nbar\n", "foo\n\nbar"); + assertRead("", ""); + assertRead("foo bar", "foo bar"); + assertRead("\n", ""); + assertRead("f\n", "f"); + assertRead("fo\n", "fo"); + assertRead("foo\n", "foo"); + assertRead("foo \n", "foo "); + + } + + private void assertRead(String input, String expected) throws IOException { + + try (InputStream in + = new SkipLastByteInputStream(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n'); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + int c; + while ((c = in.read()) >= 0) { + out.write(c); + } + Assertions.assertEquals(expected, new String(out.toByteArray(), StandardCharsets.UTF_8)); + } + + try (InputStream in + = new SkipLastByteInputStream(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)), (byte) '\n'); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + byte[] buf = new byte[3]; + int len; + while ((len = in.read(buf)) >= 0) { + out.write(buf, 0, len); + } + Assertions.assertEquals(expected, new String(out.toByteArray(), StandardCharsets.UTF_8)); + } + + } + +}