Skip to content

Commit

Permalink
Merge branch 'master' into 2.0
Browse files Browse the repository at this point in the history
Conflicts:
	community/kernel/src/test/java/org/neo4j/test/impl/EphemeralFileSystemAbstraction.java
	enterprise/cluster/src/main/java/org/neo4j/cluster/protocol/atomicbroadcast/multipaxos/InMemoryAcceptorInstanceStore.java
	manual/pom.xml
	manual/src/main/resources/community/embedded-drivers.asciidoc
	manual/src/reference/index.asciidoc
	packaging/pom.xml
	packaging/standalone/pom.xml
	python-embedded/pom.xml
  • Loading branch information
tinwelint committed Feb 4, 2013
2 parents 92d40a3 + 22da14f commit 49b8280
Show file tree
Hide file tree
Showing 435 changed files with 513 additions and 46,652 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@
* Transaction tx = graphDb.tx().begin();
*
* // To begin a transaction with relaxed force
* Transaction tx = graphDb.tx().relaxed().begin();
* Transaction tx = graphDb.tx().unforced().begin();
*
* // To have relaxed force optionally set by a condition
* TransactionBuilder txBuilder = graphDb.tx();
* if ( condition )
* {
* txBuilder = txBuilder.relaxed();
* txBuilder = txBuilder.unforced();
* }
* Transaction tx = txBuilder.begin();
* </pre>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* Copyright (c) 2002-2013 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.kernel.impl.core;

import static org.junit.Assert.assertEquals;
import static org.neo4j.test.subprocess.DebuggerDeadlockCallback.RESUME_THREAD;

import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.neo4j.graphdb.DynamicRelationshipType;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.factory.GraphDatabaseBuilder;
import org.neo4j.graphdb.factory.GraphDatabaseSettings;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.test.EmbeddedDatabaseRule;
import org.neo4j.test.subprocess.BreakPoint;
import org.neo4j.test.subprocess.BreakpointHandler;
import org.neo4j.test.subprocess.BreakpointTrigger;
import org.neo4j.test.subprocess.DebugInterface;
import org.neo4j.test.subprocess.DebuggedThread;
import org.neo4j.test.subprocess.EnabledBreakpoints;
import org.neo4j.test.subprocess.ForeignBreakpoints;
import org.neo4j.test.subprocess.SubProcessTestRunner;

