This is an automated email from the ASF dual-hosted git repository.
ivandika pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3cab749b29b HDDS-14194. Make streaming write SYNC configurable (#9533)
3cab749b29b is described below
commit 3cab749b29b94e59664f7b60080fdad7571e0b7d
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Dec 27 12:43:27 2025 +0800
HDDS-14194. Make streaming write SYNC configurable (#9533)
---
.../java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java | 13 +++++++++++++
.../hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 8 ++++----
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 488b658fce2..0002284b7f9 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -84,6 +84,15 @@ public class OzoneClientConfig {
tags = ConfigTag.CLIENT)
private boolean datastreamPipelineMode = true;
+ @Config(key = "ozone.client.datastream.sync.size",
+ defaultValue = "0B",
+ type = ConfigType.SIZE,
+ description = "The minimum size of written data before forcing the
datanodes " +
+ "in the pipeline to flush the pending data to underlying storage." +
+ " If set to zero or negative, the client will not force the
datanodes to flush.",
+ tags = ConfigTag.CLIENT)
+ private int dataStreamSyncSize = 0;
+
@Config(key = "ozone.client.stream.buffer.increment",
defaultValue = "0B",
type = ConfigType.SIZE,
@@ -570,6 +579,10 @@ public void setDatastreamPipelineMode(boolean
datastreamPipelineMode) {
this.datastreamPipelineMode = datastreamPipelineMode;
}
+ public int getDataStreamSyncSize() {
+ return dataStreamSyncSize;
+ }
+
public void setHBaseEnhancementsAllowed(boolean isHBaseEnhancementsEnabled) {
this.hbaseEnhancementsAllowed = isHBaseEnhancementsEnabled;
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index e398c79ce8d..e570ba30885 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -131,7 +131,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
private final DataStreamOutput out;
private CompletableFuture<DataStreamReply> dataStreamCloseReply;
private List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
- private static final long SYNC_SIZE = 0; // TODO: disk sync is disabled for
now
+ private final long syncSize;
private long syncPosition = 0;
private StreamBuffer currentBuffer;
private XceiverClientMetrics metrics;
@@ -157,6 +157,7 @@ public BlockDataStreamOutput(
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
this.isDatastreamPipelineMode = config.isDatastreamPipelineMode();
+ this.syncSize = config.getDataStreamSyncSize();
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
@@ -647,9 +648,8 @@ public boolean isClosed() {
}
private boolean needSync(long position) {
- if (SYNC_SIZE > 0) {
- // TODO: or position >= fileLength
- if (position - syncPosition >= SYNC_SIZE) {
+ if (syncSize > 0) {
+ if (position - syncPosition >= syncSize) {
syncPosition = position;
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]