ajantha-bhat commented on code in PR #9466:
URL: https://github.com/apache/iceberg/pull/9466#discussion_r1477293784


##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/IcebergSinkConfig.java:
##########
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import org.apache.iceberg.IcebergBuild;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergSinkConfig extends AbstractConfig {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSinkConfig.class.getName());
+
+  public static final String INTERNAL_TRANSACTIONAL_SUFFIX_PROP =
+      "iceberg.coordinator.transactional.suffix";
+  private static final String ROUTE_REGEX = "route-regex";
+  private static final String ID_COLUMNS = "id-columns";
+  private static final String PARTITION_BY = "partition-by";
+  private static final String COMMIT_BRANCH = "commit-branch";
+
+  private static final String CATALOG_PROP_PREFIX = "iceberg.catalog.";
+  private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop.";
+  private static final String KAFKA_PROP_PREFIX = "iceberg.kafka.";
+  private static final String TABLE_PROP_PREFIX = "iceberg.table.";
+  private static final String AUTO_CREATE_PROP_PREFIX = 
"iceberg.tables.auto-create-props.";
+  private static final String WRITE_PROP_PREFIX = "iceberg.table.write-props.";
+
+  private static final String CATALOG_NAME_PROP = "iceberg.catalog";
+  private static final String TABLES_PROP = "iceberg.tables";
+  private static final String TABLES_DYNAMIC_PROP = 
"iceberg.tables.dynamic-enabled";
+  private static final String TABLES_ROUTE_FIELD_PROP = 
"iceberg.tables.route-field";
+  private static final String TABLES_DEFAULT_COMMIT_BRANCH = 
"iceberg.tables.default-commit-branch";
+  private static final String TABLES_DEFAULT_ID_COLUMNS = 
"iceberg.tables.default-id-columns";
+  private static final String TABLES_DEFAULT_PARTITION_BY = 
"iceberg.tables.default-partition-by";
+  // FIXME: add config for CDC and upsert mode
+  private static final String TABLES_AUTO_CREATE_ENABLED_PROP =
+      "iceberg.tables.auto-create-enabled";
+  private static final String TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
+      "iceberg.tables.evolve-schema-enabled";
+  private static final String TABLES_SCHEMA_FORCE_OPTIONAL_PROP =
+      "iceberg.tables.schema-force-optional";
+  private static final String TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
+      "iceberg.tables.schema-case-insensitive";
+  private static final String CONTROL_TOPIC_PROP = "iceberg.control.topic";
+  private static final String CONTROL_GROUP_ID_PROP = 
"iceberg.control.group-id";
+  private static final String COMMIT_INTERVAL_MS_PROP = 
"iceberg.control.commit.interval-ms";
+  private static final int COMMIT_INTERVAL_MS_DEFAULT = 300_000;
+  private static final String COMMIT_TIMEOUT_MS_PROP = 
"iceberg.control.commit.timeout-ms";
+  private static final int COMMIT_TIMEOUT_MS_DEFAULT = 30_000;
+  private static final String COMMIT_THREADS_PROP = 
"iceberg.control.commit.threads";
+  private static final String CONNECT_GROUP_ID_PROP = 
"iceberg.connect.group-id";
+  private static final String HADDOP_CONF_DIR_PROP = "iceberg.hadoop-conf-dir";
+
+  private static final String NAME_PROP = "name";
+  private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";
+
+  private static final String DEFAULT_CATALOG_NAME = "iceberg";
+  private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
+  public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";
+
+  public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
+  public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts
+
+  @VisibleForTesting static final String COMMA_NO_PARENS_REGEX = 
",(?![^()]*+\\))";
+
+  public static final ConfigDef CONFIG_DEF = newConfigDef();
+
+  public static String version() {
+    String kcVersion = 
IcebergSinkConfig.class.getPackage().getImplementationVersion();
+    if (kcVersion == null) {
+      kcVersion = "unknown";
+    }
+    return IcebergBuild.version() + "-kc-" + kcVersion;
+  }
+
+  private static ConfigDef newConfigDef() {
+    ConfigDef configDef = new ConfigDef();
+    configDef.define(
+        TABLES_PROP,
+        ConfigDef.Type.LIST,
+        null,
+        Importance.HIGH,
+        "Comma-delimited list of destination tables");
+    configDef.define(
+        TABLES_DYNAMIC_PROP,
+        ConfigDef.Type.BOOLEAN,
+        false,
+        Importance.MEDIUM,
+        "Enable dynamic routing to tables based on a record value");
+    configDef.define(
+        TABLES_ROUTE_FIELD_PROP,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Source record field for routing records to tables");
+    configDef.define(
+        TABLES_DEFAULT_COMMIT_BRANCH,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Default branch for commits");
+    configDef.define(
+        TABLES_DEFAULT_ID_COLUMNS,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Default ID columns for tables, comma-separated");
+    configDef.define(
+        TABLES_DEFAULT_PARTITION_BY,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Default partition spec to use when creating tables, comma-separated");
+    configDef.define(
+        TABLES_AUTO_CREATE_ENABLED_PROP,
+        ConfigDef.Type.BOOLEAN,
+        false,
+        Importance.MEDIUM,
+        "Set to true to automatically create destination tables, false 
otherwise");
+    configDef.define(
+        TABLES_SCHEMA_FORCE_OPTIONAL_PROP,
+        ConfigDef.Type.BOOLEAN,
+        false,
+        Importance.MEDIUM,
+        "Set to true to set columns as optional during table create and 
evolution, false to respect schema");
+    configDef.define(
+        TABLES_SCHEMA_CASE_INSENSITIVE_PROP,
+        ConfigDef.Type.BOOLEAN,
+        false,
+        Importance.MEDIUM,
+        "Set to true to look up table columns by case-insensitive name, false 
for case-sensitive");
+    configDef.define(
+        TABLES_EVOLVE_SCHEMA_ENABLED_PROP,
+        ConfigDef.Type.BOOLEAN,
+        false,
+        Importance.MEDIUM,
+        "Set to true to add any missing record fields to the table schema, 
false otherwise");
+    configDef.define(
+        CATALOG_NAME_PROP,
+        ConfigDef.Type.STRING,
+        DEFAULT_CATALOG_NAME,
+        Importance.MEDIUM,
+        "Iceberg catalog name");
+    configDef.define(
+        CONTROL_TOPIC_PROP,
+        ConfigDef.Type.STRING,
+        DEFAULT_CONTROL_TOPIC,
+        Importance.MEDIUM,
+        "Name of the control topic");
+    configDef.define(
+        CONTROL_GROUP_ID_PROP,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Name of the consumer group to store offsets");
+    configDef.define(
+        CONNECT_GROUP_ID_PROP,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.LOW,
+        "Name of the Connect consumer group, should not be set under normal 
conditions");
+    configDef.define(
+        COMMIT_INTERVAL_MS_PROP,
+        ConfigDef.Type.INT,
+        COMMIT_INTERVAL_MS_DEFAULT,
+        Importance.MEDIUM,
+        "Coordinator interval for performing Iceberg table commits, in 
millis");
+    configDef.define(
+        COMMIT_TIMEOUT_MS_PROP,
+        ConfigDef.Type.INT,
+        COMMIT_TIMEOUT_MS_DEFAULT,
+        Importance.MEDIUM,
+        "Coordinator time to wait for worker responses before committing, in 
millis");
+    configDef.define(
+        COMMIT_THREADS_PROP,
+        ConfigDef.Type.INT,
+        Runtime.getRuntime().availableProcessors() * 2,
+        Importance.MEDIUM,
+        "Coordinator threads to use for table commits, default is (cores * 
2)");
+    configDef.define(
+        HADDOP_CONF_DIR_PROP,
+        ConfigDef.Type.STRING,
+        null,
+        Importance.MEDIUM,
+        "Coordinator threads to use for table commits, default is (cores * 
2)");

Review Comment:
   I think this definition is wrong. 



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.Tasks;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergWriterFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergWriterFactory.class);
+
+  private final Catalog catalog;
+  private final IcebergSinkConfig config;
+
+  public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
+    this.catalog = catalog;
+    this.config = config;
+  }
+
+  public RecordWriter createWriter(
+      String tableName, SinkRecord sample, boolean ignoreMissingTable) {
+    TableIdentifier identifier = TableIdentifier.parse(tableName);
+    Table table;
+    try {
+      table = catalog.loadTable(identifier);
+    } catch (NoSuchTableException nst) {
+      if (config.autoCreateEnabled()) {
+        table = autoCreateTable(tableName, sample);
+      } else if (ignoreMissingTable) {
+        return new NoOpWriter();
+      } else {
+        throw nst;
+      }
+    }
+
+    return new IcebergWriter(table, tableName, config);
+  }
+
+  @VisibleForTesting
+  Table autoCreateTable(String tableName, SinkRecord sample) {
+    StructType structType;
+    if (sample.valueSchema() == null) {
+      Type type = SchemaUtils.inferIcebergType(sample.value(), config);
+      if (type == null) {
+        throw new DataException("Unable to create table from empty object");
+      }
+      structType = type.asStructType();
+    } else {
+      structType = SchemaUtils.toIcebergType(sample.valueSchema(), 
config).asStructType();
+    }
+
+    org.apache.iceberg.Schema schema = new 
org.apache.iceberg.Schema(structType.fields());
+    TableIdentifier identifier = TableIdentifier.parse(tableName);
+
+    List<String> partitionBy = config.tableConfig(tableName).partitionBy();
+    PartitionSpec spec;
+    try {
+      spec = SchemaUtils.createPartitionSpec(schema, partitionBy);
+    } catch (Exception e) {
+      LOG.error(
+          "Unable to create partition spec {}, table {} will be unpartitioned",
+          partitionBy,
+          identifier,
+          e);
+      spec = PartitionSpec.unpartitioned();
+    }
+
+    PartitionSpec partitionSpec = spec;
+    AtomicReference<Table> result = new AtomicReference<>();
+    Tasks.range(1)
+        .retry(IcebergSinkConfig.CREATE_TABLE_RETRIES)
+        .run(
+            notUsed -> {
+              try {
+                result.set(
+                    catalog.createTable(

Review Comment:
   we have to create the namespace also if doesn't exist. 
   
   Many catalogs doesn't support implicit namespaces and expects the namespace 
to exist before table creation. 



##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java:
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.connect.data;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.connect.IcebergSinkConfig;
+import org.apache.iceberg.connect.data.SchemaUpdate.AddColumn;
+import org.apache.iceberg.connect.data.SchemaUpdate.MakeOptional;
+import org.apache.iceberg.connect.data.SchemaUpdate.UpdateType;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Type.PrimitiveType;
+import org.apache.iceberg.types.Type.TypeID;
+import org.apache.iceberg.types.Types.BinaryType;
+import org.apache.iceberg.types.Types.BooleanType;
+import org.apache.iceberg.types.Types.DateType;
+import org.apache.iceberg.types.Types.DecimalType;
+import org.apache.iceberg.types.Types.DoubleType;
+import org.apache.iceberg.types.Types.FloatType;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.LongType;
+import org.apache.iceberg.types.Types.MapType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.types.Types.TimeType;
+import org.apache.iceberg.types.Types.TimestampType;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.Tasks;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SchemaUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class);
+
+  private static final Pattern TRANSFORM_REGEX = 
Pattern.compile("(\\w+)\\((.+)\\)");
+
+  public static PrimitiveType needsDataTypeUpdate(Type currentIcebergType, 
Schema valueSchema) {
+    if (currentIcebergType.typeId() == TypeID.FLOAT && valueSchema.type() == 
Schema.Type.FLOAT64) {
+      return DoubleType.get();
+    }
+    if (currentIcebergType.typeId() == TypeID.INTEGER && valueSchema.type() == 
Schema.Type.INT64) {
+      return LongType.get();
+    }
+    return null;
+  }
+
+  public static void applySchemaUpdates(Table table, SchemaUpdate.Consumer 
updates) {
+    if (updates == null || updates.empty()) {
+      // no updates to apply
+      return;
+    }
+
+    Tasks.range(1)
+        .retry(IcebergSinkConfig.SCHEMA_UPDATE_RETRIES)
+        .run(notUsed -> commitSchemaUpdates(table, updates));
+  }
+
+  private static void commitSchemaUpdates(Table table, SchemaUpdate.Consumer 
updates) {
+    // get the latest schema in case another process updated it
+    table.refresh();
+
+    // filter out columns that have already been added
+    List<AddColumn> addColumns =
+        updates.addColumns().stream()
+            .filter(addCol -> !columnExists(table.schema(), addCol))
+            .collect(Collectors.toList());
+
+    // filter out columns that have the updated type
+    List<UpdateType> updateTypes =
+        updates.updateTypes().stream()
+            .filter(updateType -> !typeMatches(table.schema(), updateType))
+            .collect(Collectors.toList());
+
+    // filter out columns that have already been made optional
+    List<MakeOptional> makeOptionals =
+        updates.makeOptionals().stream()
+            .filter(makeOptional -> !isOptional(table.schema(), makeOptional))
+            .collect(Collectors.toList());
+
+    if (addColumns.isEmpty() && updateTypes.isEmpty() && 
makeOptionals.isEmpty()) {
+      // no updates to apply
+      LOG.info("Schema for table {} already up-to-date", table.name());
+      return;
+    }
+
+    // apply the updates
+    UpdateSchema updateSchema = table.updateSchema();
+    addColumns.forEach(
+        update -> updateSchema.addColumn(update.parentName(), update.name(), 
update.type()));
+    updateTypes.forEach(update -> updateSchema.updateColumn(update.name(), 
update.type()));
+    makeOptionals.forEach(update -> 
updateSchema.makeColumnOptional(update.name()));
+    updateSchema.commit();
+    LOG.info("Schema for table {} updated with new columns", table.name());
+  }
+
+  private static boolean columnExists(org.apache.iceberg.Schema schema, 
AddColumn update) {
+    return schema.findType(update.key()) != null;
+  }
+
+  private static boolean typeMatches(org.apache.iceberg.Schema schema, 
UpdateType update) {
+    Type type = schema.findType(update.name());
+    if (type == null) {
+      throw new IllegalArgumentException("Invalid column: " + update.name());
+    }
+    return type.typeId() == update.type().typeId();
+  }
+
+  private static boolean isOptional(org.apache.iceberg.Schema schema, 
MakeOptional update) {
+    NestedField field = schema.findField(update.name());
+    if (field == null) {
+      throw new IllegalArgumentException("Invalid column: " + update.name());
+    }
+    return field.isOptional();
+  }
+
+  public static PartitionSpec createPartitionSpec(
+      org.apache.iceberg.Schema schema, List<String> partitionBy) {
+    if (partitionBy.isEmpty()) {
+      return PartitionSpec.unpartitioned();
+    }
+
+    PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
+    partitionBy.forEach(
+        partitionField -> {
+          Matcher matcher = TRANSFORM_REGEX.matcher(partitionField);
+          if (matcher.matches()) {
+            String transform = matcher.group(1);
+            switch (transform) {
+              case "year":
+              case "years":

Review Comment:
   I think plurals was a carry over from spark transforms and it was not as per 
spec. So, recently we added singular to the same spark class. 
   https://iceberg.apache.org/spec/#partition-transforms
   
   I think we don't have to support  `years, months, days, hours` syntax as it 
is not as per the spec and this connector is nothing to do with spark. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to