Skip to content

Commit

Permalink
IteratorCloseable and code clean-up
Browse files Browse the repository at this point in the history
Remove IteratorResourceClosing in favour of IteratorOnClose.

Remove unused WrapperIterator.

Use IteratorCloseable.
  • Loading branch information
afs committed Aug 22, 2021
1 parent d1111d7 commit 1d6648b
Show file tree
Hide file tree
Showing 20 changed files with 294 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.UUID ;

import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.lib.Closeable ;
import org.apache.jena.atlas.lib.FileOps ;

Expand All @@ -44,13 +45,13 @@ public abstract class AbstractDataBag<E> implements DataBag<E>
{
private final List<File> spillFiles = new ArrayList<>();
protected Collection<E> memory = new ArrayList<>();

private final List<WeakReference<Closeable>> closeableIterators = new ArrayList<>();

// Total size, including tuples on disk.
protected long size = 0;


public boolean isEmpty()
{
return (size == 0);
Expand All @@ -67,10 +68,10 @@ public void send(E item)
{
add(item);
}

/**
* Returns a handle to a temporary file. Does not actually create the file on disk.
*
*
* TODO Improve this by getting the directory from a config file
*/
protected File getNewTemporaryFile()
Expand All @@ -79,26 +80,26 @@ protected File getNewTemporaryFile()
File tmpFile = new File(sysTempDir, "DataBag-" + UUID.randomUUID().toString() + ".tmp") ;
return tmpFile ;
}

/**
* Register the spill file handle for use later in the iterator.
*/
protected void registerSpillFile(File spillFile)
{
spillFiles.add(spillFile);
}

protected static OutputStream getOutputStream(File file) throws FileNotFoundException
{
return new BufferedOutputStream(new FileOutputStream(file));
}

protected static InputStream getInputStream(File file) throws FileNotFoundException
{
return new BufferedInputStream(new FileInputStream(file));
}
/**

/**
* Get a stream to spill contents to. The file that backs this stream will be registered in the spillFiles array.
* @return stream to write tuples to
*/
Expand All @@ -107,21 +108,21 @@ protected OutputStream getSpillStream() throws IOException
File outputFile = getNewTemporaryFile();
OutputStream toReturn = getOutputStream(outputFile);
registerSpillFile(outputFile);

return toReturn;
}

/**
* Register an iterator to be closed when this data bag is closed. The iterator
* is held via a weak reference, and is meant as a backup if the user does not
* close it themselves.
* @param c the Closeable iterator to register
*/
protected void registerCloseableIterator(Closeable c)
protected void registerCloseableIterator(IteratorCloseable<?> c)
{
closeableIterators.add(new WeakReference<>(c)) ;
}

/**
* Users should either exhaust or close any iterators they get, but if they don't we
* should forcibly close them so that we can delete any temporary files. Any further
Expand All @@ -131,12 +132,12 @@ protected void closeIterators()
{
closeableIterators.stream().map(WeakReference::get).filter(Objects::nonNull).forEach(Closeable::close);
}

protected List<File> getSpillFiles()
{
return spillFiles;
}

protected void deleteSpillFiles()
{
for (File file : spillFiles)
Expand All @@ -145,7 +146,7 @@ protected void deleteSpillFiles()
}
spillFiles.clear();
}

@Override
protected void finalize() throws Throwable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.jena.atlas.AtlasException ;
import org.apache.jena.atlas.io.IO ;
import org.apache.jena.atlas.iterator.Iter ;
import org.apache.jena.atlas.iterator.IteratorResourceClosing ;
import org.apache.jena.atlas.iterator.IteratorCloseable;
import org.apache.jena.atlas.lib.Sink ;

/**
Expand Down Expand Up @@ -59,32 +59,32 @@ public class DefaultDataBag<E> extends AbstractDataBag<E>
{
private final ThresholdPolicy<E> policy;
private final SerializationFactory<E> serializationFactory;

protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;

private Sink<E> serializer;
private OutputStream out;

public DefaultDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory)
{
this.policy = policy;
this.serializationFactory = serializerFactory;
}

private void checkClosed()
{
if (closed) throw new AtlasException("DefaultDataBag is closed, no operations can be performed on it.") ;
}

@Override
public void add(E item)
{
checkClosed();
if (finishedAdding)
throw new AtlasException("DefaultDataBag: Cannot add any more items after the writing phase is complete.");

if (!policy.isThresholdExceeded())
{
memory.add(item);
Expand All @@ -96,15 +96,15 @@ public void add(E item)
spill();
spilled = true;
}

// Write to disk
serializer.send(item);
}

policy.increment(item);
size++;
}

private void spill()
{
// In the case where we've just hit the threshold, set up the serializer and transfer all existing content to disk.
Expand All @@ -119,14 +119,14 @@ private void spill()
throw new AtlasException(e);
}
serializer = serializationFactory.createSerializer(out);

for (E e : memory)
{
serializer.send(e);
}
memory = null;
}

@Override
public boolean isSorted()
{
Expand All @@ -138,7 +138,7 @@ public boolean isDistinct()
{
return false;
}

@Override
public void flush()
{
Expand All @@ -147,22 +147,22 @@ public void flush()
serializer.flush();
}
}

@Override
public Iterator<E> iterator()
{
Iterator<E> toReturn;

checkClosed();

// Close the writer
closeWriter();

// Create a new reader
if (policy.isThresholdExceeded())
{
File spillFile = getSpillFiles().get(0);

InputStream in;
try
{
Expand All @@ -173,18 +173,18 @@ public Iterator<E> iterator()
throw new AtlasException(ex) ;
}
Iterator<E> deserializer = serializationFactory.createDeserializer(in) ;
IteratorResourceClosing<E> irc = new IteratorResourceClosing<>(deserializer, in) ;
IteratorCloseable<E> irc = Iter.onCloseIO(deserializer, in) ;
registerCloseableIterator(irc);
toReturn = irc;
}
else
{
toReturn = memory.iterator();
}

return toReturn;
}

protected void closeWriter()
{
if (!finishedAdding)
Expand All @@ -205,7 +205,7 @@ protected void closeWriter()
finishedAdding = true;
}
}

@Override
public void close()
{
Expand All @@ -214,7 +214,7 @@ public void close()
closeWriter();
closeIterators();
deleteSpillFiles();

memory = null;
closed = true;
}
Expand Down
Loading

0 comments on commit 1d6648b

Please sign in to comment.