yupeng9 commented on a change in pull request #8233:
URL: https://github.com/apache/pinot/pull/8233#discussion_r815369593



##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/HttpClient.java
##########
@@ -0,0 +1,130 @@
+/**

Review comment:
       It's used in the quickstart to show how to use REST API to fetch the 
schema and table config to config the connector. Otherwise developer needs to 
manually craft those configs. 
   
   Also, you can take a look at `PinotConnectionUtils` for the useful config 
management.

##########
File path: 
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
##########
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.flink.sink;
+
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.pinot.connector.flink.common.RecordConverter;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader;
+import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The sink function for Pinot.
+ *
+ * <p>This version of the sink function doesn't leverage {@link SegmentWriter} 
API's ability buffer
+ * data and also share that data with checkpoint state. Instead it uses an 
internal buffer within
+ * PinotSinkFunction for checkpoint.
+ *
+ * <p>This should change once we introduce FlinkPinotSegmentWriter
+ *
+ * @param <T> type of record supported
+ */
+@SuppressWarnings("NullAway")
+public class PinotSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
+
+  public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000;
+  public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5;
+  public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000;
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkFunction.class);
+
+  private final long _segmentFlushMaxNumRecords;
+  private final int _executorPoolSize;
+
+  private final RecordConverter<T> _recordConverter;
+
+  private TableConfig _tableConfig;
+  private Schema _schema;
+
+  private transient SegmentWriter _segmentWriter;
+  private transient SegmentUploader _segmentUploader;
+  private transient ExecutorService _executor;
+  private transient long _segmentNumRecord;
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema) {
+    this(recordConverter, tableConfig, schema, 
DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE);
+  }
+
+  public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig 
tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize) {
+    _recordConverter = recordConverter;
+    _tableConfig = tableConfig;
+    _schema = schema;
+    _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
+    _executorPoolSize = executorPoolSize;
+  }
+
+  @Override
+  public void open(Configuration parameters)
+      throws Exception {
+    int indexOfSubtask = this.getRuntimeContext().getIndexOfThisSubtask();
+    // TODO improve segment uploader to use in-memory buffer then flush to tar 
file.
+    _segmentWriter = new FlinkSegmentWriter(indexOfSubtask, 
getRuntimeContext().getMetricGroup());
+    _segmentWriter.init(_tableConfig, _schema);
+    // TODO improve segment uploader to take in-memory tar
+    // TODO launch segment uploader as separate thread for uploading 
(non-blocking?)
+    _segmentUploader = new FlinkSegmentUploader(indexOfSubtask);
+    _segmentUploader.init(_tableConfig);
+    _segmentNumRecord = 0;
+    _executor = Executors.newFixedThreadPool(_executorPoolSize);
+    LOG.info("Open Pinot Sink with the table {}", _tableConfig.toJsonString());
+  }
+
+  @Override
+  public void close()
+      throws Exception {
+    LOG.info("Closing Pinot Sink");
+    try {
+      if (_segmentNumRecord > 0) {
+        flush();
+      }
+    } catch (Exception e) {
+      LOG.error("Error when closing Pinot sink", e);
+    }
+    _executor.shutdown();
+    try {
+      if (!_executor.awaitTermination(DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS, 
TimeUnit.MILLISECONDS)) {
+        _executor.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      _executor.shutdownNow();
+    }
+    _segmentWriter.close();
+  }
+
+  @Override
+  public void invoke(T value, Context context)
+      throws Exception {
+    _segmentWriter.collect(_recordConverter.convertToRow(value));
+    _segmentNumRecord++;
+    if (_segmentNumRecord > _segmentFlushMaxNumRecords) {
+      flush();
+    }
+  }
+
+  @Override
+  public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
+      throws Exception {
+    // TODO: not supported yet
+    LOG.error("snapshotState is invoked in Pinot sink");
+    // clear and flush.
+    flush();
+    // snapshot state:
+    // 1. should only work on the boundary of segment uploader.
+    // 2. segmentwriter state should be preserved.
+    // 3.
+    // ...
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext 
functionInitializationContext)
+      throws Exception {
+    // no initialization needed
+    // ...
+  }
+
+  private void flush()

Review comment:
       Is it possible to have a race conditon? Only `invoke` can call it, since 
`snapshotState` is not used in batch processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to