Skip to content

Commit

Permalink
Move way from ParserProfile: use RDFParserBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Apr 6, 2017
1 parent f953001 commit 22fbc5e
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 102 deletions.
10 changes: 10 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/riot/RDFParserBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ public RDFParserBuilder source(String uriOrFile) {
return this;
}

/**
* Set the source as the contents of a string.
* This clears any other source setting.
* @param string
* @return this
*/
public RDFParserBuilder fromString(String string) {
return source(new StringReader(string));
}

/**
* Set the source to {@link InputStream}.
* This clears any other source setting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public abstract class AbstractNLineFileInputFormat<TKey, TValue> extends FileInp
*
* @see FileInputFormat#getSplits(JobContext)
*/
@Override
public final List<InputSplit> getSplits(JobContext job) throws IOException {
boolean debug = LOGGER.isDebugEnabled();
if (debug && FileInputFormat.getInputDirRecursive(job)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,12 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
import org.apache.jena.hadoop.rdf.io.input.util.BlockInputStream;
import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
import org.apache.jena.hadoop.rdf.io.input.util.TrackableInputStream;
import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
import org.apache.jena.hadoop.rdf.io.input.util.* ;
import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.ReaderRIOT;
import org.apache.jena.riot.RDFParserBuilder ;
import org.apache.jena.riot.lang.PipedRDFIterator;
import org.apache.jena.riot.lang.PipedRDFStream;
import org.apache.jena.riot.system.ParserProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -145,10 +139,9 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
// Set up background thread for parser
iter = this.getPipedIterator();
this.stream = this.getPipedStream(iter, this.input);
// TODO [RDFParser]
// Use RDFParser builder.
ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
RDFParserBuilder builder = RdfIOUtils.createRDFParserBuilder(context, file);
Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), builder);

