This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a273af0262 Add capabilities to ingest from another stream without 
disabling the realtime table (#9289)
a273af0262 is described below

commit a273af026221be6a0d1eb6268f091429ce3fac39
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Mon Aug 29 17:44:51 2022 -0700

    Add capabilities to ingest from another stream without disabling the 
realtime table (#9289)
---
 .../api/resources/PinotRealtimeTableResource.java  |  20 ++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 113 +++++++++++++--------
 .../RealtimeSegmentValidationManager.java          |  10 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   5 +-
 4 files changed, 98 insertions(+), 50 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 7bf4ac4235..715d65bede 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -61,8 +62,7 @@ public class PinotRealtimeTableResource {
   @POST
   @Path("/tables/{tableName}/pauseConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Pause consumption of a realtime table",
-      notes = "Pause the consumption of a realtime table")
+  @ApiOperation(value = "Pause consumption of a realtime table", notes = 
"Pause the consumption of a realtime table")
   public Response pauseConsumption(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
@@ -77,14 +77,22 @@ public class PinotRealtimeTableResource {
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Resume consumption of a realtime table",
-      notes = "Resume the consumption for a realtime table")
+  @ApiOperation(value = "Resume consumption of a realtime table", notes =
+      "Resume the consumption for a realtime table. ConsumeFrom parameter 
indicates from which offsets "
+          + "consumption should resume. If consumeFrom parameter is not 
provided, consumption continues based on the "
+          + "offsets in segment ZK metadata, and in case the offsets are 
already gone, the first available offsets are "
+          + "picked to minimize the data loss.")
   public Response resumeConsumption(
-      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") 
String consumeFrom) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && 
!consumeFrom.equalsIgnoreCase("largest")) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("consumeFrom param '%s' is not valid.", consumeFrom), 
Response.Status.BAD_REQUEST);
+    }
     try {
-      return 
Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+      return 
Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType,
 consumeFrom)).build();
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5cec6b8c4b..762ffdc421 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -873,7 +873,7 @@ public class PinotLLCRealtimeSegmentManager {
    * which means it's manually triggered by admin not by automatic periodic 
task)
    */
   public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
-      boolean recreateDeletedConsumingSegment) {
+      boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -881,17 +881,20 @@ public class PinotLLCRealtimeSegmentManager {
       assert idealState != null;
       boolean isTableEnabled = idealState.isEnabled();
       boolean isTablePaused = isTablePaused(idealState);
+      boolean offsetsHaveToChange = offsetCriteria != null;
       if (isTableEnabled && !isTablePaused) {
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-        // Read the smallest offset when a new partition is detected
+            offsetsHaveToChange
+                ? Collections.emptyList() // offsets from metadata are not 
valid anymore; fetch for all partitions
+                : getPartitionGroupConsumptionStatusList(idealState, 
streamConfig);
         OffsetCriteria originalOffsetCriteria = 
streamConfig.getOffsetCriteria();
-        
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+        // Read the smallest offset when a new partition is detected
+        streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
         streamConfig.setOffsetCriteria(originalOffsetCriteria);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupMetadataList,
-            recreateDeletedConsumingSegment);
+            recreateDeletedConsumingSegment, offsetCriteria);
       } else {
         LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
             realtimeTableName, isTableEnabled, isTablePaused);
@@ -1052,7 +1055,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
       IdealState idealState, List<PartitionGroupMetadata> 
newPartitionGroupMetadataList,
-      boolean recreateDeletedConsumingSegment) {
+      boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
@@ -1074,6 +1077,16 @@ public class PinotLLCRealtimeSegmentManager {
     // Get the latest segment ZK metadata for each partition
     Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = 
getLatestSegmentZKMetadataMap(realtimeTableName);
 
+    // create a map of <parition id, start offset> using data already fetched 
in newPartitionGroupMetadataList
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new 
HashMap<>();
+    for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) {
+      partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+    }
+    Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToSmallestStreamOffset = null;
+    if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
+      partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset;
+    }
+
     // Walk over all partitions that we have metadata for, and repair any 
