This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 90b480ef4d6 MINOR: Remove unused ReadonlyXxFacade classes (#21788)
90b480ef4d6 is described below
commit 90b480ef4d6ea826ac6fb5927dfdeb9e6abc2ae2
Author: Alieh Saeedi <[email protected]>
AuthorDate: Tue Mar 17 20:31:07 2026 +0100
MINOR: Remove unused ReadonlyXxFacade classes (#21788)
This PR removes the unused ReadonlyXxFacade classes as they have been
already replace with Generic*Facades, but mistakenly not removed.
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../ReadOnlyKeyValueStoreWithHeadersFacade.java | 110 --------------
...yTimestampedKeyValueStoreWithHeadersFacade.java | 121 ---------------
...nlyTimestampedWindowStoreWithHeadersFacade.java | 165 ---------------------
.../ReadOnlyWindowStoreWithHeadersFacade.java | 161 --------------------
4 files changed, 557 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreWithHeadersFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreWithHeadersFacade.java
deleted file mode 100644
index 887a1bbab91..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreWithHeadersFacade.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-
-/**
- * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
- * {@link ReadOnlyKeyValueStore} interface. This facade extracts the plain
value
- * from {@link ValueTimestampHeaders}, discarding timestamp and headers.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class ReadOnlyKeyValueStoreWithHeadersFacade<K, V> implements
ReadOnlyKeyValueStore<K, V> {
- protected final TimestampedKeyValueStoreWithHeaders<K, V> inner;
-
- protected ReadOnlyKeyValueStoreWithHeadersFacade(final
TimestampedKeyValueStoreWithHeaders<K, V> store) {
- inner = store;
- }
-
- @Override
- public V get(final K key) {
- final ValueTimestampHeaders<V> valueTimestampHeaders = inner.get(key);
- return valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
- }
-
- @Override
- public KeyValueIterator<K, V> range(final K from, final K to) {
- return new KeyValueIteratorWithHeadersFacade<>(inner.range(from, to));
- }
-
- @Override
- public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
- return new
KeyValueIteratorWithHeadersFacade<>(inner.reverseRange(from, to));
- }
-
- @Override
- public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix,
-
final PS prefixKeySerializer) {
- return new
KeyValueIteratorWithHeadersFacade<>(inner.prefixScan(prefix,
prefixKeySerializer));
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new KeyValueIteratorWithHeadersFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<K, V> reverseAll() {
- return new KeyValueIteratorWithHeadersFacade<>(inner.reverseAll());
- }
-
- @Override
- public long approximateNumEntries() {
- return inner.approximateNumEntries();
- }
-
- /**
- * Iterator facade that extracts plain values from ValueTimestampHeaders.
- */
- private static class KeyValueIteratorWithHeadersFacade<K, V> implements
KeyValueIterator<K, V> {
- private final KeyValueIterator<K, ValueTimestampHeaders<V>>
innerIterator;
-
- KeyValueIteratorWithHeadersFacade(final KeyValueIterator<K,
ValueTimestampHeaders<V>> innerIterator) {
- this.innerIterator = innerIterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public K peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<K, V> next() {
- final KeyValue<K, ValueTimestampHeaders<V>> next =
innerIterator.next();
- final V value = next.value == null ? null : next.value.value();
- return KeyValue.pair(next.key, value);
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedKeyValueStoreWithHeadersFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedKeyValueStoreWithHeadersFacade.java
deleted file mode 100644
index acd648b3a27..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedKeyValueStoreWithHeadersFacade.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-
-/**
- * A facade that wraps {@link TimestampedKeyValueStoreWithHeaders} to provide a
- * {@link ReadOnlyKeyValueStore} interface with {@link ValueAndTimestamp}
values.
- * This facade converts {@link ValueTimestampHeaders} to {@link
ValueAndTimestamp}, discarding headers.
- *
- * Similar to {@link ReadOnlyKeyValueStoreWithHeadersFacade} but for
timestamped queries.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class ReadOnlyTimestampedKeyValueStoreWithHeadersFacade<K, V>
implements ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> {
- protected final TimestampedKeyValueStoreWithHeaders<K, V> inner;
-
- public ReadOnlyTimestampedKeyValueStoreWithHeadersFacade(final
TimestampedKeyValueStoreWithHeaders<K, V> store) {
- inner = store;
- }
-
- @Override
- public ValueAndTimestamp<V> get(final K key) {
- final ValueTimestampHeaders<V> valueTimestampHeaders = inner.get(key);
- if (valueTimestampHeaders == null) {
- return null;
- }
- return ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
- }
-
- @Override
- public KeyValueIterator<K, ValueAndTimestamp<V>> range(final K from, final
K to) {
- return new TimestampedKeyValueIteratorFacade<>(inner.range(from, to));
- }
-
- @Override
- public KeyValueIterator<K, ValueAndTimestamp<V>> reverseRange(final K
from, final K to) {
- return new
TimestampedKeyValueIteratorFacade<>(inner.reverseRange(from, to));
- }
-
- @Override
- public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueAndTimestamp<V>> prefixScan(final P prefix,
-
final PS prefixKeySerializer) {
- return new
TimestampedKeyValueIteratorFacade<>(inner.prefixScan(prefix,
prefixKeySerializer));
- }
-
- @Override
- public KeyValueIterator<K, ValueAndTimestamp<V>> all() {
- return new TimestampedKeyValueIteratorFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<K, ValueAndTimestamp<V>> reverseAll() {
- return new TimestampedKeyValueIteratorFacade<>(inner.reverseAll());
- }
-
- @Override
- public long approximateNumEntries() {
- return inner.approximateNumEntries();
- }
-
- /**
- * Iterator facade that converts ValueTimestampHeaders to
ValueAndTimestamp.
- */
- private static class TimestampedKeyValueIteratorFacade<K, V> implements
KeyValueIterator<K, ValueAndTimestamp<V>> {
- private final KeyValueIterator<K, ValueTimestampHeaders<V>>
innerIterator;
-
- TimestampedKeyValueIteratorFacade(final KeyValueIterator<K,
ValueTimestampHeaders<V>> innerIterator) {
- this.innerIterator = innerIterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public K peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<K, ValueAndTimestamp<V>> next() {
- final KeyValue<K, ValueTimestampHeaders<V>> next =
innerIterator.next();
- final ValueAndTimestamp<V> valueAndTimestamp;
- if (next.value == null) {
- valueAndTimestamp = null;
- } else {
- valueAndTimestamp = ValueAndTimestamp.make(next.value.value(),
next.value.timestamp());
- }
- return KeyValue.pair(next.key, valueAndTimestamp);
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedWindowStoreWithHeadersFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedWindowStoreWithHeadersFacade.java
deleted file mode 100644
index 27079dd25c9..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyTimestampedWindowStoreWithHeadersFacade.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.time.Instant;
-
-/**
- * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
- * {@link ReadOnlyWindowStore} interface with {@link ValueAndTimestamp} values.
- * This facade converts {@link ValueTimestampHeaders} to {@link
ValueAndTimestamp}, discarding headers.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class ReadOnlyTimestampedWindowStoreWithHeadersFacade<K, V> implements
ReadOnlyWindowStore<K, ValueAndTimestamp<V>> {
- protected final TimestampedWindowStoreWithHeaders<K, V> inner;
-
- protected ReadOnlyTimestampedWindowStoreWithHeadersFacade(final
TimestampedWindowStoreWithHeaders<K, V> store) {
- inner = store;
- }
-
- @Override
- public ValueAndTimestamp<V> fetch(final K key, final long time) {
- final ValueTimestampHeaders<V> valueTimestampHeaders =
inner.fetch(key, time);
- return valueTimestampHeaders == null ? null :
- ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
- }
-
- @Override
- public WindowStoreIterator<ValueAndTimestamp<V>> fetch(final K key,
- final Instant
timeFrom,
- final Instant
timeTo) throws IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom,
timeTo));
- }
-
- @Override
- public WindowStoreIterator<ValueAndTimestamp<V>> backwardFetch(final K key,
- final
Instant timeFrom,
- final
Instant timeTo) throws IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.backwardFetch(key,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetch(final K
keyFrom,
- final K
keyTo,
- final
Instant timeFrom,
- final
Instant timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>>
backwardFetch(final K keyFrom,
-
final K keyTo,
-
final Instant timeFrom,
-
final Instant timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetch(keyFrom,
keyTo, timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetchAll(final
Instant timeFrom,
- final
Instant timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>>
backwardFetchAll(final Instant timeFrom,
-
final Instant timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom,
timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> all() {
- return new KeyValueIteratorFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> backwardAll() {
- return new KeyValueIteratorFacade<>(inner.backwardAll());
- }
-
- private static class WindowStoreIteratorFacade<V> implements
WindowStoreIterator<ValueAndTimestamp<V>> {
- final KeyValueIterator<Long, ValueTimestampHeaders<V>> innerIterator;
-
- WindowStoreIteratorFacade(final KeyValueIterator<Long,
ValueTimestampHeaders<V>> iterator) {
- innerIterator = iterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public Long peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<Long, ValueAndTimestamp<V>> next() {
- final KeyValue<Long, ValueTimestampHeaders<V>> innerKeyValue =
innerIterator.next();
- final ValueAndTimestamp<V> valueAndTimestamp = innerKeyValue.value
== null ? null :
- ValueAndTimestamp.make(innerKeyValue.value.value(),
innerKeyValue.value.timestamp());
- return KeyValue.pair(innerKeyValue.key, valueAndTimestamp);
- }
- }
-
- private static class KeyValueIteratorFacade<K, V> implements
KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> {
- private final KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
innerIterator;
-
- KeyValueIteratorFacade(final KeyValueIterator<Windowed<K>,
ValueTimestampHeaders<V>> iterator) {
- innerIterator = iterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public Windowed<K> peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<Windowed<K>, ValueAndTimestamp<V>> next() {
- final KeyValue<Windowed<K>, ValueTimestampHeaders<V>>
innerKeyValue = innerIterator.next();
- final ValueAndTimestamp<V> valueAndTimestamp = innerKeyValue.value
== null ? null :
- ValueAndTimestamp.make(innerKeyValue.value.value(),
innerKeyValue.value.timestamp());
- return KeyValue.pair(innerKeyValue.key, valueAndTimestamp);
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreWithHeadersFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreWithHeadersFacade.java
deleted file mode 100644
index e100ba8f42b..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreWithHeadersFacade.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.time.Instant;
-
-/**
- * A facade that wraps {@link TimestampedWindowStoreWithHeaders} to provide a
- * {@link ReadOnlyWindowStore} interface with plain values.
- * This facade converts {@link ValueTimestampHeaders} to plain values,
discarding both timestamps and headers.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class ReadOnlyWindowStoreWithHeadersFacade<K, V> implements
ReadOnlyWindowStore<K, V> {
- protected final TimestampedWindowStoreWithHeaders<K, V> inner;
-
- protected ReadOnlyWindowStoreWithHeadersFacade(final
TimestampedWindowStoreWithHeaders<K, V> store) {
- inner = store;
- }
-
- @Override
- public V fetch(final K key, final long time) {
- final ValueTimestampHeaders<V> valueTimestampHeaders =
inner.fetch(key, time);
- return valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
- }
-
- @Override
- public WindowStoreIterator<V> fetch(final K key,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom,
timeTo));
- }
-
- @Override
- public WindowStoreIterator<V> backwardFetch(final K key,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.backwardFetch(key,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
- final K keyTo,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
- final K keyTo,
- final Instant
timeFrom,
- final Instant
timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetch(keyFrom,
keyTo, timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant timeFrom,
- final Instant timeTo)
throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant
timeFrom,
- final Instant
timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom,
timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> all() {
- return new KeyValueIteratorFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardAll() {
- return new KeyValueIteratorFacade<>(inner.backwardAll());
- }
-
- private static class WindowStoreIteratorFacade<V> implements
WindowStoreIterator<V> {
- final KeyValueIterator<Long, ValueTimestampHeaders<V>> innerIterator;
-
- WindowStoreIteratorFacade(final KeyValueIterator<Long,
ValueTimestampHeaders<V>> iterator) {
- innerIterator = iterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public Long peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<Long, V> next() {
- final KeyValue<Long, ValueTimestampHeaders<V>> innerKeyValue =
innerIterator.next();
- final V value = innerKeyValue.value == null ? null :
innerKeyValue.value.value();
- return KeyValue.pair(innerKeyValue.key, value);
- }
- }
-
- private static class KeyValueIteratorFacade<K, V> implements
KeyValueIterator<Windowed<K>, V> {
- private final KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
innerIterator;
-
- KeyValueIteratorFacade(final KeyValueIterator<Windowed<K>,
ValueTimestampHeaders<V>> iterator) {
- innerIterator = iterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public Windowed<K> peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<Windowed<K>, V> next() {
- final KeyValue<Windowed<K>, ValueTimestampHeaders<V>>
innerKeyValue = innerIterator.next();
- final V value = innerKeyValue.value == null ? null :
innerKeyValue.value.value();
- return KeyValue.pair(innerKeyValue.key, value);
- }
- }
-}
\ No newline at end of file