This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
commit a17e58e5dec158c1862c42367cb13b7190e33706 Author: Martin Desruisseaux <martin.desruisse...@geomatys.com> AuthorDate: Wed Aug 14 14:09:31 2024 +0200 Add read/write lock for databases that do not support concurrent transactions. This is the case of SQLite used for Geopackage. --- .../apache/sis/storage/sql/feature/Database.java | 14 +++++++ .../sis/storage/sql/feature/FeatureStream.java | 4 ++ .../org/apache/sis/util/stream/DeferredStream.java | 49 +++++++++++++++++++--- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/Database.java b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/Database.java index ed49071ea5..75c9a39306 100644 --- a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/Database.java +++ b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/Database.java @@ -28,6 +28,8 @@ import java.util.ArrayList; import java.util.Locale; import java.util.Optional; import java.util.logging.LogRecord; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.sql.Array; import java.sql.Connection; import java.sql.DatabaseMetaData; @@ -202,6 +204,17 @@ public class Database<G> extends Syntax { */ private SelectionClauseWriter filterToSQL; + /** + * The lock for read or write operations in the SQL database, or {@code null} if none. + * The read or write lock should be obtained before to get a connection for executing + * a statement, and released after closing the connection. Locking is assumed unneeded + * for obtaining database metadata. + * + * <p>This field should be null if the database manages concurrent transactions by itself. + * It is non-null only as a workaround for databases that do not support concurrency.</p> + */ + protected final ReadWriteLock transactionLocks; + /** * Where to send warnings. * @@ -280,6 +293,7 @@ public class Database<G> extends Syntax { supportsSchemas = metadata.supportsSchemasInDataManipulation(); supportsJavaTime = dialect.supportsJavaTime(); crsEncodings = EnumSet.noneOf(CRSEncoding.class); + transactionLocks = dialect.supportsConcurrency() ? null : new ReentrantReadWriteLock(); } /** diff --git a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/FeatureStream.java b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/FeatureStream.java index 196c9887cd..4b9c31c00e 100644 --- a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/FeatureStream.java +++ b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/storage/sql/feature/FeatureStream.java @@ -327,6 +327,7 @@ final class FeatureStream extends DeferredStream<Feature> { if (selection != null && !selection.isEmpty()) { sql.append(" WHERE ").append(selection.toString()); } + lock(table.database.transactionLocks); try (Connection connection = getConnection()) { makeReadOnly(connection); try (Statement st = connection.createStatement(); @@ -339,6 +340,8 @@ final class FeatureStream extends DeferredStream<Feature> { } } catch (SQLException e) { throw new BackingStoreException(e); + } finally { + unlock(); } return Math.max(super.count() - offset, 0); } @@ -395,6 +398,7 @@ final class FeatureStream extends DeferredStream<Feature> { final String filter = (selection != null && !selection.isEmpty()) ? selection.toString() : null; selection = null; // Let the garbage collector do its work. + lock(table.database.transactionLocks); final Connection connection = getConnection(); setCloseHandler(connection); // Executed only if `FeatureIterator` creation fails, discarded later otherwise. makeReadOnly(connection); diff --git a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/util/stream/DeferredStream.java b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/util/stream/DeferredStream.java index 849dbba28a..56894d613b 100644 --- a/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/util/stream/DeferredStream.java +++ b/endorsed/src/org.apache.sis.storage.sql/main/org/apache/sis/util/stream/DeferredStream.java @@ -19,6 +19,8 @@ package org.apache.sis.util.stream; import java.util.Spliterator; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.sis.util.Exceptions; import org.apache.sis.util.collection.BackingStoreException; @@ -66,19 +68,35 @@ public abstract class DeferredStream<T> extends StreamWrapper<T> { */ AutoCloseable handler; + /** + * If there is a read or write lock to unlock, that lock. Otherwise {@code null}. + * This is non-null only with databases that do not support concurrent transactions well. + * + * @see #lock(ReadWriteLock) + */ + Lock lock; + /** * Invoked by the stream for disposing the resources. */ @Override public void run() { final AutoCloseable h = handler; + final Lock c = lock; handler = null; - if (h != null) try { - h.close(); - } catch (RuntimeException e) { - throw e; - } catch (Exception e) { - throw new BackingStoreException(e); + lock = null; + try { + if (h != null) try { + h.close(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new BackingStoreException(e); + } + } finally { + if (c != null) { + c.unlock(); + } } } } @@ -161,4 +179,23 @@ public abstract class DeferredStream<T> extends StreamWrapper<T> { protected final void setCloseHandler(final AutoCloseable handler) { closeHandler.handler = handler; } + + /** + * Sets the read lock. + * Locks are used only with databases that do not support concurrent transactions well. + * + * @param lock the lock, or {@code null} if none. + */ + protected final void lock(final ReadWriteLock lock) { + if (lock != null) { + (closeHandler.lock = lock.readLock()).lock(); + } + } + + /** + * Closes the connection (if any) and unlock. + */ + protected final void unlock() { + closeHandler.run(); + } }