Skip to content

Commit

Permalink
Clean up DBOE/TDB2 transaction code
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Jun 17, 2019
1 parent dd6ff3c commit c8cd5ad
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.jena.dboe.transaction;

import org.apache.jena.query.ReadWrite;
import org.apache.jena.query.TxnType;

/** Wrapper for {@link Transactional} */
public class TransactionalWrapper implements Transactional {

private Transactional other;
protected Transactional get() { return other; }

public TransactionalWrapper(Transactional other) {
this.other = other;
}

@Override
public void begin(TxnType type) {
get().begin(type);
}

@Override
public void begin(ReadWrite readWrite) {
get().begin(readWrite);
}

@Override
public boolean promote(Promote mode) {
return get().promote(mode);
}

@Override
public void commit() {
get().commit();
}

@Override
public void abort() {
get().abort();
}

@Override
public void end() {
get().end();
}

@Override
public ReadWrite transactionMode() {
return get().transactionMode();
}

@Override
public TxnType transactionType() {
return get().transactionType();
}

@Override
public boolean isInTransaction() {
return get().isInTransaction();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ public Journal getJournal() {
return journal;
}

public Location getLocation() {
return getJournal().getLocation();
}

public TransactionCoordinatorState detach(Transaction txn) {
txn.detach();
TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@

import org.apache.jena.atlas.lib.Lib;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.Transactional;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.query.TxnType;

/**
* Framework for implementing a Transactional.
* Framework for implementing a {@link Transactional} via {@link TransactionalSystem}.
* This base class provides the "per thread" aspect - the {@link TransactionCoordinator} itself
* is not thread aware.
*/

public class TransactionalBase implements TransactionalSystem {
// Help debugging by generating names for Transactionals.
// Optional labelling - development/debugging aid.
private final String label;
protected boolean isShutdown = false;
protected final TransactionCoordinator txnMgr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,12 @@ public void reset() {
sync();
}

// public void append() { position(size()); }

public long position() { return channel.position(); }
public long position() { return channel.position(); }

// public void position(long posn) { channel.position(posn); }
// public void append() { position(size()); }

public Location getLocation() { return location; }

public String getFilename() { return channel.getFilename(); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.jena.sys.JenaSystem;
import org.apache.jena.tdb2.TDBException;
import org.apache.jena.tdb2.params.StoreParams;
import org.apache.jena.tdb2.params.StoreParamsDynamic;
import org.apache.jena.tdb2.store.DatasetGraphSwitchable;

// StoreConnection, DatabaseConnection < Connection<X>
Expand Down Expand Up @@ -67,28 +68,40 @@ public synchronized static DatabaseConnection connectCreate(Location location, S
* creating it if it does not exist in storage.
*/
private synchronized static DatabaseConnection make(Location location, StoreParams params) {
if ( location.isMemUnique() ) {
// Uncached, in-memory.
DatasetGraph dsg = DatabaseOps.create(location, params);
DatabaseConnection dbConn = new DatabaseConnection(dsg, location, null);
return dbConn;
}
if ( location.isMemUnique() )
return buildUniqueMem(location, params);
// Cached by Location. Named in-memory or on-disk.
DatabaseConnection dbConn = cache.computeIfAbsent(location, (loc)->buildForCache(loc, params));
DatabaseConnection dbConn = cache.computeIfAbsent(location, (loc)->build(loc, params));
return dbConn;
}

/**
* Create a fresh {@link DatabaseConnection}. This new object is independent of
* any other {@link DatabaseConnection} for the same location. This function must
* be used with care. If any other {@link DatabaseConnection} for this location has been created,
* only {@link StoreParamsDynamic} will apply.
*/
public static DatabaseConnection createDirect(Location location, StoreParams params) {
if ( location.isMemUnique() )
return buildUniqueMem(location, params);
return build(location, params);
}

private static DatabaseConnection buildForCache(Location location, StoreParams params) {
private static DatabaseConnection build(Location location, StoreParams params) {
if ( location.isMemUnique() ) {
throw new TDBException("Can't buildForCache a memory-unique location");
}
ProcessFileLock lock = null;
if (SystemTDB.DiskLocationMultiJvmUsagePrevention && ! location.isMem() ) {
// Take the lock for the swithable.
// StoreConnection will take a lock for the storage.
lock = lockForLocation(location);
// Take the lock. This is atomic.
// Take the lock. This is atomic and non-reentrant.
lock.lockEx();
}
// c.f. StoreConnection.make
DatasetGraph dsg = DatabaseOps.create(location, params);

return new DatabaseConnection(dsg, location, lock);
}

Expand All @@ -100,6 +113,13 @@ private static DatabaseConnection buildForCache(Location location, StoreParams p
// return DatabaseOps.create(location);
// }

private static DatabaseConnection buildUniqueMem(Location location, StoreParams params) {
// Uncached, in-memory.
DatasetGraph dsg = DatabaseOps.create(location, params);
DatabaseConnection dbConn = new DatabaseConnection(dsg, location, null);
return dbConn;
}

// DRY
/** Create or fetch a {@link ProcessFileLock} for a Location */
public static ProcessFileLock lockForLocation(Location location) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,16 @@ private synchronized static StoreConnection make(Location location, StoreParams
StoreConnection sConn = cache.get(location);
if ( sConn == null ) {
ProcessFileLock lock = null;
// This is not duplicating DatabaseConnection.build.
// This is a tdb.lock file in the storage database, not the switchable.
if (SystemTDB.DiskLocationMultiJvmUsagePrevention && ! location.isMem() ) {
lock = lockForLocation(location);
// Take the lock. This is atomic.
// Take the lock. This is atomic and non-reentrant.
lock.lockEx();
}

// Recovery happens when TransactionCoordinator.start is called
// during the building of the DatasetGraphTxn.

//DatasetGraphTDB dsg = (DatasetGraphTDB)TDBBuilder.build(location, params);
// during the building of the DatasetGraphTDB
DatasetGraphTDB dsg = TDB2StorageBuilder.build(location, params);

sConn = new StoreConnection(dsg, lock);
if (!location.isMemUnique())
cache.put(location, sConn);
Expand Down

0 comments on commit c8cd5ad

Please sign in to comment.