walterddr commented on a change in pull request #8233:
URL: https://github.com/apache/pinot/pull/8233#discussion_r815327278



##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/common/PinotMapRecordConverter.java
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.common;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class PinotMapRecordConverter implements RecordConverter<Map> {

Review comment:
       ```suggestion
   public class PinotMapRecordConverter implements RecordConverter<Map<String, 
Object>> {
   ```
   to avoid unchecked casts.

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.pinot.connector.flink.common.RecordConverter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The sink function for Pinot.
+ *
+ * <p>This version of the sink function doesn't leverage {@link SegmentWriter} 
API's ability buffer
+ * data and also share that data with checkpoint state. Instead it uses an 
internal buffer within
+ * PinotSinkFunction for checkpoint.
+ *
+ * <p>This should change once we introduce FlinkPinotSegmentWriter

Review comment:
       this javadoc is outdated.

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentUploader.java
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.spi.auth.AuthContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Default implementation of {@link SegmentUploader} with support for all push 
modes The configs for
+ * push are fetched from batchConfigMaps of tableConfig
+ */
+@SuppressWarnings("NullAway")
+public class FlinkSegmentUploader implements SegmentUploader {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSegmentUploader.class);
+  private final int _indexOfSubtask;
+
+  private String _tableNameWithType;
+  private BatchConfig _batchConfig;
+  private BatchIngestionConfig _batchIngestionConfig;
+
+  public FlinkSegmentUploader(int indexOfSubtask) {
+    _indexOfSubtask = indexOfSubtask;

Review comment:
       indexOfSubtask is not used in uploaded. I think it only matters in 
segment writer. 

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link SegmentWriter} implementation that uses a buffer. The {@link 
GenericRow} are written to
+ * the buffer as AVRO records.
+ */
+@SuppressWarnings("NullAway")
+@NotThreadSafe
+public class FlinkSegmentWriter implements SegmentWriter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSegmentWriter.class);
+  private static final FileFormat BUFFER_FILE_FORMAT = FileFormat.AVRO;
+
+  private final int _indexOfSubtask;
+
+  private TableConfig _tableConfig;
+  private String _tableNameWithType;
+  private BatchIngestionConfig _batchIngestionConfig;
+  private BatchConfig _batchConfig;
+  private String _outputDirURI;
+  private Schema _schema;
+  private Set<String> _fieldsToRead;
+  private RecordTransformer _recordTransformer;
+
+  private File _stagingDir;
+  private File _bufferFile;
+  private int _rowCount;
+  /** A sequence ID that increments each time a segment is flushed */
+  private int _seqId;
+
+  private org.apache.avro.Schema _avroSchema;
+  private DataFileWriter<GenericData.Record> _recordWriter;
+  private GenericData.Record _reusableRecord;
+
+  // metrics
+  private transient Counter _processedRecords;
+  private transient volatile long _lastRecordProcessingTimeMs = 0;
+
+  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGrp) {
+    _indexOfSubtask = indexOfSubtask;
+    registerMetrics(metricGrp);
+  }
+
+  @Override
+  public void init(TableConfig tableConfig, Schema schema)
+      throws Exception {
+    init(tableConfig, schema, null);
+  }
+
+  @Override
+  public void init(TableConfig tableConfig, Schema schema, Map<String, String> 
batchConfigOverride)
+      throws Exception {
+    _rowCount = 0;
+    _seqId = 0;
+    _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
+    Preconditions.checkState(
+        _tableConfig.getIngestionConfig() != null && 
_tableConfig.getIngestionConfig().getBatchIngestionConfig() != null
+            && CollectionUtils.isNotEmpty(
+            
_tableConfig.getIngestionConfig().getBatchIngestionConfig().getBatchConfigMaps()),
+        "Must provide ingestionConfig->batchIngestionConfig->batchConfigMaps 
in tableConfig for table: %s",
+        _tableNameWithType);
+    _batchIngestionConfig = 
_tableConfig.getIngestionConfig().getBatchIngestionConfig();
+    Preconditions.checkState(_batchIngestionConfig.getBatchConfigMaps().size() 
== 1,
+        "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
+
+    Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    String segmentNamePostfixProp = String.format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
+        BatchConfigProperties.SEGMENT_NAME_POSTFIX);
+    String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp);
+    segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) : 
segmentSuffix + "_" + _indexOfSubtask;
+    batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);
+
+    _batchConfig = new BatchConfig(_tableNameWithType, batchConfigMap);
+
+    
Preconditions.checkState(StringUtils.isNotBlank(_batchConfig.getOutputDirURI()),
+        "Must provide: %s in batchConfigs for table: %s", 
BatchConfigProperties.OUTPUT_DIR_URI, _tableNameWithType);
+    _outputDirURI = _batchConfig.getOutputDirURI();
+    Files.createDirectories(Paths.get(_outputDirURI));
+
+    _schema = schema;
+    _fieldsToRead = _schema.getColumnNames();
+    _recordTransformer = 
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
+    _avroSchema = 
SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
+    _reusableRecord = new GenericData.Record(_avroSchema);
+
+    // Create tmp dir
+    // TODO staging Dir also need to be subtask separated otherwise there will 
be write conflict.

