jbampton commented on a change in pull request #2148:
URL: https://github.com/apache/lucene-solr/pull/2148#discussion_r544307457



##########
File path: 
solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
##########
@@ -107,7 +111,21 @@ public ZkWriteCommand modifyCollection(final ClusterState 
clusterState, ZkNodePr
     DocCollection coll = 
clusterState.getCollection(message.getStr(COLLECTION_PROP));
     Map<String, Object> m = coll.shallowCopy();
     boolean hasAnyOps = false;
+    PerReplicaStates.WriteOps replicaOps = null;
     for (String prop : 
CollectionAdminRequest.MODIFIABLE_COLLECTION_PROPERTIES) {
+      if (prop.equals(DocCollection.PER_REPLICA_STATE)) {
+         String val = message.getStr(DocCollection.PER_REPLICA_STATE);
+         if (val == null) continue;
+        boolean enable = Boolean.parseBoolean(val);
+        if (enable == coll.isPerReplicaState()) {
+          //already enabled
+          log.error("trying to set perReplicaState to {} from {}", val, 
coll.isPerReplicaState());
+          continue;
+        }
+        replicaOps = PerReplicaStates.WriteOps.modifyCollection(coll, enable, 
PerReplicaStates.fetch(coll.getZNode(), zkClient, null));
+      }
+
+
       if (message.containsKey(prop)) {
         hasAnyOps = true;
         if (message.get(prop) == null)  {

Review comment:
       ```suggestion
           if (message.get(prop) == null) {
   ```

##########
File path: solr/core/src/java/org/apache/solr/cloud/ZkController.java
##########
@@ -474,6 +475,7 @@ public boolean isClosed() {
     zkStateReader = new ZkStateReader(zkClient, () -> {
       if (cc != null) cc.securityNodeChanged();
     });
+    zkStateReader.nodeName =  nodeName;

Review comment:
       ```suggestion
       zkStateReader.nodeName = nodeName;
   ```

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -165,7 +198,15 @@ private boolean maybeFlushAfter() {
   public boolean hasPendingUpdates() {
     return numUpdates != 0 || isClusterStateModified;
   }
+  public ClusterState writeUpdate(ZkWriteCommand command) throws 
IllegalStateException, KeeperException, InterruptedException {
+    Map<String, ZkWriteCommand> commands =  new HashMap<>();

Review comment:
       ```suggestion
       Map<String, ZkWriteCommand> commands = new HashMap<>();
   ```

##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
##########
@@ -489,6 +492,7 @@ private Create(String collection, String config, String 
routerName, Integer numS
     public Create setStateFormat(Integer stateFormat) { this.stateFormat = 
stateFormat; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
+    public Create setPerReplicaState(Boolean b) {this.perReplicaState =  b; 
return this; }

Review comment:
       ```suggestion
       public Create setPerReplicaState(Boolean b) {this.perReplicaState = b; 
return this; }
   ```

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
##########
@@ -113,20 +114,52 @@ public ClusterState enqueueUpdate(ClusterState prevState, 
List<ZkWriteCommand> c
     if (cmds.isEmpty()) return prevState;
     if (isNoOps(cmds)) return prevState;
 
+    boolean forceFlush = false;
+    if (cmds.size() == 1) {
+      //most messages result in only one command. let's deal with it right away
+      ZkWriteCommand cmd = cmds.get(0);
+      if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+        //we do not wish to batch any updates for collections with per-replica 
state because
+        // these changes go to individual ZK nodes and there is zero advantage 
to batching
+        //now check if there are any updates for the same collection already 
present
+        if (updates.containsKey(cmd.name)) {
+          //this should not happen
+          // but let's get those updates out anyway
+          writeUpdate(updates.remove(cmd.name));
+        }
+        //now let's write the current message
+        try {
+          return writeUpdate(cmd);
+        } finally {
+          if (callback !=null) callback.onWrite();
+        }
+      }
+    } else {
+      //there are more than one commands created as a result of this message
+      for (ZkWriteCommand cmd : cmds) {
+        if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+          // we don't try to optimize for this case. let's flush out all after 
this
+          forceFlush = true;
+          break;
+        }
+      }
+    }
+
+
     for (ZkWriteCommand cmd : cmds) {
       if (cmd == NO_OP) continue;
       if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, 
prevState)) {
         isClusterStateModified = true;
       }
       prevState = prevState.copyWith(cmd.name, cmd.collection);
       if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
-        updates.put(cmd.name, cmd.collection);
+        updates.put(cmd.name, cmd);
         numUpdates++;
       }
     }
     clusterState = prevState;
 
-    if (maybeFlushAfter()) {
+    if (forceFlush ||  maybeFlushAfter()) {

Review comment:
       ```suggestion
       if (forceFlush || maybeFlushAfter()) {
   ```

##########
File path: 
solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
##########
@@ -182,6 +182,7 @@ public void call(ClusterState clusterState, ZkNodeProps 
message, @SuppressWarnin
         if(created) break;

Review comment:
       ```suggestion
           if (created) break;
   ```

##########
File path: solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
##########
@@ -281,13 +290,15 @@ private ZkWriteCommand updateState(final ClusterState 
prevState, ZkNodeProps mes
       }
       sliceName = Assign.assignShard(collection, numShards);
       log.info("Assigning new node to shard shard={}", sliceName);
+      persistCollectionState = true;
     }
 
     Slice slice = collection != null ?  collection.getSlice(sliceName) : null;

Review comment:
       ```suggestion
       Slice slice = collection != null ? collection.getSlice(sliceName) : null;
   ```

##########
File path: 
solr/core/src/test/org/apache/solr/update/processor/DimensionalRoutedAliasUpdateProcessorTest.java
##########
@@ -100,6 +101,7 @@ public void testTimeCat() throws Exception {
 
     CollectionAdminRequest.DimensionalRoutedAlias dra = 
CollectionAdminRequest.createDimensionalRoutedAlias(getAlias(),
         CollectionAdminRequest.createCollection("_unused_", configName, 2, 2)
+            .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
             .setMaxShardsPerNode(2), TRA_Dim,  CRA_Dim);

Review comment:
       ```suggestion
               .setMaxShardsPerNode(2), TRA_Dim, CRA_Dim);
   ```

##########
File path: solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
##########
@@ -208,6 +234,40 @@ public String getProperty(String propertyName) {
     return propertyValue;
   }
 
+  public Replica copyWith(PerReplicaStates.State state) {
+    log.debug("A replica is updated with new state : {}", state);
+    Map<String, Object> props = new LinkedHashMap<>(propMap);
+    if (state == null) {
+      props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+      props.remove(Slice.LEADER);
+    } else {
+      props.put(ZkStateReader.STATE_PROP, state.state.toString());
+      if (state.isLeader) props.put(Slice.LEADER, "true");
+    }
+    Replica r = new Replica(name, props, collection, slice);
+    r.replicaState = state;
+    return r;
+  }
+
+  public PerReplicaStates.State getReplicaState() {
+    return replicaState;
+  }
+
+  private static final Map<String, State> STATES = new HashMap<>();
+  static {
+    STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
+    STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
+    STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
+    STATES.put(Replica.State.RECOVERY_FAILED.shortName, 
Replica.State.RECOVERY_FAILED);
+  }
+  public static State getState(String  c) {

Review comment:
       ```suggestion
     public static State getState(String c) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to