wirybeaver commented on code in PR #17235:
URL: https://github.com/apache/pinot/pull/17235#discussion_r2629951634
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -283,6 +289,109 @@ public ConfigSuccessResponse addTable(String
tableConfigStr,
}
}
+ @POST
+ @Path("/tables/{tableName}/copy")
+ @Authorize(targetType = TargetType.TABLE, action =
Actions.Table.CREATE_TABLE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Copy a table's schema and config from another
cluster", notes = "Non upsert table only")
+ public CopyTableResponse copyTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName, String payload,
+ @Context HttpHeaders headers) {
+ try {
+ LOGGER.info("[copyTable] received request for table: {}, payload: {}",
tableName, payload);
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ CopyTablePayload copyTablePayload = JsonUtils.stringToObject(payload,
CopyTablePayload.class);
+ String sourceControllerUri = copyTablePayload.getSourceClusterUri();
+ Map<String, String> requestHeaders = copyTablePayload.getHeaders();
+ String brokerTenant = copyTablePayload.getBrokerTenant();
+ String serverTenant = copyTablePayload.getServerTenant();
+ Map<String, String> tagReplacementMap =
copyTablePayload.getTagPoolReplacementMap();
+
+ LOGGER.info("[copyTable] Start copying table: {} from source: {}",
tableName, sourceControllerUri);
+
+ // Fetch and add schema
+ URI schemaUri = new URI(sourceControllerUri + "/tables/" + tableName +
"/schema");
+ SimpleHttpResponse schemaResponse = HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(schemaUri, requestHeaders));
+ String schemaJson = schemaResponse.getResponse();
+ Schema schema = Schema.fromString(schemaJson);
+ _pinotHelixResourceManager.addSchema(schema, true, false);
+ LOGGER.info("[copyTable] Successfully added schema for table: {},
schema: {}", tableName, schema);
+
+ // Fetch and add table configs
+ URI tableConfigUri = new URI(sourceControllerUri + "/tables/" +
tableName);
+ SimpleHttpResponse tableConfigResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(tableConfigUri,
requestHeaders));
+ String tableConfigJson = tableConfigResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched table config for table: {},
tableConfig: {}", tableName, tableConfigJson);
+ JsonNode tableConfigNode = JsonUtils.stringToJsonNode(tableConfigJson);
+
+ if (tableConfigNode.has(TableType.REALTIME.name())) {
+ ObjectNode realtimeTableConfigNode = (ObjectNode)
tableConfigNode.get(TableType.REALTIME.name());
+ tweakRealtimeTableConfig(realtimeTableConfigNode, brokerTenant,
serverTenant, tagReplacementMap);
+ TableConfig realtimeTableConfig =
JsonUtils.jsonNodeToObject(realtimeTableConfigNode, TableConfig.class);
+ LOGGER.info("[copyTable] Successfully fetched and tweaked table config
for table: {}, tableConfig: {}",
+ tableName, realtimeTableConfig.toString());
+
+ URI watermarkUri = new URI(sourceControllerUri + "/tables/" +
tableName + "/consumerWatermarks");
+ SimpleHttpResponse watermarkResponse =
HttpClient.wrapAndThrowHttpException(
+ HttpClient.getInstance().sendGetRequest(watermarkUri,
requestHeaders));
+ String watermarkJson = watermarkResponse.getResponse();
+ LOGGER.info("[copyTable] Fetched watermarks for table: {}. Result:
{}", tableName, watermarkJson);
+ WatermarkInductionResult watermarkInductionResult =
+ JsonUtils.stringToObject(watermarkJson,
WatermarkInductionResult.class);
+
+ List<PartitionGroupInfo> partitionGroupInfos =
watermarkInductionResult.getWatermarks().stream()
+ .map(PartitionGroupInfo::from)
+ .collect(Collectors.toList());
+
+ _pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ }
+ LOGGER.info("[copyTable] Finished Table Config copy: {}", tableName);
+ return new CopyTableResponse("success");
+ } catch (Exception e) {
+ LOGGER.error("[copyTable] Error copying table: {}", tableName, e);
+ throw new ControllerApplicationException(LOGGER, "Error copying table: "
+ e.getMessage(),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
+ /**
+ * Tweaks the realtime table config with the given broker and server tenants.
+ *
+ * @param realtimeTableConfigNode The JSON object representing the realtime
table config.
+ * @param brokerTenant The broker tenant to set in the config.
+ * @param serverTenant The server tenant to set in the config.
+ */
+ @VisibleForTesting
+ static void tweakRealtimeTableConfig(ObjectNode realtimeTableConfigNode,
String brokerTenant, String serverTenant,
+ @Nullable Map<String, String> tagPoolReplacementMap) {
+ ObjectNode tenantConfig = (ObjectNode)
realtimeTableConfigNode.get("tenants");
+ tenantConfig.put("broker", brokerTenant);
+ tenantConfig.put("server", serverTenant);
+ if (tagPoolReplacementMap == null || tagPoolReplacementMap.isEmpty()) {
+ return;
+ }
+ JsonNode instanceAssignmentConfigMap =
realtimeTableConfigNode.get("instanceAssignmentConfigMap");
+ if (instanceAssignmentConfigMap == null) {
+ return;
+ }
+ java.util.Iterator<Map.Entry<String, JsonNode>> iterator =
instanceAssignmentConfigMap.fields();
+ while (iterator.hasNext()) {
+ Map.Entry<String, JsonNode> entry = iterator.next();
+ JsonNode instanceAssignmentConfig = entry.getValue();
+ // tagPoolConfig is a required field
+ ObjectNode tagPoolConfig = (ObjectNode)
instanceAssignmentConfig.get("tagPoolConfig");
+ // tag is a required json field
+ if (tagPoolConfig != null) {
+ String srcTag = tagPoolConfig.get("tag").asText();
Review Comment:
https://sourcegraph.com/github.com/apache/pinot@2962dd445fbd25e70d627afca14c96e4b50df5fd/-/blob/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceTagPoolConfig.java?L49
tag is required in the TagPoolConfig constructor. Should be safe to skip the
null check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]