This is an automated email from the ASF dual-hosted git repository.
ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 52930c50af HDDS-12005. Refactor TestBlockDataStreamOutput (#7716)
52930c50af is described below
commit 52930c50af4f5368ed7191d081dd7ecbba17eaf5
Author: Chia-Chuan Yu <[email protected]>
AuthorDate: Tue Jan 21 23:43:05 2025 +0800
HDDS-12005. Refactor TestBlockDataStreamOutput (#7716)
---
.../client/rpc/TestBlockDataStreamOutput.java | 423 +++++++++++----------
1 file changed, 227 insertions(+), 196 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index c1345207d9..0238f4f498 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -19,15 +19,19 @@
import org.apache.hadoop.hdds.DatanodeVersion;
import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockDataStreamOutput;
import org.apache.hadoop.hdds.scm.storage.ByteBufferStreamOutput;
-import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.ClientConfigForTesting;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -44,19 +48,26 @@
import org.apache.ozone.test.tag.Flaky;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
-import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -64,131 +75,157 @@
/**
* Tests BlockDataStreamOutput class.
*/
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Timeout(300)
public class TestBlockDataStreamOutput {
- private static MiniOzoneCluster cluster;
- private static OzoneConfiguration conf = new OzoneConfiguration();
- private static OzoneClient client;
- private static ObjectStore objectStore;
- private static int chunkSize;
- private static int flushSize;
- private static int maxFlushSize;
- private static int blockSize;
- private static String volumeName;
- private static String bucketName;
- private static String keyString;
+ private MiniOzoneCluster cluster;
+ private static final int CHUNK_SIZE = 100;
+ private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+ private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
+ private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
+ private static final String VOLUME_NAME = "testblockoutputstream";
+ private static final String BUCKET_NAME = VOLUME_NAME;
+ private static String keyString = UUID.randomUUID().toString();;
private static final DatanodeVersion DN_OLD_VERSION =
DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE;
- @BeforeAll
- public static void init() throws Exception {
- chunkSize = 100;
- flushSize = 2 * chunkSize;
- maxFlushSize = 2 * flushSize;
- blockSize = 2 * maxFlushSize;
-
+ static MiniOzoneCluster createCluster() throws IOException,
+ InterruptedException, TimeoutException {
+ OzoneConfiguration conf = new OzoneConfiguration();
OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(false);
+ clientConfig.setEnablePutblockPiggybacking(true);
conf.setFromObject(clientConfig);
- conf.setBoolean(OZONE_CHUNK_READ_NETTY_CHUNKED_NIO_FILE_KEY, true);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
conf.setQuietMode(false);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
- StorageUnit.MB);
+ conf.setStorageSize(OZONE_SCM_BLOCK_SIZE, 4, StorageUnit.MB);
+ conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 3);
+
+ conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
+ conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+
+ DatanodeRatisServerConfig ratisServerConfig =
+ conf.getObject(DatanodeRatisServerConfig.class);
+ ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+ ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(3));
+ conf.setFromObject(ratisServerConfig);
+
+ RatisClientConfig.RaftConfig raftClientConfig =
+ conf.getObject(RatisClientConfig.RaftConfig.class);
+ raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+ raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(5));
+ conf.setFromObject(raftClientConfig);
+
+ RatisClientConfig ratisClientConfig =
+ conf.getObject(RatisClientConfig.class);
+ ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
+ ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
+ conf.setFromObject(ratisClientConfig);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
- .setBlockSize(blockSize)
- .setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
- .setStreamBufferMaxSize(maxFlushSize)
- .setDataStreamBufferFlushSize(maxFlushSize)
- .setDataStreamMinPacketSize(chunkSize)
- .setDataStreamWindowSize(5 * chunkSize)
+ .setBlockSize(BLOCK_SIZE)
+ .setChunkSize(CHUNK_SIZE)
+ .setStreamBufferFlushSize(FLUSH_SIZE)
+ .setStreamBufferMaxSize(MAX_FLUSH_SIZE)
+ .setDataStreamBufferFlushSize(MAX_FLUSH_SIZE)
+ .setDataStreamMinPacketSize(CHUNK_SIZE)
+ .setDataStreamWindowSize(5 * CHUNK_SIZE)
.applyTo(conf);
- cluster = MiniOzoneCluster.newBuilder(conf)
+ MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(5)
.setDatanodeFactory(UniformDatanodesFactory.newBuilder()
.setCurrentVersion(DN_OLD_VERSION)
.build())
.build();
+ cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
+ 180000);
cluster.waitForClusterToBeReady();
- //the easiest way to create an open container is creating a key
- client = OzoneClientFactory.getRpcClient(conf);
- objectStore = client.getObjectStore();
- keyString = UUID.randomUUID().toString();
- volumeName = "testblockdatastreamoutput";
- bucketName = volumeName;
- objectStore.createVolume(volumeName);
- objectStore.getVolume(volumeName).createBucket(bucketName);
+
+ try (OzoneClient client = cluster.newClient()) {
+ ObjectStore objectStore = client.getObjectStore();
+ objectStore.createVolume(VOLUME_NAME);
+ objectStore.getVolume(VOLUME_NAME).createBucket(BUCKET_NAME);
+ }
+
+ return cluster;
}
- static String getKeyName() {
- return UUID.randomUUID().toString();
+ private static Stream<Arguments> clientParameters() {
+ return Stream.of(
+ Arguments.of(true),
+ Arguments.of(false)
+ );
}
- @AfterAll
- public static void shutdown() {
- IOUtils.closeQuietly(client);
- if (cluster != null) {
- cluster.shutdown();
- }
+ private static Stream<Arguments> dataLengthParameters() {
+ return Stream.of(
+ Arguments.of(CHUNK_SIZE / 2),
+ Arguments.of(CHUNK_SIZE),
+ Arguments.of(CHUNK_SIZE + 50),
+ Arguments.of(BLOCK_SIZE + 50)
+ );
}
- @Test
- public void testHalfChunkWrite() throws Exception {
- testWrite(chunkSize / 2);
- testWriteWithFailure(chunkSize / 2);
+ static OzoneClientConfig newClientConfig(ConfigurationSource source,
+ boolean flushDelay) {
+ OzoneClientConfig clientConfig = source.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(flushDelay);
+ return clientConfig;
}
- @Test
- public void testSingleChunkWrite() throws Exception {
- testWrite(chunkSize);
- testWriteWithFailure(chunkSize);
+ static OzoneClient newClient(OzoneConfiguration conf,
+ OzoneClientConfig config) throws IOException {
+ OzoneConfiguration copy = new OzoneConfiguration(conf);
+ copy.setFromObject(config);
+ return OzoneClientFactory.getRpcClient(copy);
}
- @Test
- public void testMultiChunkWrite() throws Exception {
- testWrite(chunkSize + 50);
- testWriteWithFailure(chunkSize + 50);
+ @BeforeAll
+ public void init() throws Exception {
+ cluster = createCluster();
}
- @Test
- @Flaky("HDDS-12027")
- public void testMultiBlockWrite() throws Exception {
- testWrite(blockSize + 50);
- testWriteWithFailure(blockSize + 50);
+ static String getKeyName() {
+ return UUID.randomUUID().toString();
}
- static void testWrite(int dataLength) throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
- long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
+ @AfterAll
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("dataLengthParameters")
+ @Flaky("HDDS-12027")
+ public void testStreamWrite(int dataLength) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), false);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ testWrite(client, dataLength);
+ testWriteWithFailure(client, dataLength);
+ }
+ }
+ static void testWrite(OzoneClient client, int dataLength) throws Exception {
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, dataLength);
+ client, keyName, dataLength);
final byte[] data = ContainerTestHelper.generateData(dataLength, false);
key.write(ByteBuffer.wrap(data));
// now close the stream, It will update the key length.
key.close();
- validateData(keyName, data);
-
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
+ validateData(client, keyName, data);
}
- private void testWriteWithFailure(int dataLength) throws Exception {
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
- long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
-
+ private void testWriteWithFailure(OzoneClient client, int dataLength) throws
Exception {
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, dataLength);
+ client, keyName, dataLength);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
@@ -203,129 +240,123 @@ private void testWriteWithFailure(int dataLength)
throws Exception {
key.write(b);
key.close();
String dataString = new String(data, UTF_8);
- validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
-
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- }
-
- @Test
- public void testPutBlockAtBoundary() throws Exception {
- int dataLength = maxFlushSize + 100;
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
- long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock);
- long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
- long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
- long totalOpCount = metrics.getTotalOpCount();
-
- String keyName = getKeyName();
- OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, 0);
- byte[] data =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength)
- .getBytes(UTF_8);
- key.write(ByteBuffer.wrap(data));
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 5);
- key.close();
- // Since data length is 500 , first putBlock will be at 400(flush boundary)
- // and the other at 500
- assertEquals(putBlockCount + 2,
- metrics.getContainerOpCountMetrics(PutBlock));
- // Each chunk is 100 so there will be 500 / 100 = 5 chunks.
- assertEquals(writeChunkCount + 5,
- metrics.getContainerOpCountMetrics(WriteChunk));
- assertEquals(totalOpCount + 7,
- metrics.getTotalOpCount());
- assertEquals(pendingPutBlockCount,
- metrics.getPendingContainerOpCountMetrics(PutBlock));
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
-
- validateData(keyName, data);
+ validateData(client, keyName,
dataString.concat(dataString).getBytes(UTF_8));
}
-
- static OzoneDataStreamOutput createKey(String keyName, ReplicationType type,
+ static OzoneDataStreamOutput createKey(OzoneClient client, String keyName,
long size) throws Exception {
- return TestHelper.createStreamKey(
- keyName, type, size, objectStore, volumeName, bucketName);
+ return TestHelper.createStreamKey(keyName, ReplicationType.RATIS, size,
+ client.getObjectStore(), VOLUME_NAME, BUCKET_NAME);
}
- static void validateData(String keyName, byte[] data) throws Exception {
+
+ static void validateData(OzoneClient client, String keyName, byte[] data)
throws Exception {
TestHelper.validateData(
- keyName, data, objectStore, volumeName, bucketName);
+ keyName, data, client.getObjectStore(), VOLUME_NAME, BUCKET_NAME);
}
-
- @Test
- public void testMinPacketSize() throws Exception {
- String keyName = getKeyName();
- XceiverClientMetrics metrics =
- XceiverClientManager.getXceiverClientMetrics();
- OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
- long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
- long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
- byte[] data =
- ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
- .getBytes(UTF_8);
- key.write(ByteBuffer.wrap(data));
- // minPacketSize= 100, so first write of 50 wont trigger a writeChunk
- assertEquals(writeChunkCount,
- metrics.getContainerOpCountMetrics(WriteChunk));
- key.write(ByteBuffer.wrap(data));
- assertEquals(writeChunkCount + 1,
- metrics.getContainerOpCountMetrics(WriteChunk));
- // now close the stream, It will update the key length.
- key.close();
- assertEquals(pendingWriteChunkCount,
- metrics.getPendingContainerOpCountMetrics(WriteChunk));
- String dataString = new String(data, UTF_8);
- validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ public void testPutBlockAtBoundary(boolean flushDelay) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ int dataLength = 500;
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long putBlockCount = metrics.getContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
+ ContainerProtos.Type.PutBlock);
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(
+ client, keyName, 0);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ key.write(ByteBuffer.wrap(data));
+
assertThat(metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+ key.close();
+ // Since data length is 500 , first putBlock will be at 400(flush
boundary)
+ // and the other at 500
+ assertEquals(
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock),
+ putBlockCount + 2);
+ validateData(client, keyName, data);
+ }
}
- @Test
- public void testTotalAckDataLength() throws Exception {
- int dataLength = 400;
- String keyName = getKeyName();
- OzoneDataStreamOutput key = createKey(
- keyName, ReplicationType.RATIS, 0);
- byte[] data =
- ContainerTestHelper.getFixedLengthString(keyString, dataLength)
- .getBytes(UTF_8);
- KeyDataStreamOutput keyDataStreamOutput =
- (KeyDataStreamOutput) key.getByteBufStreamOutput();
- BlockDataStreamOutputEntry stream =
- keyDataStreamOutput.getStreamEntries().get(0);
- key.write(ByteBuffer.wrap(data));
- key.close();
- assertEquals(dataLength, stream.getTotalAckDataLength());
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ public void testMinPacketSize(boolean flushDelay) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ String keyName = getKeyName();
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ OzoneDataStreamOutput key = createKey(client, keyName, 0);
+ long writeChunkCount =
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, CHUNK_SIZE / 2)
+ .getBytes(UTF_8);
+ key.write(ByteBuffer.wrap(data));
+ // minPacketSize= 100, so first write of 50 won't trigger a writeChunk
+ assertEquals(writeChunkCount,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ key.write(ByteBuffer.wrap(data));
+ assertEquals(writeChunkCount + 1,
+ metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ // now close the stream, It will update the key length.
+ key.close();
+ String dataString = new String(data, UTF_8);
+ validateData(client, keyName,
dataString.concat(dataString).getBytes(UTF_8));
+ }
}
- @Test
- public void testDatanodeVersion() throws Exception {
- // Verify all DNs internally have versions set correctly
- List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
- for (HddsDatanodeService dn : dns) {
- DatanodeDetails details = dn.getDatanodeDetails();
- assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion());
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ public void testTotalAckDataLength(boolean flushDelay) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ int dataLength = 400;
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(
+ client, keyName, 0);
+ byte[] data =
+ ContainerTestHelper.getFixedLengthString(keyString, dataLength)
+ .getBytes(UTF_8);
+ KeyDataStreamOutput keyDataStreamOutput =
+ (KeyDataStreamOutput) key.getByteBufStreamOutput();
+ BlockDataStreamOutputEntry stream =
+ keyDataStreamOutput.getStreamEntries().get(0);
+ key.write(ByteBuffer.wrap(data));
+ key.close();
+ assertEquals(dataLength, stream.getTotalAckDataLength());
}
+ }
- String keyName = getKeyName();
- OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
- KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput)
key.getByteBufStreamOutput();
- BlockDataStreamOutputEntry stream =
keyDataStreamOutput.getStreamEntries().get(0);
-
- // Now check 3 DNs in a random pipeline returns the correct DN versions
- List<DatanodeDetails> streamDnDetails = stream.getPipeline().getNodes();
- for (DatanodeDetails details : streamDnDetails) {
- assertEquals(DN_OLD_VERSION.toProtoValue(), details.getCurrentVersion());
+ @ParameterizedTest
+ @MethodSource("clientParameters")
+ public void testDatanodeVersion(boolean flushDelay) throws Exception {
+ OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay);
+ try (OzoneClient client = newClient(cluster.getConf(), config)) {
+ // Verify all DNs internally have versions set correctly
+ List<HddsDatanodeService> dns = cluster.getHddsDatanodes();
+ for (HddsDatanodeService dn : dns) {
+ DatanodeDetails details = dn.getDatanodeDetails();
+ assertEquals(DN_OLD_VERSION.toProtoValue(),
details.getCurrentVersion());
+ }
+
+ String keyName = getKeyName();
+ OzoneDataStreamOutput key = createKey(client, keyName, 0);
+ KeyDataStreamOutput keyDataStreamOutput = (KeyDataStreamOutput)
key.getByteBufStreamOutput();
+ BlockDataStreamOutputEntry stream =
keyDataStreamOutput.getStreamEntries().get(0);
+
+ // Now check 3 DNs in a random pipeline returns the correct DN versions
+ List<DatanodeDetails> streamDnDetails = stream.getPipeline().getNodes();
+ for (DatanodeDetails details : streamDnDetails) {
+ assertEquals(DN_OLD_VERSION.toProtoValue(),
details.getCurrentVersion());
+ }
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]