This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new c01b852 Add table name parameter for segment push (#4454) c01b852 is described below commit c01b85241ea8001ffe39dcd99b214c57520cefac Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Wed Jul 24 21:52:47 2019 -0700 Add table name parameter for segment push (#4454) * Add table name parameter for segment push 1. Add table name parameter for hadoop tar and uri push job 2. Add table name parameter for minion segment conversion executor 3. Make upload segment api to derive the table name from the parameter and then from the segment metadata 4. Remove precondition table name check for SegmentCreationMapper Next step will be deprecating table.name from segment metadata * Changed to add table name parameter instead of header * Addressing comments * Addressing comments for FileUploadDownloadClient --- .../common/utils/FileUploadDownloadClient.java | 43 +++++++++++---- .../PinotSegmentUploadRestletResource.java | 28 +++++++--- .../pinot/hadoop/job/DefaultControllerRestApi.java | 4 +- .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 4 +- .../apache/pinot/hadoop/job/SegmentUriPushJob.java | 4 +- .../hadoop/job/mappers/SegmentCreationMapper.java | 2 +- .../pinot/integration/tests/ClusterTest.java | 4 +- .../ControllerPeriodicTasksIntegrationTests.java | 2 +- .../DeleteAPIHybridClusterIntegrationTest.java | 2 +- .../tests/HybridClusterIntegrationTest.java | 2 +- ...ridClusterIntegrationTestCommandLineRunner.java | 2 +- ...onaryAggregationPlanClusterIntegrationTest.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 4 +- .../tests/PinotURIUploadIntegrationTest.java | 7 ++- .../tests/StarTreeClusterIntegrationTest.java | 2 +- .../tests/StarTreeV2ClusterIntegrationTest.java | 2 +- .../tests/UploadRefreshDeleteIntegrationTest.java | 2 +- .../BaseMultipleSegmentsConversionExecutor.java | 17 +++--- .../BaseSingleSegmentConversionExecutor.java | 13 +++-- .../apache/pinot/perf/BenchmarkQueryEngine.java | 1 - .../command/BackfillDateTimeColumnCommand.java | 2 +- .../tools/admin/command/UploadSegmentCommand.java | 6 ++- .../pinot/tools/backfill/BackfillSegmentUtils.java | 7 +-- .../pinot/tools/perf/PerfBenchmarkDriver.java | 19 ------- .../pinot/tools/perf/PerfBenchmarkDriverConf.java | 63 ---------------------- 25 files changed, 112 insertions(+), 132 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 24fcd95..288e64c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -28,6 +28,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; +import java.util.Arrays; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -55,6 +56,7 @@ import org.apache.http.entity.mime.content.FileBody; import org.apache.http.entity.mime.content.InputStreamBody; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.slf4j.Logger; @@ -78,6 +80,7 @@ public class FileUploadDownloadClient implements Closeable { public static class QueryParameters { public static final String ENABLE_PARALLEL_PUSH_PROTECTION = "enableParallelPushProtection"; + public static final String TABLE_NAME = "tableName"; } public enum FileUploadType { @@ -456,6 +459,11 @@ public class FileUploadDownloadClient implements Closeable { /** * Upload segment with segment file. * + * Note: table name needs to be added as a parameter except for the case where this gets called during realtime + * segment commit protocol. + * + * TODO: fix the realtime segment commit protocol to add table name as a parameter. + * * @param uri URI * @param segmentName Segment name * @param segmentFile Segment file @@ -473,23 +481,30 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Upload segment with segment file using default settings. + * Upload segment with segment file using default settings. Include table name as a request parameter. * * @param uri URI * @param segmentName Segment name * @param segmentFile Segment file + * @param rawTableName Raw table name * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile) + public SimpleHttpResponse uploadSegment(URI uri, String segmentName, File segmentFile, String rawTableName) throws IOException, HttpErrorStatusException { - return uploadSegment(uri, segmentName, segmentFile, null, null, DEFAULT_SOCKET_TIMEOUT_MS); + // Add table name as a request parameter + NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); + return uploadSegment(uri, segmentName, segmentFile, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); } + /** * Upload segment with segment file input stream. * + * Note: table name has to be set as a parameter. + * * @param uri URI * @param segmentName Segment name * @param inputStream Segment file input stream @@ -507,23 +522,29 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Upload segment with segment file input stream using default settings. + * Upload segment with segment file input stream using default settings. Include table name as a request parameter. * * @param uri URI * @param segmentName Segment name * @param inputStream Segment file input stream + * @param rawTableName Raw table name * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream inputStream) + public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream inputStream, String rawTableName) throws IOException, HttpErrorStatusException { - return uploadSegment(uri, segmentName, inputStream, null, null, DEFAULT_SOCKET_TIMEOUT_MS); + // Add table name as a request parameter + NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); + return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); } /** * Send segment uri. * + * Note: table name has to be set as a parameter. + * * @param uri URI * @param downloadUri Segment download uri * @param headers Optional http headers @@ -540,17 +561,21 @@ public class FileUploadDownloadClient implements Closeable { } /** - * Send segment uri using default settings. + * Send segment uri using default settings. Include table name as a request parameter. * * @param uri URI * @param downloadUri Segment download uri + * @param rawTableName Raw table name * @return Response * @throws IOException * @throws HttpErrorStatusException */ - public SimpleHttpResponse sendSegmentUri(URI uri, String downloadUri) + public SimpleHttpResponse sendSegmentUri(URI uri, String downloadUri, String rawTableName) throws IOException, HttpErrorStatusException { - return sendSegmentUri(uri, downloadUri, null, null, DEFAULT_SOCKET_TIMEOUT_MS); + // Add table name as a request parameter + NameValuePair tableNameValuePair = new BasicNameValuePair(QueryParameters.TABLE_NAME, rawTableName); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); + return sendSegmentUri(uri, downloadUri, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java index bbb34ac..6f74f8a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java @@ -230,10 +230,9 @@ public class PinotSegmentUploadRestletResource { "All segments of table " + TableNameBuilder.forType(tableType).tableNameWithType(tableName) + " deleted"); } - private SuccessResponse uploadSegment(FormDataMultiPart multiPart, boolean enableParallelPushProtection, + private SuccessResponse uploadSegment(String tableName, FormDataMultiPart multiPart, boolean enableParallelPushProtection, HttpHeaders headers, Request request, boolean moveSegmentToFinalLocation) { if (headers != null) { - // TODO: Add these headers into open source hadoop jobs LOGGER.info("HTTP Header {} is {}", CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER, headers.getRequestHeader(CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER)); LOGGER.info("HTTP Header {} is {}", CommonConstants.Controller.TABLE_NAME_HTTP_HEADER, @@ -288,9 +287,20 @@ public class PinotSegmentUploadRestletResource { throw new UnsupportedOperationException("Unsupported upload type: " + uploadType); } - String rawTableName = segmentMetadata.getTableName(); + // Fetch segment name String segmentName = segmentMetadata.getName(); + // Fetch raw table name. Try to derive the table name from the parameter and then from segment metadata + String rawTableName; + if (tableName != null && !tableName.isEmpty()) { + rawTableName = TableNameBuilder.extractRawTableName(tableName); + LOGGER.info("Uploading segment {} to table: {}, (Derived from API parameter)", segmentName, tableName); + } else { + // TODO: remove this when we completely deprecate the segment name from segment metadata + rawTableName = segmentMetadata.getTableName(); + LOGGER.info("Uploading a segment {} to table: {}, (Derived from segment metadata)", segmentName, tableName); + } + String zkDownloadUri; // This boolean is here for V1 segment upload, where we keep the segment in the downloadURI sent in the header. // We will deprecate this behavior eventually. @@ -398,10 +408,11 @@ public class PinotSegmentUploadRestletResource { // request if a multipart object is not sent. This endpoint does not move the segment to its final location; // it keeps it at the downloadURI header that is set. We will not support this endpoint going forward. public void uploadSegmentAsJson(String segmentJsonStr, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume(uploadSegment(null, enableParallelPushProtection, headers, request, false)); + asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, false)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -415,10 +426,11 @@ public class PinotSegmentUploadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary") // For the multipart endpoint, we will always move segment to final location regardless of the segment endpoint. public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume(uploadSegment(multiPart, enableParallelPushProtection, headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -434,10 +446,11 @@ public class PinotSegmentUploadRestletResource { // request if a multipart object is not sent. This endpoint is recommended for use. It differs from the first // endpoint in how it moves the segment to a Pinot-determined final directory. public void uploadSegmentAsJsonV2(String segmentJsonStr, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume(uploadSegment(null, enableParallelPushProtection, headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, null, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } @@ -451,10 +464,11 @@ public class PinotSegmentUploadRestletResource { @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary") // This behavior does not differ from v1 of the same endpoint. public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table") @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, @ApiParam(value = "Whether to enable parallel push protection") @DefaultValue("false") @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) boolean enableParallelPushProtection, @Context HttpHeaders headers, @Context Request request, @Suspended final AsyncResponse asyncResponse) { try { - asyncResponse.resume(uploadSegment(multiPart, enableParallelPushProtection, headers, request, true)); + asyncResponse.resume(uploadSegment(tableName, multiPart, enableParallelPushProtection, headers, request, true)); } catch (Throwable t) { asyncResponse.resume(t); } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java index 92e0dee..6feedb8 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java @@ -107,7 +107,7 @@ public class DefaultControllerRestApi implements ControllerRestApi { try (InputStream inputStream = fileSystem.open(tarFilePath)) { SimpleHttpResponse response = _fileUploadDownloadClient.uploadSegment( FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), - segmentName, inputStream); + segmentName, inputStream, _rawTableName); LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); } catch (Exception e) { LOGGER.error("Caught exception while pushing segment: {} to location: {}", segmentName, pushLocation, e); @@ -126,7 +126,7 @@ public class DefaultControllerRestApi implements ControllerRestApi { try { SimpleHttpResponse response = _fileUploadDownloadClient.sendSegmentUri( FileUploadDownloadClient.getUploadSegmentHttpURI(pushLocation.getHost(), pushLocation.getPort()), - segmentUri); + segmentUri, _rawTableName); LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); } catch (Exception e) { LOGGER.error("Caught exception while sending segment URI: {} to location: {}", segmentUri, pushLocation, e); diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java index 7c71fd8..49bccc9 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java @@ -30,6 +30,7 @@ import org.apache.pinot.hadoop.utils.PushLocation; public class SegmentTarPushJob extends BaseSegmentJob { private final Path _segmentPattern; private final List<PushLocation> _pushLocations; + private final String _rawTableName; public SegmentTarPushJob(Properties properties) { super(properties); @@ -37,6 +38,7 @@ public class SegmentTarPushJob extends BaseSegmentJob { String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ','); int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); _pushLocations = PushLocation.getPushLocations(hosts, port); + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); } @Override @@ -53,6 +55,6 @@ public class SegmentTarPushJob extends BaseSegmentJob { } protected ControllerRestApi getControllerRestApi() { - return new DefaultControllerRestApi(_pushLocations, null); + return new DefaultControllerRestApi(_pushLocations, _rawTableName); } } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java index aa81ba8..708fdbf 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentUriPushJob.java @@ -32,6 +32,7 @@ public class SegmentUriPushJob extends BaseSegmentJob { private final String _segmentUriSuffix; private final Path _segmentPattern; private final List<PushLocation> _pushLocations; + private final String _rawTableName; public SegmentUriPushJob(Properties properties) { super(properties); @@ -41,6 +42,7 @@ public class SegmentUriPushJob extends BaseSegmentJob { String[] hosts = StringUtils.split(properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS), ','); int port = Integer.parseInt(properties.getProperty(JobConfigConstants.PUSH_TO_PORT)); _pushLocations = PushLocation.getPushLocations(hosts, port); + _rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME)); } @Override @@ -61,6 +63,6 @@ public class SegmentUriPushJob extends BaseSegmentJob { } protected ControllerRestApi getControllerRestApi() { - return new DefaultControllerRestApi(_pushLocations, null); + return new DefaultControllerRestApi(_pushLocations, _rawTableName); } } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java index ccf752c..013104a 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java @@ -89,7 +89,7 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab _jobConf = context.getConfiguration(); logConfigurations(); - _rawTableName = Preconditions.checkNotNull(_jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME)); + _rawTableName = _jobConf.get(JobConfigConstants.SEGMENT_TABLE_NAME); _schema = Schema.fromString(_jobConf.get(JobConfigConstants.SCHEMA)); // Optional diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 0ec8ef2..8a6aa19 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -273,7 +273,7 @@ public abstract class ClusterTest extends ControllerTest { * * @param segmentDir Segment directory */ - protected void uploadSegments(@Nonnull File segmentDir) + protected void uploadSegments(@Nonnull String tableName, @Nonnull File segmentDir) throws Exception { String[] segmentNames = segmentDir.list(); Assert.assertNotNull(segmentNames); @@ -290,7 +290,7 @@ public abstract class ClusterTest extends ControllerTest { @Override public Integer call() throws Exception { - return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentName, segmentFile) + return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentName, segmentFile, tableName) .getStatusCode(); } })); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java index 374740c..5281911 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java @@ -192,7 +192,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat null, null, null, executor); executor.shutdown(); executor.awaitTermination(10, TimeUnit.MINUTES); - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); waitForAllDocsLoaded(600_000L); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java index 03610a8..0d01e59 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DeleteAPIHybridClusterIntegrationTest.java @@ -314,7 +314,7 @@ public class DeleteAPIHybridClusterIntegrationTest extends HybridClusterIntegrat private void repushOfflineSegments() throws Exception { - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); waitForNumRows(nOfflineRows, CommonConstants.Helix.TableType.OFFLINE); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index d1fd9ff..ab0046c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -97,7 +97,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet setUpTable(avroFiles.get(0)); // Upload all segments - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); // Wait for all documents loaded waitForAllDocsLoaded(600_000L); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java index ec15591..1a26c66 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java @@ -299,7 +299,7 @@ public class HybridClusterIntegrationTestCommandLineRunner { _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName(), null); // Upload all segments - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java index 61dc1b7..91a903f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MetadataAndDictionaryAggregationPlanClusterIntegrationTest.java @@ -138,7 +138,7 @@ public class MetadataAndDictionaryAggregationPlanClusterIntegrationTest extends executor.shutdown(); executor.awaitTermination(10, TimeUnit.MINUTES); - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 75358b1..dce6a5e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -124,7 +124,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet completeTableConfiguration(); // Upload all segments - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); // Set up service status callbacks // NOTE: put this step after creating the table and uploading all segments so that brokers and servers can find the @@ -199,7 +199,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Refresh time is when the segment gets refreshed (existing segment) long refreshTime = segmentZKMetadata.getRefreshTime(); - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); for (OfflineSegmentZKMetadata segmentZKMetadataAfterUpload : _helixResourceManager .getOfflineSegmentMetadata(getTableName())) { // Only check one segment diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java index 9debc81..970100a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java @@ -43,9 +43,11 @@ import org.apache.commons.io.FileUtils; import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.FileUploadDownloadClient; @@ -252,9 +254,12 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet @Override public Integer call() throws Exception { + List<NameValuePair> parameters = Collections.singletonList( + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, getTableName())); + return fileUploadDownloadClient .sendSegmentUri(FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort), - downloadUri, httpHeaders, null, 60 * 1000).getStatusCode(); + downloadUri, httpHeaders, parameters, 60 * 1000).getStatusCode(); } })); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java index 06ea600..d69073b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java @@ -135,7 +135,7 @@ public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest { executor.shutdown(); executor.awaitTermination(10, TimeUnit.MINUTES); - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java index 5485dc3..ea3a07b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeV2ClusterIntegrationTest.java @@ -119,7 +119,7 @@ public class StarTreeV2ClusterIntegrationTest extends StarTreeClusterIntegration executor.shutdown(); executor.awaitTermination(10, TimeUnit.MINUTES); - uploadSegments(_tarDir); + uploadSegments(getTableName(), _tarDir); } private static StarTreeV2BuilderConfig getBuilderConfig(List<String> dimensions, List<String> metrics) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java index 523b42f..fabceae 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UploadRefreshDeleteIntegrationTest.java @@ -121,7 +121,7 @@ public class UploadRefreshDeleteIntegrationTest extends BaseClusterIntegrationTe executor.shutdown(); executor.awaitTermination(1L, TimeUnit.MINUTES); - uploadSegments(segmentTarDir); + uploadSegments(getTableName(), segmentTarDir); FileUtils.forceDelete(avroFile); FileUtils.forceDelete(segmentTarDir); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java index 54e7d53..bbf119e 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java @@ -21,7 +21,7 @@ package org.apache.pinot.minion.executor; import com.google.common.base.Preconditions; import java.io.File; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils; import org.apache.http.NameValuePair; import org.apache.http.message.BasicNameValuePair; import org.apache.pinot.common.config.PinotTaskConfig; +import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.segment.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; @@ -132,11 +133,15 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe String resultSegmentName = segmentConversionResults.get(i).getSegmentName(); // Set parameters for upload request - List<NameValuePair> parameters = Collections.singletonList( - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true")); - - SegmentConversionUtils.uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL, - convertedTarredSegmentFile); + NameValuePair enableParallelPushProtectionParameter = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"); + NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, + TableNameBuilder.extractRawTableName(tableNameWithType)); + List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter); + + SegmentConversionUtils + .uploadSegment(configs, null, parameters, tableNameWithType, resultSegmentName, uploadURL, + convertedTarredSegmentFile); } String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java index 2743978..20f44de 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseSingleSegmentConversionExecutor.java @@ -21,7 +21,6 @@ package org.apache.pinot.minion.executor; import com.google.common.base.Preconditions; import java.io.File; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import javax.annotation.Nonnull; @@ -32,8 +31,10 @@ import org.apache.http.NameValuePair; import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicNameValuePair; import org.apache.pinot.common.config.PinotTaskConfig; +import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; import org.apache.pinot.common.segment.fetcher.SegmentFetcherFactory; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.common.MinionConstants; @@ -135,11 +136,15 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, segmentZKMetadataCustomMapModifier.toJsonString()); - List<Header> httpHeaders = Arrays.asList(ifMatchHeader, segmentZKMetadataCustomMapModifierHeader); + List<Header> httpHeaders = + Arrays.asList(ifMatchHeader, segmentZKMetadataCustomMapModifierHeader); // Set parameters for upload request. - List<NameValuePair> parameters = Collections.singletonList( - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true")); + NameValuePair enableParallelPushProtectionParameter = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"); + NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, + TableNameBuilder.extractRawTableName(tableNameWithType)); + List<NameValuePair> parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter); // Upload the tarred segment SegmentConversionUtils diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueryEngine.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueryEngine.java index 6524df5..fcceb24 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueryEngine.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkQueryEngine.java @@ -94,7 +94,6 @@ public class BenchmarkQueryEngine { conf.setStartController(true); conf.setStartServer(true); conf.setStartZookeeper(true); - conf.setUploadIndexes(false); conf.setRunQueries(false); conf.setServerInstanceSegmentTarDir(null); conf.setServerInstanceDataDir(DATA_DIRECTORY); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java index 7aa5cd7..a8ad34f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java @@ -208,7 +208,7 @@ public class BackfillDateTimeColumnCommand extends AbstractBaseAdminCommand impl // upload segment LOGGER.info("Uploading segment {} to host: {} port: {}", segmentName, _controllerHost, _controllerPort); - backfillSegmentUtils.uploadSegment(segmentName, new File(outputDir, segmentName), outputDir); + backfillSegmentUtils.uploadSegment(_tableName, segmentName, new File(outputDir, segmentName), outputDir); } // verify that all segments exist diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java index 46ee4f7..a36e811 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/UploadSegmentCommand.java @@ -50,6 +50,10 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co @Option(name = "-segmentDir", required = true, metaVar = "<string>", usage = "Path to segment directory.") private String _segmentDir = null; + // TODO: make this as a required field once we deprecate the table name from segment metadata + @Option(name = "-tableName", required = false, metaVar = "<string>", usage = "Table name to upload.") + private String _tableName = null; + @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.") private boolean _help = false; @@ -127,7 +131,7 @@ public class UploadSegmentCommand extends AbstractBaseAdminCommand implements Co } LOGGER.info("Uploading segment {}", tgzFile.getName()); - fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, tgzFile.getName(), tgzFile); + fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, tgzFile.getName(), tgzFile, _tableName); } } finally { // Delete the temporary working directory. diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java index 8ee0a3a..fe79a8d 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/backfill/BackfillSegmentUtils.java @@ -159,7 +159,7 @@ public class BackfillSegmentUtils { /** * Uploads the segment tar to the controller. */ - public boolean uploadSegment(String segmentName, File segmentDir, File outputDir) { + public boolean uploadSegment(String rawTableName, String segmentName, File segmentDir, File outputDir) { boolean success = true; File segmentTar = new File(outputDir, segmentName + TAR_SUFFIX); @@ -170,12 +170,13 @@ public class BackfillSegmentUtils { try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { SimpleHttpResponse response = fileUploadDownloadClient.uploadSegment( FileUploadDownloadClient.getUploadSegmentHttpURI(_controllerHost, Integer.parseInt(_controllerPort)), - segmentName, segmentTar); + segmentName, segmentTar, rawTableName); int statusCode = response.getStatusCode(); if (statusCode != HttpStatus.SC_OK) { success = false; } - LOGGER.info("Uploaded segment: {} and got response {}: {}", segmentName, statusCode, response.getResponse()); + LOGGER.info("Uploaded segment: {} to table {} and got response {}: {}", segmentName, rawTableName, statusCode, + response.getResponse()); } } catch (Exception e) { LOGGER.error("Exception in segment upload {}", segmentTar, e); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java index 967f006..be7a679 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java @@ -170,7 +170,6 @@ public class PerfBenchmarkDriver { startServer(); startHelixResourceManager(); configureResources(); - uploadIndexSegments(); waitForExternalViewUpdate(_zkAddress, _clusterName, 60 * 1000L); postQueries(); } @@ -296,24 +295,6 @@ public class PerfBenchmarkDriver { _helixResourceManager.addTable(tableConfig); } - private void uploadIndexSegments() - throws Exception { - if (!_conf.isUploadIndexes()) { - LOGGER.info("Skipping upload index segments step."); - return; - } - String indexDirectory = _conf.getIndexDirectory(); - File[] indexFiles = new File(indexDirectory).listFiles(); - Preconditions.checkNotNull(indexFiles); - try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(_controllerHost, _controllerPort); - for (File indexFile : indexFiles) { - LOGGER.info("Uploading index segment: {}", indexFile.getAbsolutePath()); - fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, indexFile.getName(), indexFile); - } - } - } - /** * Add segment while segment data is already in server data directory. * diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java index 0f865d3..c7436eb 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java @@ -57,20 +57,6 @@ public class PerfBenchmarkDriverConf { String brokerHost = "localhost"; boolean startBroker = true; - //data configuration - //where is the raw data, mandatory if regenerateIndex = true - boolean uploadIndexes = false; - - String rawDataDirectory; - - boolean regenerateIndex = false; - //if the regenerateIndex is true, the indexes under this directory will be deleted and recreated from rawData. - String indexDirectory; - - //by default all files under indexDirectory will be uploaded to the controller if its not already present. - //If the indexes are already uploaded, nothing - boolean forceReloadIndex = false; - //resource configuration boolean configureResources = false; @@ -162,30 +148,6 @@ public class PerfBenchmarkDriverConf { return startBroker; } - public boolean isUploadIndexes() { - return uploadIndexes; - } - - public void setUploadIndexes(boolean uploadIndexes) { - this.uploadIndexes = uploadIndexes; - } - - public String getRawDataDirectory() { - return rawDataDirectory; - } - - public boolean isRegenerateIndex() { - return regenerateIndex; - } - - public String getIndexDirectory() { - return indexDirectory; - } - - public boolean isForceReloadIndex() { - return forceReloadIndex; - } - public String getQueriesDirectory() { return queriesDirectory; } @@ -262,22 +224,6 @@ public class PerfBenchmarkDriverConf { this.startBroker = startBroker; } - public void setRawDataDirectory(String rawDataDirectory) { - this.rawDataDirectory = rawDataDirectory; - } - - public void setRegenerateIndex(boolean regenerateIndex) { - this.regenerateIndex = regenerateIndex; - } - - public void setIndexDirectory(String indexDirectory) { - this.indexDirectory = indexDirectory; - } - - public void setForceReloadIndex(boolean forceReloadIndex) { - this.forceReloadIndex = forceReloadIndex; - } - public boolean isRunQueries() { return runQueries; } @@ -317,13 +263,4 @@ public class PerfBenchmarkDriverConf { public void setSchemaFileNamePath(String schemaFileNamePath) { this.schemaFileNamePath = schemaFileNamePath; } - - public static void main(String[] args) { - DumperOptions options = new DumperOptions(); - options.setIndent(4); - options.setDefaultFlowStyle(FlowStyle.BLOCK); - Yaml yaml = new Yaml(options); - String dump = yaml.dump(new PerfBenchmarkDriverConf()); - System.out.println(dump); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org