/**
* This test tests the exact same issue as {@link TestConcurrentModificationOfRelationshipChains}. The difference is
* that it tries to cut it as close as possible by doing the relationship cache load right after the removal of the
* relationship from the cache. It really doesn't make a difference but it's a shame to throw it out.
*/
@ForeignBreakpoints( {
@ForeignBreakpoints.BreakpointDef( type = "org.neo4j.kernel.impl.nioneo.xa.WriteTransaction",
method = "doPrepare", on = BreakPoint.Event.EXIT ) } )
@RunWith( SubProcessTestRunner.class )
@Ignore("currently fails, awaiting fix")
public class TestRelationshipConcurrentDeleteAndLoadCachePoisoning
{
private static final int RelationshipGrabSize = 2;
@ClassRule
public static EmbeddedDatabaseRule database = new EmbeddedDatabaseRule()
{
@Override
protected void configure( GraphDatabaseBuilder builder )
{
builder.setConfig( GraphDatabaseSettings.relationship_grab_size, "" + RelationshipGrabSize );
}
};

private static DebuggedThread committer;
private static DebuggedThread reader;

@Test
@EnabledBreakpoints( {"doPrepare", "waitForPrepare", "readDone"} )
public void theTest() throws InterruptedException
{
final GraphDatabaseAPI db = database.getGraphDatabaseAPI();

Transaction tx = db.beginTx();
final Node first = db.createNode();
final Relationship theOneAfterTheGap =
first.createRelationshipTo( db.createNode(), DynamicRelationshipType.withName( "AC" ) );
// The gap
for ( int i = 0; i < RelationshipGrabSize; i++)
{
first.createRelationshipTo( db.createNode(), DynamicRelationshipType.withName( "AC" ) );
}
tx.success();
tx.finish();

// This is required, otherwise relChainPosition is never consulted, everything will already be in mem.
db.getNodeManager().clearCache();

Runnable writer = new Runnable()
{
@Override
public void run()
{
Transaction tx = db.beginTx();
theOneAfterTheGap.delete();
tx.success();
tx.finish();
}
};

Runnable reader = new Runnable()
{
@Override
public void run()
{
waitForPrepare();
// Get the first batch into the cache - relChainPosition shows to theOneAfterTheGap
first.getRelationships().iterator().next();
readDone();
}
};

Thread writerThread = new Thread( writer );
Thread readerThread = new Thread( reader );

// Start order matters - suspend the reader first, then start the writes.
readerThread.start();
writerThread.start();

readerThread.join();
writerThread.join();

// This should pass without any problems.
int count = 0;
for ( Relationship rel : first.getRelationships() )
{
count++;
}
assertEquals("Should have read relationships created minus one", RelationshipGrabSize - 1, count);
}

@BreakpointHandler( "doPrepare" )
public static void onDoPrepare( BreakPoint self, DebugInterface di )
{
if ( self.invocationCount() < 3 )
{
// One for the rel type, one for the setup
return;
}
self.disable();
committer = di.thread();
committer.suspend( RESUME_THREAD );
System.out.println("suspended writer");
reader.resume();
System.out.println("resumed reader");
}

@BreakpointTrigger("waitForPrepare")
public void waitForPrepare()
{
}

@BreakpointHandler( "waitForPrepare" )
public static void onWaitForPrepare( BreakPoint self, DebugInterface di )
{
self.disable();
reader = di.thread();
reader.suspend( RESUME_THREAD );
System.out.println("Suspended reader");
}

@BreakpointTrigger("readDone")
public void readDone()
{
}

@BreakpointHandler( "readDone" )
public static void onReadDone( BreakPoint self, DebugInterface di )
{
self.disable();
committer.resume();
System.out.println("Resumed writer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -75,8 +73,6 @@ public void shutdown()
{
for (EphemeralFileData file : files.values()) free(file);
files.clear();

DynamicByteBuffer.dispose();
}

@Override
Expand Down Expand Up @@ -488,62 +484,30 @@ public void release() throws IOException
private static class DynamicByteBuffer
{
private static final int[] SIZES;
private static volatile AtomicReferenceArray<Queue<Reference<ByteBuffer>>> POOL;
private static final byte[] zeroBuffer = new byte[1024];

static void dispose()
{
for (int i = POOL.length(); i < POOL.length(); i++)
{
for( Reference<ByteBuffer> byteBufferReference : POOL.get( i ) )
{
ByteBuffer byteBuffer = byteBufferReference.get();
if ( byteBuffer != null)
{
try
{
destroyDirectByteBuffer( byteBuffer );
}
catch( Throwable e )
{
e.printStackTrace();
}
}
}
}

init();
}
/**
* Holds a set of pools of unused BytBuffers, where pools are implemented by {@link Queue}s.
* Each pool contains only {@link ByteBuffer} of the same size. This way, we have pools for
* different sized {@link ByteBuffer}, and can pick an available byte buffer that suits what
* we want to store quickly.
*/
private static volatile AtomicReferenceArray<Queue<Reference<ByteBuffer>>> POOLS;
private static final byte[] zeroBuffer = new byte[1024];

@Override
public DynamicByteBuffer clone()
{
return new DynamicByteBuffer( buf );
}

private static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
throws IllegalArgumentException, IllegalAccessException,
InvocationTargetException, SecurityException, NoSuchMethodException
{
Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(toBeDestroyed);
Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
}

private static void init()
{
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pool = POOL = new AtomicReferenceArray<Queue<Reference<ByteBuffer>>>( SIZES.length );
for ( int i = 0; i < SIZES.length; i++ ) pool.set( i, new ConcurrentLinkedQueue<Reference<ByteBuffer>>() );
}

static
{
int K = 1024;
SIZES = new int[] { 64 * K, 128 * K, 256 * K, 512 * K, 1024 * K };
init();

POOLS = new AtomicReferenceArray<Queue<Reference<ByteBuffer>>>( SIZES.length );
for ( int sizeIndex = 0; sizeIndex < SIZES.length; sizeIndex++ )
POOLS.set( sizeIndex, new ConcurrentLinkedQueue<Reference<ByteBuffer>>() );
}

private ByteBuffer buf;
Expand Down Expand Up @@ -593,9 +557,9 @@ private void copyByteBufferContents( ByteBuffer from, ByteBuffer to )
private ByteBuffer allocate( int sizeIndex )
{
for (int enlargement = 0; enlargement < 2; enlargement++) {
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pool = POOL;
if (sizeIndex + enlargement < pool.length()) {
Queue<Reference<ByteBuffer>> queue = pool.get( sizeIndex+enlargement );
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pools = POOLS;
if (sizeIndex + enlargement < pools.length()) {
Queue<Reference<ByteBuffer>> queue = pools.get( sizeIndex+enlargement );
if ( queue != null )
{
for (;;)
Expand Down Expand Up @@ -626,52 +590,34 @@ void free()
{
sizeIndex += SIZES.length - 1;
}
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pool = POOL;
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pools = POOLS;
// Use soft references to the buffers to allow the GC to reclaim
// unused buffers if memory gets scarce.
SoftReference<ByteBuffer> ref = new SoftReference<ByteBuffer>( buf );

// Get or create the queue of references to add this new buffer reference to
Queue<Reference<ByteBuffer>> references = null;
if(sizeIndex < pool.length())
{
references = pool.get( sizeIndex );
// Temp check to catch a flighty bug
if(references == null)
{
throw new RuntimeException( "Expected a queue to put references in, got null." );
}
} else {
references = growPool( sizeIndex );
// Temp check to catch a flighty bug
if(references == null)
{
throw new RuntimeException( "Expected a queue to put references in, got null." );
}
}

references.add( ref );
// Put our buffer into a pool, create a pool for the buffer size if one does not exist
( sizeIndex < pools.length() ? pools.get( sizeIndex ) : getOrCreatePoolForSize( sizeIndex ) ).add( ref );
}
finally
{
buf = null;
}
}

private static synchronized Queue<Reference<ByteBuffer>> growPool( int sizeIndex )
private static synchronized Queue<Reference<ByteBuffer>> getOrCreatePoolForSize( int sizeIndex )
{
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pool = POOL;
if ( sizeIndex >= pool.length()) {
int newSize = pool.length();
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> pools = POOLS;
if ( sizeIndex >= pools.length()) {
int newSize = pools.length();
while ( sizeIndex >= newSize ) newSize <<= 1;
AtomicReferenceArray<Queue<Reference<ByteBuffer>>> newPool = new AtomicReferenceArray<Queue<Reference<ByteBuffer>>>( newSize );
for ( int i = 0; i < pool.length(); i++ )
newPool.set( i, pool.get( i ) );
for ( int i = pool.length(); i < newPool.length(); i++ )
for ( int i = 0; i < pools.length(); i++ )
newPool.set( i, pools.get( i ) );
for ( int i = pools.length(); i < newPool.length(); i++ )
newPool.set( i, new ConcurrentLinkedQueue<Reference<ByteBuffer>>() );
POOL = pool = newPool;
POOLS = pools = newPool;
}
return pool.get( sizeIndex );
return pools.get( sizeIndex );
}

void put(int pos, byte[] bytes, int offset, int length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ProtocolServer newProtocolServer( TimeoutStrategy timeoutStrategy, Messag
connectedStateMachines.addMessageProcessor( latencyCalculator );

AcceptorContext acceptorContext = new AcceptorContext( logging, acceptorInstanceStore );
LearnerContext learnerContext = new LearnerContext();
LearnerContext learnerContext = new LearnerContext(acceptorInstanceStore);
ProposerContext proposerContext = new ProposerContext();
final ClusterContext clusterContext = new ClusterContext( proposerContext, learnerContext,
new ClusterConfiguration( initialConfig.getName(), initialConfig.getMembers() ), timeouts, executor,
Expand Down
Loading

0 comments on commit 49b8280

Please sign in to comment.