This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f4bad3f5c3 [improve][broker] Migrate remaining broker proto files 
from protobuf to LightProto (#25337)
9f4bad3f5c3 is described below

commit 9f4bad3f5c3fbf2d0742fde41709c878cd33a6fa
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 19 01:53:12 2026 -0700

    [improve][broker] Migrate remaining broker proto files from protobuf to 
LightProto (#25337)
---
 pulsar-broker/pom.xml                              |   9 +-
 .../pulsar/PulsarClusterMetadataTeardown.java      |  15 +-
 .../bucket/BookkeeperBucketSnapshotStorage.java    |   7 +-
 .../broker/delayed/bucket/ImmutableBucket.java     |  33 +--
 .../broker/delayed/bucket/MutableBucket.java       |  20 +-
 .../service/schema/BookkeeperSchemaStorage.java    | 227 ++++++++++++---------
 .../service/schema/SchemaRegistryServiceImpl.java  |  75 +++----
 .../BookkeeperBucketSnapshotStorageTest.java       |  90 ++++----
 .../broker/delayed/MockBucketSnapshotStorage.java  |  10 +-
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  15 +-
 10 files changed, 249 insertions(+), 252 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index f2d618a7fee..ec4e10e53fc 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -642,17 +642,11 @@
           <!--suppress UnresolvedMavenProperty -->
           
<protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
           <checkStaleness>true</checkStaleness>
-          <excludes>
-            <exclude>**/ResourceUsage.proto</exclude>
-            <exclude>**/TransactionPendingAck.proto</exclude>
-            <exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
-          </excludes>
         </configuration>
         <executions>
           <execution>
             <phase>generate-sources</phase>
             <goals>
-              <goal>compile</goal>
               <goal>test-compile</goal>
             </goals>
           </execution>
@@ -667,6 +661,9 @@
             
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
             
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
             
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
+            
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketMetadata.proto</source>
+            
<source>${project.basedir}/src/main/proto/SchemaStorageFormat.proto</source>
+            
<source>${project.basedir}/src/main/proto/SchemaRegistryFormat.proto</source>
           </sources>
           <extraProtoPaths>
             <extraProtoPath>${pulsar.basedir}</extraProtoPath>
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index bd1c66610e6..0fb5ac8daf2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -34,7 +33,7 @@ import 
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.TenantResources;
-import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
+import org.apache.pulsar.broker.service.schema.SchemaLocator;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -265,11 +264,13 @@ public class PulsarClusterMetadataTeardown {
                 metadataStore.getChildren(namespaceRoot).join().forEach(topic 
-> {
                     final String topicRoot = namespaceRoot + "/" + topic;
                     try {
-                        
SchemaLocator.parseFrom(metadataStore.get(topicRoot).join().get().getValue())
-                                .getIndexList().stream()
-                                .map(indexEntry -> 
indexEntry.getPosition().getLedgerId())
-                                .forEach(ledgerId -> deleteLedger(bookKeeper, 
ledgerId));
-                    } catch (InvalidProtocolBufferException e) {
+                        byte[] data = 
metadataStore.get(topicRoot).join().get().getValue();
+                        SchemaLocator locator = new SchemaLocator();
+                        locator.parseFrom(data);
+                        for (int i = 0; i < locator.getIndexsCount(); i++) {
+                            deleteLedger(bookKeeper, 
locator.getIndexAt(i).getPosition().getLedgerId());
+                        }
+                    } catch (Exception e) {
                         log.warn("Invalid data format from {}: {}", topicRoot, 
e);
                     }
                 });
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
index fa7408d7e15..b070208ea06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.delayed.bucket;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
@@ -139,9 +138,9 @@ public class BookkeeperBucketSnapshotStorage implements 
BucketSnapshotStorage {
         ByteBuf entryBuffer = null;
         try {
             entryBuffer = ledgerEntry.getEntryBuffer();
-            return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
-        } catch (InvalidProtocolBufferException e) {
-            throw new BucketSnapshotSerializationException(e);
+            SnapshotMetadata metadata = new SnapshotMetadata();
+            metadata.parseFrom(entryBuffer, entryBuffer.readableBytes());
+            return metadata;
         } finally {
             if (entryBuffer != null) {
                 entryBuffer.release();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index a1944a21ea7..e9032572348 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -33,8 +33,8 @@ import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
+import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
 import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
-import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
@@ -84,20 +84,23 @@ class ImmutableBucket extends Bucket {
                         }
                     }), BucketSnapshotPersistenceException.class, 
MaxRetryTimes)
                     .thenApply(snapshotMetadata -> {
-                        List<SnapshotSegmentMetadata> metadataList =
-                                snapshotMetadata.getMetadataListList();
+                        int metadataListSize = 
snapshotMetadata.getMetadataListCount();
 
                         // Skip all already reach schedule time snapshot 
segments
                         int nextSnapshotEntryIndex = 0;
-                        while (nextSnapshotEntryIndex < metadataList.size()
-                                && 
metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= 
cutoffTime) {
+                        while (nextSnapshotEntryIndex < metadataListSize
+                                && 
snapshotMetadata.getMetadataAt(nextSnapshotEntryIndex)
+                                        .getMaxScheduleTimestamp() <= 
cutoffTime) {
                             nextSnapshotEntryIndex++;
                         }
 
-                        this.setLastSegmentEntryId(metadataList.size());
-                        
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
-                        List<Long> firstScheduleTimestamps = 
metadataList.stream().map(
-                                
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+                        this.setLastSegmentEntryId(metadataListSize);
+                        
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, 
snapshotMetadata);
+                        List<Long> firstScheduleTimestamps = new ArrayList<>();
+                        for (int i = 0; i < metadataListSize; i++) {
+                            firstScheduleTimestamps.add(
+                                    
snapshotMetadata.getMetadataAt(i).getMinScheduleTimestamp());
+                        }
                         
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
                         return nextSnapshotEntryIndex + 1;
@@ -142,16 +145,14 @@ class ImmutableBucket extends Bucket {
      * @throws InvalidRoaringFormat invalid bitmap serialization format
      */
     private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-                                                    
List<SnapshotSegmentMetadata> segmentMetaList) {
+                                                    SnapshotMetadata 
snapshotMetadata) {
         delayedIndexBitMap.clear(); // cleanup dirty bm
         final var numberMessages = new MutableLong(0);
-        for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) {
-            for (final var entry : 
segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) {
-                final var ledgerId = entry.getKey();
-                final var bs = entry.getValue();
+        for (int i = startSnapshotIndex; i < 
snapshotMetadata.getMetadataListCount(); i++) {
+            
snapshotMetadata.getMetadataAt(i).forEachDelayedIndexBitMap((ledgerId, bs) -> {
                 final var sbm = new RoaringBitmap();
                 try {
-                    sbm.deserialize(bs.asReadOnlyByteBuffer());
+                    sbm.deserialize(java.nio.ByteBuffer.wrap(bs));
                 } catch (IOException e) {
                     throw new InvalidRoaringFormat(e.getMessage());
                 }
@@ -163,7 +164,7 @@ class ImmutableBucket extends Bucket {
                     bm.or(sbm);
                     return bm;
                 });
-            }
+            });
         }
         // optimize bm
         delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index 1173a401a89..29c98b78a83 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.broker.delayed.bucket;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import com.google.protobuf.UnsafeByteOperations;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -78,7 +77,7 @@ class MutableBucket extends Bucket implements AutoCloseable {
 
         Map<Long, RoaringBitmap> bitMap = new HashMap<>();
         SnapshotSegment snapshotSegment = new SnapshotSegment();
-        SnapshotSegmentMetadata.Builder segmentMetadataBuilder = 
SnapshotSegmentMetadata.newBuilder();
+        SnapshotSegmentMetadata segmentMetadata = new 
SnapshotSegmentMetadata();
 
         List<Long> firstScheduleTimestamps = new ArrayList<>();
         long currentTimestampUpperLimit = 0;
@@ -113,8 +112,8 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
             if (delayedIndexQueue.isEmpty() || 
delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
                     || (maxIndexesPerBucketSnapshotSegment != -1
                     && snapshotSegment.getIndexesCount() >= 
maxIndexesPerBucketSnapshotSegment)) {
-                segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
-                
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
+                segmentMetadata.setMaxScheduleTimestamp(timestamp);
+                segmentMetadata.setMinScheduleTimestamp(currentFirstTimestamp);
                 currentTimestampUpperLimit = 0;
 
                 Iterator<Map.Entry<Long, RoaringBitmap>> iterator = 
bitMap.entrySet().iterator();
@@ -126,7 +125,7 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                     ByteBuffer byteBuffer = 
ByteBuffer.allocate(bm.serializedSizeInBytes());
                     bm.serialize(byteBuffer);
                     byteBuffer.flip();
-                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, 
UnsafeByteOperations.unsafeWrap(byteBuffer));
+                    segmentMetadata.putDelayedIndexBitMap(lId, 
byteBuffer.array());
                     immutableBucketBitMap.compute(lId, (__, bm0) -> {
                         if (bm0 == null) {
                             return bm;
@@ -137,8 +136,8 @@ class MutableBucket extends Bucket implements AutoCloseable 
{
                     iterator.remove();
                 }
 
-                segmentMetadataList.add(segmentMetadataBuilder.build());
-                segmentMetadataBuilder.clear();
+                segmentMetadataList.add(segmentMetadata);
+                segmentMetadata = new SnapshotSegmentMetadata();
 
                 bucketSnapshotSegments.add(snapshotSegment);
                 snapshotSegment = new SnapshotSegment();
@@ -149,9 +148,10 @@ class MutableBucket extends Bucket implements 
AutoCloseable {
         immutableBucketBitMap.values().forEach(RoaringBitmap::runOptimize);
         this.delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
 
-        SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder()
-                .addAllMetadataList(segmentMetadataList)
-                .build();
+        SnapshotMetadata bucketSnapshotMetadata = new SnapshotMetadata();
+        for (SnapshotSegmentMetadata sm : segmentMetadataList) {
+            bucketSnapshotMetadata.addMetadata().copyFrom(sm);
+        }
 
         final int lastSegmentEntryId = segmentMetadataList.size();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index e38bf48f1fd..74b41c1be84 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.service.schema;
 
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.protobuf.ByteString.copyFrom;
 import static java.util.Objects.isNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
@@ -41,7 +39,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -51,8 +48,6 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.IndexEntry;
-import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.common.policies.data.SchemaMetadata;
@@ -78,7 +73,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage 
{
 
     private final MetadataStoreExtended store;
     private final PulsarService pulsar;
-    private final MetadataCache<SchemaStorageFormat.SchemaLocator> 
locatorEntryCache;
+    private final MetadataCache<SchemaLocator> locatorEntryCache;
 
     private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
@@ -91,16 +86,18 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
         this.pulsar = pulsar;
         this.store = pulsar.getLocalMetadataStore();
         this.config = pulsar.getConfiguration();
-        this.locatorEntryCache = store.getMetadataCache(new 
MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+        this.locatorEntryCache = store.getMetadataCache(new 
MetadataSerde<SchemaLocator>() {
             @Override
-            public byte[] serialize(String path, 
SchemaStorageFormat.SchemaLocator value) {
+            public byte[] serialize(String path, SchemaLocator value) {
                 return value.toByteArray();
             }
 
             @Override
-            public SchemaStorageFormat.SchemaLocator deserialize(String path, 
byte[] content, Stat stat)
+            public SchemaLocator deserialize(String path, byte[] content, Stat 
stat)
                     throws IOException {
-                return SchemaStorageFormat.SchemaLocator.parseFrom(content);
+                SchemaLocator loc = new SchemaLocator();
+                loc.parseFrom(content);
+                return loc;
             }
         });
     }
@@ -185,16 +182,17 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 return Pair.of(locator, Collections.emptyList());
             }
 
-            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            SchemaLocator schemaLocator = locator.get().locator;
             List<CompletableFuture<StoredSchema>> list = new ArrayList<>();
-            schemaLocator.getIndexList().forEach(indexEntry -> 
list.add(readSchemaEntry(indexEntry.getPosition())
-                    .thenApply(entry -> new StoredSchema
-                            (
-                                    entry.getSchemaData().toByteArray(),
-                                    new 
LongSchemaVersion(indexEntry.getVersion())
-                            )
-                    )
-            ));
+            for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+                IndexEntry indexEntry = schemaLocator.getIndexAt(i);
+                list.add(readSchemaEntry(indexEntry.getPosition())
+                        .thenApply(entry -> new StoredSchema(
+                                entry.getSchemaData(),
+                                new LongSchemaVersion(indexEntry.getVersion())
+                        ))
+                );
+            }
             return Pair.of(locator, list);
         });
     }
@@ -213,8 +211,14 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             throw new IOException("Failed to get schema ledger for" + key);
         }
         LocatorEntry entry = locatorEntry.orElse(null);
-        return entry != null ? entry.locator.getIndexList().stream().map(i -> 
i.getPosition().getLedgerId())
-                .collect(Collectors.toList()) : null;
+        if (entry == null) {
+            return null;
+        }
+        List<Long> ledgerIds = new ArrayList<>(entry.locator.getIndexsCount());
+        for (int i = 0; i < entry.locator.getIndexsCount(); i++) {
+            
ledgerIds.add(entry.locator.getIndexAt(i).getPosition().getLedgerId());
+        }
+        return ledgerIds;
     }
 
     @VisibleForTesting
@@ -252,10 +256,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                     return completedFuture(null);
                 }
 
-                SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+                SchemaLocator schemaLocator = locator.get().locator;
 
                 return readSchemaEntry(schemaLocator.getInfo().getPosition())
-                        .thenApply(entry -> new 
StoredSchema(entry.getSchemaData().toByteArray(),
+                        .thenApply(entry -> new 
StoredSchema(entry.getSchemaData(),
                                 new 
LongSchemaVersion(schemaLocator.getInfo().getVersion())));
             });
         }).whenComplete((res, ex) -> {
@@ -300,15 +304,19 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 return completedFuture(null);
             }
 
-            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
+            SchemaLocator schemaLocator = locator.get().locator;
             if (version > schemaLocator.getInfo().getVersion()) {
                 return completedFuture(null);
             }
 
-            return findSchemaEntryByVersion(schemaLocator.getIndexList(), 
version)
+            List<IndexEntry> indexList = new 
ArrayList<>(schemaLocator.getIndexsCount());
+            for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+                indexList.add(schemaLocator.getIndexAt(i));
+            }
+            return findSchemaEntryByVersion(indexList, version)
                 .thenApply(entry ->
                         new StoredSchema(
-                            entry.getSchemaData().toByteArray(),
+                            entry.getSchemaData(),
                             new LongSchemaVersion(version)
                         )
                 );
@@ -325,16 +333,20 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                                               Optional<LocatorEntry> 
optLocatorEntry) {
         if (optLocatorEntry.isPresent()) {
 
-            SchemaStorageFormat.SchemaLocator locator = 
optLocatorEntry.get().locator;
+            SchemaLocator locator = optLocatorEntry.get().locator;
 
             if (log.isDebugEnabled()) {
                 log.debug("[{}] findSchemaEntryByHash - hash={}", schemaId, 
hash);
             }
 
             //don't check the schema whether already exist
-            return readSchemaEntry(locator.getIndexList().get(0).getPosition())
+            List<IndexEntry> indexList = new 
ArrayList<>(locator.getIndexsCount());
+            for (int i = 0; i < locator.getIndexsCount(); i++) {
+                indexList.add(locator.getIndexAt(i));
+            }
+            return readSchemaEntry(indexList.get(0).getPosition())
                     .thenCompose(schemaEntry -> 
addNewSchemaEntryToStore(schemaId,
-                            locator.getIndexList(), data).thenCompose(
+                            indexList, data).thenCompose(
                             position -> updateSchemaLocator(schemaId, 
optLocatorEntry.get(), position, hash))
                     );
         } else {
@@ -343,27 +355,22 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     private CompletableFuture<Long> createNewSchema(String schemaId, byte[] 
data, byte[] hash) {
-        SchemaStorageFormat.IndexEntry emptyIndex = 
SchemaStorageFormat.IndexEntry.newBuilder()
-                        .setVersion(0)
-                        .setHash(copyFrom(hash))
-                        
.setPosition(SchemaStorageFormat.PositionInfo.newBuilder()
-                                .setEntryId(-1L)
-                                .setLedgerId(-1L)
-                        ).build();
+        IndexEntry emptyIndex = new IndexEntry();
+        emptyIndex.setVersion(0);
+        emptyIndex.setHash(hash);
+        emptyIndex.setPosition().setEntryId(-1L).setLedgerId(-1L);
 
         return addNewSchemaEntryToStore(schemaId, 
Collections.singletonList(emptyIndex), data).thenCompose(position -> {
             // The schema was stored in the ledger, now update the z-node with 
the pointer to it
-            SchemaStorageFormat.IndexEntry info = 
SchemaStorageFormat.IndexEntry.newBuilder()
-                    .setVersion(0)
-                    .setPosition(position)
-                    .setHash(copyFrom(hash))
-                    .build();
-
-            return createSchemaLocator(getSchemaPath(schemaId), 
SchemaStorageFormat.SchemaLocator.newBuilder()
-                    .setInfo(info)
-                    .addAllIndex(
-                            newArrayList(info))
-                    .build())
+            IndexEntry info = new IndexEntry();
+            info.setVersion(0);
+            info.setPosition().copyFrom(position);
+            info.setHash(hash);
+
+            SchemaLocator locator = new SchemaLocator();
+            locator.setInfo().copyFrom(info);
+            locator.addIndex().copyFrom(info);
+            return createSchemaLocator(getSchemaPath(schemaId), locator)
                             .thenApply(ignore -> 0L);
         });
     }
@@ -386,9 +393,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                             future.complete(null);
                             return;
                         }
-                        List<SchemaStorageFormat.IndexEntry> indexEntryList = 
locator.get().locator.getIndexList();
-                        List<CompletableFuture<Void>> deleteFutures = new 
ArrayList<>(indexEntryList.size());
-                        indexEntryList.forEach(indexEntry -> {
+                        SchemaLocator schemaLocator = locator.get().locator;
+                        List<CompletableFuture<Void>> deleteFutures = new 
ArrayList<>(schemaLocator.getIndexsCount());
+                        for (int i = 0; i < schemaLocator.getIndexsCount(); 
i++) {
+                            IndexEntry indexEntry = 
schemaLocator.getIndexAt(i);
                             final long ledgerId = 
indexEntry.getPosition().getLedgerId();
                             CompletableFuture<Void> deleteFuture = new 
CompletableFuture<>();
                             deleteFutures.add(deleteFuture);
@@ -399,7 +407,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                                 }
                                 deleteFuture.complete(null);
                             }, null);
-                        });
+                        }
                         FutureUtil.waitForAll(deleteFutures).whenComplete((v, 
e) -> {
                             final String path = getSchemaPath(schemaId);
                             store.delete(path, Optional.empty())
@@ -434,12 +442,12 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @NonNull
-    private CompletableFuture<SchemaStorageFormat.PositionInfo> 
addNewSchemaEntryToStore(
+    private CompletableFuture<PositionInfo> addNewSchemaEntryToStore(
         String schemaId,
-        List<SchemaStorageFormat.IndexEntry> index,
+        List<IndexEntry> index,
         byte[] data
     ) {
-        SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, 
data);
+        SchemaEntry schemaEntry = newSchemaEntry(index, data);
         return createLedger(schemaId).thenCompose(ledgerHandle -> {
             final long ledgerId = ledgerHandle.getId();
             return addEntry(ledgerHandle, schemaEntry)
@@ -454,26 +462,24 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     private CompletableFuture<Long> updateSchemaLocator(
         String schemaId,
         LocatorEntry locatorEntry,
-        SchemaStorageFormat.PositionInfo position,
+        PositionInfo position,
         byte[] hash
     ) {
         long nextVersion = locatorEntry.locator.getInfo().getVersion() + 1;
-        SchemaStorageFormat.SchemaLocator locator = locatorEntry.locator;
-        SchemaStorageFormat.IndexEntry info =
-            SchemaStorageFormat.IndexEntry.newBuilder()
-                .setVersion(nextVersion)
-                .setPosition(position)
-                .setHash(copyFrom(hash))
-                .build();
-
-        final ArrayList<SchemaStorageFormat.IndexEntry> indexList = new 
ArrayList<>();
-        indexList.addAll(locator.getIndexList());
-        indexList.add(info);
+        SchemaLocator locator = locatorEntry.locator;
+        IndexEntry info = new IndexEntry();
+        info.setVersion(nextVersion);
+        info.setPosition().copyFrom(position);
+        info.setHash(hash);
+
+        SchemaLocator newLocator = new SchemaLocator();
+        newLocator.setInfo().copyFrom(info);
+        for (int i = 0; i < locator.getIndexsCount(); i++) {
+            newLocator.addIndex().copyFrom(locator.getIndexAt(i));
+        }
+        newLocator.addIndex().copyFrom(info);
         return updateSchemaLocator(getSchemaPath(schemaId),
-                SchemaStorageFormat.SchemaLocator.newBuilder()
-                        .setInfo(info)
-                        .addAllIndex(indexList)
-                        .build()
+                newLocator
                 , locatorEntry.version
         ).thenApply(ignore -> nextVersion).whenComplete((__, ex) -> {
             if (ex != null) {
@@ -495,8 +501,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @NonNull
-    private CompletableFuture<SchemaStorageFormat.SchemaEntry> 
findSchemaEntryByVersion(
-        List<SchemaStorageFormat.IndexEntry> index,
+    private CompletableFuture<SchemaEntry> findSchemaEntryByVersion(
+        List<IndexEntry> index,
         long version
     ) {
 
@@ -504,13 +510,19 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             return completedFuture(null);
         }
 
-        SchemaStorageFormat.IndexEntry lowest = index.get(0);
+        IndexEntry lowest = index.get(0);
         if (version < lowest.getVersion()) {
             return readSchemaEntry(lowest.getPosition())
-                    .thenCompose(entry -> 
findSchemaEntryByVersion(entry.getIndexList(), version));
+                    .thenCompose(entry -> {
+                        List<IndexEntry> entryIndex = new 
ArrayList<>(entry.getIndexsCount());
+                        for (int i = 0; i < entry.getIndexsCount(); i++) {
+                            entryIndex.add(entry.getIndexAt(i));
+                        }
+                        return findSchemaEntryByVersion(entryIndex, version);
+                    });
         }
 
-        for (SchemaStorageFormat.IndexEntry entry : index) {
+        for (IndexEntry entry : index) {
             if (entry.getVersion() == version) {
                 return readSchemaEntry(entry.getPosition());
             } else if (entry.getVersion() > version) {
@@ -522,8 +534,8 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
     }
 
     @NonNull
-    private CompletableFuture<SchemaStorageFormat.SchemaEntry> readSchemaEntry(
-        SchemaStorageFormat.PositionInfo position
+    private CompletableFuture<SchemaEntry> readSchemaEntry(
+        PositionInfo position
     ) {
         if (log.isDebugEnabled()) {
             log.debug("Reading schema entry from {}", position);
@@ -540,12 +552,12 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 
     @NonNull
     private CompletableFuture<Void> updateSchemaLocator(String id,
-                                                        
SchemaStorageFormat.SchemaLocator schema, long version) {
+                                                        SchemaLocator schema, 
long version) {
         return store.put(id, schema.toByteArray(), 
Optional.of(version)).thenApply(__ -> null);
     }
 
     @NonNull
-    private CompletableFuture<LocatorEntry> createSchemaLocator(String id, 
SchemaStorageFormat.SchemaLocator locator) {
+    private CompletableFuture<LocatorEntry> createSchemaLocator(String id, 
SchemaLocator locator) {
         return store.put(id, locator.toByteArray(), Optional.of(-1L))
                 .thenApply(stat -> new LocatorEntry(locator, 
stat.getVersion()));
     }
@@ -567,15 +579,19 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             IndexEntry info = sl.getInfo();
             metadata.info = new 
SchemaMetadata.Entry(info.getPosition().getLedgerId(), 
info.getPosition().getEntryId(),
                     info.getVersion());
-            metadata.index = sl.getIndexList() == null ? null
-                    : sl.getIndexList().stream().map(i -> new 
SchemaMetadata.Entry(i.getPosition().getLedgerId(),
-                            i.getPosition().getEntryId(), 
i.getVersion())).collect(Collectors.toList());
+            List<SchemaMetadata.Entry> indexEntries = new 
ArrayList<>(sl.getIndexsCount());
+            for (int i = 0; i < sl.getIndexsCount(); i++) {
+                IndexEntry idx = sl.getIndexAt(i);
+                indexEntries.add(new 
SchemaMetadata.Entry(idx.getPosition().getLedgerId(),
+                        idx.getPosition().getEntryId(), idx.getVersion()));
+            }
+            metadata.index = indexEntries;
             return metadata;
         });
     }
 
     @NonNull
-    private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaStorageFormat.SchemaEntry entry) {
+    private CompletableFuture<Long> addEntry(LedgerHandle ledgerHandle, 
SchemaEntry entry) {
         final CompletableFuture<Long> future = new CompletableFuture<>();
         ledgerHandle.asyncAddEntry(entry.toByteArray(),
             (rc, handle, entryId, ctx) -> {
@@ -661,8 +677,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
                 return;
             }
             Set<Long> ledgerIds = new HashSet<>();
-            SchemaStorageFormat.SchemaLocator schemaLocator = 
locator.get().locator;
-            schemaLocator.getIndexList().forEach(indexEntry -> 
ledgerIds.add(indexEntry.getPosition().getLedgerId()));
+            SchemaLocator schemaLocator = locator.get().locator;
+            for (int i = 0; i < schemaLocator.getIndexsCount(); i++) {
+                
ledgerIds.add(schemaLocator.getIndexAt(i).getPosition().getLedgerId());
+            }
             ledgerIdsFuture.complete(new ArrayList<>(ledgerIds));
         }).exceptionally(e -> {
             ledgerIdsFuture.completeExceptionally(e);
@@ -688,39 +706,44 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
             return future;
         }
 
-        static CompletableFuture<SchemaStorageFormat.SchemaEntry> 
parseSchemaEntry(LedgerEntry ledgerEntry) {
-            CompletableFuture<SchemaStorageFormat.SchemaEntry> result = new 
CompletableFuture<>();
+        static CompletableFuture<SchemaEntry> parseSchemaEntry(LedgerEntry 
ledgerEntry) {
+            CompletableFuture<SchemaEntry> result = new CompletableFuture<>();
             try {
-                
result.complete(SchemaStorageFormat.SchemaEntry.parseFrom(ledgerEntry.getEntry()));
-            } catch (IOException e) {
+                byte[] data = ledgerEntry.getEntry();
+                SchemaEntry entry = new SchemaEntry();
+                entry.parseFrom(data);
+                result.complete(entry);
+            } catch (Exception e) {
                 result.completeExceptionally(e);
             }
             return result;
         }
 
-        static SchemaStorageFormat.SchemaEntry newSchemaEntry(
-            List<SchemaStorageFormat.IndexEntry> index,
+        static SchemaEntry newSchemaEntry(
+            List<IndexEntry> index,
             byte[] data
         ) {
-            return SchemaStorageFormat.SchemaEntry.newBuilder()
-                .setSchemaData(copyFrom(data))
-                .addAllIndex(index)
-                .build();
+            SchemaEntry entry = new SchemaEntry();
+            entry.setSchemaData(data);
+            for (int i = 0; i < index.size(); i++) {
+                entry.addIndex().copyFrom(index.get(i));
+            }
+            return entry;
         }
 
-        static SchemaStorageFormat.PositionInfo newPositionInfo(long ledgerId, 
long entryId) {
-            return SchemaStorageFormat.PositionInfo.newBuilder()
-                .setLedgerId(ledgerId)
-                .setEntryId(entryId)
-                .build();
+        static PositionInfo newPositionInfo(long ledgerId, long entryId) {
+            PositionInfo pos = new PositionInfo();
+            pos.setLedgerId(ledgerId);
+            pos.setEntryId(entryId);
+            return pos;
         }
     }
 
     static class LocatorEntry {
-        final SchemaStorageFormat.SchemaLocator locator;
+        final SchemaLocator locator;
         final long version;
 
-        LocatorEntry(SchemaStorageFormat.SchemaLocator locator, long version) {
+        LocatorEntry(SchemaLocator locator, long version) {
             this.locator = locator;
             this.version = version;
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
index 5c5ed992d24..b722a2df258 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -21,15 +21,12 @@ package org.apache.pulsar.broker.service.schema;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.isNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FORWARD_TRANSITIVE;
 import static 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy.FULL_TRANSITIVE;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import java.time.Clock;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,7 +46,7 @@ import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import 
org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
 import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
-import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import org.apache.pulsar.broker.service.schema.proto.SchemaInfo;
 import 
org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -218,15 +215,14 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
                         }
                         return checkCompatibilityFuture.thenCompose(v -> {
                             byte[] context = 
hashFunction.hashBytes(schema.getData()).asBytes();
-                            SchemaRegistryFormat.SchemaInfo info = 
SchemaRegistryFormat.SchemaInfo.newBuilder()
+                            SchemaInfo info = new SchemaInfo()
                                     
.setType(Functions.convertFromDomainType(schema.getType()))
-                                    
.setSchema(ByteString.copyFrom(schema.getData()))
+                                    .setSchema(schema.getData())
                                     .setSchemaId(schemaId)
                                     .setUser(schema.getUser())
                                     .setDeleted(false)
-                                    .setTimestamp(clock.millis())
-                                    .addAllProps(toPairs(schema.getProps()))
-                                    .build();
+                                    .setTimestamp(clock.millis());
+                            Functions.addProps(info, schema.getProps());
 
                             start.setValue(this.clock.millis());
                             return 
CompletableFuture.completedFuture(Pair.of(info.toByteArray(), context));
@@ -342,15 +338,14 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
         this.stats.close();
     }
 
-    private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String 
user) {
-        return SchemaRegistryFormat.SchemaInfo.newBuilder()
+    private SchemaInfo deleted(String schemaId, String user) {
+        return new SchemaInfo()
             .setSchemaId(schemaId)
-            .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
-            .setSchema(ByteString.EMPTY)
+            .setType(SchemaInfo.SchemaType.NONE)
+            .setSchema(new byte[0])
             .setUser(user)
             .setDeleted(true)
-            .setTimestamp(clock.millis())
-            .build();
+            .setTimestamp(clock.millis());
     }
 
     private void checkCompatible(SchemaAndMetadata existingSchema, SchemaData 
newSchema,
@@ -602,60 +597,58 @@ public class SchemaRegistryServiceImpl implements 
SchemaRegistryService {
     }
 
     interface Functions {
-        static SchemaType 
convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
-            if (type.getNumber() < 0) {
+        static SchemaType convertToDomainType(SchemaInfo.SchemaType type) {
+            if (type.getValue() < 0) {
                 return SchemaType.NONE;
             } else {
                 // the value of type in `SchemaType` is always 1 less than the 
value of type `SchemaInfo.SchemaType`
-                return SchemaType.valueOf(type.getNumber() - 1);
+                return SchemaType.valueOf(type.getValue() - 1);
             }
         }
 
-        static SchemaRegistryFormat.SchemaInfo.SchemaType 
convertFromDomainType(SchemaType type) {
+        static SchemaInfo.SchemaType convertFromDomainType(SchemaType type) {
             if (type.getValue() < 0) {
-                return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+                return SchemaInfo.SchemaType.NONE;
             } else {
-                return 
SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
+                return SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
             }
         }
 
-        static Map<String, String> 
toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
+        static Map<String, String> toMap(SchemaInfo info) {
             Map<String, String> map = new HashMap<>();
-            for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
+            for (int i = 0; i < info.getPropsCount(); i++) {
+                SchemaInfo.KeyValuePair pair = info.getPropAt(i);
                 map.put(pair.getKey(), pair.getValue());
             }
             return map;
         }
 
-        static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> 
toPairs(Map<String, String> map) {
-            if (isNull(map)) {
-                return Collections.emptyList();
-            }
-            List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new 
ArrayList<>(map.size());
-            for (Map.Entry<String, String> entry : map.entrySet()) {
-                SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
-                    SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
-                
pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
+        static void addProps(SchemaInfo info, Map<String, String> map) {
+            if (map != null) {
+                for (Map.Entry<String, String> entry : map.entrySet()) {
+                    
info.addProp().setKey(entry.getKey()).setValue(entry.getValue());
+                }
             }
-            return pairs;
         }
 
-        static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo 
info) {
+        static SchemaData schemaInfoToSchema(SchemaInfo info) {
             return SchemaData.builder()
                 .user(info.getUser())
                 .type(convertToDomainType(info.getType()))
-                .data(info.getSchema().toByteArray())
+                .data(info.getSchema())
                 .timestamp(info.getTimestamp())
-                .isDeleted(info.getDeleted())
-                .props(toMap(info.getPropsList()))
+                .isDeleted(info.isDeleted())
+                .props(toMap(info))
                 .build();
         }
 
-        static CompletableFuture<SchemaRegistryFormat.SchemaInfo> 
bytesToSchemaInfo(byte[] bytes) {
-            CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
+        static CompletableFuture<SchemaInfo> bytesToSchemaInfo(byte[] bytes) {
+            CompletableFuture<SchemaInfo> future;
             try {
-                future = 
completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
-            } catch (InvalidProtocolBufferException e) {
+                SchemaInfo info = new SchemaInfo();
+                info.parseFrom(bytes);
+                future = completedFuture(info);
+            } catch (Exception e) {
                 future = new CompletableFuture<>();
                 future.completeExceptionally(e);
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
index d26f38fa2bc..964b0e40f88 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BookkeeperBucketSnapshotStorageTest.java
@@ -18,12 +18,9 @@
  */
 package org.apache.pulsar.broker.delayed;
 
-import com.google.protobuf.ByteString;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -63,7 +60,7 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testCreateSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotMetadata snapshotMetadata = 
SnapshotMetadata.newBuilder().build();
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -74,16 +71,14 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testGetSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
-                        .setMinScheduleTimestamp(System.currentTimeMillis())
-                        .setMaxScheduleTimestamp(System.currentTimeMillis())
-                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
-
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
+        SnapshotSegmentMetadata segmentMetadata = new 
SnapshotSegmentMetadata();
+        segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+        snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
@@ -106,7 +101,8 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
         List<SnapshotSegment> snapshotSegments = bucketSnapshotSegment.get();
         Assert.assertEquals(2, snapshotSegments.size());
         for (SnapshotSegment segment : snapshotSegments) {
-            for (DelayedIndex index : segment.getIndexesList()) {
+            for (int i = 0; i < segment.getIndexesCount(); i++) {
+                DelayedIndex index = segment.getIndexeAt(i);
                 Assert.assertEquals(100L, index.getLedgerId());
                 Assert.assertEquals(10L, index.getEntryId());
                 Assert.assertEquals(timeMillis, index.getTimestamp());
@@ -118,20 +114,15 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
     public void testGetSnapshotMetadata() throws ExecutionException, 
InterruptedException {
         long timeMillis = System.currentTimeMillis();
 
-        Map<Long, ByteString> map = new HashMap<>();
-        map.put(100L, ByteString.copyFrom("test1", StandardCharsets.UTF_8));
-        map.put(200L, ByteString.copyFrom("test2", StandardCharsets.UTF_8));
+        SnapshotSegmentMetadata segmentMetadata = new 
SnapshotSegmentMetadata();
+        segmentMetadata.setMaxScheduleTimestamp(timeMillis);
+        segmentMetadata.setMinScheduleTimestamp(timeMillis);
+        segmentMetadata.putDelayedIndexBitMap(100L, 
"test1".getBytes(StandardCharsets.UTF_8));
+        segmentMetadata.putDelayedIndexBitMap(200L, 
"test2".getBytes(StandardCharsets.UTF_8));
 
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
-                        .setMaxScheduleTimestamp(timeMillis)
-                        .setMinScheduleTimestamp(timeMillis)
-                        .putAllDelayedIndexBitMap(map).build();
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+        snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
 
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         CompletableFuture<Long> future =
@@ -144,17 +135,16 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
                 
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).get();
 
         SnapshotSegmentMetadata metadata =
-                bucketSnapshotMetadata.getMetadataList(0);
+                bucketSnapshotMetadata.getMetadataAt(0);
 
         Assert.assertEquals(timeMillis, metadata.getMaxScheduleTimestamp());
-        Assert.assertEquals("test1", 
metadata.getDelayedIndexBitMapMap().get(100L).toStringUtf8());
-        Assert.assertEquals("test2", 
metadata.getDelayedIndexBitMapMap().get(200L).toStringUtf8());
+        Assert.assertEquals("test1", new 
String(metadata.getDelayedIndexBitMap(100L), StandardCharsets.UTF_8));
+        Assert.assertEquals("test2", new 
String(metadata.getDelayedIndexBitMap(200L), StandardCharsets.UTF_8));
     }
 
     @Test
     public void testDeleteSnapshot() throws ExecutionException, 
InterruptedException {
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder().build();
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
         CompletableFuture<Long> future =
                 bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata,
@@ -174,16 +164,14 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testGetBucketSnapshotLength() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
-                        .setMinScheduleTimestamp(System.currentTimeMillis())
-                        .setMaxScheduleTimestamp(System.currentTimeMillis())
-                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
-
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
+        SnapshotSegmentMetadata segmentMetadata = new 
SnapshotSegmentMetadata();
+        segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+        snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
@@ -206,16 +194,14 @@ public class BookkeeperBucketSnapshotStorageTest extends 
MockedPulsarServiceBase
 
     @Test
     public void testConcurrencyGet() throws ExecutionException, 
InterruptedException {
-        SnapshotSegmentMetadata segmentMetadata =
-                SnapshotSegmentMetadata.newBuilder()
-                        .setMinScheduleTimestamp(System.currentTimeMillis())
-                        .setMaxScheduleTimestamp(System.currentTimeMillis())
-                        .putDelayedIndexBitMap(100L, ByteString.copyFrom(new 
byte[1])).build();
-
-        SnapshotMetadata snapshotMetadata =
-                SnapshotMetadata.newBuilder()
-                        .addMetadataList(segmentMetadata)
-                        .build();
+        SnapshotSegmentMetadata segmentMetadata = new 
SnapshotSegmentMetadata();
+        segmentMetadata.setMinScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.setMaxScheduleTimestamp(System.currentTimeMillis());
+        segmentMetadata.putDelayedIndexBitMap(100L, new byte[1]);
+
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+        snapshotMetadata.addMetadata().copyFrom(segmentMetadata);
+
         List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
 
         long timeMillis = System.currentTimeMillis();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
index a1f5b554b15..b533c4b156d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.delayed;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -117,12 +116,9 @@ public class MockBucketSnapshotStorage implements 
BucketSnapshotStorage {
         }
         return CompletableFuture.supplyAsync(() -> {
             ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0);
-            SnapshotMetadata snapshotMetadata;
-            try {
-                snapshotMetadata = 
SnapshotMetadata.parseFrom(byteBuf.nioBuffer());
-            } catch (InvalidProtocolBufferException e) {
-                throw new RuntimeException(e);
-            }
+            SnapshotMetadata snapshotMetadata = new SnapshotMetadata();
+            ByteBuf slice = byteBuf.slice();
+            snapshotMetadata.parseFrom(slice, slice.readableBytes());
             return snapshotMetadata;
         }, executorService);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 95527cf173e..393ed0a84b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -58,10 +58,9 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
+import org.apache.pulsar.broker.service.schema.SchemaLocator;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
-import org.apache.pulsar.broker.service.schema.SchemaStorageFormat;
-import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -1418,17 +1417,19 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
 
         // (2) Delete schema ledger
-        MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache = 
pulsar.getLocalMetadataStore()
-                .getMetadataCache(new 
MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
+        MetadataCache<SchemaLocator> locatorEntryCache = 
pulsar.getLocalMetadataStore()
+                .getMetadataCache(new MetadataSerde<SchemaLocator>() {
                     @Override
-                    public byte[] serialize(String path, 
SchemaStorageFormat.SchemaLocator value) {
+                    public byte[] serialize(String path, SchemaLocator value) {
                         return value.toByteArray();
                     }
 
                     @Override
-                    public SchemaStorageFormat.SchemaLocator 
deserialize(String path, byte[] content, Stat stat)
+                    public SchemaLocator deserialize(String path, byte[] 
content, Stat stat)
                             throws IOException {
-                        return 
SchemaStorageFormat.SchemaLocator.parseFrom(content);
+                        SchemaLocator loc = new SchemaLocator();
+                        loc.parseFrom(content);
+                        return loc;
                     }
                 });
         String path = "/schemas/public/" + namespace + 
"/test-multi-version-schema-one";

Reply via email to