This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d24254e146a [feature](journal) Add a method to write a set of journals in batch (#30380) d24254e146a is described below commit d24254e146a06d8b2b9a65de0fbdb55ea9735a29 Author: walter <w41te...@gmail.com> AuthorDate: Thu Jan 25 22:26:29 2024 +0800 [feature](journal) Add a method to write a set of journals in batch (#30380) --- .../java/org/apache/doris/journal/Journal.java | 5 ++ .../org/apache/doris/journal/JournalBatch.java | 81 ++++++++++++++++++ .../apache/doris/journal/bdbje/BDBJEJournal.java | 78 +++++++++++++++++ .../apache/doris/journal/local/LocalJournal.java | 12 +++ .../doris/persist/EditLogFileOutputStream.java | 5 ++ .../apache/doris/persist/EditLogOutputStream.java | 2 + .../doris/journal/bdbje/BDBJEJournalTest.java | 97 ++++++++++++++++++++++ 7 files changed, 280 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java index 8fca299df19..b5b37a80ef0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/Journal.java @@ -49,6 +49,11 @@ public interface Journal { // Write a journal and sync to disk public long write(short op, Writable writable) throws IOException; + // Write a set of journal to disk in batch. + // + // Return the first id of the batched journals. + public long write(JournalBatch batch) throws IOException; + // Get current journal number public long getJournalNum(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java new file mode 100644 index 00000000000..0fc0ccf9355 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalBatch.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal; + +import org.apache.doris.common.io.DataOutputBuffer; +import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.OperationType; + +import java.io.IOException; +import java.util.ArrayList; + +public class JournalBatch { + private static final int OUTPUT_BUFFER_INIT_SIZE = 128; + + private ArrayList<Entity> entities; + + public JournalBatch() { + entities = new ArrayList<>(); + } + + public JournalBatch(int cap) { + entities = new ArrayList<>(cap); + } + + // Add a writable data into journal batch. + // + // The writable data will be serialized and saved in the journal batch with an internal + // representation, so it is safety to update the data object once this function returned. + public void addJournal(short op, Writable data) throws IOException { + if (op == OperationType.OP_TIMESTAMP) { + // OP_TIMESTAMP is not supported, see `BDBJEJournal.write` for details. + throw new RuntimeException("JournalBatch.addJournal is not supported OP_TIMESTAMP"); + } + + JournalEntity entity = new JournalEntity(); + entity.setOpCode(op); + entity.setData(data); + + DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE); + entity.write(buffer); + + entities.add(new Entity(op, buffer)); + } + + public ArrayList<Entity> getJournalEntities() { + return entities; + } + + public static class Entity { + short op; + DataOutputBuffer data; + + Entity(short op, DataOutputBuffer data) { + this.op = op; + this.data = data; + } + + public short getOpCode() { + return op; + } + + public byte[] getBinaryData() { + return data.getData(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 3698b8f76ca..95efbfa3780 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -24,6 +24,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.NetUtils; import org.apache.doris.common.util.Util; import org.apache.doris.journal.Journal; +import org.apache.doris.journal.JournalBatch; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.metric.MetricRepo; @@ -121,6 +122,83 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } } + @Override + public synchronized long write(JournalBatch batch) throws IOException { + List<JournalBatch.Entity> entities = batch.getJournalEntities(); + int entitySize = entities.size(); + long dataSize = 0; + long firstId = nextJournalId.getAndAdd(entitySize); + + // Write the journals to bdb. + for (int i = 0; i < RETRY_TIME; i++) { + Transaction txn = null; + try { + // The default config is constructed from the configs of environment. + txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, null); + for (int j = 0; j < entitySize; ++j) { + JournalBatch.Entity entity = entities.get(j); + DatabaseEntry theKey = idToKey(firstId + j); + DatabaseEntry theData = new DatabaseEntry(entity.getBinaryData()); + currentJournalDB.put(txn, theKey, theData); // Put with overwrite, it always success + dataSize += theData.getSize(); + if (i == 0) { + LOG.debug("opCode = {}, journal size = {}", entity.getOpCode(), theData.getSize()); + } + } + + txn.commit(); + txn = null; + + if (MetricRepo.isInit) { + MetricRepo.COUNTER_EDIT_LOG_SIZE_BYTES.increase(dataSize); + MetricRepo.COUNTER_CURRENT_EDIT_LOG_SIZE_BYTES.increase(dataSize); + } + + return firstId; + } catch (ReplicaWriteException e) { + /** + * This exception indicates that an update operation or transaction commit + * or abort was attempted while in the + * {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked + * as being invalid. + * <p> + * The exception is the result of either an error in the application logic or + * the result of a transition of the node from Master to Replica while a + * transaction was in progress. + * <p> + * The application must abort the current transaction and redirect all + * subsequent update operations to the Master. + */ + LOG.error("catch ReplicaWriteException when writing to database, will exit. the first journal id {}", + firstId, e); + String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: " + + currentJournalDB.getDatabaseName(); + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); + } catch (DatabaseException e) { + LOG.error("catch an exception when writing to database. sleep and retry. the first journal id {}", + firstId, e); + try { + Thread.sleep(5 * 1000); + } catch (InterruptedException e1) { + LOG.warn("", e1); + } + } finally { + if (txn != null) { + txn.abort(); + } + } + } + + String msg = "write bdb failed. will exit. the first journalId: " + firstId + ", bdb database Name: " + + currentJournalDB.getDatabaseName(); + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); + return 0; // unreachable! + } + @Override public synchronized long write(short op, Writable writable) throws IOException { JournalEntity entity = new JournalEntity(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java index 9ba3274e0f0..45ae377a401 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/local/LocalJournal.java @@ -19,6 +19,7 @@ package org.apache.doris.journal.local; import org.apache.doris.common.io.Writable; import org.apache.doris.journal.Journal; +import org.apache.doris.journal.JournalBatch; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.persist.EditLogFileOutputStream; @@ -139,6 +140,17 @@ public class LocalJournal implements Journal { return cursor; } + @Override + public synchronized long write(JournalBatch batch) throws IOException { + List<JournalBatch.Entity> entities = batch.getJournalEntities(); + for (JournalBatch.Entity entity : entities) { + outputStream.write(entity.getOpCode(), entity.getBinaryData()); + } + outputStream.setReadyToFlush(); + outputStream.flush(); + return journalId.getAndAdd(entities.size()); + } + @Override public synchronized long write(short op, Writable writable) throws IOException { outputStream.write(op, writable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java index 3d8b360704e..d98f66bacd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogFileOutputStream.java @@ -65,6 +65,11 @@ public class EditLogFileOutputStream extends EditLogOutputStream { writable.write(bufCurrent); } + public void write(short op, byte[] data) throws IOException { + bufCurrent.writeShort(op); + bufCurrent.write(data); + } + // Create empty edits logs file. void create() throws IOException { fc.truncate(0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java index 95b746e0cb2..e7f966b397e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLogOutputStream.java @@ -51,6 +51,8 @@ public abstract class EditLogOutputStream extends OutputStream { */ public abstract void write(short op, Writable writable) throws IOException; + public abstract void write(short op, byte[] data) throws IOException; + abstract void create() throws IOException; public abstract void close() throws IOException; diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java index ba81d6697ba..edcbf9c033c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.journal.JournalBatch; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.persist.OperationType; @@ -228,4 +229,100 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use Assertions.assertEquals(21, journal.getDatabaseNames().get(0)); journal.close(); } + + @RepeatedTest(1) + public void testJournalBatch() throws Exception { + int port = findValidPort(); + Preconditions.checkArgument(((port > 0) && (port < 65535))); + String nodeName = Env.genFeNodeName("127.0.0.1", port, false); + long replayedJournalId = 0; + File tmpDir = createTmpDir(); + new MockUp<Env>() { + HostInfo selfNode = new HostInfo("127.0.0.1", port); + @Mock + public String getBdbDir() { + return tmpDir.getAbsolutePath(); + } + + @Mock + public HostInfo getSelfNode() { + return this.selfNode; + } + + @Mock + public HostInfo getHelperNode() { + return this.selfNode; + } + + @Mock + public boolean isElectable() { + return true; + } + + @Mock + public long getReplayedJournalId() { + return replayedJournalId; + } + }; + + LOG.info("BdbDir:{}, selfNode:{}, nodeName:{}", Env.getServingEnv().getBdbDir(), + Env.getServingEnv().getBdbDir(), nodeName); + Assertions.assertEquals(tmpDir.getAbsolutePath(), Env.getServingEnv().getBdbDir()); + BDBJEJournal journal = new BDBJEJournal(nodeName); + journal.open(); + // BDBEnvironment need several seconds election from unknown to master + for (int i = 0; i < 10; i++) { + if (journal.getBDBEnvironment().getReplicatedEnvironment().getState() + .equals(ReplicatedEnvironment.State.MASTER)) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(ReplicatedEnvironment.State.MASTER, + journal.getBDBEnvironment().getReplicatedEnvironment().getState()); + + journal.rollJournal(); + JournalBatch batch = new JournalBatch(10); + for (int i = 0; i < 10; i++) { + String data = "JournalBatch item " + i; + Writable writable = new Writable() { + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, data); + } + }; + // CREATE_MTMV_JOB is deprecated, and safe to write any data. + batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable); + } + long journalId = journal.write(batch); + Assertions.assertEquals(1, journalId); + + Assertions.assertEquals(10, journal.getMaxJournalId()); + Assertions.assertEquals(10, journal.getJournalNum()); + Assertions.assertEquals(1, journal.getMinJournalId()); + Assertions.assertEquals(0, journal.getFinalizedJournalId()); + + LOG.debug("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); + Assertions.assertEquals(1, journal.getDatabaseNames().size()); + Assertions.assertEquals(1, journal.getDatabaseNames().get(0)); + + JournalEntity journalEntity = journal.read(1); + Assertions.assertEquals(OperationType.OP_CREATE_MTMV_JOB, journalEntity.getOpCode()); + + batch = new JournalBatch(10); + for (int i = 0; i < 10; i++) { + String data = "JournalBatch 2 item " + i; + Writable writable = new Writable() { + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, data); + } + }; + batch.addJournal(OperationType.OP_CREATE_MTMV_JOB, writable); + } + journalId = journal.write(batch); + Assertions.assertEquals(11, journalId); + + journal.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org