ascherbakoff commented on code in PR #7598:
URL: https://github.com/apache/ignite-3/pull/7598#discussion_r2822120368
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateAllCommand.java:
##########
@@ -20,32 +20,20 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.network.annotations.PropertyName;
import org.apache.ignite.internal.network.annotations.Transferable;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import org.apache.ignite.internal.partition.replicator.network.TimedBinaryRow;
-import org.apache.ignite.internal.replicator.message.ReplicationGroupIdMessage;
import org.apache.ignite.internal.util.CollectionUtils;
-import org.jetbrains.annotations.Nullable;
/**
* State machine command for updating a batch of entries.
*
* <p>This command is replaced with {@link UpdateAllCommandV2} and only exists
in the source code for backward compatibility.</p>
*/
@Transferable(PartitionReplicationMessageGroup.Commands.UPDATE_ALL_V1)
-public interface UpdateAllCommand extends PartitionCommand {
- @PropertyName("tablePartitionId")
- ReplicationGroupIdMessage commitPartitionId();
-
+public interface UpdateAllCommand extends UpdateCommandBase {
Review Comment:
I believe current solution is too complex. Instead we can pass safe
timestamp validator as a volatile command field.
1. Introduce validation closure on WriteCommand:
```
public interface WriteCommand extends Command {
/**
* Holds request's initiator timestamp. TODO IGNITE-24143 Move to
SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
default @Nullable HybridTimestamp initiatorTime() {
return null;
}
/**
* Gets safe timestamp. TODO IGNITE-24143 Move to
SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
default @Nullable HybridTimestamp safeTime() {
return null;
}
default @Nullable Predicate<HybridTimestamp> safeTimeValidator() {
return null;
}
}
```
2. Set it in the PartitionReplicaListener (only for full operations)
```
UpdateCommandV2Builder bldr =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
.tableId(tableId)
.commitPartitionId(replicationGroupIdMessage(commitPartitionId))
.rowUuid(rowUuid)
.txId(txId)
.full(full)
.initiatorTime(initiatorTime)
.txCoordinatorId(txCoordinatorId)
.requiredCatalogVersion(catalogVersion)
.leaseStartTime(leaseStartTime)
.safeTimeValidator(this::validateTimestamp);
```
3. Apply validation:
```
node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() {
private HybridTimestamp safeTs;
@Override
public void result(Serializable res) {
sendResponse(res, rpcCtx);
}
@Override
public WriteCommand command() {
return command;
}
@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}
@Override
public void run(Status status) {
assert !status.isOk() : status;
sendRaftError(rpcCtx, status, node);
}
@Override
public void safeTimestamp(HybridTimestamp safeTs) {
assert this.safeTs == null : "Safe time can be set only
once";
// Apply binary patch.
node.getOptions().getCommandsMarshaller().patch(wrapper,
safeTs);
// Avoid modifying WriteCommand object, because it's shared
between raft pipeline threads.
if (command.safeTimeValidator() != null) {
if (!command.safeTimeValidator().test(safeTs)) {
throw new ValidationException();
}
}
this.safeTs = safeTs;
}
}));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]