adutra commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2152255806
##########
polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java:
##########
@@ -59,7 +59,7 @@ public interface PolarisConfigurationStore {
* @param <T> the type of the configuration value
*/
default <T> @Nonnull T getConfiguration(
- @Nonnull RealmContext realmContext, String configName, @Nonnull T
defaultValue) {
+ RealmContext realmContext, String configName, @Nonnull T defaultValue) {
Review Comment:
Why this change?
##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {
+ public static final String EMPTY_MAP_STRING = "{}";
+
+ // to serialize/deserialize properties
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ // catalog id
+ private String catalogId;
+
+ // event id
+ private String id;
+
+ // id of the request that generated this event
+ private String requestId;
+
+ // event type that was fired
+ private String eventType;
+
+ // timestamp in epoch milliseconds of when this event was emitted
+ private long timestampMs;
Review Comment:
I'm looking at `org.apache.polaris.service.events.PolarisEvent` and I don't
see how the `Clock` is being injected, since this class hierarchy is not
managed by CDI. So I don't see how this field is going to be populated.
##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java:
##########
@@ -225,6 +228,39 @@ public void writeToGrantRecords(
}
}
+ @Override
+ public void writeEvents(@Nonnull List<PolarisEvent> events) {
+ int batchSize = 10;
+
+ try {
+ datasourceOperations.runWithinTransaction(
Review Comment:
If I understand this correctly, events are being persisted in their own
transaction, and only after a buffer flush. This means that events, even
"before" events, could only show up in queries way after the business operation
they refer to has been persisted. Are we OK with that?
It also means that if a transaction writing events fail, we could end up
with dozens of missing events in the database. Are we OK with that?
I guess my base question is: do we have a clear understanding of which
minimal consistency and ordering guarantees users can expect from the events
subsystem?
##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java:
##########
@@ -135,6 +135,47 @@ public static PreparedQuery generateInsertQuery(
return new PreparedQuery(sql, finalValues);
}
+ /**
+ * Generates an INSERT query for multiple values into a given table.
+ *
+ * @param allColumns Columns to insert values into.
+ * @param tableName Target table name.
+ * @param values Values for each column (must match order of columns).
+ * @param realmId Realm value to append.
+ * @return INSERT query with value bindings.
+ */
+ public static PreparedQuery generateMultipleInsertQuery(
Review Comment:
This is imo very risky and not the right way to do batch updates. First off,
Postgres has a limit of 65,535 query parameters, which users could hit here
depending on the number of rows to persist. Second, you should probably look
into JDBC batch updates instead.
##########
service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java:
##########
@@ -21,14 +21,35 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
-/**
- * Emitted after Polaris performs a commit to a table. This is not emitted if
there's an exception
- * while committing.
- *
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
-public record AfterTableCommitedEvent(
- TableIdentifier identifier, TableMetadata base, TableMetadata metadata)
- implements PolarisEvent {}
+public final class AfterTableCommitedEvent extends PolarisEvent {
Review Comment:
Why this change? Imo a record was better.
##########
service/common/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+
+/** Emitted when Polaris intends to create a catalog. */
+public final class AfterCatalogCreatedEvent extends PolarisEvent {
Review Comment:
These classes could all be records and save thousands of lines.
##########
service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java:
##########
@@ -21,13 +21,36 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.view.ViewMetadata;
-/**
- * Emitted after Polaris performs a commit to a view. This is not emitted if
there's an exception
- * while committing.
- *
- * @param identifier The identifier.
- * @param base The old metadata.
- * @param metadata The new metadata.
- */
-public record AfterViewCommitedEvent(
- TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata)
implements PolarisEvent {}
+public final class AfterViewCommitedEvent extends PolarisEvent {
Review Comment:
I really am -1 on these changes.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
Review Comment:
A non-concurrent map as a final field in an application-scoped bean??? 🤔
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
Review Comment:
Can you explain why do we need sharding here, and how that improves the
throughput or the latency? Do we have benchmarks? As it stands, this looks
over-engineered.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
Review Comment:
Why aren't these fields private and final?
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
Review Comment:
Why are these fields static? And why aren't they final? This class looks
badly designed imho.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
Review Comment:
This thread pool is being created outside of Quarkus which isn't great. You
should probably inject it and use a managed thread pool.
But even more importantly: **it is not being closed at all**.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
+ kryo.register(PolarisEvent.class);
+ kryo.register(PolarisEvent.ResourceType.class);
+
+ // Start BufferListingTask
+ Function<FileBufferListingTask.TaskSubmissionInput, Future>
taskSubmissionFunction =
+ input -> threadPool.schedule(input.task(), input.delayInMs(),
TimeUnit.MILLISECONDS);
+ BiConsumer<String, List<PolarisEvent>> eventWriter =
+ (realmId, polarisEvents) ->
getBasePersistenceInstance(realmId).writeEvents(polarisEvents);
+ var future =
Review Comment:
This future is being ignored. At the very least, this class should have a
destroy method that cancels this future (and closes the thread pool).
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
+ kryo.register(PolarisEvent.class);
+ kryo.register(PolarisEvent.ResourceType.class);
+
+ // Start BufferListingTask
+ Function<FileBufferListingTask.TaskSubmissionInput, Future>
taskSubmissionFunction =
+ input -> threadPool.schedule(input.task(), input.delayInMs(),
TimeUnit.MILLISECONDS);
+ BiConsumer<String, List<PolarisEvent>> eventWriter =
+ (realmId, polarisEvents) ->
getBasePersistenceInstance(realmId).writeEvents(polarisEvents);
+ var future =
+ threadPool.schedule(
+ new FileBufferListingTask(
+ getBufferDirectory(),
+ taskSubmissionFunction,
+ activeFlushFutures,
+ timeToFlush,
+ clock,
+ this::rotateShard,
+ eventWriter),
+ timeToFlush,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+ int shardNum = polarisEvent.hashCode() % shardCount;
+ String realmId = callCtx.getRealmContext().getRealmIdentifier();
+ if (!buffers.containsKey(realmId)) {
+ createBuffersForRealm(realmId);
+ }
+ BufferShard bufferShard = buffers.get(realmId).get(shardNum);
+ if (bufferShard == null) {
+ LOGGER.error(
+ "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}",
+ realmId,
+ shardNum,
+ polarisEvent);
+ return;
+ }
+
+ kryo.writeObject(bufferShard.output, polarisEvent);
+ bufferShard.output.flush();
+
+ // If too many events in this buffer shard, start a new shard
+ int bufferEventCount = bufferShard.eventCount.getAndIncrement() + 1;
+ if (bufferEventCount >= maxBufferSize) {
+ rotateShard(realmId, shardNum);
+ }
+ }
+
+ private void createBuffersForRealm(String realmId) {
+ HashMap<Integer, BufferShard> bufferShardsForRealm = new HashMap<>();
+ buffers.put(realmId, bufferShardsForRealm);
+ for (int i = 0; i < shardCount; i++) {
+ bufferShardsForRealm.put(i, createShard(realmId, i));
+ }
+ }
+
+ private BufferShard createShard(String realmId, int shardNum) {
+ String bufferDirName =
+ getBufferDirectory() + realmId + "/" + BUFFER_SHARD_PREFIX + shardNum
+ "/";
+ File file = new File(bufferDirName + "buffer_timestamp-" + clock.millis());
+ File parent = file.getParentFile();
+ if (parent != null && !parent.exists()) {
+ parent.mkdirs(); // Creates all missing parent directories
+ }
+ try {
+ file.createNewFile();
+ Output output = new Output(new FileOutputStream(file));
Review Comment:
Unclosed output stream.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
+ kryo.register(PolarisEvent.class);
+ kryo.register(PolarisEvent.ResourceType.class);
+
+ // Start BufferListingTask
+ Function<FileBufferListingTask.TaskSubmissionInput, Future>
taskSubmissionFunction =
+ input -> threadPool.schedule(input.task(), input.delayInMs(),
TimeUnit.MILLISECONDS);
Review Comment:
There is no way here to cancel the `ScheduledFuture`. Are we OK with that?
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
+ kryo.register(PolarisEvent.class);
+ kryo.register(PolarisEvent.ResourceType.class);
+
+ // Start BufferListingTask
+ Function<FileBufferListingTask.TaskSubmissionInput, Future>
taskSubmissionFunction =
+ input -> threadPool.schedule(input.task(), input.delayInMs(),
TimeUnit.MILLISECONDS);
+ BiConsumer<String, List<PolarisEvent>> eventWriter =
+ (realmId, polarisEvents) ->
getBasePersistenceInstance(realmId).writeEvents(polarisEvents);
+ var future =
+ threadPool.schedule(
+ new FileBufferListingTask(
+ getBufferDirectory(),
+ taskSubmissionFunction,
+ activeFlushFutures,
+ timeToFlush,
+ clock,
+ this::rotateShard,
+ eventWriter),
+ timeToFlush,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+ int shardNum = polarisEvent.hashCode() % shardCount;
+ String realmId = callCtx.getRealmContext().getRealmIdentifier();
+ if (!buffers.containsKey(realmId)) {
+ createBuffersForRealm(realmId);
+ }
+ BufferShard bufferShard = buffers.get(realmId).get(shardNum);
+ if (bufferShard == null) {
+ LOGGER.error(
+ "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}",
+ realmId,
+ shardNum,
+ polarisEvent);
+ return;
+ }
+
+ kryo.writeObject(bufferShard.output, polarisEvent);
+ bufferShard.output.flush();
+
+ // If too many events in this buffer shard, start a new shard
+ int bufferEventCount = bufferShard.eventCount.getAndIncrement() + 1;
+ if (bufferEventCount >= maxBufferSize) {
+ rotateShard(realmId, shardNum);
+ }
+ }
+
+ private void createBuffersForRealm(String realmId) {
+ HashMap<Integer, BufferShard> bufferShardsForRealm = new HashMap<>();
+ buffers.put(realmId, bufferShardsForRealm);
+ for (int i = 0; i < shardCount; i++) {
+ bufferShardsForRealm.put(i, createShard(realmId, i));
+ }
+ }
+
+ private BufferShard createShard(String realmId, int shardNum) {
+ String bufferDirName =
Review Comment:
Instead of concatenating path segments, you should rather rely on the `Path`
API.
##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.time.Clock;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-file-buffer")
+public class FileBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class);
+ MetaStoreManagerFactory metaStoreManagerFactory;
+ PolarisConfigurationStore polarisConfigurationStore;
+ Clock clock;
+
+ // Key: str - Realm
+ // Value:
+ // Key: int - shard number
+ // Value: BufferShard - an object representing the directory and file
where events are
+ // persisted on the filesystem
+ private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new
HashMap<>();
+ ConcurrentHashMap<String, Future> activeFlushFutures = new
ConcurrentHashMap<>();
+
+ ScheduledExecutorService threadPool;
+ private static int shardCount;
+ private static int maxBufferSize;
+ private final Kryo kryo = new Kryo();
+ private static final String BUFFER_SHARD_PREFIX =
"polaris-event-buffer-shard-";
+
+ @Inject
+ public FileBufferPolarisPersistenceEventListener(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ PolarisConfigurationStore polarisConfigurationStore,
+ Clock clock) {
+ this.metaStoreManagerFactory = metaStoreManagerFactory;
+ this.polarisConfigurationStore = polarisConfigurationStore;
+ this.clock = clock;
+ shardCount =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS);
+ maxBufferSize =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE);
+ int timeToFlush =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS);
+ int numThreads =
+ polarisConfigurationStore.getConfiguration(
+ null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS);
+ threadPool = Executors.newScheduledThreadPool(numThreads);
+ kryo.register(PolarisEvent.class);
+ kryo.register(PolarisEvent.ResourceType.class);
+
+ // Start BufferListingTask
+ Function<FileBufferListingTask.TaskSubmissionInput, Future>
taskSubmissionFunction =
+ input -> threadPool.schedule(input.task(), input.delayInMs(),
TimeUnit.MILLISECONDS);
+ BiConsumer<String, List<PolarisEvent>> eventWriter =
+ (realmId, polarisEvents) ->
getBasePersistenceInstance(realmId).writeEvents(polarisEvents);
+ var future =
+ threadPool.schedule(
+ new FileBufferListingTask(
+ getBufferDirectory(),
+ taskSubmissionFunction,
+ activeFlushFutures,
+ timeToFlush,
+ clock,
+ this::rotateShard,
+ eventWriter),
+ timeToFlush,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+ int shardNum = polarisEvent.hashCode() % shardCount;
+ String realmId = callCtx.getRealmContext().getRealmIdentifier();
+ if (!buffers.containsKey(realmId)) {
+ createBuffersForRealm(realmId);
+ }
+ BufferShard bufferShard = buffers.get(realmId).get(shardNum);
+ if (bufferShard == null) {
+ LOGGER.error(
+ "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}",
Review Comment:
This doesn't look right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]