This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ef3e3c3  Add startReplaceSegments, endReplaceSegments controller API 
(#5712)
ef3e3c3 is described below

commit ef3e3c3b5cbc72df5697136ffba07e87ed26bfb9
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Jul 23 20:54:47 2020 -0700

    Add startReplaceSegments, endReplaceSegments controller API (#5712)
    
    * Add startBatchUpload, endBatchUpload controller API
    
    These APIs are the building block to achieve atomically replacing
    m segments into n segments. Segment selection algorithm will be the
    next PR.
    
    1. Added startBatchUplad, endBatchUPload controller API
    2. Added unit tests
    
    * Addressing comments
    
    * Addressing comments and change the API name to "ReplaceSegments"
---
 .../pinot/common/lineage/SegmentLineage.java       |  37 +++--
 .../common/lineage/SegmentLineageAccessHelper.java |   1 +
 .../pinot/common/lineage/SegmentLineageUtils.java  |  40 +++++
 .../resources/StartReplaceSegmentsRequest.java     |  53 +++++++
 .../pinot/common/lineage/SegmentLineageTest.java   |  28 +++-
 .../PinotSegmentUploadDownloadRestletResource.java |  44 ++++++
 .../helix/core/PinotHelixResourceManager.java      | 163 +++++++++++++++++++++
 .../helix/core/PinotHelixResourceManagerTest.java  | 143 +++++++++++++++++-
 8 files changed, 487 insertions(+), 22 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
index ee61f11..98eb1ae 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.ZNRecord;
 
 
@@ -60,14 +60,25 @@ public class SegmentLineage {
   }
 
   /**
-   * Add lineage entry to the segment lineage metadata
+   * Add lineage entry to the segment lineage metadata with the given lineage 
entry id
+   * @param lineageEntryId the id for the lineage entry
    * @param lineageEntry a lineage entry
-   * @return the id for the input lineage entry for the access
    */
-  public String addLineageEntry(LineageEntry lineageEntry) {
-    String lineageId = generateLineageId();
-    _lineageEntries.put(lineageId, lineageEntry);
-    return lineageId;
+  public void addLineageEntry(String lineageEntryId, LineageEntry 
lineageEntry) {
+    Preconditions.checkArgument(!_lineageEntries.containsKey(lineageEntryId),
+        String.format("Lineage entry id ('%s') already exists. Please try with 
the new lineage id", lineageEntryId));
+    _lineageEntries.put(lineageEntryId, lineageEntry);
+  }
+
+  /**
+   * Update lineage entry to the segment lineage metadata with the given 
lineage entry id
+   * @param lineageEntryId the id for the lineage entry to be updated
+   * @param lineageEntry a lineage entry to be updated
+   */
+  public void updateLineageEntry(String lineageEntryId, LineageEntry 
lineageEntry) {
+    Preconditions.checkArgument(_lineageEntries.containsKey(lineageEntryId),
+        String.format("Lineage entry id ('%s') does not exists. Please try 
with the valid lineage id", lineageEntryId));
+    _lineageEntries.put(lineageEntryId, lineageEntry);
   }
 
   /**
@@ -108,8 +119,8 @@ public class SegmentLineage {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
       Preconditions.checkState(value.size() == 4);
-      List<String> segmentsFrom = 
Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
-      List<String> segmentsTo = 
Arrays.asList(value.get(1).split(COMMA_SEPARATOR));
+      List<String> segmentsFrom = 
Arrays.asList(StringUtils.split(value.get(0), COMMA_SEPARATOR));
+      List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), 
COMMA_SEPARATOR));
       LineageEntryState state = LineageEntryState.valueOf(value.get(2));
       long timestamp = Long.parseLong(value.get(3));
       lineageEntries.put(lineageId, new LineageEntry(segmentsFrom, segmentsTo, 
state, timestamp));
@@ -125,8 +136,8 @@ public class SegmentLineage {
     ZNRecord znRecord = new ZNRecord(_tableNameWithType);
     for (Map.Entry<String, LineageEntry> entry : _lineageEntries.entrySet()) {
       LineageEntry lineageEntry = entry.getValue();
-      String segmentsFrom = String.join(",", lineageEntry.getSegmentsFrom());
-      String segmentsTo = String.join(",", lineageEntry.getSegmentsTo());
+      String segmentsFrom = String.join(COMMA_SEPARATOR, 
lineageEntry.getSegmentsFrom());
+      String segmentsTo = String.join(COMMA_SEPARATOR, 
lineageEntry.getSegmentsTo());
       String state = lineageEntry.getState().toString();
       String timestamp = Long.toString(lineageEntry.getTimestamp());
       List<String> listEntry = Arrays.asList(segmentsFrom, segmentsTo, state, 
timestamp);
@@ -134,8 +145,4 @@ public class SegmentLineage {
     }
     return znRecord;
   }
-
-  private String generateLineageId() {
-    return UUID.randomUUID().toString();
-  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java
index b724c95..de34a08 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageAccessHelper.java
@@ -69,6 +69,7 @@ public class SegmentLineageAccessHelper {
    *
    * @param propertyStore a property store
    * @param segmentLineage a segment lineage
+   * @param expectedVersion expected version of ZNRecord. -1 for indicating to 
match any version.
    * @return true if update is successful. false otherwise.
    */
   public static boolean writeSegmentLineage(ZkHelixPropertyStore<ZNRecord> 
propertyStore,
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
new file mode 100644
index 0000000..aa0ead8
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.lineage;
+
+import java.util.UUID;
+
+
+/**
+ * Util class for Segment Lineage
+ */
+public class SegmentLineageUtils {
+
+  private SegmentLineageUtils() {
+  }
+
+  /**
+   * Generate lineage entry id using UUID.
+   *
+   * @return lineage entry id
+   */
+  public static String generateLineageEntryId() {
+    return UUID.randomUUID().toString();
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
new file mode 100644
index 0000000..fdf3f56
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartReplaceSegmentsRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+
+
+/**
+ * Request object for startReplaceSegments API.
+ *
+ * 1. segmentsFrom : original segments. This field can be empty in case the 
user tries to upload the original segments
+ *    and wants to achieve the atomic update of multiple segments.
+ * 2. segmentsTo : merged segments.
+ */
+public class StartReplaceSegmentsRequest {
+  private List<String> _segmentsFrom;
+  private List<String> _segmentsTo;
+
+  public StartReplaceSegmentsRequest(@JsonProperty("segmentsFrom") @Nullable 
List<String> segmentsFrom,
+      @JsonProperty("segmentsTo") List<String> segmentsTo) {
+    _segmentsFrom = (segmentsFrom == null) ? Collections.emptyList() : 
segmentsFrom;
+    _segmentsTo = segmentsTo;
+    Preconditions.checkArgument(segmentsTo != null && !segmentsTo.isEmpty(), 
"'segmentsTo' should not be null or empty");
+  }
+
+  public List<String> getSegmentsFrom() {
+    return _segmentsFrom;
+  }
+
+  public List<String> getSegmentsTo() {
+    return _segmentsTo;
+  }
+}
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
index e3ce29e..f890d16 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/lineage/SegmentLineageTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.lineage;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -28,9 +29,10 @@ import org.testng.annotations.Test;
 
 public class SegmentLineageTest {
   @Test
-  public void testSegmentLineage() throws Exception {
+  public void testSegmentLineage() {
     SegmentLineage segmentLineage = new SegmentLineage("test_OFFLINE");
-    String id = segmentLineage.addLineageEntry(
+    String id = SegmentLineageUtils.generateLineageEntryId();
+    segmentLineage.addLineageEntry(id,
         new LineageEntry(Arrays.asList("s1", "s2", "s3"), Arrays.asList("s4", 
"s5"), LineageEntryState.COMPLETED,
             11111L));
     LineageEntry lineageEntry = segmentLineage.getLineageEntry(id);
@@ -39,7 +41,8 @@ public class SegmentLineageTest {
     Assert.assertEquals(lineageEntry.getState(), LineageEntryState.COMPLETED);
     Assert.assertEquals(lineageEntry.getTimestamp(), 11111L);
 
-    String id2 = segmentLineage.addLineageEntry(
+    String id2 = SegmentLineageUtils.generateLineageEntryId();
+    segmentLineage.addLineageEntry(id2,
         new LineageEntry(Arrays.asList("s6", "s6", "s8"), Arrays.asList("s9", 
"s10"), LineageEntryState.COMPLETED,
             22222L));
     LineageEntry lineageEntry2 = segmentLineage.getLineageEntry(id2);
@@ -48,7 +51,8 @@ public class SegmentLineageTest {
     Assert.assertEquals(lineageEntry2.getState(), LineageEntryState.COMPLETED);
     Assert.assertEquals(lineageEntry2.getTimestamp(), 22222L);
 
-    String id3 = segmentLineage.addLineageEntry(
+    String id3 = SegmentLineageUtils.generateLineageEntryId();
+    segmentLineage.addLineageEntry(id3,
         new LineageEntry(Arrays.asList("s5", "s9"), Arrays.asList("s11"), 
LineageEntryState.IN_PROGRESS, 33333L));
     LineageEntry lineageEntry3 = segmentLineage.getLineageEntry(id3);
     Assert.assertEquals(lineageEntry3.getSegmentsFrom(), Arrays.asList("s5", 
"s9"));
@@ -56,6 +60,15 @@ public class SegmentLineageTest {
     Assert.assertEquals(lineageEntry3.getState(), 
LineageEntryState.IN_PROGRESS);
     Assert.assertEquals(lineageEntry3.getTimestamp(), 33333L);
 
+    String id4 = SegmentLineageUtils.generateLineageEntryId();
+    segmentLineage.addLineageEntry(id4,
+        new LineageEntry(new ArrayList<>(), Arrays.asList("s12"), 
LineageEntryState.IN_PROGRESS, 44444L));
+    LineageEntry lineageEntry4 = segmentLineage.getLineageEntry(id4);
+    Assert.assertEquals(lineageEntry4.getSegmentsFrom(), new ArrayList<>());
+    Assert.assertEquals(lineageEntry4.getSegmentsTo(), Arrays.asList("s12"));
+    Assert.assertEquals(lineageEntry4.getState(), 
LineageEntryState.IN_PROGRESS);
+    Assert.assertEquals(lineageEntry4.getTimestamp(), 44444L);
+
     // Test the convesion from the segment lineage to the znRecord
     ZNRecord znRecord = segmentLineage.toZNRecord();
     Assert.assertEquals(znRecord.getId(), "test_OFFLINE");
@@ -79,10 +92,17 @@ public class SegmentLineageTest {
     Assert.assertEquals(entry3.get(2), 
LineageEntryState.IN_PROGRESS.toString());
     Assert.assertEquals(entry3.get(3), Long.toString(33333L));
 
+    List<String> entry4 = listFields.get(id4);
+    Assert.assertEquals(entry4.get(0), "");
+    Assert.assertEquals(entry4.get(1), String.join(",", Arrays.asList("s12")));
+    Assert.assertEquals(entry4.get(2), 
LineageEntryState.IN_PROGRESS.toString());
+    Assert.assertEquals(entry4.get(3), Long.toString(44444L));
+
     // Test the conversion from the znRecord to the segment lineage
     SegmentLineage segmentLineageFromZNRecord = 
SegmentLineage.fromZNRecord(segmentLineage.toZNRecord());
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id), 
lineageEntry);
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id2), 
lineageEntry2);
     Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id3), 
lineageEntry3);
+    Assert.assertEquals(segmentLineageFromZNRecord.getLineageEntry(id4), 
lineageEntry4);
   }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 5afcf9e..eef4f9f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.api.resources;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
@@ -60,6 +61,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.restlet.resources.StartReplaceSegmentsRequest;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.StringUtil;
@@ -75,10 +77,12 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.metadata.DefaultMetadataExtractor;
 import org.apache.pinot.core.metadata.MetadataExtractorFactory;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.crypt.PinotCrypter;
 import org.apache.pinot.spi.crypt.PinotCrypterFactory;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.glassfish.grizzly.http.server.Request;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
@@ -455,6 +459,46 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startReplaceSegments")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start to replace segments", notes = "Start to replace 
segments")
+  public Response startReplaceSegments(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      StartReplaceSegmentsRequest startReplaceSegmentsRequest) {
+    try {
+      String tableNameWithType =
+          
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      String segmentLineageEntryId = _pinotHelixResourceManager
+          .startReplaceSegments(tableNameWithType, 
startReplaceSegmentsRequest.getSegmentsFrom(),
+              startReplaceSegmentsRequest.getSegmentsTo());
+      return 
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId", 
segmentLineageEntryId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endReplaceSegments")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End to replace segments", notes = "End to replace 
segments")
+  public Response endReplaceSegments(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Segment lineage entry id returned by 
startReplaceSegments API") @QueryParam("segmentLineageEntryId") String 
segmentLineageEntryId) {
+    try {
+      String tableNameWithType =
+          
TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      // Check that the segment lineage entry id is valid
+      Preconditions.checkNotNull(segmentLineageEntryId, 
"'segmentLineageEntryId' should not be null");
+      _pinotHelixResourceManager.endReplaceSegments(tableNameWithType, 
segmentLineageEntryId);
+      return Response.ok().build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   private File createSegmentFileFromMultipart(FormDataMultiPart multiPart, 
File dstFile)
       throws IOException {
     // Read segment file or segment metadata file and directly use that 
information to update zk
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5e1ea4b..7d5bd61 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -67,6 +67,11 @@ import 
org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.exception.SchemaNotFoundException;
 import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
+import org.apache.pinot.common.lineage.SegmentLineageUtils;
 import org.apache.pinot.common.messages.SegmentRefreshMessage;
 import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.messages.TableConfigRefreshMessage;
@@ -2203,6 +2208,164 @@ public class PinotHelixResourceManager {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start segment replace phase
+   *
+   * 1. Generate a segment lineage entry id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the 
property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Segment lineage entry id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startReplaceSegments(String tableNameWithType, List<String> 
segmentsFrom, List<String> segmentsTo) {
+    // Create a segment lineage entry id
+    String segmentLineageEntryId = 
SegmentLineageUtils.generateLineageEntryId();
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), 
String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. 
(tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", 
tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the 
table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, 
segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table 
at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = 
'%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the segment lineage entry id doesn't exists in the 
segment lineage
+        
Preconditions.checkArgument(segmentLineage.getLineageEntry(segmentLineageEntryId)
 == null,
+            String.format("SegmentLineageEntryId (%s) already exists in the 
segment lineage.", segmentLineageEntryId));
+
+        for (String entryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(entryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(),
 segmentsFrom), String
+              .format("It is not allowed to merge segments that are already 
merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = 
%s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), segmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          
Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), 
segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged 
segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", 
tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(segmentLineageEntryId,
+            new LineageEntry(segmentsFrom, segmentsTo, 
LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, 
segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startReplaceSegments is successfully processed. 
(tableNameWithType = {}, segmentsFrom = {}, "
+            + "segmentsTo = {}, segmentLineageEntryId = {})", 
tableNameWithType, segmentsFrom, segmentsTo,
+        segmentLineageEntryId);
+    return segmentLineageEntryId;
+  }
+
+  /**
+   * Computes the end segment replace phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to 
the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for 
achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param segmentLineageEntryId
+   */
+  public void endReplaceSegments(String tableNameWithType, String 
segmentLineageEntryId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            
SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, 
tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = 
'%s', segmentLineageEntryId = '%s')",
+                tableNameWithType, segmentLineageEntryId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the segment lineage entry id
+        LineageEntry lineageEntry = 
segmentLineage.getLineageEntry(segmentLineageEntryId);
+        Preconditions.checkArgument(lineageEntry != null, String
+            .format("Invalid segment lineage entry id (tableName='%s', 
segmentLineageEntryId='%s')", tableNameWithType,
+                segmentLineageEntryId));
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new 
HashSet<>(getSegmentsFor(tableNameWithType));
+        
Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()),
 String.format(
+            "Not all segments from 'segmentsTo' are available in the table. 
(tableName = '%s', segmentsTo = '%s', "
+                + "segmentsFromTable = '%s')", tableNameWithType, 
lineageEntry.getSegmentsTo(), segmentsForTable));
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.warn(
+              "Lineage entry state is already COMPLETED. Nothing to update. 
(tableNameWithType={}, segmentLineageEntryId={})",
+              tableNameWithType, segmentLineageEntryId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), 
lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.updateLineageEntry(segmentLineageEntryId, 
newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, 
segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, 
segmentLineageEntryId = %s)",
+              tableNameWithType, segmentLineageEntryId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("endReplaceSegments is successfully processed. 
(tableNameWithType = {}, segmentLineageEntryId = {})",
+        tableNameWithType, segmentLineageEntryId);
+  }
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 16cc960..7e5aeb1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.controller.helix.core;
 
 import com.google.common.collect.BiMap;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -41,6 +43,9 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -49,6 +54,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TagOverrideConfig;
@@ -80,6 +86,11 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private static final String TABLE_NAME = "testTable";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
+
+  private static final String SEGMENTS_REPLACE_TEST_TABLE_NAME = 
"segmentsReplaceTestTable";
+  private static final String OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME =
+      
TableNameBuilder.OFFLINE.tableNameWithType(SEGMENTS_REPLACE_TEST_TABLE_NAME);
+
   private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
   private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
   private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
@@ -91,7 +102,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     startZk();
     Map<String, Object> properties = getDefaultControllerConfiguration();
     properties.put(ControllerConf.CLUSTER_TENANT_ISOLATION_ENABLE, false);
-    
+
     startController(properties);
     addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false);
     addFakeServerInstancesToAutoJoinHelixCluster(NUM_INSTANCES, false, 
BASE_SERVER_ADMIN_PORT);
@@ -201,10 +212,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
         
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
     _helixResourceManager.addTable(tableConfig);
 
-    // Check that the BrokerResource ideal state has 3 Brokers assigned to the 
table
     IdealState idealState = _helixResourceManager.getHelixAdmin()
         .getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
-    
Assert.assertEquals(idealState.getInstanceStateMap(OFFLINE_TABLE_NAME).size(), 
3);
 
     // Untag all Brokers assigned to broker tenant
     untagBrokers();
@@ -502,6 +511,134 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     }
   }
 
+  @Test
+  public void testBatchUpload()
+      throws IOException {
+    // Create broker tenant on 1 Brokers
+    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 1, 
0, 0);
+    PinotResourceManagerResponse response = 
_helixResourceManager.createBrokerTenant(brokerTenant);
+    Assert.assertTrue(response.isSuccessful());
+
+    // Create the table
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME)
+            
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
+
+    _helixResourceManager.addTable(tableConfig);
+
+    for (int i = 0; i < 5; i++) {
+      
_helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+          
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s" + i),
+          "downloadUrl");
+    }
+    List<String> segmentsForTable = 
_helixResourceManager.getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentsForTable.size(), 5);
+
+    List<String> segmentsFrom = new ArrayList<>();
+    List<String> segmentsTo = Arrays.asList("s5", "s6");
+
+    String lineageEntryId =
+        
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    SegmentLineage segmentLineage =
+        SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
 new ArrayList<>());
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
 segmentsTo);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.IN_PROGRESS);
+
+    // Check invalid segmentsTo
+    segmentsFrom = Arrays.asList("s1", "s2");
+    segmentsTo = Arrays.asList("s3", "s4");
+    try {
+      
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    } catch (Exception e) {
+      // expected
+    }
+    segmentsFrom = Arrays.asList("s1", "s2");
+    segmentsTo = Arrays.asList("s2");
+    try {
+      
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Check invalid segmentsFrom
+    segmentsFrom = Arrays.asList("s1", "s6");
+    segmentsTo = Arrays.asList("merged1", "merged2");
+    try {
+      
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    } catch (Exception e) {
+      // expected
+    }
+
+    segmentsFrom = Arrays.asList("s1", "s2");
+    String lineageEntryId2 =
+        
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    segmentLineage =
+        SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
 segmentsFrom);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
 segmentsTo);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.IN_PROGRESS);
+
+    try {
+      
_helixResourceManager.startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 segmentsFrom, segmentsTo);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Invalid table
+    try {
+      _helixResourceManager.endReplaceSegments(OFFLINE_TABLE_NAME, 
lineageEntryId);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Invalid lineage entry id
+    try {
+      
_helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "aaa");
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Merged segment not available in the table
+    try {
+      
_helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 lineageEntryId);
+    } catch (Exception e) {
+      // expected
+    }
+
+    // Try after adding merged segments to the table
+    
_helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s5"), "downloadUrl");
+    
_helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "s6"), "downloadUrl");
+
+    
_helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 lineageEntryId);
+    segmentLineage =
+        SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsFrom(),
 new ArrayList<>());
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getSegmentsTo(),
 Arrays.asList("s5", "s6"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId).getState(), 
LineageEntryState.COMPLETED);
+
+    
_helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "merged1"),
+        "downloadUrl");
+    
_helixResourceManager.addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
+        
SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 "merged2"),
+        "downloadUrl");
+
+    
_helixResourceManager.endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME,
 lineageEntryId2);
+    segmentLineage =
+        SegmentLineageAccessHelper.getSegmentLineage(_propertyStore, 
OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME);
+    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 2);
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsFrom(),
 Arrays.asList("s1", "s2"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getSegmentsTo(),
+        Arrays.asList("merged1", "merged2"));
+    
Assert.assertEquals(segmentLineage.getLineageEntry(lineageEntryId2).getState(), 
LineageEntryState.COMPLETED);
+  }
+
   private void untagBrokers() {
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
       _helixAdmin.removeInstanceTag(getHelixClusterName(), brokerInstance,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to