This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3e794ca Fixing segment push uri ingestion jobs (#5897) 3e794ca is described below commit 3e794ca0c66ce36fa2efbb91a13edd514cc3b6a8 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Wed Aug 19 00:16:37 2020 -0700 Fixing segment push uri ingestion jobs (#5897) --- .../ingestion/batch/common/SegmentPushUtils.java | 27 ++++++++ ...nUtils.java => SegmentGenerationUtilsTest.java} | 2 +- .../batch/common/SegmentPushUtilsTest.java | 79 ++++++++++++++++++++++ .../hadoop/HadoopSegmentUriPushJobRunner.java | 6 +- .../batch/spark/SparkSegmentUriPushJobRunner.java | 6 +- .../batch/standalone/SegmentUriPushJobRunner.java | 6 +- .../apache/pinot/plugin/filesystem/S3PinotFS.java | 2 + .../pinot/plugin/filesystem/S3PinotFSTest.java | 2 +- 8 files changed, 122 insertions(+), 8 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index 7cb4d55..bc5e91a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -26,6 +26,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; @@ -46,6 +47,32 @@ public class SegmentPushUtils implements Serializable { private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + public static URI generateSegmentTarURI(URI dirURI, URI fileURI, String prefix, String suffix) { + if (StringUtils.isEmpty(prefix) && StringUtils.isEmpty(suffix)) { + // In case the FS doesn't provide scheme or host, will fill it up from dirURI. + String scheme = fileURI.getScheme(); + if (StringUtils.isEmpty(fileURI.getScheme())) { + scheme = dirURI.getScheme(); + } + String host = fileURI.getHost(); + if (StringUtils.isEmpty(fileURI.getHost())) { + host = dirURI.getHost(); + } + int port = fileURI.getPort(); + if (port < 0) { + port = dirURI.getPort(); + } + try { + return new URI(scheme, fileURI.getUserInfo(), host, port, fileURI.getPath(), fileURI.getQuery(), + fileURI.getFragment()); + } catch (URISyntaxException e) { + LOGGER.warn("Unable to generate push uri based from dir URI: {} and file URI: {}, directly return file URI.", dirURI, fileURI); + return fileURI; + } + } + return URI.create(String.format("%s%s%s", prefix, fileURI.getRawPath(), suffix)); + } + public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths) throws RetriableOperationException, AttemptsExceededException { String tableName = spec.getTableSpec().getTableName(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentGenerationUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java similarity index 97% rename from pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentGenerationUtils.java rename to pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java index 518808f..aa65ffe 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentGenerationUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java @@ -23,7 +23,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import java.net.URI; -public class TestSegmentGenerationUtils { +public class SegmentGenerationUtilsTest { @Test public void testExtractFileNameFromURI() { diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java new file mode 100644 index 0000000..ceecc58 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtilsTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.ingestion.batch.common; + +import java.net.URI; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class SegmentPushUtilsTest { + + private static final String RAW_DIRECTORY_PATH = "/rawdata/"; + private static final String RAW_SEGMENT_PATH = "/rawdata/segments/segment.tar.gz"; + + @Test + public void testGenerateSegmentTarURIForS3() { + String s3Base = "s3://org.pinot.data"; + URI dirURI = URI.create(s3Base + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(s3Base + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, s3Base, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), s3Base, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForGCS() { + String gcsBase = "gs://org.pinot.data"; + URI dirURI = URI.create(gcsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(gcsBase + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, gcsBase, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), gcsBase, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForHdfs() { + String hdfsBase = "hdfs://namespace1"; + URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, hdfsBase, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), hdfsBase, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForWebhdfs() { + String hdfsBase = "hdfs://namespace1"; + URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH); + String webHdfsPrefix = "http://foo:1234/webhdfs/v1"; + String webHdfsSuffix = "?op=OPEN"; + URI webHdfsPath = URI.create("http://foo:1234/webhdfs/v1/rawdata/segments/segment.tar.gz?op=OPEN"); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, webHdfsPrefix, webHdfsSuffix), + webHdfsPath); + Assert.assertEquals( + SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), webHdfsPrefix, webHdfsSuffix), + webHdfsPath); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java index e1a2ceb..f451c2c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java @@ -87,8 +87,10 @@ public class HadoopSegmentUriPushJobRunner implements IngestionJobRunner, Serial for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } // Push from driver diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java index d8604f5..824e05a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java @@ -92,8 +92,10 @@ public class SparkSegmentUriPushJobRunner implements IngestionJobRunner, Seriali for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java index afe5430..3172dac 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java @@ -87,8 +87,10 @@ public class SegmentUriPushJobRunner implements IngestionJobRunner { for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } try { diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index 33892be..d70eadc 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -31,6 +31,7 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.slf4j.Logger; @@ -411,6 +412,7 @@ public class S3PinotFS extends PinotFS { throws Exception { LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath()); URI base = getBase(srcUri); + FileUtils.forceMkdir(dstFile.getParentFile()); String prefix = sanitizePath(base.relativize(srcUri).getPath()); GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build(); diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java index 26d5b33..5a681ba 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java @@ -287,7 +287,7 @@ public class S3PinotFSTest { Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length()); - File fileToDownload = new File("copyFile_download.txt"); + File fileToDownload = new File("copyFile_download.txt").getAbsoluteFile(); _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload); Assert.assertEquals(fileToCopy.length(), fileToDownload.length()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org