Review comment:
       clean up TODO

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
##########
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.util.SegmentProcessorAvroUtils;
+import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
+import org.apache.pinot.segment.local.recordtransformer.RecordTransformer;
+import org.apache.pinot.segment.local.utils.IngestionUtils;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.ingestion.batch.spec.Constants;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link SegmentWriter} implementation that uses a buffer. The {@link 
GenericRow} are written to
+ * the buffer as AVRO records.
+ */
+@SuppressWarnings("NullAway")
+@NotThreadSafe
+public class FlinkSegmentWriter implements SegmentWriter {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkSegmentWriter.class);
+  private static final FileFormat BUFFER_FILE_FORMAT = FileFormat.AVRO;
+
+  private final int _indexOfSubtask;
+
+  private TableConfig _tableConfig;
+  private String _tableNameWithType;
+  private BatchIngestionConfig _batchIngestionConfig;
+  private BatchConfig _batchConfig;
+  private String _outputDirURI;
+  private Schema _schema;
+  private Set<String> _fieldsToRead;
+  private RecordTransformer _recordTransformer;
+
+  private File _stagingDir;
+  private File _bufferFile;
+  private int _rowCount;
+  /** A sequence ID that increments each time a segment is flushed */
+  private int _seqId;
+
+  private org.apache.avro.Schema _avroSchema;
+  private DataFileWriter<GenericData.Record> _recordWriter;
+  private GenericData.Record _reusableRecord;
+
+  // metrics
+  private transient Counter _processedRecords;
+  private transient volatile long _lastRecordProcessingTimeMs = 0;
+
+  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGrp) {

Review comment:
       nit: Refactor common code between FlinkSegmentWriter and 
FileBaseSegmentWriter but can be done in separate PR

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.pinot.connector.flink.common.RecordConverter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The sink function for Pinot.
+ *
+ * <p>This version of the sink function doesn't leverage {@link SegmentWriter} 
API's ability buffer
+ * data and also share that data with checkpoint state. Instead it uses an 
internal buffer within
+ * PinotSinkFunction for checkpoint.
+ *
+ * <p>This should change once we introduce FlinkPinotSegmentWriter
+ *
+ * @param <T> type of record supported
+ */
+@SuppressWarnings("NullAway")
+public class PinotSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
+
+  public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
+  public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
+  public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkFunction.class);
+
+  private final long _segmentFlushMaxNumRecords;
+  private final int _executorPoolSize;
+
+  private final RecordConverter<T> _recordConverter;
+
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  private transient SegmentWriter _segmentWriter;
+  private transient SegmentUploader _segmentUploader;
+  private transient ExecutorService _executor;
+  private transient long _segmentNumRecord;
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema) {
+    this(recordConverter, tableConfig, schema, 
DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
+  }
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize) {
+    _recordConverter = recordConverter;
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
+    _executorPoolSize = executorPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters)
+      throws Exception {
+    int indexOfSubtask = this.getRuntimeContext().getIndexOfThisSubtask();
+    // TODO improve segment uploader to use in-memory buffer then flush to tar 
file.
+    _segmentWriter = new FlinkSegmentWriter(indexOfSubtask, 
getRuntimeContext().getMetricGroup());
+    _segmentWriter.init(_tableConfig, _schema);
+    // TODO improve segment uploader to take in-memory tar
+    // TODO launch segment uploader as separate thread for uploading 
(non-blocking?)
+    _segmentUploader = new FlinkSegmentUploader(indexOfSubtask);
+    _segmentUploader.init(_tableConfig);
+    _segmentNumRecord = 0;
+    _executor = Executors.newFixedThreadPool(_executorPoolSize);
+    LOG.info("Open Pinot Sink with the table {}", _tableConfig.toJsonString());
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    LOG.info("Closing Pinot Sink");
+    try {
+      if (_segmentNumRecord > 0) {
+        flush();
+      }
+    } catch (Exception e) {
+      LOG.error("Error when closing Pinot sink", e);
+    }
+    _executor.shutdown();
+    try {
+      if (!_executor.awaitTermination(DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS, 
TimeUnit.MILLISECONDS)) {
+        _executor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      _executor.shutdownNow();
+    }
+    _segmentWriter.close();
+  }
+
+  @Override
+  public void invoke(T value, Context context)
+      throws Exception {
+    _segmentWriter.collect(_recordConverter.convertToRow(value));
+    _segmentNumRecord++;
+    if (_segmentNumRecord > _segmentFlushMaxNumRecords) {
+      flush();
+    }
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
+      throws Exception {
+    // TODO: not supported yet
+    LOG.error("snapshotState is invoked in Pinot sink");
+    // clear and flush.
+    flush();
+    // snapshot state:
+    // 1. should only work on the boundary of segment uploader.
+    // 2. segmentwriter state should be preserved.
+    // 3.
+    // ...
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext)
+      throws Exception {
+    // no initialization needed
+    // ...
+  }
+
+  private void flush()

Review comment:
       potential race condition when snapshot and invoke both calls flush?

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/HttpClient.java
##########
@@ -0,0 +1,130 @@
+/**

Review comment:
       I might've been wrong but I dont see the entire package of 
`org.apache.pinot.connector.flink.http` used in anywhere outside of this 
package.
   
   is this module relevant to the flink connector PR at all? or this is simply 
used for integration/e2e test as an easy util? 
   
   i've deleted this entire module and tests runs just fine, so if not relevant 
I suggest we delete this module 

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.pinot.connector.flink.common.RecordConverter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The sink function for Pinot.
+ *
+ * <p>This version of the sink function doesn't leverage {@link SegmentWriter} 
API's ability buffer
+ * data and also share that data with checkpoint state. Instead it uses an 
internal buffer within
+ * PinotSinkFunction for checkpoint.
+ *
+ * <p>This should change once we introduce FlinkPinotSegmentWriter
+ *
+ * @param <T> type of record supported
+ */
+@SuppressWarnings("NullAway")
+public class PinotSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
+
+  public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
+  public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
+  public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkFunction.class);
+
+  private final long _segmentFlushMaxNumRecords;
+  private final int _executorPoolSize;
+
+  private final RecordConverter<T> _recordConverter;
+
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  private transient SegmentWriter _segmentWriter;
+  private transient SegmentUploader _segmentUploader;
+  private transient ExecutorService _executor;
+  private transient long _segmentNumRecord;
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema) {
+    this(recordConverter, tableConfig, schema, 
DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
+  }
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize) {
+    _recordConverter = recordConverter;
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
+    _executorPoolSize = executorPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters)
+      throws Exception {
+    int indexOfSubtask = this.getRuntimeContext().getIndexOfThisSubtask();
+    // TODO improve segment uploader to use in-memory buffer then flush to tar 
file.
+    _segmentWriter = new FlinkSegmentWriter(indexOfSubtask, 
getRuntimeContext().getMetricGroup());
+    _segmentWriter.init(_tableConfig, _schema);
+    // TODO improve segment uploader to take in-memory tar
+    // TODO launch segment uploader as separate thread for uploading 
(non-blocking?)

Review comment:
       clean up TODOs. 

##########
File path: pinot-connectors/pinot-flink-connector/pom.xml
##########
@@ -0,0 +1,173 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pinot</groupId>
+    <artifactId>pinot-connectors</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>flink-pinot-sink</artifactId>
+  <name>Pinot Flink Connector</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../..</pinot.root>
+    <scala.version>2.12</scala.version>
+    <flink.version>1.12.0</flink.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport-native-unix-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.inject</groupId>
+      <artifactId>jersey-hk2</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-json-jackson</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-segment-writer-file-based</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-segment-uploader-default</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+
+    <!-- Test Dependencies -->
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.version}</artifactId>
+      <version>${flink.version}</version>
+    </dependency>

Review comment:
       add `<scope>test</scope>` to these 2 dependencies.

##########
File path: pinot-connectors/pinot-flink-connector/pom.xml
##########
@@ -0,0 +1,173 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pinot</groupId>
+    <artifactId>pinot-connectors</artifactId>
+    <version>0.10.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <artifactId>flink-pinot-sink</artifactId>

Review comment:
       ```suggestion
     <artifactId>pinot-flink-connector</artifactId>
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to