this.parserThread = new Thread(parserRunnable);
this.parserThread.setDaemon(true);
this.parserThread.start();
Expand Down Expand Up @@ -186,23 +179,17 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
* Stream
* @param lang
* Language to use for parsing
* @param builder
* RDFParser setup
* @return Parser runnable
*/
private Runnable createRunnable(final AbstractBlockBasedNodeTupleReader<?,?> reader, final InputStream input,
final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
private Runnable createRunnable(final AbstractBlockBasedNodeTupleReader<?, ?> reader, final InputStream input,
final PipedRDFStream<TValue> stream, final Lang lang, RDFParserBuilder builder) {
return new Runnable() {
@Override
public void run() {
try {
@SuppressWarnings("deprecation")
// Only needed because of ParserProfile setting errorhandler and label mapping - see RdfIOUtils.createParserProfile

//

ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
riotReader.setParserProfile(profile);
riotReader.read(input, null, lang.getContentType(), stream, null);
//RDFDataMgr.parse(stream, input, null, lang);
builder.lang(lang).source(input).parse(stream);
reader.setParserFinished(null);
} catch (Throwable e) {
reader.setParserFinished(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -37,7 +38,8 @@
import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
import org.apache.jena.hadoop.rdf.io.input.util.RdfIOUtils;
import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
import org.apache.jena.riot.system.ParserProfile;
import org.apache.jena.riot.lang.LabelToNode;
import org.apache.jena.riot.system.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -65,7 +67,7 @@ public abstract class AbstractLineBasedNodeTupleReader<TValue, T extends Abstrac
private LongWritable key = null;
private Text value = null;
private T tuple = null;
private ParserProfile profile = null;
private ParserProfile parserProfile = null ;
private boolean ignoreBadTuples = true;

@Override
Expand All @@ -77,9 +79,12 @@ public final void initialize(InputSplit genericSplit, TaskAttemptContext context
throw new IOException("This record reader only supports FileSplit inputs");
FileSplit split = (FileSplit) genericSplit;

// TODO [RDFParser]
// Use RDFParser builder.
profile = RdfIOUtils.createParserProfile(context, split.getPath());
// Intermediate : RDFParser but need to make a Iterator<Quad/Triple>
LabelToNode labelToNode = RdfIOUtils.createLabelToNode(context, split.getPath());
Prologue prologue = new Prologue(PrefixMapFactory.createForInput(), IRIResolver.create());
parserProfile = new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, RiotLib.factoryRDF(labelToNode));


Configuration config = context.getConfiguration();
this.ignoreBadTuples = config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true);
if (this.ignoreBadTuples)
Expand Down Expand Up @@ -135,8 +140,8 @@ public final void initialize(InputSplit genericSplit, TaskAttemptContext context
*
* @param line
* Line
* @param profile
* Parser profile
* @param builder
* Parser setup.
* @return Iterator
*/
protected abstract Iterator<TValue> getIterator(String line, ParserProfile profile);
Expand Down Expand Up @@ -191,7 +196,7 @@ public final boolean nextKeyValue() throws IOException {

// Attempt to read the tuple from current line
try {
Iterator<TValue> iter = this.getIterator(value.toString(), profile);
Iterator<TValue> iter = this.getIterator(value.toString(), parserProfile);
if (iter.hasNext()) {
tuple = this.createInstance(iter.next());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@
import org.apache.jena.hadoop.rdf.io.input.util.TrackedInputStream;
import org.apache.jena.hadoop.rdf.io.input.util.TrackedPipedRDFStream;
import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFDataMgr;
import org.apache.jena.riot.ReaderRIOT;
import org.apache.jena.riot.Lang ;
import org.apache.jena.riot.RDFParserBuilder ;
import org.apache.jena.riot.lang.PipedRDFIterator;
import org.apache.jena.riot.lang.PipedRDFStream;
import org.apache.jena.riot.system.ParserProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,10 +131,8 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
// Set up background thread for parser
iter = this.getPipedIterator();
this.stream = this.getPipedStream(iter, this.input);
// TODO [RDFParser]
// Use RDFParser builder.
ParserProfile profile = RdfIOUtils.createParserProfile(context, file);
Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), profile);
RDFParserBuilder builder = RdfIOUtils.createRDFParserBuilder(context, file);
Runnable parserRunnable = this.createRunnable(this, this.input, stream, this.getRdfLanguage(), builder);
this.parserThread = new Thread(parserRunnable);
this.parserThread.setDaemon(true);
this.parserThread.start();
Expand Down Expand Up @@ -174,20 +170,17 @@ public void initialize(InputSplit genericSplit, TaskAttemptContext context) thro
* Stream
* @param lang
* Language to use for parsing
* @param builder
* RDFParser setup
* @return Parser runnable
*/
private Runnable createRunnable(final AbstractWholeFileNodeTupleReader<?, ?> reader, final InputStream input,
final PipedRDFStream<TValue> stream, final Lang lang, final ParserProfile profile) {
final PipedRDFStream<TValue> stream, final Lang lang, RDFParserBuilder builder) {
return new Runnable() {
@Override
public void run() {
try {
@SuppressWarnings("deprecation")
// Only needed because of ParserProfile setting errorhandler and label mapping - see RdfIOUtils.createParserProfile
ReaderRIOT riotReader = RDFDataMgr.createReader(lang);
// [TRIX]
riotReader.setParserProfile(profile);
riotReader.read(input, null, lang.getContentType(), stream, null);
builder.lang(lang).source(input).parse(stream);
reader.setParserFinished(null);
} catch (Throwable e) {
reader.setParserFinished(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
*
*
*/
@SuppressWarnings("javadoc")
public class TriplesOrQuadsReader extends AbstractRdfReader<Quad, QuadWritable> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
import org.apache.jena.riot.RDFParser ;
import org.apache.jena.riot.RDFParserBuilder ;
import org.apache.jena.riot.lang.LabelToNode;
import org.apache.jena.riot.system.* ;
import org.slf4j.Logger;
Expand Down Expand Up @@ -51,16 +53,30 @@ private RdfIOUtils() {
* @param path
* File path
* @return Parser profile
* @deprecated Use {@link #createRDFParserBuilder} or {@link #createLabelToNode}
*/
@Deprecated
public static ParserProfile createParserProfile(JobContext context, Path path) {
// [RDFParser]
// Replace call sites with RDFParser builder.
Prologue prologue = new Prologue(PrefixMapFactory.createForInput(), IRIResolver.createNoResolve());
UUID seed = RdfIOUtils.getSeed(context, path);
LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed);
LabelToNode labelMapping = createLabelToNode(context, path);
return new ParserProfileBase(prologue, ErrorHandlerFactory.errorHandlerStd, RiotLib.factoryRDF(labelMapping));
}

public static RDFParserBuilder createRDFParserBuilder(JobContext context, Path path) {
LabelToNode labelMapping = createLabelToNode(context, path);
RDFParserBuilder builder = RDFParser.create()
.labelToNode(labelMapping)
.errorHandler(ErrorHandlerFactory.errorHandlerStd) ;
return builder ;
}

public static LabelToNode createLabelToNode(JobContext context, Path path) {
UUID seed = RdfIOUtils.getSeed(context, path);
LabelToNode labelMapping = LabelToNode.createScopeByDocumentHash(seed);
return labelMapping;
}
/**
* Selects a seed for use in generating blank node identifiers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private void writeTuples(Dataset ds, OutputStream output) {
protected abstract Lang getRdfLanguage();

private void writeGoodTuples(OutputStream output, int num) {
Dataset ds = DatasetFactory.createMem();
Dataset ds = DatasetFactory.createGeneral();
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
Property predicate = m.createProperty("http://example.org/predicate");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.jena.riot.RDFLanguages;

/**
* Tests blank node divergence when using the {@link RdfThriftInputFormat}
* Tests blank node divergence when using {@link RDFLanguages#THRIFT}
*/
public class RdfThriftBlankNodeTest extends AbstractTripleBlankNodeTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private void writeTuples(Dataset ds, OutputStream output) {
protected abstract Lang getRdfLanguage();

private void writeGoodTuples(OutputStream output, int num) {
Dataset ds = DatasetFactory.createMem();
Dataset ds = DatasetFactory.createGeneral();
Model m = ModelFactory.createDefaultModel();
Resource currSubj = m.createResource("http://example.org/subjects/0");
Property predicate = m.createProperty("http://example.org/predicate");
Expand Down
Loading

0 comments on commit 22fbc5e

Please sign in to comment.