This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6d73e58fab Segment lifecycle event listener support (#10536) 6d73e58fab is described below commit 6d73e58fabc92287a829e67ee52f66fb1221111f Author: Saurabh Dubey <saurabhd...@gmail.com> AuthorDate: Wed Apr 26 01:15:38 2023 +0530 Segment lifecycle event listener support (#10536) --- .../pinot/controller/BaseControllerStarter.java | 4 + .../helix/core/SegmentDeletionManager.java | 6 ++ .../PinotSegmentLifecycleEventListener.java | 30 +++++++ .../PinotSegmentLifecycleEventListenerManager.java | 96 ++++++++++++++++++++++ .../lifecycle/SegmentLifecycleEventDetails.java | 29 +++++++ .../lifecycle/SegmentLifecycleEventListener.java | 30 +++++++ .../lifecycle/SegmentLifecycleEventType.java | 23 ++++++ .../impl/SegmentDeletionEventDetails.java | 48 +++++++++++ 8 files changed, 266 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 381097a1b6..3467bc05e0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -98,6 +98,7 @@ import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager; import org.apache.pinot.core.periodictask.PeriodicTask; import org.apache.pinot.core.periodictask.PeriodicTaskScheduler; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; +import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager; import org.apache.pinot.core.transport.ListenerConfig; import org.apache.pinot.core.util.ListenerConfigUtil; import org.apache.pinot.spi.crypt.PinotCrypterFactory; @@ -409,6 +410,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper"); _helixResourceManager.start(_helixParticipantManager); + // Initialize segment lifecycle event listeners + PinotSegmentLifecycleEventListenerManager.getInstance().init(_helixParticipantManager); + LOGGER.info("Starting task resource manager"); _helixTaskResourceManager = new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index ecc7d337e8..ebde702b09 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -45,6 +45,8 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.core.segment.processing.lifecycle.PinotSegmentLifecycleEventListenerManager; +import org.apache.pinot.core.segment.processing.lifecycle.impl.SegmentDeletionEventDetails; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.filesystem.PinotFS; @@ -166,6 +168,10 @@ public class SegmentDeletionManager { propStorePathList.add(segmentPropertyStorePath); } + // Notify all active listeners here + PinotSegmentLifecycleEventListenerManager.getInstance() + .notifyListeners(new SegmentDeletionEventDetails(tableName, segmentsToDelete)); + boolean[] deleteSuccessful = _propertyStore.remove(propStorePathList, AccessOption.PERSISTENT); List<String> propStoreFailedSegs = new ArrayList<>(segmentsToDelete.size()); for (int i = 0; i < deleteSuccessful.length; i++) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java new file mode 100644 index 0000000000..c7c510ae2d --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle; + +import org.apache.helix.HelixManager; + + +public interface PinotSegmentLifecycleEventListener { + SegmentLifecycleEventType getType(); + + void init(HelixManager helixManager); + + void onEvent(SegmentLifecycleEventDetails event); +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java new file mode 100644 index 0000000000..5269c3f797 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/PinotSegmentLifecycleEventListenerManager.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixManager; +import org.apache.pinot.spi.utils.PinotReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PinotSegmentLifecycleEventListenerManager { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotSegmentLifecycleEventListenerManager.class); + private static final PinotSegmentLifecycleEventListenerManager INSTANCE = + new PinotSegmentLifecycleEventListenerManager(); + private Map<SegmentLifecycleEventType, List<PinotSegmentLifecycleEventListener>> _eventTypeToListenersMap; + private boolean _initialized = false; + + private PinotSegmentLifecycleEventListenerManager() { + } + + public static PinotSegmentLifecycleEventListenerManager getInstance() { + return INSTANCE; + } + + public synchronized void init(HelixManager helixZkManager) { + if (_initialized) { + LOGGER.warn("Segment lifecycle event listener manager already initialized, skipping it"); + return; + } + _eventTypeToListenersMap = new HashMap<>(); + Set<Class<?>> classes = + PinotReflectionUtils.getClassesThroughReflection(".*\\.plugin\\.segment\\.lifecycle\\.listener\\..*", + SegmentLifecycleEventListener.class); + for (Class<?> clazz : classes) { + SegmentLifecycleEventListener annotation = clazz.getAnnotation(SegmentLifecycleEventListener.class); + if (annotation.enabled()) { + try { + PinotSegmentLifecycleEventListener pinotSegmentLifecycleEventListener = + (PinotSegmentLifecycleEventListener) clazz.newInstance(); + pinotSegmentLifecycleEventListener.init(helixZkManager); + _eventTypeToListenersMap.compute(pinotSegmentLifecycleEventListener.getType(), (key, list) -> { + if (list == null) { + list = new ArrayList<>(); + } + list.add(pinotSegmentLifecycleEventListener); + return list; + }); + } catch (Exception e) { + LOGGER.error("Caught exception while initializing segment lifecyle event listener : {}, skipping it", clazz, + e); + } + } + } + + _initialized = true; + } + + public void notifyListeners(SegmentLifecycleEventDetails event) { + if (!_initialized) { + LOGGER.warn("Segment lifecycle event listener manager not initialized, skipping it"); + return; + } + + List<PinotSegmentLifecycleEventListener> listeners = _eventTypeToListenersMap.get(event.getType()); + if (listeners != null) { + for (PinotSegmentLifecycleEventListener listener : listeners) { + try { + listener.onEvent(event); + } catch (Exception e) { + LOGGER.error("Segment lifecycle listener call failed : ", e); + } + } + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java new file mode 100644 index 0000000000..c59435455c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventDetails.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle; + +import java.util.List; + +public interface SegmentLifecycleEventDetails { + SegmentLifecycleEventType getType(); + + List<String> getSegments(); + + String getTableNameWithType(); +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java new file mode 100644 index 0000000000..0a4225e529 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventListener.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface SegmentLifecycleEventListener { + boolean enabled() default true; +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java new file mode 100644 index 0000000000..9559b9547f --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/SegmentLifecycleEventType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle; + +public enum SegmentLifecycleEventType { + DELETION +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java new file mode 100644 index 0000000000..eb839a26a6 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/lifecycle/impl/SegmentDeletionEventDetails.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.lifecycle.impl; + +import java.util.List; +import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventDetails; +import org.apache.pinot.core.segment.processing.lifecycle.SegmentLifecycleEventType; + +public class SegmentDeletionEventDetails implements SegmentLifecycleEventDetails { + private final List<String> _segmentsDeleted; + private final String _tableName; + + public SegmentDeletionEventDetails(String tableName, List<String> segmentsDeleted) { + _tableName = tableName; + _segmentsDeleted = segmentsDeleted; + } + + @Override + public SegmentLifecycleEventType getType() { + return SegmentLifecycleEventType.DELETION; + } + + @Override + public List<String> getSegments() { + return _segmentsDeleted; + } + + @Override + public String getTableNameWithType() { + return _tableName; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org