partitions necessary.
     // Possible things to repair:
     // 1. The latest metadata is in DONE state, but the idealstate says 
segment is CONSUMING:
@@ -1144,10 +1157,18 @@ public class PinotLLCRealtimeSegmentManager {
           // 3. we should never end up with some replicas ONLINE and some 
OFFLINE.
           if (isAllInstancesInState(instanceStateMap, 
SegmentStateModel.OFFLINE)) {
             LOGGER.info("Repairing segment: {} which is OFFLINE for all 
instances in IdealState", latestSegmentName);
-            StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+            if (partitionGroupIdToSmallestStreamOffset == null) {
+              // Smallest offset is fetched from stream once and stored in 
partitionGroupIdToSmallestStreamOffset.
+              // This is to prevent fetching the same info for each and every 
partition group.
+              partitionGroupIdToSmallestStreamOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfig);
+            }
+            StreamPartitionMsgOffset startOffset =
+                selectStartOffset(offsetCriteria, partitionGroupId, 
partitionGroupIdToStartOffset,
+                    partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
+                    latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions, instanceStatesMap,
-                segmentAssignment, instancePartitionsMap, startOffset);
+                newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+                instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               if (recreateDeletedConsumingSegment && 
latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1155,10 +1176,18 @@ public class PinotLLCRealtimeSegmentManager {
                 // If we get here, that means in IdealState, the latest 
segment has all replicas ONLINE.
                 // Create a new IN_PROGRESS segment in PROPERTYSTORE,
                 // add it as CONSUMING segment to IDEALSTATE.
-                StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+                if (partitionGroupIdToSmallestStreamOffset == null) {
+                  // Smallest offset is fetched from stream once and stored in 
partitionGroupIdToSmallestStreamOffset.
+                  // This is to prevent fetching the same info for each and 
every partition group.
+                  partitionGroupIdToSmallestStreamOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfig);
+                }
+                StreamPartitionMsgOffset startOffset =
+                    selectStartOffset(offsetCriteria, partitionGroupId, 
partitionGroupIdToStartOffset,
+                        partitionGroupIdToSmallestStreamOffset, 
tableConfig.getTableName(), offsetFactory,
+                        latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                    partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions, instanceStatesMap,
-                    segmentAssignment, instancePartitionsMap, startOffset);
+                    newPartitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+                    instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error("Got unexpected instance state map: {} for 
segment: {}", instanceStateMap,
                     latestSegmentName);
@@ -1220,7 +1249,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private void createNewConsumingSegment(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
-      SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int 
partitionGroupId,
+      SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
InstancePartitions instancePartitions,
       Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment 
segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
StreamPartitionMsgOffset startOffset) {
@@ -1228,23 +1257,6 @@ public class PinotLLCRealtimeSegmentManager {
     int numPartitions = newPartitionGroupMetadataList.size();
     LLCSegmentName latestLLCSegmentName = new 
LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
     LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-    StreamPartitionMsgOffset partitionGroupSmallestOffset =
-        getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
-
-    if (partitionGroupSmallestOffset != null) {
-      // Start offset must be higher than the start offset of the stream
-      if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
-        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of 
table: {}", startOffset,
-            partitionGroupSmallestOffset, partitionGroupId, 
tableConfig.getTableName());
-        _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-        startOffset = partitionGroupSmallestOffset;
-      }
-    } else {
-      LOGGER.error("Smallest offset for partition: {} of table: {} not found. 
Using startOffset: {}", partitionGroupId,
-          tableConfig.getTableName(), startOffset);
-      _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-    }
-
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new 
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), 
startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs, committingSegmentDescriptor,
@@ -1254,21 +1266,39 @@ public class PinotLLCRealtimeSegmentManager {
         instancePartitionsMap);
   }
 
-  @Nullable
-  private StreamPartitionMsgOffset 
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int 
partitionGroupId) {
+  private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
     OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
     streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
-    List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+    List<PartitionGroupMetadata> partitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfig, 
Collections.emptyList());
     streamConfig.setOffsetCriteria(originalOffsetCriteria);
