stevenzwu commented on code in PR #6160: URL: https://github.com/apache/iceberg/pull/6160#discussion_r1093366709
########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java: ########## @@ -94,14 +94,17 @@ private FlinkConfigOptions() {} public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE = ConfigOptions.key("table.exec.iceberg.split-assigner-type") .enumType(SplitAssignerType.class) - .defaultValue(SplitAssignerType.SIMPLE) + .noDefaultValue() Review Comment: why do we change the default? I thought it is good to set a default for the more common scenario which simple assigner is probably good. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java: ########## @@ -94,14 +94,17 @@ private FlinkConfigOptions() {} public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE = ConfigOptions.key("table.exec.iceberg.split-assigner-type") .enumType(SplitAssignerType.class) - .defaultValue(SplitAssignerType.SIMPLE) + .noDefaultValue() .withDescription( Description.builder() .text("Split assigner type that determine how splits are assigned to readers.") .linebreak() .list( TextElement.text( SplitAssignerType.SIMPLE - + ": simple assigner that doesn't provide any guarantee on order or locality.")) + + ": simple assigner that doesn't provide any guarantee on order or locality."), + TextElement.text( + SplitAssignerType.LOCALITY + + ": locality assigner that provide guarantee on locality.")) Review Comment: we shouldn't use `guarantee`, which is not very accurate. my suggestion would be `that assign splits with locality affinity preference`. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/SourceUtil.java: ########## @@ -68,6 +67,24 @@ static boolean isLocalityEnabled( return false; } + static SplitAssignerFactory assignerFactory( Review Comment: this is not a getter method. probably `createAssignerFactory` is more appropriate? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java: ########## @@ -236,8 +236,6 @@ public FlinkInputFormat buildFormat() { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema)); } - contextBuilder.exposeLocality( Review Comment: why removing this part? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { Review Comment: please check SimpleSplitAssigner. some/most methods may require `synchronized` to be safe. also maybe add a short description on what this assigner does. similar to the config doc. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); Review Comment: collect copies the entire collection to a list. the state can be big. why don't we just do a `forEach` on the input collection to avoid the copy. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssignerType.java: ########## @@ -27,6 +27,12 @@ public enum SplitAssignerType { public SplitAssignerFactory factory() { return new SimpleSplitAssignerFactory(); } + }, + LOCALITY { Review Comment: nit: should have an empty line btw ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceReader.java: ########## @@ -69,6 +74,13 @@ protected IcebergSourceSplit toSplitType(String splitId, IcebergSourceSplit spli } private void requestSplit(Collection<String> finishedSplitIds) { - context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds)); + String hostname = null; + try { + hostname = InetAddress.getLocalHost().getHostName(); Review Comment: this shouldn't need to be done in per requestSplit step. it can be done once during init, right? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return GetSplitResult.unavailable(); + } + + Deque<IcebergSourceSplit> icebergSourceSplits = + hostname == null + ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits) + : getIcebergSourceSplits(hostname, pendingSplits); + LOG.info("Get Iceberg source splits for: {}", hostname); + + if (!icebergSourceSplits.isEmpty()) { + IcebergSourceSplit split = icebergSourceSplits.poll(); + return GetSplitResult.forSplit(split); + } + + return GetSplitResult.unavailable(); + } + + private Deque<IcebergSourceSplit> getIcebergSourceSplits( + String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) { + if (splitsDeque.isEmpty()) { + return new ArrayDeque<>(); + } + + Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator = + splitsDeque.entrySet().iterator(); + while (splitsIterator.hasNext()) { Review Comment: I am not sure this is the right approach. it can make get split very slow if there are a lot of splits with different hosts. Seems to me that we shouldn't use `Set<String>` as the Map key. I think in the split planning phase maybe we shouldn't use CombinedScanTask directly. We can create one task per file so that we can ensure a single hostname for the task. Then this can be a single string map key. if we want to further optimize, we can group the files for the same host back to CombinedScanTask in the planning phase. cc @pvary ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSplitAssignerBase.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.junit.Assert; +import org.mockito.internal.util.collections.Sets; + +public abstract class TestSplitAssignerBase { + + protected void assertAvailableFuture( + SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) { + assertAvailableFuture(assigner, splitCount, addSplitsRunnable, null); + } + + protected void assertAvailableFuture( + SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable, String hostname) { + // register callback + AtomicBoolean futureCompleted = new AtomicBoolean(); + CompletableFuture<Void> future = assigner.isAvailable(); + future.thenAccept(ignored -> futureCompleted.set(true)); + // calling isAvailable again should return the same object reference + // note that thenAccept will return a new future. + // we want to assert the same instance on the assigner returned future + Assert.assertSame(future, assigner.isAvailable()); + + // now add some splits + addSplitsRunnable.run(); + Assert.assertEquals(true, futureCompleted.get()); + + for (int i = 0; i < splitCount; ++i) { + assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, hostname); + } + assertGetNext(assigner, GetSplitResult.Status.UNAVAILABLE); Review Comment: nit: use the hostname version here too ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestSplitAssignerBase.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.junit.Assert; +import org.mockito.internal.util.collections.Sets; + +public abstract class TestSplitAssignerBase { + + protected void assertAvailableFuture( + SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable) { + assertAvailableFuture(assigner, splitCount, addSplitsRunnable, null); + } + + protected void assertAvailableFuture( + SplitAssigner assigner, int splitCount, Runnable addSplitsRunnable, String hostname) { + // register callback + AtomicBoolean futureCompleted = new AtomicBoolean(); + CompletableFuture<Void> future = assigner.isAvailable(); + future.thenAccept(ignored -> futureCompleted.set(true)); + // calling isAvailable again should return the same object reference + // note that thenAccept will return a new future. + // we want to assert the same instance on the assigner returned future + Assert.assertSame(future, assigner.isAvailable()); + + // now add some splits + addSplitsRunnable.run(); + Assert.assertEquals(true, futureCompleted.get()); + + for (int i = 0; i < splitCount; ++i) { + assertGetNext(assigner, GetSplitResult.Status.AVAILABLE, hostname); Review Comment: can we also assert on the pendingSplitsCount too? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java: ########## @@ -39,24 +39,37 @@ public class IcebergSourceSplit implements SourceSplit, Serializable { private int fileOffset; private long recordOffset; + private final String[] hostname; Review Comment: the variable name should be plural since it is an array. applied to everywhere when this is used. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java: ########## @@ -434,6 +436,9 @@ public IcebergSource<T> build() { } } + splitAssignerFactory = + SourceUtil.assignerFactory(flinkConfig, splitAssignerFactory, context.exposeLocality()); Review Comment: this function behavior is a little weird. we pass in `splitAssignerFactory` and return another `splitAssignerFactory`. it is more clear to do the null check outside the method. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return GetSplitResult.unavailable(); + } + + Deque<IcebergSourceSplit> icebergSourceSplits = + hostname == null + ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits) + : getIcebergSourceSplits(hostname, pendingSplits); + LOG.info("Get Iceberg source splits for: {}", hostname); + + if (!icebergSourceSplits.isEmpty()) { + IcebergSourceSplit split = icebergSourceSplits.poll(); + return GetSplitResult.forSplit(split); + } + + return GetSplitResult.unavailable(); + } + + private Deque<IcebergSourceSplit> getIcebergSourceSplits( + String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) { + if (splitsDeque.isEmpty()) { + return new ArrayDeque<>(); + } + + Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator = + splitsDeque.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next(); + Deque<IcebergSourceSplit> splits = splitsEntry.getValue(); + if (splits.isEmpty()) { + splitsIterator.remove(); + continue; + } + + if (splitsEntry.getKey().contains(hostname)) { + return splits; + } + } + + if (!splitsDeque.isEmpty()) { + return splitsDeque.values().stream().findAny().get(); + } + + return new ArrayDeque<>(); + } + + @Override + public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + @Override + public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + private void addSplits(Collection<IcebergSourceSplit> splits) { + if (splits.isEmpty()) { + return; + } + + for (IcebergSourceSplit split : splits) { + String[] hostname = split.hostname(); + if (hostname == null) { + hostname = new String[] {DEFAULT_HOSTNAME}; + } + + Set<String> hosts = Sets.newHashSet(hostname); + Deque<IcebergSourceSplit> icebergSourceSplits = + pendingSplits.computeIfAbsent(hosts, key -> new ArrayDeque<>()); + icebergSourceSplits.add(split); + } + + // only complete pending future if new splits are discovered + completeAvailableFuturesIfNeeded(); + } + + /** Simple assigner only tracks unassigned splits */ + @Override + public Collection<IcebergSourceSplitState> state() { + return pendingSplits.values().stream() + .flatMap(Collection::stream) + .map(split -> new IcebergSourceSplitState(split, IcebergSourceSplitStatus.UNASSIGNED)) + .collect(Collectors.toList()); + } + + @Override + public synchronized CompletableFuture<Void> isAvailable() { + if (availableFuture == null) { + availableFuture = new CompletableFuture<>(); + } + return availableFuture; + } + + @Override + public int pendingSplitCount() { + return (int) pendingSplits.values().stream().mapToInt(Deque::size).count(); Review Comment: last step should be `sum` right? maybe we should add some assertions in unit test ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/split/SplitRequestEvent.java: ########## @@ -32,7 +32,7 @@ public class SplitRequestEvent implements SourceEvent { private final String requesterHostname; public SplitRequestEvent() { - this(Collections.emptyList()); + this(Collections.emptyList(), null); Review Comment: this change seems not necessary? ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return GetSplitResult.unavailable(); + } + + Deque<IcebergSourceSplit> icebergSourceSplits = + hostname == null + ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits) + : getIcebergSourceSplits(hostname, pendingSplits); + LOG.info("Get Iceberg source splits for: {}", hostname); + + if (!icebergSourceSplits.isEmpty()) { + IcebergSourceSplit split = icebergSourceSplits.poll(); + return GetSplitResult.forSplit(split); + } + + return GetSplitResult.unavailable(); + } + + private Deque<IcebergSourceSplit> getIcebergSourceSplits( + String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) { + if (splitsDeque.isEmpty()) { + return new ArrayDeque<>(); + } + + Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator = + splitsDeque.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next(); + Deque<IcebergSourceSplit> splits = splitsEntry.getValue(); + if (splits.isEmpty()) { + splitsIterator.remove(); + continue; + } + + if (splitsEntry.getKey().contains(hostname)) { + return splits; + } + } + + if (!splitsDeque.isEmpty()) { + return splitsDeque.values().stream().findAny().get(); + } + + return new ArrayDeque<>(); + } + + @Override + public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + @Override + public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + private void addSplits(Collection<IcebergSourceSplit> splits) { + if (splits.isEmpty()) { + return; + } + + for (IcebergSourceSplit split : splits) { + String[] hostname = split.hostname(); + if (hostname == null) { + hostname = new String[] {DEFAULT_HOSTNAME}; + } + + Set<String> hosts = Sets.newHashSet(hostname); + Deque<IcebergSourceSplit> icebergSourceSplits = + pendingSplits.computeIfAbsent(hosts, key -> new ArrayDeque<>()); + icebergSourceSplits.add(split); + } + + // only complete pending future if new splits are discovered + completeAvailableFuturesIfNeeded(); + } + + /** Simple assigner only tracks unassigned splits */ Review Comment: paste error ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return GetSplitResult.unavailable(); + } + + Deque<IcebergSourceSplit> icebergSourceSplits = + hostname == null + ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits) + : getIcebergSourceSplits(hostname, pendingSplits); + LOG.info("Get Iceberg source splits for: {}", hostname); + + if (!icebergSourceSplits.isEmpty()) { + IcebergSourceSplit split = icebergSourceSplits.poll(); + return GetSplitResult.forSplit(split); + } + + return GetSplitResult.unavailable(); + } + + private Deque<IcebergSourceSplit> getIcebergSourceSplits( + String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) { + if (splitsDeque.isEmpty()) { + return new ArrayDeque<>(); + } + + Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator = + splitsDeque.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next(); + Deque<IcebergSourceSplit> splits = splitsEntry.getValue(); + if (splits.isEmpty()) { + splitsIterator.remove(); + continue; + } + + if (splitsEntry.getKey().contains(hostname)) { + return splits; + } + } + + if (!splitsDeque.isEmpty()) { + return splitsDeque.values().stream().findAny().get(); + } + + return new ArrayDeque<>(); Review Comment: I also think this shouldn't guarantee locality. Instead, it can be the preference behavior. if there are splits matching the request host, prefer those splits. Otherwise, it can be free to get splits from any other hosts. ########## flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/assigner/LocalitySplitAssigner.java: ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitState; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public class LocalitySplitAssigner implements SplitAssigner { + private static final Logger LOG = LoggerFactory.getLogger(LocalitySplitAssigner.class); + + private static final String DEFAULT_HOSTNAME = "hostname"; + private final Map<Set<String>, Deque<IcebergSourceSplit>> pendingSplits; + private CompletableFuture<Void> availableFuture; + + public LocalitySplitAssigner() { + this.pendingSplits = Maps.newHashMap(); + } + + public LocalitySplitAssigner(Collection<IcebergSourceSplitState> assignerState) { + this.pendingSplits = Maps.newHashMap(); + Stream<IcebergSourceSplit> splits = assignerState.stream().map(IcebergSourceSplitState::split); + addSplits(splits.collect(Collectors.toList())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return GetSplitResult.unavailable(); + } + + Deque<IcebergSourceSplit> icebergSourceSplits = + hostname == null + ? getIcebergSourceSplits(DEFAULT_HOSTNAME, pendingSplits) + : getIcebergSourceSplits(hostname, pendingSplits); + LOG.info("Get Iceberg source splits for: {}", hostname); + + if (!icebergSourceSplits.isEmpty()) { + IcebergSourceSplit split = icebergSourceSplits.poll(); + return GetSplitResult.forSplit(split); + } + + return GetSplitResult.unavailable(); + } + + private Deque<IcebergSourceSplit> getIcebergSourceSplits( + String hostname, Map<Set<String>, Deque<IcebergSourceSplit>> splitsDeque) { + if (splitsDeque.isEmpty()) { + return new ArrayDeque<>(); + } + + Iterator<Map.Entry<Set<String>, Deque<IcebergSourceSplit>>> splitsIterator = + splitsDeque.entrySet().iterator(); + while (splitsIterator.hasNext()) { + Map.Entry<Set<String>, Deque<IcebergSourceSplit>> splitsEntry = splitsIterator.next(); + Deque<IcebergSourceSplit> splits = splitsEntry.getValue(); + if (splits.isEmpty()) { + splitsIterator.remove(); + continue; + } + + if (splitsEntry.getKey().contains(hostname)) { + return splits; + } + } + + if (!splitsDeque.isEmpty()) { + return splitsDeque.values().stream().findAny().get(); + } + + return new ArrayDeque<>(); + } + + @Override + public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + @Override + public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) { + addSplits(splits); + } + + private void addSplits(Collection<IcebergSourceSplit> splits) { + if (splits.isEmpty()) { + return; + } + + for (IcebergSourceSplit split : splits) { + String[] hostname = split.hostname(); Review Comment: should be plural -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org