wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889807


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String 
tableConfigStr,
     }
   }
 
+  @POST
+  @Path("/tables/{tableName}/copy")
+  @Authorize(targetType = TargetType.TABLE, action = 
Actions.Table.CREATE_TABLE)
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Copy a table's schema and config from another 
cluster", notes = "Non upsert table only")
+  public CopyTableResponse copyTable(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName, String payload,
+      @Context HttpHeaders headers) {
+    try {
+      LOGGER.info("[copyTable] received request for table: {}, payload: {}", 
tableName, payload);
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+      CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload, 
CopyTablePayload.class);
+      String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+      Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+      String brokerTenant = copyTablePayload.getBrokerTenant();
+      String serverTenant = copyTablePayload.getServerTenant();
+      Map<String, String> tagReplacementMap = 
copyTablePayload.getTagPoolReplacementMap();
+
+      LOGGER.info("[copyTable] Start copying table: {} from source: {}", 
tableName, sourceControllerUri);
+
+      // Fetch and add schema
+      URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName + 
"/schema");
+      SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+      String schemaJson = schemaResponse.getResponse();
+      Schema schema = Schema.fromString(schemaJson);
+      _pinotHelixResourceManager.addSchema(schema, true, false);
+      LOGGER.info("[copyTable] Successfully added schema for table: {}, 
schema: {}", tableName, schema);
+
+      // Fetch and add table configs
+      URI tableConfigUri = new URI(sourceControllerUri + "/tables/" + 
tableName);
+      SimpleHttpResponse tableConfigResponse = 
HttpClient.wrapAndThrowHttpException(
+          HttpClient.getInstance().sendGetRequest(tableConfigUri, 
requestHeaders));
+      String tableConfigJson = tableConfigResponse.getResponse();
+      LOGGER.info("[copyTable] Fetched table config for table: {}, 
tableConfig: {}", tableName, tableConfigJson);
+      JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+      boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+      if (tableConfigNode.has(TableType.REALTIME.name())) {
+        ObjectNode realtimeTableConfigNode = (ObjectNode) 
tableConfigNode.get(TableType.REALTIME.name());
+        tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant, 
serverTenant, tagReplacementMap);
+        TableConfig realtimeTableConfig = 
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+        if (realtimeTableConfig.getUpsertConfig() != null) {
+          return new CopyTableResponse("fail", "upsert table copy not 
supported");
+        }
+        LOGGER.info("[copyTable] Successfully fetched and tweaked table config 
for table: {}, tableConfig: {}",
+            tableName, realtimeTableConfig.toString());
+
+        URI watermarkUri = new URI(sourceControllerUri + "/tables/" + 
tableName + "/consumerWatermarks");
+        SimpleHttpResponse watermarkResponse = 
HttpClient.wrapAndThrowHttpException(
+            HttpClient.getInstance().sendGetRequest(watermarkUri, 
requestHeaders));
+        String watermarkJson = watermarkResponse.getResponse();
+        LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result: 
{}", tableName, watermarkJson);
+        WatermarkInductionResult watermarkInductionResult =
+            JsonUtils.stringToObject(watermarkJson, 
WatermarkInductionResult.class);
+
+        List<PartitionGroupInfo> partitionGroupInfos = 
watermarkInductionResult.getWatermarks().stream()
+            .map(PartitionGroupInfo::from)
+            .collect(Collectors.toList());
+
+        // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
+        _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+        if (hasOffline) {
+          return new CopyTableResponse("warn", "detect offline; copy real-time 
segments only");
+        }
+        return new CopyTableResponse("success", "");
+      }
+      return new CopyTableResponse("fail", "copying offline table's segments 
is not supported yet");

Review Comment:
   yeah, throw exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to