-    StreamPartitionMsgOffset partitionStartOffset = null;
-    for (PartitionGroupMetadata info : 
smallestOffsetCriteriaPartitionGroupMetadata) {
-      if (info.getPartitionGroupId() == partitionGroupId) {
-        partitionStartOffset = info.getStartOffset();
-        break;
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
+    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+      partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+    }
+    return partitionGroupIdToSmallestOffset;
+  }
+
+  private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria 
offsetCriteria, int partitionGroupId,
+      Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
+      Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToSmallestStreamOffset, String tableName,
+      StreamPartitionMsgOffsetFactory offsetFactory, String 
startOffsetInSegmentZkMetadataStr) {
+    if (offsetCriteria != null) {
+      // use the fetched offset according to offset criteria
+      return partitionGroupIdToStartOffset.get(partitionGroupId);
+    } else {
+      // use offset from segment ZK metadata
+      StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = 
offsetFactory.create(startOffsetInSegmentZkMetadataStr);
+      StreamPartitionMsgOffset streamSmallestOffset = 
partitionGroupIdToSmallestStreamOffset.get(partitionGroupId);
+      // Start offset in ZK must be higher than the start offset of the stream
+      if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) {
+        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of 
table: {}", startOffsetInSegmentZkMetadata,
+            streamSmallestOffset, partitionGroupId, tableName);
+        _controllerMetrics.addMeteredTableValue(tableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+        return streamSmallestOffset;
       }
+      return startOffsetInSegmentZkMetadata;
     }
-    return partitionStartOffset;
   }
 
   private LLCSegmentName getNextLLCSegmentName(LLCSegmentName 
lastLLCSegmentName, long creationTimeMs) {
@@ -1448,12 +1478,15 @@ public class PinotLLCRealtimeSegmentManager {
    *   1) setting "isTablePaused" in ideal states to false and
    *   2) triggering segment validation job to create new consuming segments 
in ideal states
    */
-  public PauseStatus resumeConsumption(String tableNameWithType) {
+  public PauseStatus resumeConsumption(String tableNameWithType, @Nullable 
String offsetCriteria) {
     IdealState updatedIdealState = 
updatePauseStatusInIdealState(tableNameWithType, false);
 
     // trigger realtime segment validation job to resume consumption
     Map<String, String> taskProperties = new HashMap<>();
     
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
 "true");
+    if (offsetCriteria != null) {
+      taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, 
offsetCriteria);
+    }
     _helixResourceManager
         .invokeControllerPeriodicTask(tableNameWithType, 
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index b772fcabd1..90c23361d1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -55,6 +56,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
 
   public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = 
"recreateDeletedConsumingSegment";
+  public static final String OFFSET_CRITERIA = "offsetCriteria";
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -82,7 +84,10 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     }
     context._recreateDeletedConsumingSegment =
         
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
-
+    String offsetCriteriaStr = 
periodicTaskProperties.getProperty(OFFSET_CRITERIA);
+    if (offsetCriteriaStr != null) {
+      context._offsetCriteria = new 
OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
+    }
     return context;
   }
 
@@ -106,7 +111,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
     if (streamConfig.hasLowLevelConsumerType()) {
       _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
-          context._recreateDeletedConsumingSegment);
+          context._recreateDeletedConsumingSegment, context._offsetCriteria);
     }
   }
 
@@ -178,6 +183,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   public static final class Context {
     private boolean _runSegmentLevelValidation;
     private boolean _recreateDeletedConsumingSegment;
+    private OffsetCriteria _offsetCriteria;
   }
 
   @VisibleForTesting
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 14ee8970ec..eee1f2a8a8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -888,7 +888,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
     try {
-      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, false);
+      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, 
segmentManager._streamConfig, false,
+          null);
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -1115,7 +1116,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), false);
+          getNewPartitionGroupMetadataList(_streamConfig, 
Collections.emptyList()), false, null);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to