Skip to content

Commit

Permalink
Revert "Better error message on parallel turtle parsing ... (#1807)"
Browse files Browse the repository at this point in the history
This reverts commit 8678731, which
breaks the index build, see ad-freiburg/qlever-control#139
  • Loading branch information
Hannah Bast committed Feb 20, 2025
1 parent cc0e35b commit 42fcbb7
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 134 deletions.
1 change: 0 additions & 1 deletion src/engine/GraphStoreProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#include "engine/GraphStoreProtocol.h"

#include "parser/Tokenizer.h"
#include "util/http/beast.h"

// ____________________________________________________________________________
Expand Down
2 changes: 0 additions & 2 deletions src/index/IndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
#include "index/IndexFormatVersion.h"
#include "index/VocabularyMerger.h"
#include "parser/ParallelParseBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "util/BatchedPipeline.h"
#include "util/CachingMemoryResource.h"
#include "util/HashMap.h"
Expand Down
3 changes: 3 additions & 0 deletions src/parser/RdfEscaping.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
#ifndef QLEVER_RDFESCAPING_H
#define QLEVER_RDFESCAPING_H

#include <unicode/ustream.h>

#include <sstream>
#include <string>

#include "global/TypedIndex.h"
#include "parser/NormalizedString.h"
#include "util/Exception.h"
#include "util/HashSet.h"
#include "util/StringUtils.h"

