This is an automated email from the ASF dual-hosted git repository. tingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7f12105 [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857) 7f12105 is described below commit 7f1210539591e04fbb5fc4ef53e197405a40c727 Author: Ting Chen <tingc...@uber.com> AuthorDate: Wed Sep 2 12:19:09 2020 -0700 [Deepstore by-pass]Add a Deepstore bypass integration test with minor bug fixes. (#5857) * Add an integration test for peer segment download for LLC. Serveral minor fixes. --- .../common/utils/fetcher/HttpSegmentFetcher.java | 15 + .../manager/realtime/PinotFSSegmentUploader.java | 6 +- .../pinot/core/util/PeerServerSegmentFinder.java | 27 +- .../realtime/PinotFSSegmentUploaderTest.java | 4 +- ...rDownloadLLCRealtimeClusterIntegrationTest.java | 379 +++++++++++++++++++++ 5 files changed, 422 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java index 5b04eda..fcaee6c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/HttpSegmentFetcher.java @@ -21,6 +21,7 @@ package org.apache.pinot.common.utils.fetcher; import java.io.File; import java.net.URI; +import java.util.List; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.spi.env.PinotConfiguration; @@ -64,4 +65,18 @@ public class HttpSegmentFetcher extends BaseSegmentFetcher { } }); } + + @Override + public void fetchSegmentToLocalWithoutRetry(URI uri, File dest) + throws Exception { + try { + int statusCode = _httpClient.downloadFile(uri, dest); + _logger + .info("Downloaded segment from: {} to: {} of size: {}; Response status code: {}", uri, dest, dest.length(), + statusCode); + } catch (Exception e) { + _logger.warn("Caught exception while downloading segment from: {} to: {}", uri, dest, e); + throw e; + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java index 75a7fff..263435e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.realtime; import java.io.File; import java.net.URI; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory; /** * A segment uploader which does segment upload to a segment store (with store root dir configured as * _segmentStoreUriStr) using PinotFS within a configurable timeout period. The final segment location would be in the - * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful. + * URI _segmentStoreUriStr/_tableNameWithType/segmentName+random_uuid if successful. */ public class PinotFSSegmentUploader implements SegmentUploader { private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class); @@ -57,7 +58,7 @@ public class PinotFSSegmentUploader implements SegmentUploader { } Callable<URI> uploadTask = () -> { URI destUri = new URI(StringUtil - .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName())); + .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName() + UUID.randomUUID().toString())); try { PinotFS pinotFS = PinotFSFactory.create(new URI(_segmentStoreUriStr).getScheme()); // Check and delete any existing segment file. @@ -74,6 +75,7 @@ public class PinotFSSegmentUploader implements SegmentUploader { Future<URI> future = _executorService.submit(uploadTask); try { URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS); + LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation); return segmentLocation; } catch (InterruptedException e) { LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", segmentName, _segmentStoreUriStr); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java index 76fd152..2747f5b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/PeerServerSegmentFinder.java @@ -35,16 +35,20 @@ import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * PeerServerSegmentFinder discovers all the servers having the input segment in a ONLINE state through external view of - * a Pinot table. + * PeerServerSegmentFinder discovers all the servers having the input segment in an ONLINE state through external view + * of a Pinot table. It performs retries during the discovery to minimize the chance of Helix state propagation delay. */ public class PeerServerSegmentFinder { private static final Logger _logger = LoggerFactory.getLogger(PeerServerSegmentFinder.class); + private static final int MAX_NUM_ATTEMPTS = 5; + private static final int INITIAL_DELAY_MS = 500; + private static final double DELAY_SCALE_FACTOR = 2; /** * @@ -65,13 +69,27 @@ public class PeerServerSegmentFinder { _logger.error("ClusterName not found"); return ListUtils.EMPTY_LIST; } + final List<URI> onlineServerURIs = new ArrayList<>(); + try { + RetryPolicies.exponentialBackoffRetryPolicy(MAX_NUM_ATTEMPTS, INITIAL_DELAY_MS, DELAY_SCALE_FACTOR).attempt(() -> { + getOnlineServersFromExternalView(segmentName, downloadScheme, tableNameWithType, helixAdmin, clusterName, + onlineServerURIs); + return onlineServerURIs.size() > 0; + }); + } catch (Exception e) { + _logger.error("Failure in getting online servers for segment {}", segmentName, e); + } + return onlineServerURIs; + } + + private static void getOnlineServersFromExternalView(String segmentName, String downloadScheme, + String tableNameWithType, HelixAdmin helixAdmin, String clusterName, List<URI> onlineServerURIs) { ExternalView externalViewForResource = HelixHelper.getExternalViewForResource(helixAdmin, clusterName, tableNameWithType); if (externalViewForResource == null) { _logger.warn("External View not found for table {}", tableNameWithType); - return ListUtils.EMPTY_LIST; + return; } - List<URI> onlineServerURIs = new ArrayList<>(); // Find out the ONLINE servers serving the segment. Map<String, String> instanceToStateMap = externalViewForResource.getStateMap(segmentName); for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) { @@ -89,7 +107,6 @@ public class PeerServerSegmentFinder { } } } - return onlineServerURIs; } private static int getServerAdminPort(HelixAdmin helixAdmin, String clusterName, String instanceId) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java index dc93bbf..3ee81e0 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java @@ -60,14 +60,14 @@ public class PinotFSSegmentUploaderTest { public void testSuccessfulUpload() { SegmentUploader segmentUploader = new PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS); URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); - Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())); + Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()))); } @Test public void testSegmentAlreadyExist() { SegmentUploader segmentUploader = new PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS); URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); - Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())); + Assert.assertTrue(segmentURI.toString().startsWith(StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()))); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java new file mode 100644 index 0000000..ea8ea51 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java @@ -0,0 +1,379 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.avro.reflect.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.ExternalView; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.spi.config.table.CompletionConfig; +import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.LocalPinotFS; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.controller.ControllerConf.ALLOW_HLC_TABLES; +import static org.apache.pinot.controller.ControllerConf.ENABLE_SPLIT_COMMIT; +import static org.testng.Assert.assertEquals; + + +/** + * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer and a fake PinotFS as + * the deep store for segments. This test enables the peer to peer segment download scheme to test Pinot servers can + * download segments from peer servers even when the deep store is down. This is done by injection of failures in + * the fake PinotFS segment upload api (i.e., copyFromLocal) for all segments whose seq number mod 5 is 0. + * + * Besides standard tests, it also verifies that + * (1) All the segments on all servers are in either ONLINE or CONSUMING states + * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk. + */ +public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class); + + private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; + private static final long RANDOM_SEED = System.currentTimeMillis(); + private static final Random RANDOM = new Random(RANDOM_SEED); + private static final int NUM_SERVERS = 2; + public static final int UPLOAD_FAILURE_MOD = 5; + + private final boolean _isDirectAlloc = true; //Set as true; otherwise trigger indexing exception. + private final boolean _isConsumerDirConfigured = true; + private final boolean _enableSplitCommit = true; + private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); + private static File PINOT_FS_ROOT_DIR; + + @BeforeClass + @Override + public void setUp() + throws Exception { + System.out.println(String.format( + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableSplitCommit: %s, enableLeadControllerResource: %s", + RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableSplitCommit, _enableLeadControllerResource)); + + PINOT_FS_ROOT_DIR = new File(FileUtils.getTempDirectoryPath() + File.separator + System.currentTimeMillis() + "/"); + Preconditions.checkState(PINOT_FS_ROOT_DIR.mkdir(), "Failed to make a dir for " + PINOT_FS_ROOT_DIR.getPath()); + + // Remove the consumer directory + File consumerDirectory = new File(CONSUMER_DIRECTORY); + if (consumerDirectory.exists()) { + FileUtils.deleteDirectory(consumerDirectory); + } + super.setUp(); + } + + + @Override + public void startServer() { + startServers(NUM_SERVERS); + } + + @Override + public void addTableConfig(TableConfig tableConfig) + throws IOException { + SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig = + new SegmentsValidationAndRetentionConfig(); + CompletionConfig completionConfig = new CompletionConfig("DOWNLOAD"); + segmentsValidationAndRetentionConfig.setCompletionConfig(completionConfig); + segmentsValidationAndRetentionConfig.setReplicasPerPartition(String.valueOf(NUM_SERVERS)); + // Important: enable peer to peer download. + segmentsValidationAndRetentionConfig.setPeerSegmentDownloadScheme("http"); + tableConfig.setValidationConfig(segmentsValidationAndRetentionConfig); + tableConfig.getValidationConfig().setTimeColumnName(this.getTimeColumnName()); + + sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString()); + } + + + @Override + public void startController() { + Map<String, Object> controllerConfig = getDefaultControllerConfiguration(); + controllerConfig.put(ALLOW_HLC_TABLES, false); + controllerConfig.put(ENABLE_SPLIT_COMMIT, _enableSplitCommit); + // Override the data dir config. + controllerConfig.put(ControllerConf.DATA_DIR, "mockfs://" + getHelixClusterName()); + controllerConfig.put(ControllerConf.LOCAL_TEMP_DIR, FileUtils.getTempDirectory().getAbsolutePath()); + // Use the mock PinotFS as the PinotFS. + controllerConfig.put("pinot.controller.storage.factory.class.mockfs", + "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS"); + startController(controllerConfig); + enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); + } + + @Override + protected boolean useLlc() { + return true; + } + + @Nullable + @Override + protected String getLoadMode() { + return "MMAP"; + } + + @Override + protected void overrideServerConf(PinotConfiguration configuration) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); + configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY + ".class.mockfs", + "org.apache.pinot.integration.tests.PeerDownloadLLCRealtimeClusterIntegrationTest$MockPinotFS"); + // Set the segment deep store uri. + configuration.setProperty("pinot.server.instance.segment.store.uri", "mockfs://" + getHelixClusterName()); + // For setting the HDFS segment fetcher. + configuration.setProperty(CommonConstants.Server.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY + ".protocols", "file,http"); + if (_isConsumerDirConfigured) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); + } + if (_enableSplitCommit) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_SPLIT_COMMIT, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_ENABLE_COMMIT_END_WITH_METADATA, true); + } + } + + @Test + public void testConsumerDirectoryExists() { + File consumerDirectory = new File(CONSUMER_DIRECTORY, "mytable_REALTIME"); + assertEquals(consumerDirectory.exists(), _isConsumerDirConfigured, + "The off heap consumer directory does not exist"); + } + + @Test + public void testSegmentFlushSize() { + String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0); + for (String segmentName : segmentNames) { + ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0); + assertEquals(znRecord.getSimpleField(CommonConstants.Segment.FLUSH_THRESHOLD_SIZE), + Integer.toString(getRealtimeSegmentFlushSize() / getNumKafkaPartitions()), + "Segment: " + segmentName + " does not have the expected flush size"); + } + } + + @Test + public void testSegmentDownloadURLs() { + // Verify that all segments of even partition number have empty download url in zk. + String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName()); + List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0); + for (String segmentName : segmentNames) { + ZNRecord znRecord = _propertyStore.get(zkSegmentsPath + "/" + segmentName, null, 0); + String downloadURL = znRecord.getSimpleField("segment.realtime.download.url"); + String numberOfDoc = znRecord.getSimpleField("segment.total.docs"); + if (numberOfDoc.equals("-1")) { + // This is a consuming segment so the download url is null. + Assert.assertNull(downloadURL); + continue; + } + int seqNum = Integer.parseInt(segmentName.split("__")[2]); + if (seqNum % UPLOAD_FAILURE_MOD == 0) { + Assert.assertEquals("", downloadURL); + } else { + Assert.assertTrue(downloadURL.startsWith("mockfs://")); + } + } + } + + @Test + public void testAllSegmentsAreOnlineOrConsuming() { + ExternalView externalView = + HelixHelper.getExternalViewForResource(_helixAdmin, getHelixClusterName(), + TableNameBuilder.REALTIME.tableNameWithType(getTableName())); + Assert.assertEquals("2", externalView.getReplicas()); + // Verify for each segment e, the state of e in its 2 hosting servers is either ONLINE or CONSUMING + for(String segment : externalView.getPartitionSet()) { + Map<String, String> instanceToStateMap = externalView.getStateMap(segment); + Assert.assertEquals(2, instanceToStateMap.size()); + for (Map.Entry<String, String> instanceState : instanceToStateMap.entrySet()) { + Assert.assertTrue("ONLINE".equalsIgnoreCase(instanceState.getValue()) || "CONSUMING" + .equalsIgnoreCase(instanceState.getValue())); + } + } + } + + @Test(expectedExceptions = IOException.class) + public void testAddHLCTableShouldFail() + throws IOException { + TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName("testTable") + .setStreamConfigs(Collections.singletonMap("stream.kafka.consumer.type", "HIGHLEVEL")).build(); + sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString()); + } + + // MockPinotFS is a localPinotFS whose root directory is configured as _basePath; + public static class MockPinotFS extends PinotFS { + LocalPinotFS _localPinotFS = new LocalPinotFS(); + File _basePath; + @Override + public void init(PinotConfiguration config) { + _localPinotFS.init(config); + _basePath = PeerDownloadLLCRealtimeClusterIntegrationTest.PINOT_FS_ROOT_DIR; + } + + @Override + public boolean mkdir(URI uri) + throws IOException { + try { + return _localPinotFS.mkdir(new URI(_basePath + uri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean delete(URI segmentUri, boolean forceDelete) + throws IOException { + try { + return _localPinotFS.delete(new URI(_basePath + segmentUri.getPath()), forceDelete); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean doMove(URI srcUri, URI dstUri) + throws IOException { + try { + LOGGER.warn("Moving from {} to {}", srcUri, dstUri); + return _localPinotFS.doMove(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean copy(URI srcUri, URI dstUri) + throws IOException { + try { + return _localPinotFS.copy(new URI(_basePath + srcUri.getPath()), new URI(_basePath + dstUri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean exists(URI fileUri) + throws IOException { + try { + return _localPinotFS.exists(new URI(_basePath + fileUri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public long length(URI fileUri) + throws IOException { + try { + return _localPinotFS.length(new URI(_basePath + fileUri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public String[] listFiles(URI fileUri, boolean recursive) + throws IOException { + try { + return _localPinotFS.listFiles(new URI(_basePath + fileUri.getPath()), recursive); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public void copyToLocalFile(URI srcUri, File dstFile) + throws Exception { + _localPinotFS.copyToLocalFile(new URI(_basePath + srcUri.getPath()), dstFile); + } + + @Override + public void copyFromLocalFile(File srcFile, URI dstUri) + throws Exception { + // Inject failures for segments whose seq number mod 5 is 0. + if (new LLCSegmentName(srcFile.getName()).getSequenceNumber() % UPLOAD_FAILURE_MOD == 0) { + throw new IllegalArgumentException(srcFile.getAbsolutePath()); + } + try { + _localPinotFS.copyFromLocalFile(srcFile, new URI(_basePath + dstUri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean isDirectory(URI uri) + throws IOException { + try { + return _localPinotFS.isDirectory(new URI(_basePath + uri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public long lastModified(URI uri) + throws IOException { + try { + return _localPinotFS.lastModified(new URI(_basePath + uri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public boolean touch(URI uri) + throws IOException { + try { + return _localPinotFS.touch(new URI(_basePath + uri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + + @Override + public InputStream open(URI uri) + throws IOException { + try { + return _localPinotFS.open(new URI(_basePath + uri.getPath())); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org