This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new ef3e3c3 Add startReplaceSegments, endReplaceSegments controller API (#5712) ef3e3c3 is described below commit ef3e3c3b5cbc72df5697136ffba07e87ed26bfb9 Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Thu Jul 23 20:54:47 2020 -0700 Add startReplaceSegments, endReplaceSegments controller API (#5712) * Add startBatchUpload, endBatchUpload controller API These APIs are the building block to achieve atomically replacing m segments into n segments. Segment selection algorithm will be the next PR. 1. Added startBatchUplad, endBatchUPload controller API 2. Added unit tests * Addressing comments * Addressing comments and change the API name to "ReplaceSegments" --- .../pinot/common/lineage/SegmentLineage.java | 37 +++-- .../common/lineage/SegmentLineageAccessHelper.java | 1 + .../pinot/common/lineage/SegmentLineageUtils.java | 40 +++++ .../resources/StartReplaceSegmentsRequest.java | 53 +++++++ .../pinot/common/lineage/SegmentLineageTest.java | 28 +++- .../PinotSegmentUploadDownloadRestletResource.java | 44 ++++++ .../helix/core/PinotHelixResourceManager.java | 163 +++++++++++++++++++++ .../helix/core/PinotHelixResourceManagerTest.java | 143 +++++++++++++++++- 8 files changed, 487 insertions(+), 22 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java index ee61f11..98eb1ae 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java @@ -24,7 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.apache.helix.ZNRecord; @@ -60,14 +60,25 @@ public class SegmentLineage { } /** - * Add lineage entry to the segment lineage metadata + * Add lineage entry to the segment lineage metadata with the given lineage entry id + * @param lineageEntryId the id for the lineage entry * @param lineageEntry a lineage entry - * @return the id for the input lineage entry for the access */ - public String addLineageEntry(LineageEntry lineageEntry) { - String lineageId = generateLineageId(); - _lineageEntries.put(lineageId, lineageEntry); - return lineageId; + public void addLineageEntry(String lineageEntryId, LineageEntry lineageEntry) { + Preconditions.checkArgument(!_lineageEntries.containsKey(lineageEntryId), + String.format("Lineage entry id ('%s') already exists. Please try with the new lineage id", lineageEntryId)); + _lineageEntries.put(lineageEntryId, lineageEntry); + } + + /** + * Update lineage entry to the segment lineage metadata with the given lineage entry id + * @param lineageEntryId the id for the lineage entry to be updated + * @param lineageEntry a lineage entry to be updated + */ + public void updateLineageEntry(String lineageEntryId, LineageEntry lineageEntry) { + Preconditions.checkArgument(_lineageEntries.containsKey(lineageEntryId), + String.format("Lineage entry id ('%s') does not exists. Please try with the valid lineage id", lineageEntryId)); + _lineageEntries.put(lineageEntryId, lineageEntry); } /** @@ -108,8 +119,8 @@ public class SegmentLineage { String lineageId = listField.getKey(); List<String> value = listField.getValue(); Preconditions.checkState(value.size() == 4); - List<String> segmentsFrom = Arrays.asList(value.get(0).split(COMMA_SEPARATOR)); - List<String> segmentsTo = Arrays.asList(value.get(1).split(COMMA_SEPARATOR)); + List<String> segmentsFrom = Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR)); + List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), COMMA_SEPARATOR)); LineageEntryState state = LineageEntryState.valueOf(value.get(2)); long timestamp = Long.parseLong(value.get(3)); lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo, state, timestamp)); @@ -125,8 +136,8 @@ public class SegmentLineage { ZNRecord znRecord = new ZNRecord(_tableNameWithType); for (Map.Entry<String, LineageEntry> entry : _lineageEntries.entrySet()) { LineageEntry lineageEntry = entry.getValue(); - String segmentsFrom = String.join(",", lineageEntry.getSegmentsFrom()); - String segmentsTo = String.join(",", lineageEntry.getSegmentsTo()); + String segmentsFrom = String.join(COMMA_SEPARATOR, lineageEntry.getSegmentsFrom()); + String segmentsTo = String.join(COMMA_SEPARATOR, lineageEntry.getSegmentsTo()); String state = lineageEntry.getState().toString(); String timestamp = Long.toString(lineageEntry.getTimestamp()); List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state, timestamp); @@ -134,8 +145,4 @@ public class SegmentLineage { } return znRecord; } - - private String generateLineageId() { - return UUID.randomUUID().toString(); - } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java index b724c95..de34a08 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java @@ -69,6 +69,7 @@ public class SegmentLineageAccessHelper { * * @param propertyStore a property store * @param segmentLineage a segment lineage + * @param expectedVersion expected version of ZNRecord. -1 for indicating to match any version. * @return true if update is successful. false otherwise. */ public static boolean writeSegmentLineage(ZkHelixPropertyStore<ZNRecord> propertyStore, diff --git a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java new file mode 100644 index 0000000..aa0ead8 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.lineage; + +import java.util.UUID; + + +/** + * Util class for Segment Lineage + */ +public class SegmentLineageUtils { + + private SegmentLineageUtils() { + } + + /** + * Generate lineage entry id using UUID. + * + * @return lineage entry id + */ + public static String generateLineageEntryId() { + return UUID.randomUUID().toString(); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java new file mode 100644 index 0000000..fdf3f56 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.restlet.resources; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; + + +/** + * Request object for startReplaceSegments API. + * + * 1. segmentsFrom : original segments. This field can be empty in case the user tries to upload the original segments + * and wants to achieve the atomic update of multiple segments. + * 2. segmentsTo : merged segments. + */ +public class StartReplaceSegmentsRequest { + private List<String> _segmentsFrom; + private List<String> _segmentsTo; + + public StartReplaceSegmentsRequest(@JsonProperty("segmentsFrom") @Nullable List<String> segmentsFrom, + @JsonProperty("segmentsTo") List<String> segmentsTo) { + _segmentsFrom = (segmentsFrom == null) ? Collections.emptyList() : segmentsFrom; + _segmentsTo = segmentsTo; + Preconditions.checkArgument(segmentsTo != null && !segmentsTo.isEmpty(), "'segmentsTo' should not be null or empty"); + } + + public List<String> getSegmentsFrom() { + return _segmentsFrom; + } + + public List<String> getSegmentsTo() { + return _segmentsTo; + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java index e3ce29e..f890d16 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.lineage; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,9 +29,10 @@ import org.testng.annotations.Test; public class SegmentLineageTest { @Test - public void testSegmentLineage() throws Exception { + public void testSegmentLineage() { SegmentLineage segmentLineage = new SegmentLineage("test_OFFLINE"); - String id = segmentLineage.addLineageEntry( + String id = SegmentLineageUtils.generateLineageEntryId(); + segmentLineage.addLineageEntry(id, new LineageEntry(Arrays.asList("s1", "s2", "s3"), Arrays.asList("s4", "s5"), LineageEntryState.COMPLETED, 11111L)); LineageEntry lineageEntry = segmentLineage.getLineageEntry(id); @@ -39,7 +41,8 @@ public class SegmentLineageTest { Assert.assertEquals(lineageEntry.getState(), LineageEntryState.COMPLETED); Assert.assertEquals(lineageEntry.getTimestamp(), 11111L); - String id2 = segmentLineage.addLineageEntry( + String id2 = SegmentLineageUtils.generateLineageEntryId(); + segmentLineage.addLineageEntry(id2, new LineageEntry(Arrays.asList("s6", "s6", "s8"), Arrays.asList("s9", "s10"), LineageEntryState.COMPLETED, 22222L)); LineageEntry lineageEntry2 = segmentLineage.getLineageEntry(id2); @@ -48,7 +51,8 @@ public class SegmentLineageTest { Assert.assertEquals(lineageEntry2.getState(), LineageEntryState.COMPLETED); Assert.assertEquals(lineageEntry2.getTimestamp(), 22222L); - String id3 = segmentLineage.addLineageEntry( + String id3 = SegmentLineageUtils.generateLineageEntryId(); + segmentLineage.addLineageEntry(id3, new LineageEntry(Arrays.asList("s5", "s9"), Arrays.asList("s11"), LineageEntryState.IN_PROGRESS, 33333L)); LineageEntry lineageEntry3 = segmentLineage.getLineageEntry(id3); Assert.assertEquals(lineageEntry3.getSegmentsFrom(), Arrays.asList("s5", "s9")); @@ -56,6 +60,15 @@ public class SegmentLineageTest { Assert.assertEquals(lineageEntry3.getState(), LineageEntryState.IN_PROGRESS); Assert.assertEquals(lineageEntry3.getTimestamp(), 33333L); + String id4 = SegmentLineageUtils.generateLineageEntryId(); + segmentLineage.addLineageEntry(id4, + new LineageEntry(new ArrayList<>(), Arrays.asList("s12"), LineageEntryState.IN_PROGRESS, 44444L)); + LineageEntry lineageEntry4 = segmentLineage.getLineageEntry(id4); + Assert.assertEquals(lineageEntry4.getSegmentsFrom(), new ArrayList<>()); + Assert.assertEquals(lineageEntry4.getSegmentsTo(), Arrays.asList("s12")); + Assert.assertEquals(lineageEntry4.getState(), LineageEntryState.IN_PROGRESS); + Assert.assertEquals(lineageEntry4.getTimestamp(), 44444L); + // Test the convesion from the segment lineage to the znRecord ZNRecord znRecord = segmentLineage.toZNRecord(); Assert.assertEquals(znRecord.getId(), "test_OFFLINE"); @@ -79,10 +92,17 @@ public class SegmentLineageTest { Assert.assertEquals(entry3.get(2), LineageEntryState.IN_PROGRESS.toString()); Assert.assertEquals(entry3.get(3), Long.toString(33333L)); + List<String> entry4 = listFields.get(id4); + Assert.assertEquals(entry4.get(0), ""); + Assert.assertEquals(entry4.get(1), String.join(",", Arrays.asList("s12"))); + Assert.assertEquals(entry4.get(2), LineageEntryState.IN_PROGRESS.toString()); + Assert.assertEquals(entry4.get(3), Long.toString(44444L)); + // Test the conversion from the znRecord to the segment lineage SegmentLineage segmentLineageFromZNRecord = SegmentLineage.fromZNRecord(segmentLineage.toZNRecord()); Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id), lineageEntry); Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id2), lineageEntry2); Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id3), lineageEntry3); + Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id4), lineageEntry4); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 5afcf9e..eef4f9f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.api.resources; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -60,6 +61,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.StringUtil; @@ -75,10 +77,12 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.metadata.DefaultMetadataExtractor; import org.apache.pinot.core.metadata.MetadataExtractorFactory; import org.apache.pinot.core.segment.index.metadata.SegmentMetadata; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.crypt.PinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.filesystem.PinotFS; import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.glassfish.grizzly.http.server.Request; import org.glassfish.jersey.media.multipart.FormDataBodyPart; @@ -455,6 +459,46 @@ public class PinotSegmentUploadDownloadRestletResource { } } + @POST + @Path("segments/{tableName}/startReplaceSegments") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Start to replace segments", notes = "Start to replace segments") + public Response startReplaceSegments( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + StartReplaceSegmentsRequest startReplaceSegmentsRequest) { + try { + String tableNameWithType = + TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName); + String segmentLineageEntryId = _pinotHelixResourceManager + .startReplaceSegments(tableNameWithType, startReplaceSegmentsRequest.getSegmentsFrom(), + startReplaceSegmentsRequest.getSegmentsTo()); + return Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", segmentLineageEntryId)).build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + + @POST + @Path("segments/{tableName}/endReplaceSegments") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "End to replace segments", notes = "End to replace segments") + public Response endReplaceSegments( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "Segment lineage entry id returned by startReplaceSegments API") @QueryParam("segmentLineageEntryId") String segmentLineageEntryId) { + try { + String tableNameWithType = + TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName); + // Check that the segment lineage entry id is valid + Preconditions.checkNotNull(segmentLineageEntryId, "'segmentLineageEntryId' should not be null"); + _pinotHelixResourceManager.endReplaceSegments(tableNameWithType, segmentLineageEntryId); + return Response.ok().build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, File dstFile) throws IOException { // Read segment file or segment metadata file and directly use that information to update zk diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 5e1ea4b..7d5bd61 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -67,6 +67,11 @@ import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.exception.SchemaNotFoundException; import org.apache.pinot.common.exception.TableNotFoundException; +import org.apache.pinot.common.lineage.LineageEntry; +import org.apache.pinot.common.lineage.LineageEntryState; +import org.apache.pinot.common.lineage.SegmentLineage; +import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; +import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.messages.TableConfigRefreshMessage; @@ -2203,6 +2208,164 @@ public class PinotHelixResourceManager { return tableNamesWithType; } + /** + * Computes the start segment replace phase + * + * 1. Generate a segment lineage entry id + * 2. Compute validation on the user inputs + * 3. Add the new lineage entry to the segment lineage metadata in the property store + * + * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage + * metadata. + * + * @param tableNameWithType Table name with type + * @param segmentsFrom a list of segments to be merged + * @param segmentsTo a list of merged segments + * @return Segment lineage entry id + * + * @throws InvalidConfigException + */ + public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) { + // Create a segment lineage entry id + String segmentLineageEntryId = SegmentLineageUtils.generateLineageEntryId(); + + // Check that all the segments from 'segmentsFrom' exist in the table + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType)); + Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format( + "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', " + + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo, + segmentsForTable)); + + // Check that all the segments from 'segmentTo' does not exist in the table. + Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format( + "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', " + + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, + segmentsTo, segmentsForTable)); + + try { + DEFAULT_RETRY_POLICY.attempt(() -> { + // Fetch the segment lineage metadata + ZNRecord segmentLineageZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); + SegmentLineage segmentLineage; + int expectedVersion = -1; + if (segmentLineageZNRecord == null) { + segmentLineage = new SegmentLineage(tableNameWithType); + } else { + segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); + expectedVersion = segmentLineageZNRecord.getVersion(); + } + + // Check that the segment lineage entry id doesn't exists in the segment lineage + Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId) == null, + String.format("SegmentLineageEntryId (%s) already exists in the segment lineage.", segmentLineageEntryId)); + + for (String entryId : segmentLineage.getLineageEntryIds()) { + LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId); + + // Check that any segment from 'segmentsFrom' does not appear twice. + Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsFrom), String + .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from " + + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType, + lineageEntry.getSegmentsFrom(), segmentsFrom)); + + // Check that merged segments name cannot be the same. + Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format( + "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from " + + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType, + lineageEntry.getSegmentsTo(), segmentsTo)); + } + + // Update lineage entry + segmentLineage.addLineageEntry(segmentLineageEntryId, + new LineageEntry(segmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); + + // Write back to the lineage entry + return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion); + }); + } catch (Exception e) { + String errorMsg = String + .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)", + tableNameWithType, segmentsFrom, segmentsTo); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + + // Only successful attempt can reach here + LOGGER.info("startReplaceSegments is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, " + + "segmentsTo = {}, segmentLineageEntryId = {})", tableNameWithType, segmentsFrom, segmentsTo, + segmentLineageEntryId); + return segmentLineageEntryId; + } + + /** + * Computes the end segment replace phase + * + * 1. Compute validation + * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store + * + * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage + * metadata. + * + * @param tableNameWithType + * @param segmentLineageEntryId + */ + public void endReplaceSegments(String tableNameWithType, String segmentLineageEntryId) { + try { + DEFAULT_RETRY_POLICY.attempt(() -> { + // Fetch the segment lineage metadata + ZNRecord segmentLineageZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType); + SegmentLineage segmentLineage; + int expectedVersion = -1; + Preconditions.checkArgument(segmentLineageZNRecord != null, String + .format("Segment lineage does not exist. (tableNameWithType = '%s', segmentLineageEntryId = '%s')", + tableNameWithType, segmentLineageEntryId)); + segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord); + expectedVersion = segmentLineageZNRecord.getVersion(); + + // Look up the lineage entry based on the segment lineage entry id + LineageEntry lineageEntry = segmentLineage.getLineageEntry(segmentLineageEntryId); + Preconditions.checkArgument(lineageEntry != null, String + .format("Invalid segment lineage entry id (tableName='%s', segmentLineageEntryId='%s')", tableNameWithType, + segmentLineageEntryId)); + + // Check that all the segments from 'segmentsTo' exist in the table + Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType)); + Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format( + "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', " + + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable)); + + // NO-OPS if the entry is already completed + if (lineageEntry.getState() == LineageEntryState.COMPLETED) { + LOGGER.warn( + "Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, segmentLineageEntryId={})", + tableNameWithType, segmentLineageEntryId); + return true; + } + + // Update lineage entry + LineageEntry newLineageEntry = + new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED, + System.currentTimeMillis()); + segmentLineage.updateLineageEntry(segmentLineageEntryId, newLineageEntry); + + // Write back to the lineage entry + return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion); + }); + } catch (Exception e) { + String errorMsg = String + .format("Failed to update the segment lineage. (tableName = %s, segmentLineageEntryId = %s)", + tableNameWithType, segmentLineageEntryId); + LOGGER.error(errorMsg, e); + throw new RuntimeException(errorMsg, e); + } + + // Only successful attempt can reach here + LOGGER.info("endReplaceSegments is successfully processed. (tableNameWithType = {}, segmentLineageEntryId = {})", + tableNameWithType, segmentLineageEntryId); + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index 16cc960..7e5aeb1 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -19,7 +19,9 @@ package org.apache.pinot.controller.helix.core; import com.google.common.collect.BiMap; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -41,6 +43,9 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MasterSlaveSMD; import org.apache.pinot.common.exception.InvalidConfigException; +import org.apache.pinot.common.lineage.LineageEntryState; +import org.apache.pinot.common.lineage.SegmentLineage; +import org.apache.pinot.common.lineage.SegmentLineageAccessHelper; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; @@ -49,6 +54,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.TagOverrideConfig; @@ -80,6 +86,11 @@ public class PinotHelixResourceManagerTest extends ControllerTest { private static final String TABLE_NAME = "testTable"; private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME); private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME); + + private static final String SEGMENTS_REPLACE_TEST_TABLE_NAME = "segmentsReplaceTestTable"; + private static final String OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME = + TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_TABLE_NAME); + private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000; private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000; private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10; @@ -91,7 +102,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest { startZk(); Map<String, Object> properties = getDefaultControllerConfiguration(); properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false); - + startController(properties); addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false); addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT); @@ -201,10 +212,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build(); _helixResourceManager.addTable(tableConfig); - // Check that the BrokerResource ideal state has 3 Brokers assigned to the table IdealState idealState = _helixResourceManager.getHelixAdmin() .getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); - Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(), 3); // Untag all Brokers assigned to broker tenant untagBrokers(); @@ -502,6 +511,134 @@ public class PinotHelixResourceManagerTest extends ControllerTest { } } + @Test + public void testBatchUpload() + throws IOException { + // Create broker tenant on 1 Brokers + Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1, 0, 0); + PinotResourceManagerResponse response = _helixResourceManager.createBrokerTenant(brokerTenant); + Assert.assertTrue(response.isSuccessful()); + + // Create the table + TableConfig tableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME) + .setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build(); + + _helixResourceManager.addTable(tableConfig); + + for (int i = 0; i < 5; i++) { + _helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s" + i), + "downloadUrl"); + } + List<String> segmentsForTable = _helixResourceManager.getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentsForTable.size(), 5); + + List<String> segmentsFrom = new ArrayList<>(); + List<String> segmentsTo = Arrays.asList("s5", "s6"); + + String lineageEntryId = + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + SegmentLineage segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), new ArrayList<>()); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), segmentsTo); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.IN_PROGRESS); + + // Check invalid segmentsTo + segmentsFrom = Arrays.asList("s1", "s2"); + segmentsTo = Arrays.asList("s3", "s4"); + try { + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + } catch (Exception e) { + // expected + } + segmentsFrom = Arrays.asList("s1", "s2"); + segmentsTo = Arrays.asList("s2"); + try { + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + } catch (Exception e) { + // expected + } + + // Check invalid segmentsFrom + segmentsFrom = Arrays.asList("s1", "s6"); + segmentsTo = Arrays.asList("merged1", "merged2"); + try { + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + } catch (Exception e) { + // expected + } + + segmentsFrom = Arrays.asList("s1", "s2"); + String lineageEntryId2 = + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), segmentsFrom); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), segmentsTo); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.IN_PROGRESS); + + try { + _helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo); + } catch (Exception e) { + // expected + } + + // Invalid table + try { + _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId); + } catch (Exception e) { + // expected + } + + // Invalid lineage entry id + try { + _helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "aaa"); + } catch (Exception e) { + // expected + } + + // Merged segment not available in the table + try { + _helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId); + } catch (Exception e) { + // expected + } + + // Try after adding merged segments to the table + _helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s5"), "downloadUrl"); + _helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "s6"), "downloadUrl"); + + _helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId); + segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(), new ArrayList<>()); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(), Arrays.asList("s5", "s6")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), LineageEntryState.COMPLETED); + + _helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged1"), + "downloadUrl"); + _helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "merged2"), + "downloadUrl"); + + _helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2); + segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME); + Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(), Arrays.asList("s1", "s2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(), + Arrays.asList("merged1", "merged2")); + Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), LineageEntryState.COMPLETED); + } + private void untagBrokers() { for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) { _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org