npawar commented on a change in pull request #8233: URL: https://github.com/apache/pinot/pull/8233#discussion_r818253261
########## File path: pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink.sink; + +import java.net.URI; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.pinot.connector.flink.common.RecordConverter; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.ingestion.segment.uploader.SegmentUploader; +import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The sink function for Pinot. + * + * @param <T> type of record supported + */ +@SuppressWarnings("NullAway") +public class PinotSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction { + + public static final long DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS = 500000; + public static final int DEFAULT_EXECUTOR_POOL_SIZE = 5; + public static final long DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS = 3000; + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkFunction.class); + + private final long _segmentFlushMaxNumRecords; + private final int _executorPoolSize; + + private final RecordConverter<T> _recordConverter; + + private TableConfig _tableConfig; + private Schema _schema; + + private transient SegmentWriter _segmentWriter; + private transient SegmentUploader _segmentUploader; + private transient ExecutorService _executor; + private transient long _segmentNumRecord; + + public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig tableConfig, Schema schema) { + this(recordConverter, tableConfig, schema, DEFAULT_SEGMENT_FLUSH_MAX_NUM_RECORDS, DEFAULT_EXECUTOR_POOL_SIZE); + } + + public PinotSinkFunction(RecordConverter<T> recordConverter, TableConfig tableConfig, Schema schema, + long segmentFlushMaxNumRecords, int executorPoolSize) { + _recordConverter = recordConverter; + _tableConfig = tableConfig; + _schema = schema; + _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords; + _executorPoolSize = executorPoolSize; + } + + @Override + public void open(Configuration parameters) + throws Exception { + int indexOfSubtask = this.getRuntimeContext().getIndexOfThisSubtask(); + _segmentWriter = new FlinkSegmentWriter(indexOfSubtask, getRuntimeContext().getMetricGroup()); + _segmentWriter.init(_tableConfig, _schema); + _segmentUploader = new FlinkSegmentUploader(); + _segmentUploader.init(_tableConfig); + _segmentNumRecord = 0; + _executor = Executors.newFixedThreadPool(_executorPoolSize); + LOG.info("Open Pinot Sink with the table {}", _tableConfig.toJsonString()); + } + + @Override + public void close() + throws Exception { + LOG.info("Closing Pinot Sink"); + try { + if (_segmentNumRecord > 0) { + flush(); + } + } catch (Exception e) { + LOG.error("Error when closing Pinot sink", e); + } + _executor.shutdown(); + try { + if (!_executor.awaitTermination(DEFAULT_EXECUTOR_SHUTDOWN_WAIT_MS, TimeUnit.MILLISECONDS)) { + _executor.shutdownNow(); + } + } catch (InterruptedException e) { + _executor.shutdownNow(); + } + _segmentWriter.close(); Review comment: do this in finally? ########## File path: pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/common/PinotRowRecordConverter.java ########## @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink.common; + +import java.util.Map; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.pinot.spi.data.readers.GenericRow; + + +/** Converts {@link Row} type data into {@link GenericRow} format. */ +public class PinotRowRecordConverter implements RecordConverter<Row> { + + private final RowTypeInfo _rowTypeInfo; + private final String[] _fieldNames; + private final TypeInformation<?>[] _fieldTypes; + + public PinotRowRecordConverter(RowTypeInfo rowTypeInfo) { + _rowTypeInfo = rowTypeInfo; + _fieldNames = rowTypeInfo.getFieldNames(); + _fieldTypes = rowTypeInfo.getFieldTypes(); Review comment: this is unused. remove? ########## File path: pinot-connectors/pinot-flink-connector/README.md ########## @@ -0,0 +1,47 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +# Flink-Pinot Connector + +Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables, +including the upsert tables. You can read more about the motivation and design in this [design proposal](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177045634). + +## Quick Start +```java +StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); +execEnv.setParallelism(2); +DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0)); + +PinotControllerClient client = new PinotControllerClient(); +// fetch Pinot schema +Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores"); +// fetch Pinot table config +TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE"); +// create Flink Pinot Sink +srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema)); +execEnv.execute(); +``` + +For more examples, please see `src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java` + +## Notes for backfilling upsert table + - To correctly partition the output segments by the primary key, the Flink job *must* also include the partitionBeyKey operator before the Sink operator Review comment: typo s/partitionBeyKey/partitionByKey ########## File path: pinot-connectors/pinot-flink-connector/README.md ########## @@ -0,0 +1,47 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +# Flink-Pinot Connector + +Flink connector to write data to Pinot directly. This is useful for backfilling or bootstrapping tables, +including the upsert tables. You can read more about the motivation and design in this [design proposal](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177045634). + +## Quick Start +```java +StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); +execEnv.setParallelism(2); +DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO).keyBy(r -> r.getField(0)); + +PinotControllerClient client = new PinotControllerClient(); +// fetch Pinot schema +Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores"); +// fetch Pinot table config +TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE"); +// create Flink Pinot Sink +srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO), tableConfig, schema)); +execEnv.execute(); +``` + +For more examples, please see `src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java` + +## Notes for backfilling upsert table + - To correctly partition the output segments by the primary key, the Flink job *must* also include the partitionBeyKey operator before the Sink operator + - The parallelism of the job *must* be set the same as the number of partitions of the Pinot table, so that the sink in each task executor can generate the segment of same partitions. + - It’s important to plan the resource usage to avoid capacity issues such as out of memory. In particular, Pinot sink has an in-memory buffer of records, and it flushes when the threshold is reached. Currently, the threshold on the number of records is supported via the config of `segmentFlushMaxNumRecords`. In the future, we could add other types of threshold such as the memory usage of the buffer. Review comment: it's not an in-memory buffer right? collect writes each record to file. the capacity issue to call out is out of disk space ########## File path: pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink.util; + +import java.io.File; +import java.io.IOException; +import javax.annotation.Nonnull; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; + + +public final class TestUtils { Review comment: can the integration test move to pinot-integration-tests? 1. it should ideally be there with all others 2. you won't have to write this util class just for flink connector, as it's already there for use in ITs ########## File path: pinot-connectors/pinot-flink-connector/src/test/resources/fixtures/pinotTableSchema.json ########## @@ -0,0 +1,59 @@ +{ + "schemaName": "demand", + "dimensionFieldSpecs": [ + { + "name": "demand_uuid", + "singleValueField": true, + "dataType": "STRING", + "virtualColumnProvider": null, + "defaultNullValue": "null", + "transformFunction": null, + "maxLength" : 4096 + }, + { + "name": "geofence", + "singleValueField": true, + "dataType": "STRING", + "virtualColumnProvider": null, + "defaultNullValue": "null", + "transformFunction": null + } + ], + "metricFieldSpecs": [ + { + "dataType": "DOUBLE", + "singleValueField": true, + "fieldSize": 8, + "derivedMetricType": null, + "name": "surge_multiplier", + "virtualColumnProvider": null, + "defaultNullValue": 0.0, + "transformFunction": null + } + ], + "dateTimeFieldSpecs": [ + ], + "timeFieldSpec": { Review comment: nit: use the dateTimeFieldSpec. often our oss users look at the quickstarts and copy the configs. we don't want anyone to be using timeFieldSpec anymore ########## File path: pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/FlinkQuickStart.java ########## @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.pinot.connector.flink.common.PinotRowRecordConverter; +import org.apache.pinot.connector.flink.http.PinotConnectionUtils; +import org.apache.pinot.connector.flink.http.PinotControllerClient; +import org.apache.pinot.connector.flink.sink.PinotSinkFunction; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; + + +/** + * A quick start to populate a segment into Pinot Table using the connector. Please run the GenericQuickStart to create + * the offline table of all Starbucks store locations in US, and then run this quick start to populate other Starbucks + * stores in the rest of the world. + */ +public final class FlinkQuickStart { Review comment: can this just be in pinot-tools with the rest of the Quickstarts? ########## File path: pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/common/PinotMapRecordConverter.java ########## @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink.common; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.data.readers.GenericRow; + + +public class PinotMapRecordConverter implements RecordConverter<Map<String, Object>> { Review comment: (optional) this name is not the very intuitive.. How about PinotGenericRowConverter as interface, and MapGenericRowConverter and FlinkRowGenericRowConverter as impls? ########## File path: pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/util/TestUtils.java ########## @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.connector.flink.util; + +import java.io.File; +import java.io.IOException; +import javax.annotation.Nonnull; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; + + +public final class TestUtils { + + private TestUtils() { + } + + public static void ensureDirectoriesExistAndEmpty(@Nonnull File... dirs) + throws IOException { + File[] var1 = dirs; + int var2 = dirs.length; + + for (int var3 = 0; var3 < var2; var3++) { Review comment: if you do decide to keep this class, change the var names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org