wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2666889807
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,118 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+ try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map<String, String> tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ boolean hasOffline = tableConfigNode.has(TableType.OFFLINE.name());
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+ tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+ TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+ if (realtimeTableConfig.getUpsertConfig() != null) {
+ return new CopyTableResponse("fail", "upsert table copy not
supported");
+ }
+ LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+ tableName, realtimeTableConfig.toString());
+
+ URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+ SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+ String watermarkJson = watermarkResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+ WatermarkInductionResult watermarkInductionResult =
+ JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+ List<PartitionGroupInfo> partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+ .map(PartitionGroupInfo::from)
+ .collect(Collectors.toList());
+
+ // Add the table with designated starting kafka offset and segment
sequence number to create consuming segments
+ _pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ if (hasOffline) {
+ return new CopyTableResponse("warn", "detect offline; copy real-time
segments only");
+ }
+ return new CopyTableResponse("success", "");
+ }
+ return new CopyTableResponse("fail", "copying offline table's segments
is not supported yet");
Review Comment:
yeah, throw exception
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]