abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2668973896


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTablePayload.java:
##########
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyTablePayload {
+
+  private String _sourceClusterUri;
+  private Map<String, String> _headers;
+  /**
+   * Broker tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _BROKER.
+   */
+  private String _brokerTenant;
+  /**
+   * Server tenant for the new table.
+   * MUST NOT contain the tenant type suffix, i.e. _REALTIME or _OFFLINE.
+   */
+  private String _serverTenant;
+
+  /**
+   * The instanceAssignmentConfig's tagPoolConfig contains full tenant name. 
We will use this field to let user specify
+   * the replacement relation from source cluster's full tenant to target 
cluster's full tenant.
+   */
+  private Map<String, String> _tagPoolReplacementMap;
+
+  private boolean _verbose = false;
+  private boolean _dryRun = true;

Review Comment:
   These should be API parameters and not copy payload. The copy payload should 
only contain table config specific key-value pairs only.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalArgumentException("pure offline table copy not 
supported yet");

Review Comment:
   IllegalStateException



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalArgumentException("pure offline table copy not 
supported yet");
+      }
+
+      ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+      tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+      TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+      if (realtimeTableConfig.getUpsertConfig() != null) {
+        throw new IllegalArgumentException("upsert table copy not supported");
+      }
+      LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}", tableName);
+
+      if (copyTablePayload.isDryRun()) {
+        return new CopyTableResponse("dryrun",

Review Comment:
   status can not be dryrun. It should be success or failure.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalArgumentException("pure offline table copy not 
supported yet");
+      }
+
+      ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+      tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+      TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+      if (realtimeTableConfig.getUpsertConfig() != null) {
+        throw new IllegalArgumentException("upsert table copy not supported");
+      }
+      LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}", tableName);
+
+      if (copyTablePayload.isDryRun()) {
+        return new CopyTableResponse("dryrun",
+            "Dry run successful. Fetched schema, table config and watermarks 
from source cluster.", schema,
+            realtimeTableConfig, watermarkInductionResult);
+      }
+
+      List<PartitionGroupInfo> partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+          .map(PartitionGroupInfo::from)
+          .collect(Collectors.toList());
+
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}", 
tableName);
+      // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+      _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+      LOGGER.info("[copyTable] Successfully added table config: {} with 
designated high watermark", tableName);
+      // hybrid table
+      if (hasOffline) {
+        return copyTablePayload.isVerbose() ? new CopyTableResponse("warn",
+            "detect offline too; it will only copy real-time segments", schema,
+            realtimeTableConfig, watermarkInductionResult)
+            : new CopyTableResponse("warn", "detect offline too; it will only 
copy real-time segments", null, null,
+                null);

Review Comment:
   Instead, create response object early on and set specific fields based on 
the logic and set the verbose fields at the end before returning the response. 
This would simplify all the verbose logic.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalArgumentException("pure offline table copy not 
supported yet");
+      }
+
+      ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+      tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+      TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+      if (realtimeTableConfig.getUpsertConfig() != null) {
+        throw new IllegalArgumentException("upsert table copy not supported");
+      }
+      LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}", tableName);
+
+      if (copyTablePayload.isDryRun()) {
+        return new CopyTableResponse("dryrun",
+            "Dry run successful. Fetched schema, table config and watermarks 
from source cluster.", schema,
+            realtimeTableConfig, watermarkInductionResult);
+      }
+
+      List<PartitionGroupInfo> partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+          .map(PartitionGroupInfo::from)
+          .collect(Collectors.toList());
+
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}", 
tableName);
+      // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+      _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+      LOGGER.info("[copyTable] Successfully added table config: {} with 
designated high watermark", tableName);
+      // hybrid table
+      if (hasOffline) {
+        return copyTablePayload.isVerbose() ? new CopyTableResponse("warn",
+            "detect offline too; it will only copy real-time segments", schema,
+            realtimeTableConfig, watermarkInductionResult)
+            : new CopyTableResponse("warn", "detect offline too; it will only 
copy real-time segments", null, null,
+                null);
+      }
+      return copyTablePayload.isVerbose() ? new CopyTableResponse("success", 
"", schema,

Review Comment:
   `msg` parameter, instead of empty, can we put a simple success message.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1903,17 +1915,31 @@ private LLCSegmentName 
getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName,
   private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, 
InstancePartitions instancePartitions,
       int numPartitions, int numReplicas) {
+    return setupNewPartitionGroup(tableConfig, streamConfig, new 
PartitionGroupInfo(partitionGroupMetadata,
+            STARTING_SEQUENCE_NUMBER), creationTimeMs, instancePartitions, 
numPartitions, numReplicas);
+  }
+
+  /**
+   * Sets up a new partition group.
+   * <p>Persists the ZK metadata for the first CONSUMING segment, and returns 
the segment name.
+   */
+  private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
+      PartitionGroupInfo wrapper, long creationTimeMs, InstancePartitions 
instancePartitions,
+      int numPartitions, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
-    int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
-    String startOffset = partitionGroupMetadata.getStartOffset().toString();
+    int partitionGroupId = wrapper.getPartitionGroupId();
+    String startOffset = wrapper.getStartOffset().toString();
     LOGGER.info("Setting up new partition group: {} for table: {}", 
partitionGroupId, realtimeTableName);
+    LOGGER.info("[copyTable] Setting up new partition group: {} for table: {} 
with sequence: {} and startOffset: {}",

Review Comment:
   Remove this log.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +290,136 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+
+      if 
(_pinotHelixResourceManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(tableName))
 != null
+          || 
_pinotHelixResourceManager.getTableConfig(TableNameBuilder.OFFLINE.tableNameWithType(tableName))
 != null) {
+        throw new TableAlreadyExistsException("Table config for " + tableName
+            + " already exists. If this is unexpected, try deleting the table 
to remove all metadata associated"
+            + " with it before attempting to recreate.");
+      }
+
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      ControllerRequestURLBuilder urlBuilder = 
ControllerRequestURLBuilder.baseUrl(sourceControllerUri);
+
+      URI schemaUri = new URI(urlBuilder.forTableSchemaGet(tableName));
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+
+      URI tableConfigUri = new URI(urlBuilder.forTableGet(tableName));
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}", tableName);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      URI watermarkUri = new 
URI(urlBuilder.forConsumerWatermarksGet(tableName));
+      SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+      String watermarkJson = watermarkResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: {}", 
tableName, watermarkJson);
+      WatermarkInductionResult watermarkInductionResult =
+          JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      boolean hasRealtime = tableConfigNode.has(TableType.REALTIME.name());
+      if (hasOffline && !hasRealtime) {
+        throw new IllegalArgumentException("pure offline table copy not 
supported yet");
+      }
+
+      ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+      tweakRealtimeTableConfig(realtimeTableConfigNode, copyTablePayload);
+      TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+      if (realtimeTableConfig.getUpsertConfig() != null) {
+        throw new IllegalArgumentException("upsert table copy not supported");

Review Comment:
   IllegalStateException



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -376,17 +376,29 @@ public void stop() {
    * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
    */
   public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    List<PartitionGroupInfo> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState).stream().map(
+            x -> new PartitionGroupInfo(x, STARTING_SEQUENCE_NUMBER)
+        ).collect(Collectors.toList());
+    setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+  }
+
+  /**
+   * Sets up the initial segments for a new LLC real-time table.
+   * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
+   */
+  public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
+      List<PartitionGroupInfo> consumeMeta) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
+    LOGGER.info("[copyTable] Setting up new LLC table: {}", realtimeTableName);

