This is an automated email from the ASF dual-hosted git repository. mayanks pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ac7e5f9 Fixed pinot java client to add zkClient close (#7196) ac7e5f9 is described below commit ac7e5f9c298d13c90ff31521e128a981543cfb42 Author: ramabme <84807309+rama...@users.noreply.github.com> AuthorDate: Sat Jul 24 12:13:54 2021 -0700 Fixed pinot java client to add zkClient close (#7196) Fixed the issue where zkClient connection was not being closed in pinot java client's connection close --- .../org/apache/pinot/client/BrokerSelector.java | 5 +++++ .../java/org/apache/pinot/client/Connection.java | 1 + .../apache/pinot/client/DynamicBrokerSelector.java | 24 ++++++++++++++-------- .../apache/pinot/client/SimpleBrokerSelector.java | 4 ++++ 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java index 329a1ca..4356435 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/BrokerSelector.java @@ -25,4 +25,9 @@ public interface BrokerSelector { * @return */ String selectBroker(String table); + + /** + * Close any resources + */ + void close(); } diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java index d408ad5..e7e9c16 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java @@ -182,6 +182,7 @@ public class Connection { public void close() throws PinotClientException { _transport.close(); + _brokerSelector.close(); } private static class ResultSetGroupFuture implements Future<ResultSetGroup> { diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java index 2cccffb..ec8b5cd 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/DynamicBrokerSelector.java @@ -39,17 +39,18 @@ import static org.apache.pinot.client.ExternalViewReader.REALTIME_SUFFIX; * Maintains a mapping between table name and list of brokers */ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener { - AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<Map<String, List<String>>>(); - AtomicReference<List<String>> allBrokerListRef = new AtomicReference<List<String>>(); + AtomicReference<Map<String, List<String>>> tableToBrokerListMapRef = new AtomicReference<>(); + AtomicReference<List<String>> allBrokerListRef = new AtomicReference<>(); + private final ZkClient _zkClient; private final Random _random = new Random(); - private ExternalViewReader evReader; + private final ExternalViewReader _evReader; public DynamicBrokerSelector(String zkServers) { - ZkClient zkClient = getZkClient(zkServers); - zkClient.setZkSerializer(new BytesPushThroughSerializer()); - zkClient.waitUntilConnected(60, TimeUnit.SECONDS); - zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this); - evReader = getEvReader(zkClient); + _zkClient = getZkClient(zkServers); + _zkClient.setZkSerializer(new BytesPushThroughSerializer()); + _zkClient.waitUntilConnected(60, TimeUnit.SECONDS); + _zkClient.subscribeDataChanges(ExternalViewReader.BROKER_EXTERNAL_VIEW_PATH, this); + _evReader = getEvReader(_zkClient); refresh(); } @@ -62,7 +63,7 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener { } private void refresh() { - Map<String, List<String>> tableToBrokerListMap = evReader.getTableToBrokersMap(); + Map<String, List<String>> tableToBrokerListMap = _evReader.getTableToBrokersMap(); tableToBrokerListMapRef.set(tableToBrokerListMap); Set<String> brokerSet = new HashSet<>(); for (List<String> brokerList : tableToBrokerListMap.values()) { @@ -91,6 +92,11 @@ public class DynamicBrokerSelector implements BrokerSelector, IZkDataListener { } @Override + public void close() { + _zkClient.close(); + } + + @Override public void handleDataChange(String dataPath, Object data) throws Exception { refresh(); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java index d171c31..e5ab363 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/SimpleBrokerSelector.java @@ -39,4 +39,8 @@ public class SimpleBrokerSelector implements BrokerSelector { public String selectBroker(String table) { return _brokerList.get(_random.nextInt(_brokerList.size())); } + + @Override + public void close() { + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org