fqaiser94 commented on code in PR #10351:
URL: https://github.com/apache/iceberg/pull/10351#discussion_r1621333676


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkTask extends SinkTask {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSinkTask.class);
+
+  private IcebergSinkConfig config;
+  private Catalog catalog;
+  private Committer committer;
+
+  @Override
+  public String version() {
+    return IcebergSinkConfig.version();

Review Comment:
   Looks like this method was implemented before this PR but I have a question 
about this.
   This method returns a String that looks something like this: 
   ```
   IcebergBuild.version() + "-kc-" + kcVersion;
   ```
   
   where `kcVersion =  
IcebergSinkConfig.class.getPackage().getImplementationVersion()`. 
   
   Won't `kcVersion` and `IcebergBuild.version()` be the same value since AFAIK 
this connector's releases will be tied with the general iceberg releases? CMIIW
   
   Note: when I run the existing unit test locally, I get 
`1.6.0-SNAPHSHOT-kc-unknown` currently. 



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkTask.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkTask extends SinkTask {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSinkTask.class);
+
+  private IcebergSinkConfig config;
+  private Catalog catalog;

Review Comment:
   As far as I know Catalog objects are not guaranteed to be thread-safe.
   And at least on the "leader" tasks, we can have multiple threads using the 
same Catalog instance at the same time ("leader" tasks have both a main thread 
as well as a CoordinatorThread).
   I haven't heard users report any issues about this but it would be better to 
avoid this risk entirely?
   



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {
+      routeRecordDynamically(record);
+    } else {
+      routeRecordStatically(record);
+    }
+  }
+
+  private void routeRecordStatically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+
+    if (routeField == null) {
+      // route to all tables
+      config
+          .tables()
+          .forEach(
+              tableName -> {
+                writerForTable(tableName, record, false).write(record);
+              });
+
+    } else {
+      String routeValue = extractRouteValue(record.value(), routeField);
+      if (routeValue != null) {
+        config
+            .tables()
+            .forEach(
+                tableName -> {
+                  Pattern regex = config.tableConfig(tableName).routeRegex();
+                  if (regex != null && regex.matcher(routeValue).matches()) {
+                    writerForTable(tableName, record, false).write(record);
+                  }
+                });
+      }

Review Comment:
   nit: is this really "static" routing? I guess it's a static list of tables, 
but the table/s each message is written to is determined dynamically based on 
the route value ...
   
   More importantly, is this an important enough use-case to support within the 
connector? I would strongly prefer if we didn't support this within the 
connector itself (users can easily implement this by writing an SMT + dynamic 
mode).



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {

Review Comment:
   Same here, we really don't need to check this on every record?
   
   Feels like we're missing an abstraction here, something like the concept of 
a `Router` which has a `StaticRouter` implementation and a `DynamicRouter` 
implementation and only one of those is constructed for the lifetime of a 
`SinkWriter` based on the `config.dynamicTablesEnabled()` setting. 



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {
+      routeRecordDynamically(record);
+    } else {
+      routeRecordStatically(record);
+    }
+  }
+
+  private void routeRecordStatically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+
+    if (routeField == null) {
+      // route to all tables
+      config
+          .tables()
+          .forEach(
+              tableName -> {
+                writerForTable(tableName, record, false).write(record);
+              });
+
+    } else {
+      String routeValue = extractRouteValue(record.value(), routeField);
+      if (routeValue != null) {
+        config
+            .tables()
+            .forEach(
+                tableName -> {
+                  Pattern regex = config.tableConfig(tableName).routeRegex();
+                  if (regex != null && regex.matcher(routeValue).matches()) {
+                    writerForTable(tableName, record, false).write(record);
+                  }
+                });
+      }
+    }
+  }
+
+  private void routeRecordDynamically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+    Preconditions.checkNotNull(routeField, "Route field cannot be null with 
dynamic routing");

Review Comment:
   This should really be checked at config parsing time or at SinkWriter 
construction time? 
   Instead of on every record. 
   To be clear; I'm not worried about this from a performance perspective (I'm 
confident the JVM will optimize this away) but it just seems awkward. 



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {
+      routeRecordDynamically(record);
+    } else {
+      routeRecordStatically(record);
+    }
+  }
+
+  private void routeRecordStatically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+
+    if (routeField == null) {
+      // route to all tables
+      config
+          .tables()
+          .forEach(
+              tableName -> {
+                writerForTable(tableName, record, false).write(record);
+              });
+
+    } else {
+      String routeValue = extractRouteValue(record.value(), routeField);
+      if (routeValue != null) {
+        config
+            .tables()
+            .forEach(
+                tableName -> {
+                  Pattern regex = config.tableConfig(tableName).routeRegex();
+                  if (regex != null && regex.matcher(routeValue).matches()) {
+                    writerForTable(tableName, record, false).write(record);
+                  }
+                });
+      }
+    }
+  }
+
+  private void routeRecordDynamically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+    Preconditions.checkNotNull(routeField, "Route field cannot be null with 
dynamic routing");
+
+    String routeValue = extractRouteValue(record.value(), routeField);
+    if (routeValue != null) {

Review Comment:
   IMO we should throw an error instead of dropping the record.
   Users can filter out messages easily using an SMT to get the same behaviour, 
if necessary.
   In the future, I imagine we can allow users to supply custom 
exception-handler implementations which could also allow users to drop records 
on error.



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SinkWriter.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.SinkRecord;
+
+public class SinkWriter {
+  private final IcebergSinkConfig config;
+  private final IcebergWriterFactory writerFactory;
+  private final Map<String, RecordWriter> writers;
+  private final Map<TopicPartition, Offset> sourceOffsets;
+
+  public SinkWriter(Catalog catalog, IcebergSinkConfig config) {
+    this.config = config;
+    this.writerFactory = new IcebergWriterFactory(catalog, config);
+    this.writers = Maps.newHashMap();
+    this.sourceOffsets = Maps.newHashMap();
+  }
+
+  public void close() {
+    writers.values().forEach(RecordWriter::close);
+  }
+
+  public SinkWriterResult completeWrite() {
+    List<IcebergWriterResult> writerResults =
+        writers.values().stream()
+            .flatMap(writer -> writer.complete().stream())
+            .collect(Collectors.toList());
+    Map<TopicPartition, Offset> offsets = Maps.newHashMap(sourceOffsets);
+
+    writers.clear();
+    sourceOffsets.clear();
+
+    return new SinkWriterResult(writerResults, offsets);
+  }
+
+  public void save(Collection<SinkRecord> sinkRecords) {
+    sinkRecords.forEach(this::save);
+  }
+
+  private void save(SinkRecord record) {
+    // the consumer stores the offsets that corresponds to the next record to 
consume,
+    // so increment the record offset by one
+    OffsetDateTime timestamp =
+        record.timestamp() == null
+            ? null
+            : 
OffsetDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), 
ZoneOffset.UTC);
+    sourceOffsets.put(
+        new TopicPartition(record.topic(), record.kafkaPartition()),
+        new Offset(record.kafkaOffset() + 1, timestamp));
+
+    if (config.dynamicTablesEnabled()) {
+      routeRecordDynamically(record);
+    } else {
+      routeRecordStatically(record);
+    }
+  }
+
+  private void routeRecordStatically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+
+    if (routeField == null) {
+      // route to all tables
+      config
+          .tables()
+          .forEach(
+              tableName -> {
+                writerForTable(tableName, record, false).write(record);
+              });
+
+    } else {
+      String routeValue = extractRouteValue(record.value(), routeField);
+      if (routeValue != null) {
+        config
+            .tables()
+            .forEach(
+                tableName -> {
+                  Pattern regex = config.tableConfig(tableName).routeRegex();
+                  if (regex != null && regex.matcher(routeValue).matches()) {
+                    writerForTable(tableName, record, false).write(record);
+                  }
+                });
+      }
+    }
+  }
+
+  private void routeRecordDynamically(SinkRecord record) {
+    String routeField = config.tablesRouteField();
+    Preconditions.checkNotNull(routeField, "Route field cannot be null with 
dynamic routing");
+
+    String routeValue = extractRouteValue(record.value(), routeField);
+    if (routeValue != null) {
+      String tableName = routeValue.toLowerCase();
+      writerForTable(tableName, record, true).write(record);
+    }

Review Comment:
   Can we support writing to multiple tables if this is a comma separate list 
of table names? Like we do in "static" mode.
   
   I think this is important because then we can remove regex-matcher configs 
as it could be implemented entirely as an SMT + dynamic mode. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to