This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f4bad3f5c3 [improve][broker] Migrate remaining broker proto files
from protobuf to LightProto (#25337)
9f4bad3f5c3 is described below
commit 9f4bad3f5c3fbf2d0742fde41709c878cd33a6fa
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:53:12 2026 -0700
[improve][broker] Migrate remaining broker proto files from protobuf to
LightProto (#25337)
---
pulsar-broker/pom.xml | 9 +-
.../pulsar/PulsarClusterMetadataTeardown.java | 15 +-
.../bucket/BookkeeperBucketSnapshotStorage.java | 7 +-
.../broker/delayed/bucket/ImmutableBucket.java | 33 +--
.../broker/delayed/bucket/MutableBucket.java | 20 +-
.../service/schema/BookkeeperSchemaStorage.java | 227 ++++++++++++---------
.../service/schema/SchemaRegistryServiceImpl.java | 75 +++----
.../BookkeeperBucketSnapshotStorageTest.java | 90 ++++----
.../broker/delayed/MockBucketSnapshotStorage.java | 10 +-
.../java/org/apache/pulsar/schema/SchemaTest.java | 15 +-
10 files changed, 249 insertions(+), 252 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index f2d618a7fee..ec4e10e53fc 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -642,17 +642,11 @@
<!--suppress UnresolvedMavenProperty -->
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
<checkStaleness>true</checkStaleness>
- <excludes>
- <exclude>**/ResourceUsage.proto</exclude>
- <exclude>**/TransactionPendingAck.proto</exclude>
- <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
- </excludes>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
- <goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
@@ -667,6 +661,9 @@
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
+
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketMetadata.proto</source>
+
<source>${project.basedir}/src/main/proto/SchemaStorageFormat.proto</source>
+
<source>${project.basedir}/src/main/proto/SchemaRegistryFormat.proto</source>
</sources>
<extraProtoPaths>
<extraProtoPath>${pulsar.basedir}</extraProtoPath>
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index bd1c66610e6..0fb5ac8daf2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -34,7 +33,7 @@ import
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
-import
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
+import org.apache.pulsar.broker.service.schema.SchemaLocator;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -265,11 +264,13 @@ public class PulsarClusterMetadataTeardown {
metadataStore.getChildren(namespaceRoot).join().forEach(topic
-> {
final String topicRoot = namespaceRoot + "/" + topic;
try {
-
SchemaLocator.parseFrom(metadataStore.get(topicRoot).join().get().getValue())
- .getIndexList().stream()
- .map(indexEntry ->
indexEntry.getPosition().getLedgerId())
- .forEach(ledgerId -> deleteLedger(bookKeeper,
ledgerId));
- } catch (InvalidProtocolBufferException e) {
+ byte[] data =
metadataStore.get(topicRoot).join().get().getValue();
+ SchemaLocator locator = new SchemaLocator();
+ locator.parseFrom(data);
+ for (int i = 0; i < locator.getIndexsCount(); i++) {
+ deleteLedger(bookKeeper,
locator.getIndexAt(i).getPosition().getLedgerId());
+ }
+ } catch (Exception e) {
log.warn("Invalid data format from {}: {}", topicRoot,
e);
}
});
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index fa7408d7e15..b070208ea06 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.delayed.bucket;
-import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
@@ -139,9 +138,9 @@ public class BookkeeperBucketSnapshotStorage implements
BucketSnapshotStorage {
ByteBuf entryBuffer = null;
try {
entryBuffer = ledgerEntry.getEntryBuffer();
- return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
- } catch (InvalidProtocolBufferException e) {
- throw new BucketSnapshotSerializationException(e);
+ SnapshotMetadata metadata = new SnapshotMetadata();
+ metadata.parseFrom(entryBuffer, entryBuffer.readableBytes());
+ return metadata;
} finally {
if (entryBuffer != null) {
entryBuffer.release();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index a1944a21ea7..e9032572348 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -33,8 +33,8 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
@@ -84,20 +84,23 @@ class ImmutableBucket extends Bucket {
}
}), BucketSnapshotPersistenceException.class,
MaxRetryTimes)
.thenApply(snapshotMetadata -> {
- List<SnapshotSegmentMetadata> metadataList =
- snapshotMetadata.getMetadataListList();
+ int metadataListSize =
snapshotMetadata.getMetadataListCount();
// Skip all already reach schedule time snapshot
segments
int nextSnapshotEntryIndex = 0;
- while (nextSnapshotEntryIndex < metadataList.size()
- &&
metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <=
cutoffTime) {
+ while (nextSnapshotEntryIndex < metadataListSize
+ &&
snapshotMetadata.getMetadataAt(nextSnapshotEntryIndex)
+ .getMaxScheduleTimestamp() <=
cutoffTime) {
nextSnapshotEntryIndex++;
}
- this.setLastSegmentEntryId(metadataList.size());
-
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
- List<Long> firstScheduleTimestamps =
metadataList.stream().map(
-
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+ this.setLastSegmentEntryId(metadataListSize);
+
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex,
snapshotMetadata);
+ List<Long> firstScheduleTimestamps = new ArrayList<>();
+ for (int i = 0; i < metadataListSize; i++) {
+ firstScheduleTimestamps.add(
+
snapshotMetadata.getMetadataAt(i).getMinScheduleTimestamp());
+ }
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
return nextSnapshotEntryIndex + 1;
@@ -142,16 +145,14 @@ class ImmutableBucket extends Bucket {
* @throws InvalidRoaringFormat invalid bitmap serialization format
*/
private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-
List<SnapshotSegmentMetadata> segmentMetaList) {
+ SnapshotMetadata
snapshotMetadata) {
delayedIndexBitMap.clear(); // cleanup dirty bm
final var numberMessages = new MutableLong(0);
- for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) {
- for (final var entry :
segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) {
- final var ledgerId = entry.getKey();
- final var bs = entry.getValue();
+ for (int i = startSnapshotIndex; i <
snapshotMetadata.getMetadataListCount(); i++) {
+
snapshotMetadata.getMetadataAt(i).forEachDelayedIndexBitMap((ledgerId, bs) -> {
final var sbm = new RoaringBitmap();
try {
- sbm.deserialize(bs.asReadOnlyByteBuffer());
+ sbm.deserialize(java.nio.ByteBuffer.wrap(bs));
} catch (IOException e) {
throw new InvalidRoaringFormat(e.getMessage());
}
@@ -163,7 +164,7 @@ class ImmutableBucket extends Bucket {
bm.or(sbm);
return bm;
});
- }
+ });
}
// optimize bm
delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 1173a401a89..29c98b78a83 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.broker.delayed.bucket;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.UnsafeByteOperations;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -78,7 +77,7 @@ class MutableBucket extends Bucket implements AutoCloseable {
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
SnapshotSegment snapshotSegment = new SnapshotSegment();
- SnapshotSegmentMetadata.Builder segmentMetadataBuilder =
SnapshotSegmentMetadata.newBuilder();
+ SnapshotSegmentMetadata segmentMetadata = new
SnapshotSegmentMetadata();
List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
@@ -113,8 +112,8 @@ class MutableBucket extends Bucket implements AutoCloseable
{
if (delayedIndexQueue.isEmpty() ||
delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegment.getIndexesCount() >=
maxIndexesPerBucketSnapshotSegment)) {
- segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
-
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
+ segmentMetadata.setMaxScheduleTimestamp(timestamp);
+ segmentMetadata.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Iterator<Map.Entry<Long, RoaringBitmap>> iterator =
bitMap.entrySet().iterator();
@@ -126,7 +125,7 @@ class MutableBucket extends Bucket implements AutoCloseable
{
ByteBuffer byteBuffer =
ByteBuffer.allocate(bm.serializedSizeInBytes());
bm.serialize(byteBuffer);
byteBuffer.flip();
- segmentMetadataBuilder.putDelayedIndexBitMap(lId,
UnsafeByteOperations.unsafeWrap(byteBuffer));
+ segmentMetadata.putDelayedIndexBitMap(lId,
byteBuffer.array());
immutableBucketBitMap.compute(lId, (__, bm0) -> {
if (bm0 == null) {
return bm;
@@ -137,8 +136,8 @@ class MutableBucket extends Bucket implements AutoCloseable
{
iterator.remove();
}
- segmentMetadataList.add(segmentMetadataBuilder.build());
- segmentMetadataBuilder.clear();
+ segmentMetadataList.add(segmentMetadata);
+ segmentMetadata = new SnapshotSegmentMetadata();
bucketSnapshotSegments.add(snapshotSegment);
snapshotSegment = new SnapshotSegment();
@@ -149,9 +148,10 @@ class MutableBucket extends Bucket implements
AutoCloseable {
immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize);
this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
- SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
- .addAllMetadataList(segmentMetadataList)
- .build();
+ SnapshotMetadata bucketSnapshotMetadata = new SnapshotMetadata();
+ for (SnapshotSegmentMetadata sm : segmentMetadataList) {
+ bucketSnapshotMetadata.addMetadata().copyFrom(sm);
+ }
final int lastSegmentEntryId = segmentMetadataList.size();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index e38bf48f1fd..74b41c1be84 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.service.schema;
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.protobuf.ByteString.copyFrom;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
@@ -41,7 +39,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -51,8 +48,6 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry;
-import
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.common.policies.data.SchemaMetadata;
@@ -78,7 +73,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage
{
private final MetadataStoreExtended store;
private final PulsarService pulsar;
- private final MetadataCache<SchemaStorageFormat.SchemaLocator>
locatorEntryCache;
+ private final MetadataCache<SchemaLocator> locatorEntryCache;
private final ServiceConfiguration config;
private BookKeeper bookKeeper;
@@ -91,16 +86,18 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
this.pulsar = pulsar;
this.store = pulsar.getLocalMetadataStore();
this.config = pulsar.getConfiguration();
- this.locatorEntryCache = store.getMetadataCache(new
MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+ this.locatorEntryCache = store.getMetadataCache(new
MetadataSerde<SchemaLocator>() {
@Override
- public byte[] serialize(String path,
SchemaStorageFormat.SchemaLocator value) {
+ public byte[] serialize(String path, SchemaLocator value) {
return value.toByteArray();
}
@Override
- public SchemaStorageFormat.SchemaLocator deserialize(String path,
byte[] content, Stat stat)
+ public SchemaLocator deserialize(String path, byte[] content, Stat
stat)
throws IOException {
- return SchemaStorageFormat.SchemaLocator.parseFrom(content);
+ SchemaLocator loc = new SchemaLocator();
+ loc.parseFrom(content);
+ return loc;
}
});
}
@@ -185,16 +182,17 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return Pair.of(locator, Collections.emptyList());
}
- SchemaStorageFormat.SchemaLocator schemaLocator =
locator.get().locator;
+ SchemaLocator schemaLocator = locator.get().locator;
List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
- schemaLocator.getIndexList().forEach(indexEntry ->
list.add(readSchemaEntry(indexEntry.getPosition())
- .thenApply(entry -> new StoredSchema
- (
- entry.getSchemaData().toByteArray(),
- new
LongSchemaVersion(indexEntry.getVersion())
- )
- )
- ));
+ for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+ IndexEntry indexEntry = schemaLocator.getIndexAt(i);
+ list.add(readSchemaEntry(indexEntry.getPosition())
+ .thenApply(entry -> new StoredSchema(
+ entry.getSchemaData(),
+ new LongSchemaVersion(indexEntry.getVersion())
+ ))
+ );
+ }
return Pair.of(locator, list);
});
}
@@ -213,8 +211,14 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
throw new IOException("Failed to get schema ledger for" + key);
}
LocatorEntry entry = locatorEntry.orElse(null);
- return entry != null ? entry.locator.getIndexList().stream().map(i ->
i.getPosition().getLedgerId())
- .collect(Collectors.toList()) : null;
+ if (entry == null) {
+ return null;
+ }
+ List<Long> ledgerIds = new ArrayList<>(entry.locator.getIndexsCount());
+ for (int i = 0; i < entry.locator.getIndexsCount(); i++) {
+
ledgerIds.add(entry.locator.getIndexAt(i).getPosition().getLedgerId());
+ }
+ return ledgerIds;
}
@VisibleForTesting
@@ -252,10 +256,10 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return completedFuture(null);
}
- SchemaStorageFormat.SchemaLocator schemaLocator =
locator.get().locator;
+ SchemaLocator schemaLocator = locator.get().locator;
return readSchemaEntry(schemaLocator.getInfo().getPosition())
- .thenApply(entry -> new
StoredSchema(entry.getSchemaData().toByteArray(),
+ .thenApply(entry -> new
StoredSchema(entry.getSchemaData(),
new
LongSchemaVersion(schemaLocator.getInfo().getVersion())));
});
}).whenComplete((res, ex) -> {
@@ -300,15 +304,19 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return completedFuture(null);
}
- SchemaStorageFormat.SchemaLocator schemaLocator =
locator.get().locator;
+ SchemaLocator schemaLocator = locator.get().locator;
if (version > schemaLocator.getInfo().getVersion()) {
return completedFuture(null);
}
- return findSchemaEntryByVersion(schemaLocator.getIndexList(),
version)
+ List<IndexEntry> indexList = new
ArrayList<>(schemaLocator.getIndexsCount());
+ for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+ indexList.add(schemaLocator.getIndexAt(i));
+ }
+ return findSchemaEntryByVersion(indexList, version)
.thenApply(entry ->
new StoredSchema(
- entry.getSchemaData().toByteArray(),
+ entry.getSchemaData(),
new LongSchemaVersion(version)
)
);
@@ -325,16 +333,20 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
Optional<LocatorEntry>
optLocatorEntry) {
if (optLocatorEntry.isPresent()) {
- SchemaStorageFormat.SchemaLocator locator =
optLocatorEntry.get().locator;
+ SchemaLocator locator = optLocatorEntry.get().locator;
if (log.isDebugEnabled()) {
log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId,
hash);
}
//don't check the schema whether already exist
- return readSchemaEntry(locator.getIndexList().get(0).getPosition())
+ List<IndexEntry> indexList = new
ArrayList<>(locator.getIndexsCount());
+ for (int i = 0; i < locator.getIndexsCount(); i++) {
+ indexList.add(locator.getIndexAt(i));
+ }
+ return readSchemaEntry(indexList.get(0).getPosition())
.thenCompose(schemaEntry ->
addNewSchemaEntryToStore(schemaId,
- locator.getIndexList(), data).thenCompose(
+ indexList, data).thenCompose(
position -> updateSchemaLocator(schemaId,
optLocatorEntry.get(), position, hash))
);
} else {
@@ -343,27 +355,22 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
private CompletableFuture<Long> createNewSchema(String schemaId, byte[]
data, byte[] hash) {
- SchemaStorageFormat.IndexEntry emptyIndex =
SchemaStorageFormat.IndexEntry.newBuilder()
- .setVersion(0)
- .setHash(copyFrom(hash))
-
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
- .setEntryId(-1L)
- .setLedgerId(-1L)
- ).build();
+ IndexEntry emptyIndex = new IndexEntry();
+ emptyIndex.setVersion(0);
+ emptyIndex.setHash(hash);
+ emptyIndex.setPosition().setEntryId(-1L).setLedgerId(-1L);
return addNewSchemaEntryToStore(schemaId,
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
// The schema was stored in the ledger, now update the z-node with
the pointer to it
- SchemaStorageFormat.IndexEntry info =
SchemaStorageFormat.IndexEntry.newBuilder()
- .setVersion(0)
- .setPosition(position)
- .setHash(copyFrom(hash))
- .build();
-
- return createSchemaLocator(getSchemaPath(schemaId),
SchemaStorageFormat.SchemaLocator.newBuilder()
- .setInfo(info)
- .addAllIndex(
- newArrayList(info))
- .build())
+ IndexEntry info = new IndexEntry();
+ info.setVersion(0);
+ info.setPosition().copyFrom(position);
+ info.setHash(hash);
+
+ SchemaLocator locator = new SchemaLocator();
+ locator.setInfo().copyFrom(info);
+ locator.addIndex().copyFrom(info);
+ return createSchemaLocator(getSchemaPath(schemaId), locator)
.thenApply(ignore -> 0L);
});
}
@@ -386,9 +393,10 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
future.complete(null);
return;
}
- List<SchemaStorageFormat.IndexEntry> indexEntryList =
locator.get().locator.getIndexList();
- List<CompletableFuture<Void>> deleteFutures = new
ArrayList<>(indexEntryList.size());
- indexEntryList.forEach(indexEntry -> {
+ SchemaLocator schemaLocator = locator.get().locator;
+ List<CompletableFuture<Void>> deleteFutures = new
ArrayList<>(schemaLocator.getIndexsCount());
+ for (int i = 0; i < schemaLocator.getIndexsCount();
i++) {
+ IndexEntry indexEntry =
schemaLocator.getIndexAt(i);
final long ledgerId =
indexEntry.getPosition().getLedgerId();
CompletableFuture<Void> deleteFuture = new
CompletableFuture<>();
deleteFutures.add(deleteFuture);
@@ -399,7 +407,7 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
deleteFuture.complete(null);
}, null);
- });
+ }
FutureUtil.waitForAll(deleteFutures).whenComplete((v,
e) -> {
final String path = getSchemaPath(schemaId);
store.delete(path, Optional.empty())
@@ -434,12 +442,12 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NonNull
- private CompletableFuture<SchemaStorageFormat.PositionInfo>
addNewSchemaEntryToStore(
+ private CompletableFuture<PositionInfo> addNewSchemaEntryToStore(
String schemaId,
- List<SchemaStorageFormat.IndexEntry> index,
+ List<IndexEntry> index,
byte[] data
) {
- SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index,
data);
+ SchemaEntry schemaEntry = newSchemaEntry(index, data);
return createLedger(schemaId).thenCompose(ledgerHandle -> {
final long ledgerId = ledgerHandle.getId();
return addEntry(ledgerHandle, schemaEntry)
@@ -454,26 +462,24 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
private CompletableFuture<Long> updateSchemaLocator(
String schemaId,
LocatorEntry locatorEntry,
- SchemaStorageFormat.PositionInfo position,
+ PositionInfo position,
byte[] hash
) {
long nextVersion = locatorEntry.locator.getInfo().getVersion() + 1;
- SchemaStorageFormat.SchemaLocator locator = locatorEntry.locator;
- SchemaStorageFormat.IndexEntry info =
- SchemaStorageFormat.IndexEntry.newBuilder()
- .setVersion(nextVersion)
- .setPosition(position)
- .setHash(copyFrom(hash))
- .build();
-
- final ArrayList<SchemaStorageFormat.IndexEntry> indexList = new
ArrayList<>();
- indexList.addAll(locator.getIndexList());
- indexList.add(info);
+ SchemaLocator locator = locatorEntry.locator;
+ IndexEntry info = new IndexEntry();
+ info.setVersion(nextVersion);
+ info.setPosition().copyFrom(position);
+ info.setHash(hash);
+
+ SchemaLocator newLocator = new SchemaLocator();
+ newLocator.setInfo().copyFrom(info);
+ for (int i = 0; i < locator.getIndexsCount(); i++) {
+ newLocator.addIndex().copyFrom(locator.getIndexAt(i));
+ }
+ newLocator.addIndex().copyFrom(info);
return updateSchemaLocator(getSchemaPath(schemaId),
- SchemaStorageFormat.SchemaLocator.newBuilder()
- .setInfo(info)
- .addAllIndex(indexList)
- .build()
+ newLocator
, locatorEntry.version
).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
if (ex != null) {
@@ -495,8 +501,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NonNull
- private CompletableFuture<SchemaStorageFormat.SchemaEntry>
findSchemaEntryByVersion(
- List<SchemaStorageFormat.IndexEntry> index,
+ private CompletableFuture<SchemaEntry> findSchemaEntryByVersion(
+ List<IndexEntry> index,
long version
) {
@@ -504,13 +510,19 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return completedFuture(null);
}
- SchemaStorageFormat.IndexEntry lowest = index.get(0);
+ IndexEntry lowest = index.get(0);
if (version < lowest.getVersion()) {
return readSchemaEntry(lowest.getPosition())
- .thenCompose(entry ->
findSchemaEntryByVersion(entry.getIndexList(), version));
+ .thenCompose(entry -> {
+ List<IndexEntry> entryIndex = new
ArrayList<>(entry.getIndexsCount());
+ for (int i = 0; i < entry.getIndexsCount(); i++) {
+ entryIndex.add(entry.getIndexAt(i));
+ }
+ return findSchemaEntryByVersion(entryIndex, version);
+ });
}
- for (SchemaStorageFormat.IndexEntry entry : index) {
+ for (IndexEntry entry : index) {
if (entry.getVersion() == version) {
return readSchemaEntry(entry.getPosition());
} else if (entry.getVersion() > version) {
@@ -522,8 +534,8 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
}
@NonNull
- private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
- SchemaStorageFormat.PositionInfo position
+ private CompletableFuture<SchemaEntry> readSchemaEntry(
+ PositionInfo position
) {
if (log.isDebugEnabled()) {
log.debug("Reading schema entry from {}", position);
@@ -540,12 +552,12 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
@NonNull
private CompletableFuture<Void> updateSchemaLocator(String id,
-
SchemaStorageFormat.SchemaLocator schema, long version) {
+ SchemaLocator schema,
long version) {
return store.put(id, schema.toByteArray(),
Optional.of(version)).thenApply(__ -> null);
}
@NonNull
- private CompletableFuture<LocatorEntry> createSchemaLocator(String id,
SchemaStorageFormat.SchemaLocator locator) {
+ private CompletableFuture<LocatorEntry> createSchemaLocator(String id,
SchemaLocator locator) {
return store.put(id, locator.toByteArray(), Optional.of(-1L))
.thenApply(stat -> new LocatorEntry(locator,
stat.getVersion()));
}
@@ -567,15 +579,19 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
IndexEntry info = sl.getInfo();
metadata.info = new
SchemaMetadata.Entry(info.getPosition().getLedgerId(),
info.getPosition().getEntryId(),
info.getVersion());
- metadata.index = sl.getIndexList() == null ? null
- : sl.getIndexList().stream().map(i -> new
SchemaMetadata.Entry(i.getPosition().getLedgerId(),
- i.getPosition().getEntryId(),
i.getVersion())).collect(Collectors.toList());
+ List<SchemaMetadata.Entry> indexEntries = new
ArrayList<>(sl.getIndexsCount());
+ for (int i = 0; i < sl.getIndexsCount(); i++) {
+ IndexEntry idx = sl.getIndexAt(i);
+ indexEntries.add(new
SchemaMetadata.Entry(idx.getPosition().getLedgerId(),
+ idx.getPosition().getEntryId(), idx.getVersion()));
+ }
+ metadata.index = indexEntries;
return metadata;
});
}
@NonNull
- private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle,
SchemaStorageFormat.SchemaEntry entry) {
+ private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle,
SchemaEntry entry) {
final CompletableFuture<Long> future = new CompletableFuture<>();
ledgerHandle.asyncAddEntry(entry.toByteArray(),
(rc, handle, entryId, ctx) -> {
@@ -661,8 +677,10 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return;
}
Set<Long> ledgerIds = new HashSet<>();
- SchemaStorageFormat.SchemaLocator schemaLocator =
locator.get().locator;
- schemaLocator.getIndexList().forEach(indexEntry ->
ledgerIds.add(indexEntry.getPosition().getLedgerId()));
+ SchemaLocator schemaLocator = locator.get().locator;
+ for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+
ledgerIds.add(schemaLocator.getIndexAt(i).getPosition().getLedgerId());
+ }
ledgerIdsFuture.complete(new ArrayList<>(ledgerIds));
}).exceptionally(e -> {
ledgerIdsFuture.completeExceptionally(e);
@@ -688,39 +706,44 @@ public class BookkeeperSchemaStorage implements
SchemaStorage {
return future;
}
- static CompletableFuture<SchemaStorageFormat.SchemaEntry>
parseSchemaEntry(LedgerEntry ledgerEntry) {
- CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new
CompletableFuture<>();
+ static CompletableFuture<SchemaEntry> parseSchemaEntry(LedgerEntry
ledgerEntry) {
+ CompletableFuture<SchemaEntry> result = new CompletableFuture<>();
try {
-
result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
- } catch (IOException e) {
+ byte[] data = ledgerEntry.getEntry();
+ SchemaEntry entry = new SchemaEntry();
+ entry.parseFrom(data);
+ result.complete(entry);
+ } catch (Exception e) {
result.completeExceptionally(e);
}
return result;
}
- static SchemaStorageFormat.SchemaEntry newSchemaEntry(
- List<SchemaStorageFormat.IndexEntry> index,
+ static SchemaEntry newSchemaEntry(
+ List<IndexEntry> index,
byte[] data
) {
- return SchemaStorageFormat.SchemaEntry.newBuilder()
- .setSchemaData(copyFrom(data))
- .addAllIndex(index)
- .build();
+ SchemaEntry entry = new SchemaEntry();
+ entry.setSchemaData(data);
+ for (int i = 0; i < index.size(); i++) {
+ entry.addIndex().copyFrom(index.get(i));
+ }
+ return entry;
}
- static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId,
long entryId) {
- return SchemaStorageFormat.PositionInfo.newBuilder()
- .setLedgerId(ledgerId)
- .setEntryId(entryId)
- .build();
+ static PositionInfo newPositionInfo(long ledgerId, long entryId) {
+ PositionInfo pos = new PositionInfo();
+ pos.setLedgerId(ledgerId);
+ pos.setEntryId(entryId);
+ return pos;
}
}
static class LocatorEntry {
- final SchemaStorageFormat.SchemaLocator locator;
+ final SchemaLocator locator;
final long version;
- LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) {
+ LocatorEntry(SchemaLocator locator, long version) {
this.locator = locator;
this.version = version;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5c5ed992d24..b722a2df258 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -21,15 +21,12 @@ package org.apache.pulsar.broker.service.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
import static
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,7 +46,7 @@ import org.apache.pulsar.broker.PulsarService;
import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
-import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import org.apache.pulsar.broker.service.schema.proto.SchemaInfo;
import
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -218,15 +215,14 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
return checkCompatibilityFuture.thenCompose(v -> {
byte[] context =
hashFunction.hashBytes(schema.getData()).asBytes();
- SchemaRegistryFormat.SchemaInfo info =
SchemaRegistryFormat.SchemaInfo.newBuilder()
+ SchemaInfo info = new SchemaInfo()
.setType(Functions.convertFromDomainType(schema.getType()))
-
.setSchema(ByteString.copyFrom(schema.getData()))
+ .setSchema(schema.getData())
.setSchemaId(schemaId)
.setUser(schema.getUser())
.setDeleted(false)
- .setTimestamp(clock.millis())
- .addAllProps(toPairs(schema.getProps()))
- .build();
+ .setTimestamp(clock.millis());
+ Functions.addProps(info, schema.getProps());
start.setValue(this.clock.millis());
return
CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
@@ -342,15 +338,14 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
this.stats.close();
}
- private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String
user) {
- return SchemaRegistryFormat.SchemaInfo.newBuilder()
+ private SchemaInfo deleted(String schemaId, String user) {
+ return new SchemaInfo()
.setSchemaId(schemaId)
- .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
- .setSchema(ByteString.EMPTY)
+ .setType(SchemaInfo.SchemaType.NONE)
+ .setSchema(new byte[0])
.setUser(user)
.setDeleted(true)
- .setTimestamp(clock.millis())
- .build();
+ .setTimestamp(clock.millis());
}
private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData
newSchema,
@@ -602,60 +597,58 @@ public class SchemaRegistryServiceImpl implements
SchemaRegistryService {
}
interface Functions {
- static SchemaType
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
- if (type.getNumber() < 0) {
+ static SchemaType convertToDomainType(SchemaInfo.SchemaType type) {
+ if (type.getValue() < 0) {
return SchemaType.NONE;
} else {
// the value of type in `SchemaType` is always 1 less than the
value of type `SchemaInfo.SchemaType`
- return SchemaType.valueOf(type.getNumber() - 1);
+ return SchemaType.valueOf(type.getValue() - 1);
}
}
- static SchemaRegistryFormat.SchemaInfo.SchemaType
convertFromDomainType(SchemaType type) {
+ static SchemaInfo.SchemaType convertFromDomainType(SchemaType type) {
if (type.getValue() < 0) {
- return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+ return SchemaInfo.SchemaType.NONE;
} else {
- return
SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
+ return SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
}
}
- static Map<String, String>
toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
+ static Map<String, String> toMap(SchemaInfo info) {
Map<String, String> map = new HashMap<>();
- for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
+ for (int i = 0; i < info.getPropsCount(); i++) {
+ SchemaInfo.KeyValuePair pair = info.getPropAt(i);
map.put(pair.getKey(), pair.getValue());
}
return map;
}
- static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
toPairs(Map<String, String> map) {
- if (isNull(map)) {
- return Collections.emptyList();
- }
- List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new
ArrayList<>(map.size());
- for (Map.Entry<String, String> entry : map.entrySet()) {
- SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
- SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
-
pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
+ static void addProps(SchemaInfo info, Map<String, String> map) {
+ if (map != null) {
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+
info.addProp().setKey(entry.getKey()).setValue(entry.getValue());
+ }
}
- return pairs;
}
- static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo
info) {
+ static SchemaData schemaInfoToSchema(SchemaInfo info) {
return SchemaData.builder()
.user(info.getUser())
.type(convertToDomainType(info.getType()))
- .data(info.getSchema().toByteArray())
+ .data(info.getSchema())
.timestamp(info.getTimestamp())
- .isDeleted(info.getDeleted())
- .props(toMap(info.getPropsList()))
+ .isDeleted(info.isDeleted())
+ .props(toMap(info))
.build();
}
- static CompletableFuture<SchemaRegistryFormat.SchemaInfo>
bytesToSchemaInfo(byte[] bytes) {
- CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
+ static CompletableFuture<SchemaInfo> bytesToSchemaInfo(byte[] bytes) {
+ CompletableFuture<SchemaInfo> future;
try {
- future =
completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
- } catch (InvalidProtocolBufferException e) {
+ SchemaInfo info = new SchemaInfo();
+ info.parseFrom(bytes);
+ future = completedFuture(info);
+ } catch (Exception e) {
future = new CompletableFuture<>();
future.completeExceptionally(e);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index d26f38fa2bc..964b0e40f88 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -18,12 +18,9 @@
*/
package org.apache.pulsar.broker.delayed;
-import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -63,7 +60,7 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testCreateSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotMetadata snapshotMetadata =
SnapshotMetadata.newBuilder().build();
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -74,16 +71,14 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testGetSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
- .setMinScheduleTimestamp(System.currentTimeMillis())
- .setMaxScheduleTimestamp(System.currentTimeMillis())
- .putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
-
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
- .addMetadataList(segmentMetadata)
- .build();
+ SnapshotSegmentMetadata segmentMetadata = new
SnapshotSegmentMetadata();
+ segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+ snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
@@ -106,7 +101,8 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
Assert.assertEquals(2, snapshotSegments.size());
for (SnapshotSegment segment : snapshotSegments) {
- for (DelayedIndex index : segment.getIndexesList()) {
+ for (int i = 0; i < segment.getIndexesCount(); i++) {
+ DelayedIndex index = segment.getIndexeAt(i);
Assert.assertEquals(100L, index.getLedgerId());
Assert.assertEquals(10L, index.getEntryId());
Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -118,20 +114,15 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
public void testGetSnapshotMetadata() throws ExecutionException,
InterruptedException {
long timeMillis = System.currentTimeMillis();
- Map<Long, ByteString> map = new HashMap<>();
- map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
- map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
+ SnapshotSegmentMetadata segmentMetadata = new
SnapshotSegmentMetadata();
+ segmentMetadata.setMaxScheduleTimestamp(timeMillis);
+ segmentMetadata.setMinScheduleTimestamp(timeMillis);
+ segmentMetadata.putDelayedIndexBitMap(100L,
"test1".getBytes(StandardCharsets.UTF_8));
+ segmentMetadata.putDelayedIndexBitMap(200L,
"test2".getBytes(StandardCharsets.UTF_8));
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
- .setMaxScheduleTimestamp(timeMillis)
- .setMinScheduleTimestamp(timeMillis)
- .putAllDelayedIndexBitMap(map).build();
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+ snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
- .addMetadataList(segmentMetadata)
- .build();
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
@@ -144,17 +135,16 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
SnapshotSegmentMetadata metadata =
- bucketSnapshotMetadata.getMetadataList(0);
+ bucketSnapshotMetadata.getMetadataAt(0);
Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
- Assert.assertEquals("test1",
metadata.getDelayedIndexBitMapMap().get(100L).toStringUtf8());
- Assert.assertEquals("test2",
metadata.getDelayedIndexBitMapMap().get(200L).toStringUtf8());
+ Assert.assertEquals("test1", new
String(metadata.getDelayedIndexBitMap(100L), StandardCharsets.UTF_8));
+ Assert.assertEquals("test2", new
String(metadata.getDelayedIndexBitMap(200L), StandardCharsets.UTF_8));
}
@Test
public void testDeleteSnapshot() throws ExecutionException,
InterruptedException {
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder().build();
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
CompletableFuture<Long> future =
bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -174,16 +164,14 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testGetBucketSnapshotLength() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
- .setMinScheduleTimestamp(System.currentTimeMillis())
- .setMaxScheduleTimestamp(System.currentTimeMillis())
- .putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
-
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
- .addMetadataList(segmentMetadata)
- .build();
+ SnapshotSegmentMetadata segmentMetadata = new
SnapshotSegmentMetadata();
+ segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+ snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
@@ -206,16 +194,14 @@ public class BookkeeperBucketSnapshotStorageTest extends
MockedPulsarServiceBase
@Test
public void testConcurrencyGet() throws ExecutionException,
InterruptedException {
- SnapshotSegmentMetadata segmentMetadata =
- SnapshotSegmentMetadata.newBuilder()
- .setMinScheduleTimestamp(System.currentTimeMillis())
- .setMaxScheduleTimestamp(System.currentTimeMillis())
- .putDelayedIndexBitMap(100L, ByteString.copyFrom(new
byte[1])).build();
-
- SnapshotMetadata snapshotMetadata =
- SnapshotMetadata.newBuilder()
- .addMetadataList(segmentMetadata)
- .build();
+ SnapshotSegmentMetadata segmentMetadata = new
SnapshotSegmentMetadata();
+ segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+ segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+ snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
long timeMillis = System.currentTimeMillis();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index a1f5b554b15..b533c4b156d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.delayed;
-import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -117,12 +116,9 @@ public class MockBucketSnapshotStorage implements
BucketSnapshotStorage {
}
return CompletableFuture.supplyAsync(() -> {
ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0);
- SnapshotMetadata snapshotMetadata;
- try {
- snapshotMetadata =
SnapshotMetadata.parseFrom(byteBuf.nioBuffer());
- } catch (InvalidProtocolBufferException e) {
- throw new RuntimeException(e);
- }
+ SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+ ByteBuf slice = byteBuf.slice();
+ snapshotMetadata.parseFrom(slice, slice.readableBytes());
return snapshotMetadata;
}, executorService);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 95527cf173e..393ed0a84b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -58,10 +58,9 @@ import org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaLocator;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
-import org.apache.pulsar.broker.service.schema.SchemaStorageFormat;
-import
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -1418,17 +1417,19 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
consumer.close();
// (2) Delete schema ledger
- MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache =
pulsar.getLocalMetadataStore()
- .getMetadataCache(new
MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+ MetadataCache<SchemaLocator> locatorEntryCache =
pulsar.getLocalMetadataStore()
+ .getMetadataCache(new MetadataSerde<SchemaLocator>() {
@Override
- public byte[] serialize(String path,
SchemaStorageFormat.SchemaLocator value) {
+ public byte[] serialize(String path, SchemaLocator value) {
return value.toByteArray();
}
@Override
- public SchemaStorageFormat.SchemaLocator
deserialize(String path, byte[] content, Stat stat)
+ public SchemaLocator deserialize(String path, byte[]
content, Stat stat)
throws IOException {
- return
SchemaStorageFormat.SchemaLocator.parseFrom(content);
+ SchemaLocator loc = new SchemaLocator();
+ loc.parseFrom(content);
+ return loc;
}
});
String path = "/schemas/public/" + namespace +
"/test-multi-version-schema-one";