This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 449536bed0 Revert "[ISSUE# 9333] Use fastjson2 in broker module
(#9334)" (#9387)
449536bed0 is described below
commit 449536bed023ef57713ecb097433bab544a3ae67
Author: lizhimins <[email protected]>
AuthorDate: Tue May 6 17:55:53 2025 +0800
Revert "[ISSUE# 9333] Use fastjson2 in broker module (#9334)" (#9387)
This reverts commit b1daa3c0f0e8dd85f181331e4ec4ff276fb361c0.
---
broker/BUILD.bazel | 2 +
.../rocketmq/broker/RocksDBConfigManager.java | 7 +-
.../broker/offset/ConsumerOrderInfoManager.java | 19 +--
.../rocketmq/broker/pop/PopConsumerRecord.java | 8 +-
.../rocketmq/broker/pop/PopConsumerService.java | 35 ++--
.../broker/processor/AckMessageProcessor.java | 7 +-
.../broker/processor/AdminBrokerProcessor.java | 49 +++---
.../processor/ChangeInvisibleTimeProcessor.java | 9 +-
.../broker/processor/PopBufferMergeService.java | 19 +--
.../broker/processor/PopMessageProcessor.java | 31 ++--
.../broker/processor/PopReviveService.java | 25 ++-
.../broker/topic/TopicQueueMappingManager.java | 21 +--
.../broker/transaction/TransactionMetrics.java | 44 ++---
.../rocketmq/broker/RocksDBConfigManagerTest.java | 75 ---------
.../broker/processor/AdminBrokerProcessorTest.java | 79 +--------
.../ChangeInvisibleTimeProcessorTest.java | 57 +------
.../processor/PopBufferMergeServiceTest.java | 182 +++++----------------
.../broker/processor/PopMessageProcessorTest.java | 50 +-----
.../broker/processor/PopReviveServiceTest.java | 83 ++--------
.../broker/topic/TopicQueueMappingManagerTest.java | 68 ++------
.../transaction/queue/TransactionMetricsTest.java | 59 +------
.../apache/rocketmq/store/pop/PopCheckPoint.java | 5 +-
22 files changed, 219 insertions(+), 715 deletions(-)
diff --git a/broker/BUILD.bazel b/broker/BUILD.bazel
index 6ee2c8635f..9d61c0a1ff 100644
--- a/broker/BUILD.bazel
+++ b/broker/BUILD.bazel
@@ -31,6 +31,7 @@ java_library(
"//tieredstore",
"@maven//:org_slf4j_slf4j_api",
"@maven//:ch_qos_logback_logback_classic",
+ "@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:com_github_luben_zstd_jni",
"@maven//:com_google_guava_guava",
@@ -81,6 +82,7 @@ java_library(
"//remoting",
"//store",
"//tieredstore",
+ "@maven//:com_alibaba_fastjson",
"@maven//:com_alibaba_fastjson2_fastjson2",
"@maven//:org_slf4j_slf4j_api",
"@maven//:com_google_guava_guava",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
index c59c00c040..ee2d4e54a6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/RocksDBConfigManager.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.broker;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -29,9 +31,6 @@ import org.rocksdb.RocksIterator;
import org.rocksdb.Statistics;
import org.rocksdb.WriteBatch;
-import java.nio.charset.StandardCharsets;
-import java.util.function.BiConsumer;
-
public class RocksDBConfigManager {
protected static final Logger BROKER_LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public volatile boolean isStop = false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 9f173daf46..120f5b104c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -16,9 +16,17 @@
*/
package org.apache.rocketmq.broker.offset;
-import com.alibaba.fastjson2.annotation.JSONField;
+import com.alibaba.fastjson.annotation.JSONField;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -29,15 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
public class ConsumerOrderInfoManager extends ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
index 661ace9bcb..1ee01fea1c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerRecord.java
@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.broker.pop;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.annotation.JSONField;
-
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -119,7 +119,7 @@ public class PopConsumerRecord {
}
public static PopConsumerRecord decode(byte[] body) {
- return JSON.parseObject(body, PopConsumerRecord.class);
+ return JSONObject.parseObject(body, PopConsumerRecord.class);
}
public long getPopTime() {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
index a2198f2560..1138ff4afe 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java
@@ -16,9 +16,25 @@
*/
package org.apache.rocketmq.broker.pop;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
@@ -49,23 +65,6 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
public class PopConsumerService extends ServiceThread {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 06a531552a..23a4f6167c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
+import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -50,9 +52,6 @@ import
org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
-import java.nio.charset.StandardCharsets;
-import java.util.BitSet;
-
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4d45730a3c..812ca90e82 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,12 +16,33 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.api.common.Attributes;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.auth.authentication.enums.UserType;
@@ -206,28 +227,6 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_INVOCATION_STATUS;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
@@ -2733,7 +2732,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
} else {
ConsumerFilterData filterData =
this.brokerController.getConsumerFilterManager()
.get(requestHeader.getTopic(),
requestHeader.getConsumerGroup());
- body.setFilterData(JSON.toJSONString(filterData));
+ body.setFilterData(JSON.toJSONString(filterData, true));
messageFilter = new ExpressionMessageFilter(subscriptionData,
filterData,
this.brokerController.getConsumerFilterManager());
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index f288c001b8..de72ee7baf 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -16,9 +16,12 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -47,10 +50,6 @@ import
org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 7c309ec5c4..820388b18d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -16,7 +16,15 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
@@ -36,15 +44,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-
public class PopBufferMergeService extends ServiceThread {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
ConcurrentHashMap<String/*mergeKey*/, PopCheckPointWrapper>
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 9f55269f39..d73acc84df 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -16,13 +16,27 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion;
import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
@@ -77,21 +91,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Random;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index dcffaf50cc..e1ead86169 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -16,8 +16,18 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
import io.opentelemetry.api.common.Attributes;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
@@ -27,12 +37,12 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -51,17 +61,6 @@ import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
index dfbe5d347a..6b9cf15938 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManager.java
@@ -16,8 +16,13 @@
*/
package org.apache.rocketmq.broker.topic;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -35,13 +40,6 @@ import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import static
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
public class TopicQueueMappingManager extends ConfigManager {
@@ -151,10 +149,7 @@ public class TopicQueueMappingManager extends
ConfigManager {
TopicQueueMappingSerializeWrapper wrapper = new
TopicQueueMappingSerializeWrapper();
wrapper.setTopicQueueMappingInfoMap(topicQueueMappingTable);
wrapper.setDataVersion(this.dataVersion);
- if (pretty) {
- return JSON.toJSONString(wrapper, JSONWriter.Feature.PrettyFormat);
- }
- return JSON.toJSONString(wrapper);
+ return JSON.toJSONString(wrapper, pretty);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
index d8dd811db2..ad30c73c60 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetrics.java
@@ -16,21 +16,16 @@
*/
package org.apache.rocketmq.broker.transaction;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
import com.google.common.io.Files;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -38,6 +33,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.DataVersion;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
public class TransactionMetrics extends ConfigManager {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -87,11 +90,11 @@ public class TransactionMetrics extends ConfigManager {
this.transactionCounts = transactionCounts;
}
- protected void write0(OutputStream out) {
+ protected void write0(Writer writer) {
TransactionMetricsSerializeWrapper wrapper = new
TransactionMetricsSerializeWrapper();
wrapper.setTransactionCount(transactionCounts);
wrapper.setDataVersion(dataVersion);
- JSON.writeTo(out, wrapper, JSONWriter.Feature.BrowserCompatible);
+ JSON.writeJSONString(writer, wrapper,
SerializerFeature.BrowserCompatible);
}
@Override
@@ -179,7 +182,7 @@ public class TransactionMetrics extends ConfigManager {
String config = configFilePath();
String temp = config + ".tmp";
String backup = config + ".bak";
- FileOutputStream outputStream = null;
+ BufferedWriter bufferedWriter = null;
try {
File tmpFile = new File(temp);
File parentDirectory = tmpFile.getParentFile();
@@ -196,10 +199,11 @@ public class TransactionMetrics extends ConfigManager {
return;
}
}
- outputStream = new FileOutputStream(tmpFile, false);
- write0(outputStream);
- outputStream.flush();
- outputStream.close();
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(new
FileOutputStream(tmpFile, false),
+ StandardCharsets.UTF_8));
+ write0(bufferedWriter);
+ bufferedWriter.flush();
+ bufferedWriter.close();
log.debug("Finished writing tmp file: {}", temp);
File configFile = new File(config);
@@ -212,9 +216,9 @@ public class TransactionMetrics extends ConfigManager {
} catch (IOException e) {
log.error("Failed to persist {}", temp, e);
} finally {
- if (null != outputStream) {
+ if (null != bufferedWriter) {
try {
- outputStream.close();
+ bufferedWriter.close();
} catch (IOException ignore) {
}
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
deleted file mode 100644
index d9feb6a782..0000000000
---
a/broker/src/test/java/org/apache/rocketmq/broker/RocksDBConfigManagerTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.broker;
-
-import com.alibaba.fastjson2.JSON;
-import org.apache.rocketmq.common.config.ConfigRocksDBStorage;
-import org.apache.rocketmq.remoting.protocol.DataVersion;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-
-public class RocksDBConfigManagerTest {
-
- private ConfigRocksDBStorage configRocksDBStorage;
-
- private RocksDBConfigManager rocksDBConfigManager;
-
- @Before
- public void setUp() throws IllegalAccessException {
- configRocksDBStorage = mock(ConfigRocksDBStorage.class);
- rocksDBConfigManager = spy(new RocksDBConfigManager("testPath", 1000L,
null));
- rocksDBConfigManager.configRocksDBStorage = configRocksDBStorage;
- }
-
- @Test
- public void testLoadDataVersion() throws Exception {
- DataVersion expected = new DataVersion();
- expected.nextVersion();
- String jsonData = JSON.toJSONString(expected);
- byte[] mockDataVersion = jsonData.getBytes(StandardCharsets.UTF_8);
-
-
when(rocksDBConfigManager.configRocksDBStorage.getKvDataVersion()).thenReturn(mockDataVersion);
-
- boolean result = rocksDBConfigManager.loadDataVersion();
-
- assertTrue(result);
- assertEquals(expected.getCounter().get(),
rocksDBConfigManager.getKvDataVersion().getCounter().get());
- assertEquals(expected.getTimestamp(),
rocksDBConfigManager.getKvDataVersion().getTimestamp());
- }
-
- @Test
- public void testUpdateKvDataVersion() throws Exception {
- rocksDBConfigManager.updateKvDataVersion();
-
- DataVersion expectedDataVersion =
rocksDBConfigManager.getKvDataVersion();
- verify(rocksDBConfigManager.configRocksDBStorage,
times(1)).updateKvDataVersion(
-
eq(JSON.toJSONString(expectedDataVersion).getBytes(StandardCharsets.UTF_8))
- );
- }
-}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index 8418781b6b..a6bcca954d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -16,8 +16,7 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.fastjson.JSON;
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -35,10 +34,10 @@ import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
-import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
-import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
+import org.apache.rocketmq.broker.config.v1.RocksDBSubscriptionGroupManager;
+import org.apache.rocketmq.broker.config.v1.RocksDBTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
@@ -74,7 +73,6 @@ import
org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UserInfo;
-import
org.apache.rocketmq.remoting.protocol.header.CheckRocksdbCqWriteProgressRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
@@ -89,13 +87,11 @@ import
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHead
import
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.GetSubscriptionGroupConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.QueryConsumeQueueRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
import
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
@@ -108,7 +104,6 @@ import
org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -116,7 +111,6 @@ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
@@ -151,8 +145,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -727,7 +719,7 @@ public class AdminBrokerProcessorTest {
consumerOffsetManager = mock(ConsumerOffsetManager.class);
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
-
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset));
+
when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset,
false));
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
@@ -1338,69 +1330,6 @@ public class AdminBrokerProcessorTest {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
- @Test
- public void testGetSubscriptionGroup() throws RemotingCommandException {
-
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
new SubscriptionGroupConfig());
- GetSubscriptionGroupConfigRequestHeader requestHeader = new
GetSubscriptionGroupConfigRequestHeader();
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_SUBSCRIPTIONGROUP_CONFIG,
requestHeader);
- requestHeader.setGroup("group");
- request.makeCustomHeaderToNet();
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertEquals(ResponseCode.SUCCESS, response.getCode());
- }
-
- @Test
- public void testCheckRocksdbCqWriteProgress() throws
RemotingCommandException {
- CheckRocksdbCqWriteProgressRequestHeader requestHeader = new
CheckRocksdbCqWriteProgressRequestHeader();
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_ROCKSDB_CQ_WRITE_PROGRESS,
requestHeader);
- requestHeader.setTopic("topic");
- request.makeCustomHeaderToNet();
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertEquals(ResponseCode.SUCCESS, response.getCode());
- }
-
- @Test
- public void testQueryConsumeQueue() throws RemotingCommandException {
- messageStore = mock(MessageStore.class);
- ConsumeQueueInterface consumeQueue = mock(ConsumeQueueInterface.class);
- when(consumeQueue.getMinOffsetInQueue()).thenReturn(0L);
- when(consumeQueue.getMaxOffsetInQueue()).thenReturn(1L);
- when(messageStore.getConsumeQueue(anyString(),
anyInt())).thenReturn(consumeQueue);
- when(brokerController.getMessageStore()).thenReturn(messageStore);
- QueryConsumeQueueRequestHeader requestHeader = new
QueryConsumeQueueRequestHeader();
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_QUEUE,
requestHeader);
- requestHeader.setTopic("topic");
- requestHeader.setQueueId(0);
- request.makeCustomHeaderToNet();
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
- assertEquals(ResponseCode.SUCCESS, response.getCode());
- }
-
- @Test
- public void testProcessRequest_GetTopicConfig() throws Exception {
- GetTopicConfigRequestHeader requestHeader = new
GetTopicConfigRequestHeader();
- requestHeader.setTopic("testTopic");
-
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG,
requestHeader);
- request.makeCustomHeaderToNet();
-
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setTopicName("testTopic");
- TopicConfigManager topicConfigManager = mock(TopicConfigManager.class);
-
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
- when(topicConfigManager.selectTopicConfig("testTopic"))
- .thenReturn(topicConfig);
-
- RemotingCommand response =
adminBrokerProcessor.processRequest(handlerContext, request);
-
- assertNotNull(response);
- assertEquals(ResponseCode.SUCCESS, response.getCode());
-
- String responseBody = new String(response.getBody(),
StandardCharsets.UTF_8);
- TopicConfigAndQueueMapping result =
JSONObject.parseObject(responseBody, TopicConfigAndQueueMapping.class);
- assertEquals("testTopic", result.getTopicName());
- }
-
private ResetOffsetRequestHeader createRequestHeader(String topic,String
group,long timestamp,boolean force,long offset,int queueId) {
ResetOffsetRequestHeader requestHeader = new
ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
index 77490dbd69..e15d51b4a8 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
@@ -18,11 +18,12 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.failover.EscapeBridge;
-import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
@@ -40,12 +41,10 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,13 +52,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import java.lang.reflect.Field;
-import java.util.concurrent.CompletableFuture;
-
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -168,51 +162,4 @@ public class ChangeInvisibleTimeProcessorTest {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
}
-
- @Test
- public void testProcessRequestAsync_JsonParsing() throws Exception {
- Channel mockChannel = mock(Channel.class);
- RemotingCommand mockRequest = mock(RemotingCommand.class);
- BrokerController mockBrokerController = mock(BrokerController.class);
- TopicConfigManager mockTopicConfigManager =
mock(TopicConfigManager.class);
- MessageStore mockMessageStore = mock(MessageStore.class);
- BrokerConfig mockBrokerConfig = mock(BrokerConfig.class);
- BrokerStatsManager mockBrokerStatsManager =
mock(BrokerStatsManager.class);
- PopMessageProcessor mockPopMessageProcessor =
mock(PopMessageProcessor.class);
- PopBufferMergeService mockPopBufferMergeService =
mock(PopBufferMergeService.class);
-
-
when(mockBrokerController.getTopicConfigManager()).thenReturn(mockTopicConfigManager);
-
when(mockBrokerController.getMessageStore()).thenReturn(mockMessageStore);
-
when(mockBrokerController.getBrokerConfig()).thenReturn(mockBrokerConfig);
-
when(mockBrokerController.getBrokerStatsManager()).thenReturn(mockBrokerStatsManager);
-
when(mockBrokerController.getPopMessageProcessor()).thenReturn(mockPopMessageProcessor);
-
when(mockPopMessageProcessor.getPopBufferMergeService()).thenReturn(mockPopBufferMergeService);
- when(mockPopBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
- when(mockBrokerController.getEscapeBridge()).thenReturn(escapeBridge);
- PutMessageResult mockPutMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null, true);
-
when(mockBrokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenReturn(CompletableFuture.completedFuture(mockPutMessageResult));
-
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setReadQueueNums(4);
-
when(mockTopicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
- when(mockMessageStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(0L);
- when(mockMessageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(10L);
-
when(mockBrokerConfig.isPopConsumerKVServiceEnable()).thenReturn(false);
-
- ChangeInvisibleTimeRequestHeader requestHeader = new
ChangeInvisibleTimeRequestHeader();
- requestHeader.setTopic("TestTopic");
- requestHeader.setQueueId(1);
- requestHeader.setOffset(5L);
- requestHeader.setConsumerGroup("TestGroup");
- requestHeader.setExtraInfo("0 10000 10000 0 TestBroker 1");
- requestHeader.setInvisibleTime(60000L);
-
when(mockRequest.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class)).thenReturn(requestHeader);
-
- ChangeInvisibleTimeProcessor processor = new
ChangeInvisibleTimeProcessor(mockBrokerController);
- CompletableFuture<RemotingCommand> futureResponse =
processor.processRequestAsync(mockChannel, mockRequest, true);
-
- RemotingCommand response = futureResponse.get();
- assertNotNull(response);
- assertEquals(ResponseCode.SUCCESS, response.getCode());
- }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
index 33d6820a7e..acc7a3da74 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
@@ -16,20 +16,19 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.ConsumerManager;
-import org.apache.rocketmq.broker.failover.EscapeBridge;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
-import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
@@ -38,82 +37,56 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import java.lang.reflect.Method;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.Silent.class)
public class PopBufferMergeServiceTest {
-
+ @Spy
+ private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
@Mock
- private BrokerController brokerController;
-
private PopMessageProcessor popMessageProcessor;
-
- @Mock
- private ScheduleMessageService scheduleMessageService;
-
@Mock
- private TopicConfigManager topicConfigManager;
-
- @Mock
- private ConsumerManager consumerManager;
-
+ private ChannelHandlerContext handlerContext;
@Mock
private DefaultMessageStore messageStore;
-
- @Mock
- private MessageStoreConfig messageStoreConfig;
-
- private String defaultGroup = "defaultGroup";
-
- private String defaultTopic = "defaultTopic";
-
- private PopBufferMergeService popBufferMergeService;
-
- @Mock
- private BrokerConfig brokerConfig;
-
- @Mock
- private EscapeBridge escapeBridge;
+ private ScheduleMessageService scheduleMessageService;
+ private ClientChannelInfo clientChannelInfo;
+ private String group = "FooBarGroup";
+ private String topic = "FooBar";
@Before
public void init() throws Exception {
- when(brokerConfig.getBrokerIP1()).thenReturn("127.0.0.1");
- when(brokerConfig.isEnablePopBufferMerge()).thenReturn(true);
- when(brokerConfig.getPopCkStayBufferTime()).thenReturn(10 * 1000);
- when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
- when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
- when(brokerController.getMessageStore()).thenReturn(messageStore);
-
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
-
when(brokerController.getScheduleMessageService()).thenReturn(scheduleMessageService);
-
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+ FieldUtils.writeField(brokerController.getBrokerConfig(),
"enablePopBufferMerge", true, true);
+ brokerController.setMessageStore(messageStore);
popMessageProcessor = new PopMessageProcessor(brokerController);
- popBufferMergeService = new PopBufferMergeService(brokerController,
popMessageProcessor);
- FieldUtils.writeDeclaredField(popBufferMergeService,
"brokerController", brokerController, true);
- ConcurrentMap<String, TopicConfig> topicConfigTable = new
ConcurrentHashMap<>();
- topicConfigTable.put(defaultTopic, new TopicConfig());
-
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
+ scheduleMessageService = new ScheduleMessageService(brokerController);
+ scheduleMessageService.parseDelayLevel();
+ Channel mockChannel = mock(Channel.class);
+
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
+ clientChannelInfo = new ClientChannelInfo(mockChannel);
+ ConsumerData consumerData = createConsumerData(group, topic);
+ brokerController.getConsumerManager().registerConsumer(
+ consumerData.getGroupName(),
+ clientChannelInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
}
- @Test(timeout = 15_000)
+ @Test(timeout = 10_000)
public void testBasic() throws Exception {
// This test case fails on Windows in CI pipeline
// Disable it for later fix
Assume.assumeFalse(MixAll.isWindows());
+ PopBufferMergeService popBufferMergeService = new
PopBufferMergeService(brokerController, popMessageProcessor);
+ popBufferMergeService.start();
PopCheckPoint ck = new PopCheckPoint();
ck.setBitMap(0);
int msgCnt = 1;
@@ -124,8 +97,8 @@ public class PopBufferMergeServiceTest {
ck.setInvisibleTime(invisibleTime);
int offset = 100;
ck.setStartOffset(offset);
- ck.setCId(defaultGroup);
- ck.setTopic(defaultTopic);
+ ck.setCId(group);
+ ck.setTopic(topic);
int queueId = 0;
ck.setQueueId(queueId);
@@ -135,93 +108,18 @@ public class PopBufferMergeServiceTest {
AckMsg ackMsg = new AckMsg();
ackMsg.setAckOffset(ackOffset);
ackMsg.setStartOffset(offset);
- ackMsg.setConsumerGroup(defaultGroup);
- ackMsg.setTopic(defaultTopic);
+ ackMsg.setConsumerGroup(group);
+ ackMsg.setTopic(topic);
ackMsg.setQueueId(queueId);
ackMsg.setPopTime(popTime);
try {
assertThat(popBufferMergeService.addCk(ck, reviveQid, ackOffset,
nextBeginOffset)).isTrue();
- assertThat(popBufferMergeService.getLatestOffset(defaultTopic,
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
+ assertThat(popBufferMergeService.getLatestOffset(topic, group,
queueId)).isEqualTo(nextBeginOffset);
Thread.sleep(1000); // wait background threads of
PopBufferMergeService run for some time
assertThat(popBufferMergeService.addAk(reviveQid,
ackMsg)).isTrue();
- assertThat(popBufferMergeService.getLatestOffset(defaultTopic,
defaultGroup, queueId)).isEqualTo(nextBeginOffset);
+ assertThat(popBufferMergeService.getLatestOffset(topic, group,
queueId)).isEqualTo(nextBeginOffset);
} finally {
popBufferMergeService.shutdown(true);
}
}
-
- @Test
- public void testAddCkJustOffset_MergeKeyConflict() {
- PopCheckPoint point = mock(PopCheckPoint.class);
- String mergeKey = "testMergeKey";
- when(point.getTopic()).thenReturn(mergeKey);
- when(point.getCId()).thenReturn("");
- when(point.getQueueId()).thenReturn(0);
- when(point.getStartOffset()).thenReturn(0L);
- when(point.getPopTime()).thenReturn(0L);
- when(point.getBrokerName()).thenReturn("");
- popBufferMergeService.buffer.put(mergeKey + "000",
mock(PopBufferMergeService.PopCheckPointWrapper.class));
-
- assertFalse(popBufferMergeService.addCkJustOffset(point, 0, 0, 0));
- }
-
- @Test
- public void testAddCkMock() {
- int queueId = 0;
- long startOffset = 100L;
- long invisibleTime = 30_000L;
- long popTime = System.currentTimeMillis();
- int reviveQueueId = 0;
- long nextBeginOffset = 101L;
- String brokerName = "brokerName";
- popBufferMergeService.addCkMock(defaultGroup, defaultTopic, queueId,
startOffset, invisibleTime, popTime, reviveQueueId, nextBeginOffset,
brokerName);
- verify(brokerConfig, times(1)).isEnablePopLog();
- }
-
- @Test
- public void testPutAckToStore() throws Exception {
- PopCheckPoint point = new PopCheckPoint();
- point.setStartOffset(100L);
- point.setCId("testGroup");
- point.setTopic("testTopic");
- point.setQueueId(1);
- point.setPopTime(System.currentTimeMillis());
- point.setBrokerName("testBroker");
-
- PopBufferMergeService.PopCheckPointWrapper pointWrapper =
mock(PopBufferMergeService.PopCheckPointWrapper.class);
- when(pointWrapper.getCk()).thenReturn(point);
- when(pointWrapper.getReviveQueueId()).thenReturn(0);
-
- AtomicInteger toStoreBits = new AtomicInteger(0);
- when(pointWrapper.getToStoreBits()).thenReturn(toStoreBits);
-
- byte msgIndex = 0;
- AtomicInteger count = new AtomicInteger(0);
-
- EscapeBridge escapeBridge = mock(EscapeBridge.class);
- when(brokerController.getEscapeBridge()).thenReturn(escapeBridge);
-
when(brokerController.getBrokerConfig().isAppendAckAsync()).thenReturn(false);
-
-
when(escapeBridge.putMessageToSpecificQueue(any())).thenAnswer(invocation -> {
- MessageExtBrokerInner capturedMessage = invocation.getArgument(0);
- AckMsg ackMsg = JSON.parseObject(capturedMessage.getBody(),
AckMsg.class);
-
- assertEquals(point.ackOffsetByIndex(msgIndex),
ackMsg.getAckOffset());
- assertEquals(point.getStartOffset(), ackMsg.getStartOffset());
- assertEquals(point.getCId(), ackMsg.getConsumerGroup());
- assertEquals(point.getTopic(), ackMsg.getTopic());
- assertEquals(point.getQueueId(), ackMsg.getQueueId());
- assertEquals(point.getPopTime(), ackMsg.getPopTime());
- assertEquals(point.getBrokerName(), ackMsg.getBrokerName());
-
- PutMessageResult result = mock(PutMessageResult.class);
-
when(result.getPutMessageStatus()).thenReturn(PutMessageStatus.PUT_OK);
- return result;
- });
-
- Method method =
PopBufferMergeService.class.getDeclaredMethod("putAckToStore",
PopBufferMergeService.PopCheckPointWrapper.class, byte.class,
AtomicInteger.class);
- method.setAccessible(true);
- method.invoke(popBufferMergeService, pointWrapper, msgIndex, count);
- verify(escapeBridge,
times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class));
- }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
index 28476149ab..fdb0690e5d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
@@ -16,9 +16,10 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.common.BrokerConfig;
@@ -26,7 +27,6 @@ import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.ConsumeInitMode;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -42,7 +42,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
-import org.apache.rocketmq.store.pop.PopCheckPoint;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -50,13 +50,8 @@ import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -173,17 +168,17 @@ public class PopMessageProcessorTest {
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- assertEquals(-1, offset);
+ Assert.assertEquals(-1, offset);
RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- assertEquals(minOffset, offset);
+ Assert.assertEquals(minOffset, offset);
when(messageStore.getMinOffsetInQueue(retryTopic,
0)).thenReturn(minOffset * 2);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, retryTopic,
0);
- assertEquals(minOffset, offset); // will not entry getInitOffset()
again
+ Assert.assertEquals(minOffset, offset); // will not entry
getInitOffset() again
messageStore.getMinOffsetInQueue(retryTopic, 0); // prevent
UnnecessaryStubbingException
}
@@ -198,17 +193,17 @@ public class PopMessageProcessorTest {
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
long offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- assertEquals(-1, offset);
+ Assert.assertEquals(-1, offset);
RemotingCommand request = createPopMsgCommand(newGroup, topic, 0,
ConsumeInitMode.MAX);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- assertEquals(maxOffset - 1, offset); // checkInMem return false
+ Assert.assertEquals(maxOffset - 1, offset); // checkInMem return false
when(messageStore.getMaxOffsetInQueue(topic, 0)).thenReturn(maxOffset
* 2);
popMessageProcessor.processRequest(handlerContext, request);
offset =
brokerController.getConsumerOffsetManager().queryOffset(newGroup, topic, 0);
- assertEquals(maxOffset - 1, offset); // will not entry getInitOffset()
again
+ Assert.assertEquals(maxOffset - 1, offset); // will not entry
getInitOffset() again
messageStore.getMaxOffsetInQueue(topic, 0); // prevent
UnnecessaryStubbingException
}
@@ -245,31 +240,4 @@ public class PopMessageProcessorTest {
}
return getMessageResult;
}
-
- @Test
- public void testBuildCkMsgJsonParsing() {
- PopCheckPoint ck = new PopCheckPoint();
- ck.setTopic("TestTopic");
- ck.setQueueId(1);
- ck.setStartOffset(100L);
- ck.setCId("TestConsumer");
- ck.setPopTime(System.currentTimeMillis());
- ck.setBrokerName("TestBroker");
-
- int reviveQid = 0;
- PopMessageProcessor processor = new
PopMessageProcessor(brokerController);
-
- MessageExtBrokerInner result = processor.buildCkMsg(ck, reviveQid);
-
- String jsonBody = new String(result.getBody(), StandardCharsets.UTF_8);
- PopCheckPoint actual = JSON.parseObject(jsonBody, PopCheckPoint.class);
-
- assertEquals(ck.getTopic(), actual.getTopic());
- assertEquals(ck.getQueueId(), actual.getQueueId());
- assertEquals(ck.getStartOffset(), actual.getStartOffset());
- assertEquals(ck.getCId(), actual.getCId());
- assertEquals(ck.getPopTime(), actual.getPopTime());
- assertEquals(ck.getBrokerName(), actual.getBrokerName());
- assertEquals(ck.getReviveTime(), actual.getReviveTime());
- }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index e6a2cdb6cd..3010e83610 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -16,7 +16,13 @@
*/
package org.apache.rocketmq.broker.processor;
-import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson.JSON;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.failover.EscapeBridge;
@@ -34,13 +40,12 @@ import
org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.DataConverter;
import org.apache.rocketmq.common.utils.NetworkUtil;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.store.AppendMessageResult;
-import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
-import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.junit.Assert;
@@ -51,27 +56,18 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
@RunWith(MockitoJUnitRunner.Silent.class)
public class PopReviveServiceTest {
@@ -409,59 +405,6 @@ public class PopReviveServiceTest {
verify(messageStore,
times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK
}
- @Test
- public void testReviveMsgFromBatchAck() throws Throwable {
- brokerConfig.setEnableSkipLongAwaitingAck(true);
- when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP,
REVIVE_TOPIC, REVIVE_QUEUE_ID)).thenReturn(0L);
- List<MessageExt> reviveMessageExtList = new ArrayList<>();
- long basePopTime = System.currentTimeMillis();
-
reviveMessageExtList.add(buildBatchAckMsg(buildBatchAckMsg(Arrays.asList(1L,
2L, 3L), basePopTime), 1, 1, basePopTime));
- doReturn(reviveMessageExtList, new
ArrayList<>()).when(popReviveService).getReviveMessage(anyLong(), anyInt());
-
- PopReviveService.ConsumeReviveObj consumeReviveObj = new
PopReviveService.ConsumeReviveObj();
- popReviveService.consumeReviveMessage(consumeReviveObj);
- assertEquals(1, consumeReviveObj.map.size());
-
- ArgumentCaptor<Long> commitOffsetCaptor =
ArgumentCaptor.forClass(Long.class);
- doNothing().when(consumerOffsetManager).commitOffset(anyString(),
anyString(), anyString(), anyInt(), commitOffsetCaptor.capture());
- popReviveService.mergeAndRevive(consumeReviveObj);
- assertEquals(1, commitOffsetCaptor.getValue().longValue());
- }
-
- public static MessageExtBrokerInner buildBatchAckMsg(BatchAckMsg
batchAckMsg, long deliverMs, long reviveOffset, long deliverTime) {
- MessageExtBrokerInner result = buildBatchAckInnerMessage(REVIVE_TOPIC,
batchAckMsg, REVIVE_QUEUE_ID, STORE_HOST, deliverMs,
PopMessageProcessor.genAckUniqueId(batchAckMsg));
- result.setQueueOffset(reviveOffset);
- result.setDeliverTimeMs(deliverMs);
- result.setStoreTimestamp(deliverTime);
- return result;
- }
-
- public static BatchAckMsg buildBatchAckMsg(Collection<Long> offsets, long
popTime) {
- BatchAckMsg result = new BatchAckMsg();
- result.setConsumerGroup(GROUP);
- result.setTopic(TOPIC);
- result.setQueueId(0);
- result.setPopTime(popTime);
- result.setBrokerName("broker-a");
- result.getAckOffsetList().addAll(offsets);
- return result;
- }
-
- public static MessageExtBrokerInner buildBatchAckInnerMessage(String
reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs,
String ackUniqueId) {
- MessageExtBrokerInner result = new MessageExtBrokerInner();
- result.setTopic(reviveTopic);
-
result.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.CHARSET_UTF8));
- result.setQueueId(reviveQid);
- result.setTags(PopAckConstants.BATCH_ACK_TAG);
- result.setBornTimestamp(System.currentTimeMillis());
- result.setBornHost(host);
- result.setStoreHost(host);
- result.setDeliverTimeMs(deliverMs);
-
result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
ackUniqueId);
-
result.setPropertiesString(MessageDecoder.messageProperties2String(result.getProperties()));
- return result;
- }
-
public static PopCheckPoint buildPopCheckPoint(long startOffset, long
popTime, long reviveOffset) {
PopCheckPoint ck = new PopCheckPoint();
ck.setStartOffset(startOffset);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
index 9b25e0134c..b74e57ab93 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicQueueMappingManagerTest.java
@@ -17,11 +17,15 @@
package org.apache.rocketmq.broker.topic;
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
-import
org.apache.rocketmq.remoting.protocol.body.TopicQueueMappingSerializeWrapper;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingUtils;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicRemappingDetailWrapper;
@@ -33,16 +37,6 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -85,9 +79,9 @@ public class TopicQueueMappingManagerTest {
String topic = UUID.randomUUID().toString();
int queueNum = 10;
TopicRemappingDetailWrapper topicRemappingDetailWrapper =
TopicQueueMappingUtils.createTopicConfigMapping(topic, queueNum, brokers, new
HashMap<>());
- assertEquals(1,
topicRemappingDetailWrapper.getBrokerConfigMap().size());
+ Assert.assertEquals(1,
topicRemappingDetailWrapper.getBrokerConfigMap().size());
TopicQueueMappingDetail topicQueueMappingDetail =
topicRemappingDetailWrapper.getBrokerConfigMap().values().iterator().next().getMappingDetail();
- assertEquals(queueNum,
topicQueueMappingDetail.getHostedQueues().size());
+ Assert.assertEquals(queueNum,
topicQueueMappingDetail.getHostedQueues().size());
mappingDetailMap.put(topic, topicQueueMappingDetail);
}
}
@@ -95,7 +89,7 @@ public class TopicQueueMappingManagerTest {
{
topicQueueMappingManager = new
TopicQueueMappingManager(brokerController);
Assert.assertTrue(topicQueueMappingManager.load());
- assertEquals(0,
topicQueueMappingManager.getTopicQueueMappingTable().size());
+ Assert.assertEquals(0,
topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail mappingDetail :
mappingDetailMap.values()) {
for (int i = 0; i < 10; i++) {
topicQueueMappingManager.updateTopicQueueMapping(mappingDetail, false, false,
true);
@@ -107,49 +101,11 @@ public class TopicQueueMappingManagerTest {
{
topicQueueMappingManager = new
TopicQueueMappingManager(brokerController);
Assert.assertTrue(topicQueueMappingManager.load());
- assertEquals(mappingDetailMap.size(),
topicQueueMappingManager.getTopicQueueMappingTable().size());
+ Assert.assertEquals(mappingDetailMap.size(),
topicQueueMappingManager.getTopicQueueMappingTable().size());
for (TopicQueueMappingDetail topicQueueMappingDetail:
topicQueueMappingManager.getTopicQueueMappingTable().values()) {
- assertEquals(topicQueueMappingDetail,
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
+ Assert.assertEquals(topicQueueMappingDetail,
mappingDetailMap.get(topicQueueMappingDetail.getTopic()));
}
}
delete(topicQueueMappingManager);
}
-
- @Test
- public void testEncodePretty() {
- TopicQueueMappingManager topicQueueMappingManager = new
TopicQueueMappingManager(null);
- TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
- detail.setTopic("testTopic");
- detail.setBname("testBroker");
-
- topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic",
detail);
- topicQueueMappingManager.getDataVersion().nextVersion();
-
- String actual = topicQueueMappingManager.encode(true);
- TopicQueueMappingSerializeWrapper expectedWrapper = new
TopicQueueMappingSerializeWrapper();
- expectedWrapper.setTopicQueueMappingInfoMap(new
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
-
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
- String expected = JSON.toJSONString(expectedWrapper,
JSONWriter.Feature.PrettyFormat);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testEncodeNonPretty() {
- TopicQueueMappingManager topicQueueMappingManager = new
TopicQueueMappingManager(null);
- TopicQueueMappingDetail detail = new TopicQueueMappingDetail();
- detail.setTopic("testTopic");
- detail.setBname("testBroker");
-
- topicQueueMappingManager.getTopicQueueMappingTable().put("testTopic",
detail);
- topicQueueMappingManager.getDataVersion().nextVersion();
-
- String actual = topicQueueMappingManager.encode(false);
- TopicQueueMappingSerializeWrapper expectedWrapper = new
TopicQueueMappingSerializeWrapper();
- expectedWrapper.setTopicQueueMappingInfoMap(new
ConcurrentHashMap<>(topicQueueMappingManager.getTopicQueueMappingTable()));
-
expectedWrapper.setDataVersion(topicQueueMappingManager.getDataVersion());
- String expected = JSON.toJSONString(expectedWrapper);
-
- assertEquals(expected, actual);
- }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
index 62a6ad8b5b..690b4eabb5 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionMetricsTest.java
@@ -19,40 +19,23 @@ package org.apache.rocketmq.broker.transaction.queue;
import org.apache.rocketmq.broker.transaction.TransactionMetrics;
import org.apache.rocketmq.broker.transaction.TransactionMetrics.Metric;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.Collections;
-import java.util.UUID;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public class TransactionMetricsTest {
private TransactionMetrics transactionMetrics;
private String configPath;
- private Path path;
@Before
- public void before() throws Exception {
- configPath = createBaseDir();
- path = Paths.get(configPath);
- transactionMetrics = spy(new TransactionMetrics(configPath));
- }
-
- @After
- public void after() throws Exception {
- deleteFile(configPath);
- assertFalse(path.toFile().exists());
+ public void setUp() throws Exception {
+ configPath = "configPath";
+ transactionMetrics = new TransactionMetrics(configPath);
}
/**
@@ -97,40 +80,4 @@ public class TransactionMetricsTest {
transactionMetrics.cleanMetrics(Collections.singleton(topic));
assert transactionMetrics.getTransactionCount(topic) == 0;
}
-
- @Test
- public void testPersist() {
- assertFalse(path.toFile().exists());
- transactionMetrics.persist();
- assertTrue(path.toFile().exists());
- verify(transactionMetrics).persist();
- }
-
- private String createBaseDir() {
- String baseDir = System.getProperty("java.io.tmpdir") + File.separator
+ "unitteststore-" + UUID.randomUUID();
- final File file = new File(baseDir);
- if (file.exists()) {
- System.exit(1);
- }
- return baseDir;
- }
-
- private void deleteFile(String fileName) {
- deleteFile(new File(fileName));
- }
-
- private void deleteFile(File file) {
- if (!file.exists()) {
- return;
- }
- if (file.isFile()) {
- file.delete();
- } else if (file.isDirectory()) {
- File[] files = file.listFiles();
- for (File file1 : files) {
- deleteFile(file1);
- }
- file.delete();
- }
- }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
index 67cf045cfb..38e0a20752 100644
--- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java
@@ -16,7 +16,7 @@
*/
package org.apache.rocketmq.store.pop;
-import com.alibaba.fastjson2.annotation.JSONField;
+import com.alibaba.fastjson.annotation.JSONField;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +35,7 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
private int queueId;
@JSONField(name = "t")
private String topic;
+ @JSONField(name = "c")
private String cid;
@JSONField(name = "ro")
private long reviveOffset;
@@ -113,12 +114,10 @@ public class PopCheckPoint implements
Comparable<PopCheckPoint> {
this.topic = topic;
}
- @JSONField(name = "c")
public String getCId() {
return cid;
}
- @JSONField(name = "c")
public void setCId(String cid) {
this.cid = cid;
}