Review Comment:
   We should revert this change. This is optimizing for the table copy while 
most operations would be regular table creations.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -4715,6 +4739,51 @@ public QueryWorkloadManager getQueryWorkloadManager() {
     return _queryWorkloadManager;
   }
 
+  /**
+   * Retrieves the consumer watermark for a given real-time table.
+   * <p>The watermark represents the next offset to be consumed for each 
partition group.
+   * If the latest segment of a partition is in a DONE state, the watermark is 
the end offset of the completed segment.
+   * Otherwise, it is the start offset of the current consuming segment.
+   *
+   * @param tableName The name of the real-time table (without type suffix).
+   * @return A {@link WatermarkInductionResult} containing a list of 
watermarks for each partition group.
+   * @throws TableNotFoundException if the specified real-time table does not 
exist.
+   * @throws IllegalStateException if the IdealState for the table is not 
found.
+   */
+  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    if (!hasRealtimeTable(tableName)) {
+      throw new TableNotFoundException("Table " + tableNameWithType + " does 
not exist");
+    }
+    TableConfig tableConfig = getTableConfig(tableNameWithType);
+    Preconditions.checkNotNull(tableConfig, "Table " + tableNameWithType + 
"exists but null tableConfig");
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    IdealState idealState = _helixAdmin
+        .getResourceIdealState(getHelixClusterName(), tableNameWithType);
+    if (idealState == null) {
+      throw new IllegalStateException("Null IdealState of the table " + 
tableNameWithType);
+    }
+    List<PartitionGroupConsumptionStatus> lst = _pinotLLCRealtimeSegmentManager
+        .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
+    List<WatermarkInductionResult.Watermark> watermarks = 
lst.stream().map(status -> {
+      int seq = status.getSequenceNumber();
+      long startOffset;
+      try {
+        if ("DONE".equalsIgnoreCase(status.getStatus())) {
+          Preconditions.checkNotNull(status.getEndOffset());
+          startOffset = 
NumberUtils.parseLong(status.getEndOffset().toString());
+          seq++;
+        } else {

Review Comment:
   Add a test case for the else path too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to