rpuch commented on code in PR #7598: URL: https://github.com/apache/ignite-3/pull/7598#discussion_r2822581525
########## modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schemasync; + +import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions; +import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowCausedBy; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.ignite.Ignite; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.lang.NodeStoppingException; +import org.apache.ignite.internal.partition.replicator.ZoneResourcesManager.ZonePartitionResources; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand; +import org.apache.ignite.internal.replicator.PartitionGroupId; +import org.apache.ignite.internal.schema.SchemaSyncInhibitor; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries; +import org.apache.ignite.internal.testframework.ExecutorServiceExtension; +import org.apache.ignite.internal.testframework.log4j2.LogInspector; +import org.apache.ignite.raft.jraft.rpc.WriteActionRequest; +import org.apache.ignite.table.KeyValueView; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ExecutorServiceExtension.class) +class ItBlockedSchemaSyncAndRaftCommandExecutionTest extends ClusterPerTestIntegrationTest { + private static final String ZONE_NAME = "TEST_ZONE"; + private static final String TABLE_NAME = "TEST"; + + private LogInspector inspector; + + @Override + protected int initialNodes() { + return 3; + } + + @BeforeEach + void prepare() { + inspector = LogInspector.create(CheckCatalogVersionOnAppendEntries.class, true); + } + + @AfterEach + void cleanup() { + if (inspector != null) { + inspector.stop(); + } + } + + @Test + void operationBlockedOnSchemaSyncDoesNotPreventNodeStop() throws Exception { + InhibitorAndFuture inhibitorAndFuture = producePutHangingDueToSchemaSyncInLeaderStateMachine(); + + assertTimeoutPreemptively( + Duration.of(10, ChronoUnit.SECONDS), + () -> cluster.stopNode(0) + ); + + //noinspection ThrowableNotThrown + assertWillThrowCausedBy(inhibitorAndFuture.future, NodeStoppingException.class); + } + + private InhibitorAndFuture producePutHangingDueToSchemaSyncInLeaderStateMachine() + throws InterruptedException { + Ignite node = cluster.node(0); + + createTableWith1PartitionOnAllNodes(node); + + cluster.transferLeadershipTo(0, cluster.solePartitionId(ZONE_NAME)); + + KeyValueView<Integer, String> kvView = node.tables() + .table(TABLE_NAME) + .keyValueView(Integer.class, String.class); + + CompletableFuture<SchemaSyncInhibitor> inhibitorFuture = startInhibitingSchemaSyncWhenUpdateCommandArrives(); + + CompletableFuture<Void> putFuture = kvView.putAsync(null, 1, "one"); + + waitTillCommandStartsExecutionAndBlocksOnSchemaSync(); + + assertThat(inhibitorFuture, willCompleteSuccessfully()); + + return new InhibitorAndFuture(inhibitorFuture.join(), putFuture); + } + + private static void createTableWith1PartitionOnAllNodes(Ignite node) { + node.sql().executeScript( + "CREATE ZONE " + ZONE_NAME + " (REPLICAS 3, PARTITIONS 1) STORAGE PROFILES ['" + + DEFAULT_AIPERSIST_PROFILE_NAME + "'];" + + "CREATE TABLE " + TABLE_NAME + " (ID INT PRIMARY KEY, VAL VARCHAR) ZONE " + ZONE_NAME + ";" + ); + } + + private CompletableFuture<SchemaSyncInhibitor> startInhibitingSchemaSyncWhenUpdateCommandArrives() { + AtomicBoolean startedInhibiting = new AtomicBoolean(); + CompletableFuture<SchemaSyncInhibitor> future = new CompletableFuture<>(); + + for (Ignite node : cluster.nodes()) { + IgniteImpl igniteImpl = unwrapIgniteImpl(node); + + igniteImpl.dropMessages((recipientName, message) -> { + if (message instanceof WriteActionRequest) { + WriteActionRequest actionRequest = (WriteActionRequest) message; + + if (PartitionGroupId.matchesString(actionRequest.groupId()) + && actionRequest.deserializedCommand() instanceof UpdateCommand + && startedInhibiting.compareAndSet(false, true)) { + SchemaSyncInhibitor inhibitor = new SchemaSyncInhibitor(igniteImpl); + inhibitor.startInhibit(); + + // Making sure that commitTs (for which we take partition safe time) will be at least DelayDuration ahead + // of Metastorage safe time, so during schema sync we'll hang until inhibition is over. + waitForAllSafeTimesToReach(igniteImpl.clock().current().tick(), igniteImpl); + + future.complete(inhibitor); + } + } + + return false; + }); + } + + return future; + } + + private void waitForAllSafeTimesToReach(HybridTimestamp current, Ignite nodeToWaitSafeTime) { + ZonePartitionResources zonePartitionResources = unwrapIgniteImpl(nodeToWaitSafeTime) + .partitionReplicaLifecycleManager() + .zonePartitionResources(cluster.solePartitionId(ZONE_NAME)); + + try { + zonePartitionResources.safeTimeTracker().waitFor(current).get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new RuntimeException(e); + } catch (ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + private static void waitTillCommandStartsExecutionAndBlocksOnSchemaSync() throws InterruptedException { + // Current implementation doesn't actually block any threads, but we still give it a chance to get stuck if the implementation + // gets changed. + Thread.sleep(1000); Review Comment: They are stable. This comes from the original PR number 1 (#7500) where the execution would actually block inside the state machine thread. I did not find any way to detect that it already 'blocked', but waiting for 1 second was enough. In the worst case, we would sometimes have a false positive (it did not block, the test passed because of this and not because the underlying code is correct), but most of the times it would actually block, so the test would pass for the correct reason. But the test would catch (and it was catching) situations when the command was blocked and the production code worked INcorrectly. But in this PR it does not block in the state machine thread, so this wait is 'just in case' to try to catch a wrong behavior if we accidentally return to it in the production code. This is a poor justification for a 1 second wait. If you believe this should be removed, I'll remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
