jbampton commented on a change in pull request #2148: URL: https://github.com/apache/lucene-solr/pull/2148#discussion_r544307457
########## File path: solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java ########## @@ -107,7 +111,21 @@ public ZkWriteCommand modifyCollection(final ClusterState clusterState, ZkNodePr DocCollection coll = clusterState.getCollection(message.getStr(COLLECTION_PROP)); Map<String, Object> m = coll.shallowCopy(); boolean hasAnyOps = false; + PerReplicaStates.WriteOps replicaOps = null; for (String prop : CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) { + if (prop.equals(DocCollection.PER_REPLICA_STATE)) { + String val = message.getStr(DocCollection.PER_REPLICA_STATE); + if (val == null) continue; + boolean enable = Boolean.parseBoolean(val); + if (enable == coll.isPerReplicaState()) { + //already enabled + log.error("trying to set perReplicaState to {} from {}", val, coll.isPerReplicaState()); + continue; + } + replicaOps = PerReplicaStates.WriteOps.modifyCollection(coll, enable, PerReplicaStates.fetch(coll.getZNode(), zkClient, null)); + } + + if (message.containsKey(prop)) { hasAnyOps = true; if (message.get(prop) == null) { Review comment: ```suggestion if (message.get(prop) == null) { ``` ########## File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java ########## @@ -474,6 +475,7 @@ public boolean isClosed() { zkStateReader = new ZkStateReader(zkClient, () -> { if (cc != null) cc.securityNodeChanged(); }); + zkStateReader.nodeName = nodeName; Review comment: ```suggestion zkStateReader.nodeName = nodeName; ``` ########## File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java ########## @@ -165,7 +198,15 @@ private boolean maybeFlushAfter() { public boolean hasPendingUpdates() { return numUpdates != 0 || isClusterStateModified; } + public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException { + Map<String, ZkWriteCommand> commands = new HashMap<>(); Review comment: ```suggestion Map<String, ZkWriteCommand> commands = new HashMap<>(); ``` ########## File path: solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java ########## @@ -489,6 +492,7 @@ private Create(String collection, String config, String routerName, Integer numS public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; } public Create setRule(String... s){ this.rule = s; return this; } public Create setSnitch(String... s){ this.snitch = s; return this; } + public Create setPerReplicaState(Boolean b) {this.perReplicaState = b; return this; } Review comment: ```suggestion public Create setPerReplicaState(Boolean b) {this.perReplicaState = b; return this; } ``` ########## File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java ########## @@ -113,20 +114,52 @@ public ClusterState enqueueUpdate(ClusterState prevState, List<ZkWriteCommand> c if (cmds.isEmpty()) return prevState; if (isNoOps(cmds)) return prevState; + boolean forceFlush = false; + if (cmds.size() == 1) { + //most messages result in only one command. let's deal with it right away + ZkWriteCommand cmd = cmds.get(0); + if (cmd.collection != null && cmd.collection.isPerReplicaState()) { + //we do not wish to batch any updates for collections with per-replica state because + // these changes go to individual ZK nodes and there is zero advantage to batching + //now check if there are any updates for the same collection already present + if (updates.containsKey(cmd.name)) { + //this should not happen + // but let's get those updates out anyway + writeUpdate(updates.remove(cmd.name)); + } + //now let's write the current message + try { + return writeUpdate(cmd); + } finally { + if (callback !=null) callback.onWrite(); + } + } + } else { + //there are more than one commands created as a result of this message + for (ZkWriteCommand cmd : cmds) { + if (cmd.collection != null && cmd.collection.isPerReplicaState()) { + // we don't try to optimize for this case. let's flush out all after this + forceFlush = true; + break; + } + } + } + + for (ZkWriteCommand cmd : cmds) { if (cmd == NO_OP) continue; if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) { isClusterStateModified = true; } prevState = prevState.copyWith(cmd.name, cmd.collection); if (cmd.collection == null || cmd.collection.getStateFormat() != 1) { - updates.put(cmd.name, cmd.collection); + updates.put(cmd.name, cmd); numUpdates++; } } clusterState = prevState; - if (maybeFlushAfter()) { + if (forceFlush || maybeFlushAfter()) { Review comment: ```suggestion if (forceFlush || maybeFlushAfter()) { ``` ########## File path: solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java ########## @@ -182,6 +182,7 @@ public void call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnin if(created) break; Review comment: ```suggestion if (created) break; ``` ########## File path: solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java ########## @@ -281,13 +290,15 @@ private ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps mes } sliceName = Assign.assignShard(collection, numShards); log.info("Assigning new node to shard shard={}", sliceName); + persistCollectionState = true; } Slice slice = collection != null ? collection.getSlice(sliceName) : null; Review comment: ```suggestion Slice slice = collection != null ? collection.getSlice(sliceName) : null; ``` ########## File path: solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java ########## @@ -100,6 +101,7 @@ public void testTimeCat() throws Exception { CollectionAdminRequest.DimensionalRoutedAlias dra = CollectionAdminRequest.createDimensionalRoutedAlias(getAlias(), CollectionAdminRequest.createCollection("_unused_", configName, 2, 2) + .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE) .setMaxShardsPerNode(2), TRA_Dim, CRA_Dim); Review comment: ```suggestion .setMaxShardsPerNode(2), TRA_Dim, CRA_Dim); ``` ########## File path: solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java ########## @@ -208,6 +234,40 @@ public String getProperty(String propertyName) { return propertyValue; } + public Replica copyWith(PerReplicaStates.State state) { + log.debug("A replica is updated with new state : {}", state); + Map<String, Object> props = new LinkedHashMap<>(propMap); + if (state == null) { + props.put(ZkStateReader.STATE_PROP, State.DOWN.toString()); + props.remove(Slice.LEADER); + } else { + props.put(ZkStateReader.STATE_PROP, state.state.toString()); + if (state.isLeader) props.put(Slice.LEADER, "true"); + } + Replica r = new Replica(name, props, collection, slice); + r.replicaState = state; + return r; + } + + public PerReplicaStates.State getReplicaState() { + return replicaState; + } + + private static final Map<String, State> STATES = new HashMap<>(); + static { + STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE); + STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN); + STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING); + STATES.put(Replica.State.RECOVERY_FAILED.shortName, Replica.State.RECOVERY_FAILED); + } + public static State getState(String c) { Review comment: ```suggestion public static State getState(String c) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org