frankvicky commented on code in PR #21444: URL: https://github.com/apache/kafka/pull/21444#discussion_r2789234214
########## streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java: ########## @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatchInterface; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.kafka.streams.state.internals.RocksDBStore.incrementWithoutOverflow; + +/** + * A generic implementation of {@link RocksDBStore.ColumnFamilyAccessor} that supports dual-column-family + * upgrade scenarios. This class manages two column families: + * <ul> + * <li>oldColumnFamily: contains legacy data in the old format</li> + * <li>newColumnFamily: contains data in the new format</li> + * </ul> + * + * When reading, it first checks the new column family, then falls back to the old column family + * and converts values on-the-fly using the provided conversion function. + */ +class DualColumnFamilyAccessor implements RocksDBStore.ColumnFamilyAccessor { + + private final ColumnFamilyHandle oldColumnFamily; + private final ColumnFamilyHandle newColumnFamily; + private final Function<byte[], byte[]> valueConverter; + private final RocksDBStore store; + + /** + * Constructs a DualColumnFamilyAccessor. + * + * @param oldColumnFamily the column family containing legacy data + * @param newColumnFamily the column family for new format data + * @param valueConverter function to convert old format values to new format + * @param store the RocksDBStore instance (for accessing position, context, and name) + */ Review Comment: style nit: ```suggestion /** * Constructs a DualColumnFamilyAccessor. * * @param oldColumnFamily the column family containing legacy data * @param newColumnFamily the column family for new format data * @param valueConverter function to convert old format values to new format * @param store the RocksDBStore instance (for accessing position, context, and name) */ ``` ########## streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java: ########## @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatchInterface; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.kafka.streams.state.internals.RocksDBStore.incrementWithoutOverflow; + +/** + * A generic implementation of {@link RocksDBStore.ColumnFamilyAccessor} that supports dual-column-family + * upgrade scenarios. This class manages two column families: + * <ul> + * <li>oldColumnFamily: contains legacy data in the old format</li> + * <li>newColumnFamily: contains data in the new format</li> + * </ul> + * + * When reading, it first checks the new column family, then falls back to the old column family + * and converts values on-the-fly using the provided conversion function. + */ +class DualColumnFamilyAccessor implements RocksDBStore.ColumnFamilyAccessor { + + private final ColumnFamilyHandle oldColumnFamily; + private final ColumnFamilyHandle newColumnFamily; + private final Function<byte[], byte[]> valueConverter; + private final RocksDBStore store; + + /** + * Constructs a DualColumnFamilyAccessor. + * + * @param oldColumnFamily the column family containing legacy data + * @param newColumnFamily the column family for new format data + * @param valueConverter function to convert old format values to new format + * @param store the RocksDBStore instance (for accessing position, context, and name) + */ + DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily, + final ColumnFamilyHandle newColumnFamily, + final Function<byte[], byte[]> valueConverter, + final RocksDBStore store) { + this.oldColumnFamily = oldColumnFamily; + this.newColumnFamily = newColumnFamily; + this.valueConverter = valueConverter; + this.store = store; + } + + @Override + public void put(final RocksDBStore.DBAccessor accessor, + final byte[] key, + final byte[] value) { + synchronized (store.position) { + if (value == null) { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + try { + accessor.delete(newColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + } else { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + try { + accessor.put(newColumnFamily, key, value); + StoreQueryUtils.updatePosition(store.position, store.context); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while putting key/value into store " + store.name(), e); + } + } + } + } + + @Override + public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries, + final WriteBatchInterface batch) throws RocksDBException { + for (final KeyValue<Bytes, byte[]> entry : entries) { + Objects.requireNonNull(entry.key, "key cannot be null"); + addToBatch(entry.key.get(), entry.value, batch); + } + } + + @Override + public byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key) throws RocksDBException { + return get(accessor, key, Optional.empty()); + } + + @Override + public byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key, + final ReadOptions readOptions) throws RocksDBException { + return get(accessor, key, Optional.of(readOptions)); + } + + private byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key, + final Optional<ReadOptions> readOptions) throws RocksDBException { + final byte[] newValue = readOptions.isPresent() + ? accessor.get(newColumnFamily, readOptions.get(), key) + : accessor.get(newColumnFamily, key); + if (newValue != null) { + return newValue; + } + + final byte[] oldValue = readOptions.isPresent() Review Comment: nit: maybe `valueInOldFormat`? ########## streams/src/main/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessor.java: ########## @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatchInterface; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.kafka.streams.state.internals.RocksDBStore.incrementWithoutOverflow; + +/** + * A generic implementation of {@link RocksDBStore.ColumnFamilyAccessor} that supports dual-column-family + * upgrade scenarios. This class manages two column families: + * <ul> + * <li>oldColumnFamily: contains legacy data in the old format</li> + * <li>newColumnFamily: contains data in the new format</li> + * </ul> + * + * When reading, it first checks the new column family, then falls back to the old column family + * and converts values on-the-fly using the provided conversion function. + */ +class DualColumnFamilyAccessor implements RocksDBStore.ColumnFamilyAccessor { + + private final ColumnFamilyHandle oldColumnFamily; + private final ColumnFamilyHandle newColumnFamily; + private final Function<byte[], byte[]> valueConverter; + private final RocksDBStore store; + + /** + * Constructs a DualColumnFamilyAccessor. + * + * @param oldColumnFamily the column family containing legacy data + * @param newColumnFamily the column family for new format data + * @param valueConverter function to convert old format values to new format + * @param store the RocksDBStore instance (for accessing position, context, and name) + */ + DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily, + final ColumnFamilyHandle newColumnFamily, + final Function<byte[], byte[]> valueConverter, + final RocksDBStore store) { + this.oldColumnFamily = oldColumnFamily; + this.newColumnFamily = newColumnFamily; + this.valueConverter = valueConverter; + this.store = store; + } + + @Override + public void put(final RocksDBStore.DBAccessor accessor, + final byte[] key, + final byte[] value) { + synchronized (store.position) { + if (value == null) { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + try { + accessor.delete(newColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + } else { + try { + accessor.delete(oldColumnFamily, key); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while removing key from store " + store.name(), e); + } + try { + accessor.put(newColumnFamily, key, value); + StoreQueryUtils.updatePosition(store.position, store.context); + } catch (final RocksDBException e) { + throw new ProcessorStateException("Error while putting key/value into store " + store.name(), e); + } + } + } + } + + @Override + public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries, + final WriteBatchInterface batch) throws RocksDBException { + for (final KeyValue<Bytes, byte[]> entry : entries) { + Objects.requireNonNull(entry.key, "key cannot be null"); + addToBatch(entry.key.get(), entry.value, batch); + } + } + + @Override + public byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key) throws RocksDBException { + return get(accessor, key, Optional.empty()); + } + + @Override + public byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key, + final ReadOptions readOptions) throws RocksDBException { + return get(accessor, key, Optional.of(readOptions)); + } + + private byte[] get(final RocksDBStore.DBAccessor accessor, final byte[] key, + final Optional<ReadOptions> readOptions) throws RocksDBException { + final byte[] newValue = readOptions.isPresent() Review Comment: nit: maybe `valueInNewFormat`? ########## streams/src/test/java/org/apache/kafka/streams/state/internals/DualColumnFamilyAccessorTest.java: ########## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.query.Position; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteBatchInterface; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class DualColumnFamilyAccessorTest { + + @Mock + private ColumnFamilyHandle oldCF; + + @Mock + private ColumnFamilyHandle newCF; + + @Mock + private RocksDBStore.DBAccessor dbAccessor; + + private RocksDBStore store; + private Position position; Review Comment: It seems that these two members could be local variables in `setup()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