namespace RdfEscaping {
Expand Down
62 changes: 18 additions & 44 deletions src/parser/RdfParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
#include "global/Constants.h"
#include "parser/GeoPoint.h"
#include "parser/NormalizedString.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "parser/RdfEscaping.h"
#include "util/Conversions.h"
#include "util/DateYearDuration.h"
#include "util/OnDestructionDontThrowDuringStackUnwinding.h"
#include "util/TransparentFunctors.h"

using namespace std::chrono_literals;
// _______________________________________________________________
Expand All @@ -32,17 +31,7 @@ bool TurtleParser<T>::statement() {
// ______________________________________________________________
template <class T>
bool TurtleParser<T>::directive() {
bool successfulParse = prefixID() || base() || sparqlPrefix() || sparqlBase();
if (successfulParse && prefixAndBaseDisabled_) {
raise(
"@prefix or @base directives need to be at the beginning of the file "
"when using the parallel parser. Use '--parse-parallel false' if you "
"can't guarantee this. If the reason for this error is that the input "
"is a concatenation of Turtle files, each of which has the prefixes at "
"the beginning, you should feed the files to QLever separately instead "
"of concatenated");
}
return successfulParse;
return prefixID() || base() || sparqlPrefix() || sparqlBase();
}

// ________________________________________________________________
Expand Down Expand Up @@ -641,7 +630,7 @@ bool TurtleParser<T>::iri() {
// _____________________________________________________________________
template <class T>
bool TurtleParser<T>::prefixedName() {
if constexpr (T::UseRelaxedParsing) {
if constexpr (UseRelaxedParsing) {
if (!(pnameLnRelaxed() || pnameNS())) {
return false;
}
Expand Down Expand Up @@ -756,7 +745,7 @@ bool TurtleParser<T>::iriref() {
// In relaxed mode, that is all we check. Otherwise, we check if the IRI is
// standard-compliant. If not, we output a warning and try to parse it in a
// more relaxed way.
if constexpr (T::UseRelaxedParsing) {
if constexpr (UseRelaxedParsing) {
tok_.remove_prefix(endPos + 1);
lastParseResult_ = TripleComponent::Iri::fromIrirefConsiderBase(
view.substr(0, endPos + 1), baseForRelativeIri(), baseForAbsoluteIri());
Expand Down Expand Up @@ -959,20 +948,20 @@ bool RdfStreamParser<T>::getLineImpl(TurtleTriple* triple) {
// `parallelParser_` have been fully processed. After the last batch we will
// push another call to this lambda to the `parallelParser_` which will then
// finish the `tripleCollector_` as soon as all batches have been computed.
template <typename T>
void RdfParallelParser<T>::finishTripleCollectorIfLastBatch() {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::finishTripleCollectorIfLastBatch() {
if (batchIdx_.fetch_add(1) == numBatchesTotal_) {
tripleCollector_.finish();
}
}

// __________________________________________________________________________________
template <typename T>
void RdfParallelParser<T>::parseBatch(size_t parsePosition, auto batch) {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::parseBatch(size_t parsePosition,
auto batch) {
try {
RdfStringParser<T> parser{defaultGraphIri_};
RdfStringParser<Tokenizer_T> parser{defaultGraphIri_};
parser.prefixMap_ = this->prefixMap_;
parser.disablePrefixParsing();
parser.setPositionOffset(parsePosition);
parser.setInputStream(std::move(batch));
// TODO: raise error message if a prefix parsing fails;
Expand All @@ -983,15 +972,14 @@ void RdfParallelParser<T>::parseBatch(size_t parsePosition, auto batch) {
});
finishTripleCollectorIfLastBatch();
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
parallelParser_.finish();
}
};

// _______________________________________________________________________
template <typename T>
void RdfParallelParser<T>::feedBatchesToParser(
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::feedBatchesToParser(
auto remainingBatchFromInitialization) {
bool first = true;
size_t parsePosition = 0;
Expand Down Expand Up @@ -1031,15 +1019,14 @@ void RdfParallelParser<T>::feedBatchesToParser(
}
}
} catch (std::exception& e) {
errorMessages_.wlock()->emplace_back(parsePosition, e.what());
tripleCollector_.pushException(std::current_exception());
}
};

// _______________________________________________________________________
template <typename T>
void RdfParallelParser<T>::initialize(const string& filename,
ad_utility::MemorySize bufferSize) {
template <typename Tokenizer_T>
void RdfParallelParser<Tokenizer_T>::initialize(
const string& filename, ad_utility::MemorySize bufferSize) {
fileBuffer_ = std::make_unique<ParallelBufferWithEndRegex>(
bufferSize.getBytes(), "\\.[\\t ]*([\\r\\n]+)");
ParallelBuffer::BufferType remainingBatchFromInitialization;
Expand All @@ -1048,7 +1035,7 @@ void RdfParallelParser<T>::initialize(const string& filename,
LOG(WARN) << "Empty input to the TURTLE parser, is this what you intended?"
<< std::endl;
} else {
RdfStringParser<T> declarationParser{};
RdfStringParser<Tokenizer_T> declarationParser{};
declarationParser.setInputStream(std::move(batch.value()));
while (declarationParser.parseDirectiveManually()) {
}
Expand All @@ -1075,20 +1062,7 @@ bool RdfParallelParser<T>::getLineImpl(TurtleTriple* triple) {
// contains no triples. (Theoretically this might happen, and it is safer this
// way)
while (triples_.empty()) {
auto optionalTripleTask = [&]() {
try {
return tripleCollector_.pop();
} catch (const std::exception&) {
// In case of multiple errors in parallel batches, we always report the
// first error.
parallelParser_.waitUntilFinished();
auto errors = std::move(*errorMessages_.wlock());
const auto& firstError =
ql::ranges::min_element(errors, {}, ad_utility::first);
AD_CORRECTNESS_CHECK(firstError != errors.end());
throw std::runtime_error{firstError->second};
}
}();
auto optionalTripleTask = tripleCollector_.pop();
if (!optionalTripleTask) {
// Everything has been parsed
return false;
Expand Down
57 changes: 27 additions & 30 deletions src/parser/RdfParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,35 @@

#pragma once

#include <absl/strings/str_cat.h>
#include <gtest/gtest_prod.h>
#include <sys/mman.h>

#include <codecvt>
#include <exception>
#include <future>
#include <locale>
#include <stdexcept>
#include <string_view>

#include "absl/strings/str_cat.h"
#include "global/Constants.h"
#include "global/SpecialIds.h"
#include "index/ConstantsIndexBuilding.h"
#include "index/InputFileSpecification.h"
#include "parser/ParallelBuffer.h"
#include "parser/Tokenizer.h"
#include "parser/TokenizerCtre.h"
#include "parser/TripleComponent.h"
#include "parser/TurtleTokenId.h"
#include "parser/data/BlankNode.h"
#include "util/Exception.h"
#include "util/File.h"
#include "util/HashMap.h"
#include "util/Log.h"
#include "util/ParseException.h"
#include "util/TaskQueue.h"
#include "util/ThreadSafeQueue.h"

using std::string;

enum class TurtleParserIntegerOverflowBehavior {
Error,
OverflowingToDouble,
Expand Down Expand Up @@ -120,6 +126,10 @@ class TurtleParser : public RdfParserBase {
public:
using ParseException = ::ParseException;

// The CTRE Tokenizer implies relaxed parsing.
static constexpr bool UseRelaxedParsing =
std::is_same_v<Tokenizer_T, TokenizerCtre>;

// Get the result of the single rule that was parsed most recently. Used for
// testing.
const TripleComponent& getLastParseResult() const { return lastParseResult_; }
Expand Down Expand Up @@ -194,10 +204,10 @@ class TurtleParser : public RdfParserBase {

// Getters for the two base prefixes. Without BASE declaration, these will
// both return the empty IRI.
const TripleComponent::Iri& baseForRelativeIri() const {
const TripleComponent::Iri& baseForRelativeIri() {
return prefixMap_.at(baseForRelativeIriKey_);
}
const TripleComponent::Iri& baseForAbsoluteIri() const {
const TripleComponent::Iri& baseForAbsoluteIri() {
return prefixMap_.at(baseForAbsoluteIriKey_);
}

Expand All @@ -216,8 +226,6 @@ class TurtleParser : public RdfParserBase {
static inline std::atomic<size_t> numParsers_ = 0;
size_t blankNodePrefix_ = numParsers_.fetch_add(1);

bool prefixAndBaseDisabled_ = false;

public:
TurtleParser() = default;
explicit TurtleParser(TripleComponent defaultGraphIri)
Expand Down Expand Up @@ -392,7 +400,7 @@ class TurtleParser : public RdfParserBase {
}

// create a new, unused, unique blank node string
std::string createAnonNode() {
string createAnonNode() {
return BlankNode{true,
absl::StrCat(blankNodePrefix_, "_", numBlankNodes_++)}
.toSparql();
Expand Down Expand Up @@ -471,7 +479,9 @@ CPP_template(typename Parser)(
return positionOffset_ + tmpToParse_.size() - this->tok_.data().size();
}

void initialize(const std::string&, ad_utility::MemorySize) {
void initialize(const string& filename, ad_utility::MemorySize bufferSize) {
(void)filename;
(void)bufferSize;
throw std::runtime_error(
"RdfStringParser doesn't support calls to initialize. Only use "
"parseUtf8String() for unit tests\n");
Expand Down Expand Up @@ -524,7 +534,7 @@ CPP_template(typename Parser)(
// testing interface for reusing a parser
// only specifies the tokenizers input stream.
// Does not alter the tokenizers state
void setInputStream(const std::string& toParse) {
void setInputStream(const string& toParse) {
tmpToParse_.clear();
tmpToParse_.reserve(toParse.size());
tmpToParse_.insert(tmpToParse_.end(), toParse.begin(), toParse.end());
Expand All @@ -545,9 +555,6 @@ CPP_template(typename Parser)(
// as expected
size_t getPosition() const { return this->tok_.begin() - tmpToParse_.data(); }

// Disable prefix parsing for turtle parsers during parallel parsing.
void disablePrefixParsing() { this->prefixAndBaseDisabled_ = true; }

FRIEND_TEST(RdfParserTest, prefixedName);
FRIEND_TEST(RdfParserTest, prefixID);
FRIEND_TEST(RdfParserTest, stringParse);
Expand Down Expand Up @@ -583,7 +590,7 @@ class RdfStreamParser : public Parser {
// Default construction needed for tests
RdfStreamParser() = default;
explicit RdfStreamParser(
const std::string& filename,
const string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
TripleComponent defaultGraphIri =
qlever::specialIds().at(DEFAULT_GRAPH_IRI))
Expand All @@ -595,8 +602,7 @@ class RdfStreamParser : public Parser {

bool getLineImpl(TurtleTriple* triple) override;

void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);
void initialize(const string& filename, ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
return numBytesBeforeCurrentBatch_ + (tok_.data().data() - byteVec_.data());
Expand Down Expand Up @@ -638,15 +644,15 @@ class RdfStreamParser : public Parser {
template <typename Parser>
class RdfParallelParser : public Parser {
public:
using Triple = std::array<std::string, 3>;
using Triple = std::array<string, 3>;
// Default construction needed for tests
RdfParallelParser() = default;

// If the `sleepTimeForTesting` is set, then after the initialization the
// parser will sleep for the specified time before parsing each batch s.t.
// certain corner cases can be tested.
explicit RdfParallelParser(
const std::string& filename,
const string& filename,
ad_utility::MemorySize bufferSize = DEFAULT_PARSER_BUFFER_SIZE,
std::chrono::milliseconds sleepTimeForTesting =
std::chrono::milliseconds{0})
Expand All @@ -659,8 +665,7 @@ class RdfParallelParser : public Parser {
}

// Construct a parser from a file and a given default graph iri.
RdfParallelParser(const std::string& filename,
ad_utility::MemorySize bufferSize,
RdfParallelParser(const string& filename, ad_utility::MemorySize bufferSize,
const TripleComponent& defaultGraphIri)
: Parser{defaultGraphIri}, defaultGraphIri_{defaultGraphIri} {
initialize(filename, bufferSize);
Expand All @@ -678,8 +683,7 @@ class RdfParallelParser : public Parser {
parallelParser_.resetTimers();
}

void initialize(const std::string& filename,
ad_utility::MemorySize bufferSize);
void initialize(const string& filename, ad_utility::MemorySize bufferSize);

size_t getParsePosition() const override {
// TODO: can we really define this position here?
Expand Down Expand Up @@ -716,12 +720,6 @@ class RdfParallelParser : public Parser {
QUEUE_SIZE_BEFORE_PARALLEL_PARSING, NUM_PARALLEL_PARSER_THREADS,
"parallel parser"};
std::future<void> parseFuture_;

// Collect error messages in case of multiple failures. The `size_t` is the
// start position of the corresponding batch, used to order the errors in case
// the batches are finished out of order.
ad_utility::Synchronized<std::vector<std::pair<size_t, std::string>>>
errorMessages_;
// The parallel parsers need to know when the last batch has been parsed, s.t.
// the parser threads can be destroyed. The following two members are needed
// for keeping track of this condition.
Expand Down Expand Up @@ -781,8 +779,7 @@ class RdfMultifileParser : public RdfParserBase {
// `parsingQueue_` is declared *after* the `finishedBatchQueue_`, s.t. when
// destroying the parser, the threads from the `parsingQueue_` are all joined
// before the `finishedBatchQueue_` (which they are using!) is destroyed.
ad_utility::TaskQueue<false> parsingQueue_{QUEUE_SIZE_BEFORE_PARALLEL_PARSING,
NUM_PARALLEL_PARSER_THREADS};
ad_utility::TaskQueue<false> parsingQueue_{10, NUM_PARALLEL_PARSER_THREADS};

// The number of parsers that have started, but not yet finished. This is
// needed to detect the complete parsing.
Expand Down
7 changes: 4 additions & 3 deletions src/parser/Tokenizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
#include <gtest/gtest_prod.h>
#include <re2/re2.h>

#include <regex>

#include "parser/TurtleTokenId.h"
#include "util/Exception.h"
#include "util/Log.h"

using re2::RE2;
Expand Down Expand Up @@ -237,7 +240,7 @@ struct SkipWhitespaceAndCommentsMixin {
auto v = self().view();
if (v.starts_with('#')) {
auto pos = v.find('\n');
if (pos == std::string::npos) {
if (pos == string::npos) {
// TODO<joka921>: This should rather yield an error.
LOG(INFO) << "Warning, unfinished comment found while parsing"
<< std::endl;
Expand Down Expand Up @@ -270,8 +273,6 @@ class Tokenizer : public SkipWhitespaceAndCommentsMixin<Tokenizer> {
Tokenizer(std::string_view input)
: _tokens(), _data(input.data(), input.size()) {}

static constexpr bool UseRelaxedParsing = false;

// if a prefix of the input stream matches the regex argument,
// return true and that prefix and move the input stream forward
// by the length of the match. If no match is found,
Expand Down
Loading

0 comments on commit 42fcbb7

Please sign in to comment.