This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d24254e146a [feature](journal) Add a method to write a set of journals 
in batch (#30380)
d24254e146a is described below

commit d24254e146a06d8b2b9a65de0fbdb55ea9735a29
Author: walter <w41te...@gmail.com>
AuthorDate: Thu Jan 25 22:26:29 2024 +0800

    [feature](journal) Add a method to write a set of journals in batch (#30380)
---
 .../java/org/apache/doris/journal/Journal.java     |  5 ++
 .../org/apache/doris/journal/JournalBatch.java     | 81 ++++++++++++++++++
 .../apache/doris/journal/bdbje/BDBJEJournal.java   | 78 +++++++++++++++++
 .../apache/doris/journal/local/LocalJournal.java   | 12 +++
 .../doris/persist/EditLogFileOutputStream.java     |  5 ++
 .../apache/doris/persist/EditLogOutputStream.java  |  2 +
 .../doris/journal/bdbje/BDBJEJournalTest.java      | 97 ++++++++++++++++++++++
 7 files changed, 280 insertions(+)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
index 8fca299df19..b5b37a80ef0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java
@@ -49,6 +49,11 @@ public interface Journal {
     // Write a journal and sync to disk
     public long write(short op, Writable writable) throws IOException;
 
+    // Write a set of journal to disk in batch.
+    //
+    // Return the first id of the batched journals.
+    public long write(JournalBatch batch) throws IOException;
+
     // Get current journal number
     public long getJournalNum();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
new file mode 100644
index 00000000000..0fc0ccf9355
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.journal;
+
+import org.apache.doris.common.io.DataOutputBuffer;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.OperationType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class JournalBatch {
+    private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
+
+    private ArrayList<Entity> entities;
+
+    public JournalBatch() {
+        entities = new ArrayList<>();
+    }
+
+    public JournalBatch(int cap) {
+        entities = new ArrayList<>(cap);
+    }
+
+    // Add a writable data into journal batch.
+    //
+    // The writable data will be serialized and saved in the journal batch 
with an internal
+    // representation, so it is safety to update the data object once this 
function returned.
+    public void addJournal(short op, Writable data) throws IOException {
+        if (op == OperationType.OP_TIMESTAMP) {
+            // OP_TIMESTAMP is not supported, see `BDBJEJournal.write` for 
details.
+            throw new RuntimeException("JournalBatch.addJournal is not 
supported OP_TIMESTAMP");
+        }
+
+        JournalEntity entity = new JournalEntity();
+        entity.setOpCode(op);
+        entity.setData(data);
+
+        DataOutputBuffer buffer = new 
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
+        entity.write(buffer);
+
+        entities.add(new Entity(op, buffer));
+    }
+
+    public ArrayList<Entity> getJournalEntities() {
+        return entities;
+    }
+
+    public static class Entity {
+        short op;
+        DataOutputBuffer data;
+
+        Entity(short op, DataOutputBuffer data) {
+            this.op = op;
+            this.data = data;
+        }
+
+        public short getOpCode() {
+            return op;
+        }
+
+        public byte[] getBinaryData() {
+            return data.getData();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 3698b8f76ca..95efbfa3780 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -24,6 +24,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.metric.MetricRepo;
@@ -121,6 +122,83 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         }
     }
 
+    @Override
+    public synchronized long write(JournalBatch batch) throws IOException {
+        List<JournalBatch.Entity> entities = batch.getJournalEntities();
+        int entitySize = entities.size();
+        long dataSize = 0;
+        long firstId = nextJournalId.getAndAdd(entitySize);
+
+        // Write the journals to bdb.
+        for (int i = 0; i < RETRY_TIME; i++) {
+            Transaction txn = null;
+            try {
+                // The default config is constructed from the configs of 
environment.
+                txn = 
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, null);
+                for (int j = 0; j < entitySize; ++j) {
+                    JournalBatch.Entity entity = entities.get(j);
+                    DatabaseEntry theKey = idToKey(firstId + j);
+                    DatabaseEntry theData = new 
DatabaseEntry(entity.getBinaryData());
+                    currentJournalDB.put(txn, theKey, theData);  // Put with 
overwrite, it always success
+                    dataSize += theData.getSize();
+                    if (i == 0) {
+                        LOG.debug("opCode = {}, journal size = {}", 
entity.getOpCode(), theData.getSize());
+                    }
+                }
+
+                txn.commit();
+                txn = null;
+
+                if (MetricRepo.isInit) {
+                    MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize);
+                    
MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase(dataSize);
+                }
+
+                return firstId;
+            } catch (ReplicaWriteException e) {
+                /**
+                 * This exception indicates that an update operation or 
transaction commit
+                 * or abort was attempted while in the
+                 * {@link ReplicatedEnvironment.State#REPLICA} state. The 
transaction is marked
+                 * as being invalid.
+                 * <p>
+                 * The exception is the result of either an error in the 
application logic or
+                 * the result of a transition of the node from Master to 
Replica while a
+                 * transaction was in progress.
+                 * <p>
+                 * The application must abort the current transaction and 
redirect all
+                 * subsequent update operations to the Master.
+                 */
+                LOG.error("catch ReplicaWriteException when writing to 
database, will exit. the first journal id {}",
+                        firstId, e);
+                String msg = "write bdb failed. will exit. the first 
journalId: " + firstId + ", bdb database Name: "
+                        + currentJournalDB.getDatabaseName();
+                LOG.error(msg);
+                Util.stdoutWithTime(msg);
+                System.exit(-1);
+            } catch (DatabaseException e) {
+                LOG.error("catch an exception when writing to database. sleep 
and retry. the first journal id {}",
+                        firstId, e);
+                try {
+                    Thread.sleep(5 * 1000);
+                } catch (InterruptedException e1) {
+                    LOG.warn("", e1);
+                }
+            } finally {
+                if (txn != null) {
+                    txn.abort();
+                }
+            }
+        }
+
+        String msg = "write bdb failed. will exit. the first journalId: " + 
firstId + ", bdb database Name: "
+                + currentJournalDB.getDatabaseName();
+        LOG.error(msg);
+        Util.stdoutWithTime(msg);
+        System.exit(-1);
+        return 0; // unreachable!
+    }
+
     @Override
     public synchronized long write(short op, Writable writable) throws 
IOException {
         JournalEntity entity = new JournalEntity();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
index 9ba3274e0f0..45ae377a401 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java
@@ -19,6 +19,7 @@ package org.apache.doris.journal.local;
 
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.journal.Journal;
+import org.apache.doris.journal.JournalBatch;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.persist.EditLogFileOutputStream;
@@ -139,6 +140,17 @@ public class LocalJournal implements Journal {
         return cursor;
     }
 
+    @Override
+    public synchronized long write(JournalBatch batch) throws IOException {
+        List<JournalBatch.Entity> entities = batch.getJournalEntities();
+        for (JournalBatch.Entity entity : entities) {
+            outputStream.write(entity.getOpCode(), entity.getBinaryData());
+        }
+        outputStream.setReadyToFlush();
+        outputStream.flush();
+        return journalId.getAndAdd(entities.size());
+    }
+
     @Override
     public synchronized long write(short op, Writable writable) throws 
IOException {
         outputStream.write(op, writable);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
index 3d8b360704e..d98f66bacd1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java
@@ -65,6 +65,11 @@ public class EditLogFileOutputStream extends 
EditLogOutputStream {
         writable.write(bufCurrent);
     }
 
+    public void write(short op, byte[] data) throws IOException {
+        bufCurrent.writeShort(op);
+        bufCurrent.write(data);
+    }
+
     // Create empty edits logs file.
     void create() throws IOException {
         fc.truncate(0);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
index 95b746e0cb2..e7f966b397e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java
@@ -51,6 +51,8 @@ public abstract class EditLogOutputStream extends 
OutputStream {
      */
     public abstract void write(short op, Writable writable) throws IOException;
 
+    public abstract void write(short op, byte[] data) throws IOException;
+
     abstract void create() throws IOException;
 
     public abstract void close() throws IOException;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
index ba81d6697ba..edcbf9c033c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.journal.JournalBatch;
 import org.apache.doris.journal.JournalCursor;
 import org.apache.doris.journal.JournalEntity;
 import org.apache.doris.persist.OperationType;
@@ -228,4 +229,100 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS 
LINE: BDBJE should use
         Assertions.assertEquals(21, journal.getDatabaseNames().get(0));
         journal.close();
     }
+
+    @RepeatedTest(1)
+    public void testJournalBatch() throws Exception {
+        int port = findValidPort();
+        Preconditions.checkArgument(((port > 0) && (port < 65535)));
+        String nodeName = Env.genFeNodeName("127.0.0.1", port, false);
+        long replayedJournalId = 0;
+        File tmpDir = createTmpDir();
+        new MockUp<Env>() {
+            HostInfo selfNode = new HostInfo("127.0.0.1", port);
+            @Mock
+            public String getBdbDir() {
+                return tmpDir.getAbsolutePath();
+            }
+
+            @Mock
+            public HostInfo getSelfNode() {
+                return this.selfNode;
+            }
+
+            @Mock
+            public HostInfo getHelperNode() {
+                return this.selfNode;
+            }
+
+            @Mock
+            public boolean isElectable() {
+                return true;
+            }
+
+            @Mock
+            public long getReplayedJournalId() {
+                return replayedJournalId;
+            }
+        };
+
+        LOG.info("BdbDir:{}, selfNode:{}, nodeName:{}", 
Env.getServingEnv().getBdbDir(),
+                Env.getServingEnv().getBdbDir(), nodeName);
+        Assertions.assertEquals(tmpDir.getAbsolutePath(), 
Env.getServingEnv().getBdbDir());
+        BDBJEJournal journal = new BDBJEJournal(nodeName);
+        journal.open();
+        // BDBEnvironment need several seconds election from unknown to master
+        for (int i = 0; i < 10; i++) {
+            if 
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+                    .equals(ReplicatedEnvironment.State.MASTER)) {
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+                
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+
+        journal.rollJournal();
+        JournalBatch batch = new JournalBatch(10);
+        for (int i = 0; i < 10; i++) {
+            String data = "JournalBatch item " + i;
+            Writable writable = new Writable() {
+                @Override
+                public void write(DataOutput out) throws IOException {
+                    Text.writeString(out, data);
+                }
+            };
+            // CREATE_MTMV_JOB is deprecated, and safe to write any data.
+            batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable);
+        }
+        long journalId = journal.write(batch);
+        Assertions.assertEquals(1, journalId);
+
+        Assertions.assertEquals(10, journal.getMaxJournalId());
+        Assertions.assertEquals(10, journal.getJournalNum());
+        Assertions.assertEquals(1, journal.getMinJournalId());
+        Assertions.assertEquals(0, journal.getFinalizedJournalId());
+
+        LOG.debug("journal.getDatabaseNames(): {}", 
journal.getDatabaseNames());
+        Assertions.assertEquals(1, journal.getDatabaseNames().size());
+        Assertions.assertEquals(1, journal.getDatabaseNames().get(0));
+
+        JournalEntity journalEntity = journal.read(1);
+        Assertions.assertEquals(OperationType.OP_CREATE_MTMV_JOB, 
journalEntity.getOpCode());
+
+        batch = new JournalBatch(10);
+        for (int i = 0; i < 10; i++) {
+            String data = "JournalBatch 2 item " + i;
+            Writable writable = new Writable() {
+                @Override
+                public void write(DataOutput out) throws IOException {
+                    Text.writeString(out, data);
+                }
+            };
+            batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable);
+        }
+        journalId = journal.write(batch);
+        Assertions.assertEquals(11, journalId);
+
+        journal.close();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to