This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 83598ce Zookeeper put api (#5949) 83598ce is described below commit 83598ceca137da00087f728338d1de5fc96ced9d Author: Kishore Gopalakrishna <g.kish...@gmail.com> AuthorDate: Sun Sep 13 11:34:54 2020 -0700 Zookeeper put api (#5949) * Adding api to edit ZK path * Adding delete api * Addressing comments --- .../api/resources/ZookeeperResource.java | 62 ++++++++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 17 ++++-- 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java index 3093052..d367f2c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ZookeeperResource.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Charsets; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -28,13 +29,17 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import javax.inject.Inject; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.helix.AccessOption; import org.apache.helix.ZNRecord; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -75,6 +80,63 @@ public class ZookeeperResource { return null; } + @DELETE + @Path("/zk/delete") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete the znode at this path") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "Success"), // + @ApiResponse(code = 404, message = "ZK Path not found"), // + @ApiResponse(code = 204, message = "No Content"), // + @ApiResponse(code = 500, message = "Internal server error")}) + public SuccessResponse delete( + @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path) { + + path = validateAndNormalizeZKPath(path); + + boolean success = pinotHelixResourceManager.deleteZKPath(path); + if (success) { + return new SuccessResponse("Successfully deleted path: " + path); + } else { + throw new ControllerApplicationException(LOGGER, "Failed to delete path: " + path, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + @PUT + @Path("/zk/put") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Update the content of the node") + @ApiResponses(value = { // + @ApiResponse(code = 200, message = "Success"), // + @ApiResponse(code = 404, message = "ZK Path not found"), // + @ApiResponse(code = 204, message = "No Content"), // + @ApiResponse(code = 500, message = "Internal server error")}) + public SuccessResponse putData( + @ApiParam(value = "Zookeeper Path, must start with /", required = true, defaultValue = "/") @QueryParam("path") @DefaultValue("") String path, + @ApiParam(value = "Content", required = true) @QueryParam("data") @DefaultValue("") String content, + @ApiParam(value = "expectedVersion", required = true, defaultValue = "-1") @QueryParam("expectedVersion") @DefaultValue("-1") String expectedVersion, + @ApiParam(value = "accessOption", required = true, defaultValue = "1") @QueryParam("accessOption") @DefaultValue("1") String accessOption) { + path = validateAndNormalizeZKPath(path); + ZNRecord record = null; + if (content != null) { + record = (ZNRecord) _znRecordSerializer.deserialize(content.getBytes(Charsets.UTF_8)); + } + try { + boolean result = pinotHelixResourceManager + .setZKData(path, record, Integer.parseInt(expectedVersion), Integer.parseInt(accessOption)); + if (result) { + return new SuccessResponse("Successfully Updated path: " + path); + } else { + throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path, + Response.Status.INTERNAL_SERVER_ERROR); + } + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to update path: " + path, + Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + @GET @Path("/zk/ls") @Produces(MediaType.APPLICATION_JSON) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2bff629..11d2135 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1226,6 +1226,14 @@ public class PinotHelixResourceManager { } } + public boolean setZKData(String path, ZNRecord record, int expectedVersion, int accessOption) { + return _helixDataAccessor.getBaseDataAccessor().set(path, record, expectedVersion, accessOption); + } + + public boolean deleteZKPath(String path) { + return _helixDataAccessor.getBaseDataAccessor().remove(path, -1); + } + public ZNRecord readZKData(String path) { return _helixDataAccessor.getBaseDataAccessor().get(path, null, -1); } @@ -1814,7 +1822,8 @@ public class PinotHelixResourceManager { _helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent - LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType); + LOGGER + .info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType); } else { LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", tableNameWithType); } @@ -2474,9 +2483,9 @@ public class PinotHelixResourceManager { } Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); } while (System.currentTimeMillis() < endTimeMs); - throw new TimeoutException(String.format( - "Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", - tableNameWithType, segmentsToCheck)); + throw new TimeoutException(String + .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", + tableNameWithType, segmentsToCheck)); } private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org