abhishekbafna commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2664718282


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");

Review Comment:
   Use `ControllerRequestURLBuilder` for building URLs.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+        TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+        if (realtimeTableConfig.getUpsertConfig() != null) {
+          return new CopyTableResponse("fail", "upsert table copy not 
supported");
+        }

Review Comment:
   Lets get the table config at the beginning and do all the validation. We 
have already added the schema for the upsert table, which is not desirable.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);

Review Comment:
   Same, use `ControllerRequestURLBuilder`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);

Review Comment:
   Instead pass the `CopyTablePayload` so that any new parameter in the future 
would not require method signature change.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   At high level, the API should be like:
   * validate input parameters
   * get data from the source cluster (schema and table config)
   * modify the table config and schema if needed
   * perform any validation needed before making any write operation on the 
cluster
   * perform write operations



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(

Review Comment:
   It would make to sense to support the dryrun mode directly on the API so 
that users can get and see the final result for the API.
   
   Making two or more API requests and then manually verifying them is not same 
as what the code is doing. This should be supported programmatically.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);

Review Comment:
   Avoid logging table config, as ingestion config contains passwords and 
secrets, which will be logged in the logs.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+        TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+        if (realtimeTableConfig.getUpsertConfig() != null) {
+          return new CopyTableResponse("fail", "upsert table copy not 
supported");
+        }
+        LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+            tableName, realtimeTableConfig.toString());
+
+        URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+        SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+            HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+        String watermarkJson = watermarkResponse.getResponse();
+        LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+        WatermarkInductionResult watermarkInductionResult =
+            JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+        List<PartitionGroupInfo> partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+            .map(PartitionGroupInfo::from)
+            .collect(Collectors.toList());
+
+        // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+        _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+        if (hasOffline) {
+          return new CopyTableResponse("warn", "detect offline; copy real-time 
segments only");
+        }
+        return new CopyTableResponse("success", "");
+      }
+      return new CopyTableResponse("fail", "copying offline table's segments 
is not supported yet");

Review Comment:
   This would be a 200 OK response. Instead should send a failure message with 
501 code or something.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/WatermarkInductionResult.java:
##########
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Represents the result of a watermark induction process, containing a list 
of watermarks.
+ */
+public class WatermarkInductionResult {
+
+  private List<Watermark> _watermarks;
+
+  /**
+   * The @JsonCreator annotation marks this constructor to be used for 
deserializing
+   * a JSON array back into a WaterMarks object.
+   *
+   * @param watermarks The list of watermarks.
+   */
+  @JsonCreator
+  public WatermarkInductionResult(@JsonProperty("watermarks") List<Watermark> 
watermarks) {
+    _watermarks = watermarks;
+  }
+
+  /**
+   * Gets the list of watermarks.
+   *
+   * @return The list of watermarks.
+   */
+  @JsonGetter("watermarks")
+  public List<Watermark> getWatermarks() {
+    return _watermarks;
+  }
+
+  /**
+   * Represents a single watermark with its partitionGroupId, sequence, and 
offset.
+   */
+  public static class Watermark {
+    private long _partitionGroupId;
+    private long _sequenceNumber;
+    private long _offset;

Review Comment:
   Why this can not be `int`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1789,10 +1807,16 @@ public void addTable(TableConfig tableConfig)
         // Add ideal state
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, 
idealState);
         LOGGER.info("Adding table {}: Added ideal state for offline table", 
tableNameWithType);
-      } else {
+      } else if (consumeMeta == null || consumeMeta.isEmpty()) {
         // Add ideal state with the first CONSUMING segment
         _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
         LOGGER.info("Adding table {}: Added ideal state with first consuming 
segment", tableNameWithType);
+      } else {
+        // Add ideal state with the first CONSUMING segment with designated 
partition consuming metadata
+        // Add ideal state with the first CONSUMING segment
+        _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
consumeMeta);
+        LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated consuming metadata",

Review Comment:
   Add `[copyTable]`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+        TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+        if (realtimeTableConfig.getUpsertConfig() != null) {
+          return new CopyTableResponse("fail", "upsert table copy not 
supported");
+        }
+        LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+            tableName, realtimeTableConfig.toString());
+
+        URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+        SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+            HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+        String watermarkJson = watermarkResponse.getResponse();
+        LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+        WatermarkInductionResult watermarkInductionResult =
+            JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+        List<PartitionGroupInfo> partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+            .map(PartitionGroupInfo::from)
+            .collect(Collectors.toList());
+
+        // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+        _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+        if (hasOffline) {
+          return new CopyTableResponse("warn", "detect offline; copy real-time 
segments only");

Review Comment:
   improve the message.
   
   `detected offline table too; it will only copy realtime table.`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -4715,6 +4739,51 @@ public QueryWorkloadManager getQueryWorkloadManager() {
     return _queryWorkloadManager;
   }
 
+  /**
+   * Retrieves the consumer watermark for a given real-time table.
+   * <p>The watermark represents the next offset to be consumed for each 
partition group.
+   * If the latest segment of a partition is in a DONE state, the watermark is 
the end offset of the completed segment.
+   * Otherwise, it is the start offset of the current consuming segment.
+   *
+   * @param tableName The name of the real-time table (without type suffix).
+   * @return A {@link WatermarkInductionResult} containing a list of 
watermarks for each partition group.
+   * @throws TableNotFoundException if the specified real-time table does not 
exist.
+   * @throws IllegalStateException if the IdealState for the table is not 
found.
+   */
+  public WatermarkInductionResult getConsumerWatermarks(String tableName) 
throws TableNotFoundException {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);

Review Comment:
   Move it after the validation check



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -376,17 +376,30 @@ public void stop() {
    * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
    */
   public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
+    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
+    List<PartitionGroupInfo> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState).stream().map(
+            x -> new PartitionGroupInfo(x, STARTING_SEQUENCE_NUMBER)
+        ).collect(Collectors.toList());
+    setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+  }
+
+  /**
+   * Sets up the initial segments for a new LLC real-time table.
+   * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
+   */
+  public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
+      List<PartitionGroupInfo> consumeMeta) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
     LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
+    LOGGER.info("[copyTable] Setting up new LLC table: {}", realtimeTableName);

Review Comment:
   Duplicate log statement.
   
   I see them other places in the code, these would created unnecessary log 
statements for the normal table creation operations.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/CopyTableResponse.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CopyTableResponse {

Review Comment:
   Consider adding schema and table config in the response for the users to 
verify in case needed. They can be hidden under a boolean verbose flag is 
needed. This will further enable to implement dryrun method for the API in 
future.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+        TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+        if (realtimeTableConfig.getUpsertConfig() != null) {
+          return new CopyTableResponse("fail", "upsert table copy not 
supported");
+        }
+        LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+            tableName, realtimeTableConfig.toString());
+
+        URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");

Review Comment:
   Same, use `ControllerRequestURLBuilder`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());

Review Comment:
   Move it inside the if condition. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to