diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalWrapper.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalWrapper.java new file mode 100644 index 00000000000..81f944a4a9d --- /dev/null +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/TransactionalWrapper.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.dboe.transaction; + +import org.apache.jena.query.ReadWrite; +import org.apache.jena.query.TxnType; + +/** Wrapper for {@link Transactional} */ +public class TransactionalWrapper implements Transactional { + + private Transactional other; + protected Transactional get() { return other; } + + public TransactionalWrapper(Transactional other) { + this.other = other; + } + + @Override + public void begin(TxnType type) { + get().begin(type); + } + + @Override + public void begin(ReadWrite readWrite) { + get().begin(readWrite); + } + + @Override + public boolean promote(Promote mode) { + return get().promote(mode); + } + + @Override + public void commit() { + get().commit(); + } + + @Override + public void abort() { + get().abort(); + } + + @Override + public void end() { + get().end(); + } + + @Override + public ReadWrite transactionMode() { + return get().transactionMode(); + } + + @Override + public TxnType transactionType() { + return get().transactionType(); + } + + @Override + public boolean isInTransaction() { + return get().isInTransaction(); + } +} diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java index 522c036bd78..7b535a11cf6 100644 --- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionCoordinator.java @@ -277,6 +277,10 @@ public Journal getJournal() { return journal; } + public Location getLocation() { + return getJournal().getLocation(); + } + public TransactionCoordinatorState detach(Transaction txn) { txn.detach(); TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn); diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java index 3aa52b9fe2b..2d50656db1c 100644 --- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/TransactionalBase.java @@ -22,15 +22,17 @@ import org.apache.jena.atlas.lib.Lib; import org.apache.jena.atlas.logging.Log; +import org.apache.jena.dboe.transaction.Transactional; import org.apache.jena.query.ReadWrite; import org.apache.jena.query.TxnType; /** - * Framework for implementing a Transactional. + * Framework for implementing a {@link Transactional} via {@link TransactionalSystem}. + * This base class provides the "per thread" aspect - the {@link TransactionCoordinator} itself + * is not thread aware. */ - public class TransactionalBase implements TransactionalSystem { - // Help debugging by generating names for Transactionals. + // Optional labelling - development/debugging aid. private final String label; protected boolean isShutdown = false; protected final TransactionCoordinator txnMgr; diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java index 82955707804..b76556331ac 100644 --- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/journal/Journal.java @@ -397,11 +397,12 @@ public void reset() { sync(); } -// public void append() { position(size()); } - - public long position() { return channel.position(); } + public long position() { return channel.position(); } // public void position(long posn) { channel.position(posn); } +// public void append() { position(size()); } + + public Location getLocation() { return location; } public String getFilename() { return channel.getFilename(); } } diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/DatabaseConnection.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/DatabaseConnection.java index 8a0d314b8e6..9280b335f31 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/DatabaseConnection.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/DatabaseConnection.java @@ -36,6 +36,7 @@ import org.apache.jena.sys.JenaSystem; import org.apache.jena.tdb2.TDBException; import org.apache.jena.tdb2.params.StoreParams; +import org.apache.jena.tdb2.params.StoreParamsDynamic; import org.apache.jena.tdb2.store.DatasetGraphSwitchable; // StoreConnection, DatabaseConnection < Connection @@ -67,28 +68,40 @@ public synchronized static DatabaseConnection connectCreate(Location location, S * creating it if it does not exist in storage. */ private synchronized static DatabaseConnection make(Location location, StoreParams params) { - if ( location.isMemUnique() ) { - // Uncached, in-memory. - DatasetGraph dsg = DatabaseOps.create(location, params); - DatabaseConnection dbConn = new DatabaseConnection(dsg, location, null); - return dbConn; - } + if ( location.isMemUnique() ) + return buildUniqueMem(location, params); // Cached by Location. Named in-memory or on-disk. - DatabaseConnection dbConn = cache.computeIfAbsent(location, (loc)->buildForCache(loc, params)); + DatabaseConnection dbConn = cache.computeIfAbsent(location, (loc)->build(loc, params)); return dbConn; } + + /** + * Create a fresh {@link DatabaseConnection}. This new object is independent of + * any other {@link DatabaseConnection} for the same location. This function must + * be used with care. If any other {@link DatabaseConnection} for this location has been created, + * only {@link StoreParamsDynamic} will apply. + */ + public static DatabaseConnection createDirect(Location location, StoreParams params) { + if ( location.isMemUnique() ) + return buildUniqueMem(location, params); + return build(location, params); + } - private static DatabaseConnection buildForCache(Location location, StoreParams params) { + private static DatabaseConnection build(Location location, StoreParams params) { if ( location.isMemUnique() ) { throw new TDBException("Can't buildForCache a memory-unique location"); } ProcessFileLock lock = null; if (SystemTDB.DiskLocationMultiJvmUsagePrevention && ! location.isMem() ) { + // Take the lock for the swithable. + // StoreConnection will take a lock for the storage. lock = lockForLocation(location); - // Take the lock. This is atomic. + // Take the lock. This is atomic and non-reentrant. lock.lockEx(); } + // c.f. StoreConnection.make DatasetGraph dsg = DatabaseOps.create(location, params); + return new DatabaseConnection(dsg, location, lock); } @@ -100,6 +113,13 @@ private static DatabaseConnection buildForCache(Location location, StoreParams p // return DatabaseOps.create(location); // } + private static DatabaseConnection buildUniqueMem(Location location, StoreParams params) { + // Uncached, in-memory. + DatasetGraph dsg = DatabaseOps.create(location, params); + DatabaseConnection dbConn = new DatabaseConnection(dsg, location, null); + return dbConn; + } + // DRY /** Create or fetch a {@link ProcessFileLock} for a Location */ public static ProcessFileLock lockForLocation(Location location) { diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/StoreConnection.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/StoreConnection.java index 75476947010..cb38cd22a3b 100644 --- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/StoreConnection.java +++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/sys/StoreConnection.java @@ -81,18 +81,16 @@ private synchronized static StoreConnection make(Location location, StoreParams StoreConnection sConn = cache.get(location); if ( sConn == null ) { ProcessFileLock lock = null; + // This is not duplicating DatabaseConnection.build. + // This is a tdb.lock file in the storage database, not the switchable. if (SystemTDB.DiskLocationMultiJvmUsagePrevention && ! location.isMem() ) { lock = lockForLocation(location); - // Take the lock. This is atomic. + // Take the lock. This is atomic and non-reentrant. lock.lockEx(); } - // Recovery happens when TransactionCoordinator.start is called - // during the building of the DatasetGraphTxn. - - //DatasetGraphTDB dsg = (DatasetGraphTDB)TDBBuilder.build(location, params); + // during the building of the DatasetGraphTDB DatasetGraphTDB dsg = TDB2StorageBuilder.build(location, params); - sConn = new StoreConnection(dsg, lock); if (!location.isMemUnique()) cache.put(location, sConn);