Skip to content

Commit

Permalink
1. Lifetime of tasks in ThreadPool is now managed via shared pointers.
Browse files Browse the repository at this point in the history
2. Code cleanup in IOWrapper and a bit elsewhere.
  • Loading branch information
ugermann committed Mar 21, 2015
1 parent 85d2567 commit 8ca11d9
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 188 deletions.
13 changes: 7 additions & 6 deletions contrib/server/mosesserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class TranslationTask : public virtual Moses::Task {
stringstream out, graphInfo, transCollOpts;

if (staticData.IsSyntax()) {
TreeInput tinput(NULL);
TreeInput tinput;
const vector<FactorType>&
inputFactorOrder = staticData.GetInputFactorOrder();
stringstream in(source + "\n");
Expand All @@ -324,7 +324,7 @@ class TranslationTask : public virtual Moses::Task {
}
} else {
size_t lineNumber = 0; // TODO: Include sentence request number here?
Sentence sentence(NULL);
Sentence sentence;
sentence.SetTranslationId(lineNumber);

const vector<FactorType> &
Expand Down Expand Up @@ -594,13 +594,14 @@ class Translator : public xmlrpc_c::method
xmlrpc_c::value * const retvalP) {
boost::condition_variable cond;
boost::mutex mut;
::TranslationTask task(paramList,cond,mut);
m_threadPool.Submit(&task);
typedef ::TranslationTask TTask;
boost::shared_ptr<TTask> task(new TTask(paramList,cond,mut));
m_threadPool.Submit(task);
boost::unique_lock<boost::mutex> lock(mut);
while (!task.IsDone()) {
while (!task->IsDone()) {
cond.wait(lock);
}
*retvalP = xmlrpc_c::value_struct(task.GetRetData());
*retvalP = xmlrpc_c::value_struct(task->GetRetData());
}
private:
Moses::ThreadPool m_threadPool;
Expand Down
10 changes: 4 additions & 6 deletions mert/mert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ int main(int argc, char **argv)
startingPoints.back().Randomize();
}

vector<vector<OptimizationTask*> > allTasks(1);
vector<vector<boost::shared_ptr<OptimizationTask> > > allTasks(1);

//optional sharding
vector<Data> shards;
Expand All @@ -466,13 +466,14 @@ int main(int argc, char **argv)
if (option.shard_count)
data_ref = shards[i]; //use the sharded data if it exists

vector<OptimizationTask*>& tasks = allTasks[i];
vector<boost::shared_ptr<OptimizationTask> >& tasks = allTasks[i];
Optimizer *optimizer = OptimizerFactory::BuildOptimizer(option.pdim, to_optimize, positive, start_list[0], option.optimize_type, option.nrandom);
optimizer->SetScorer(data_ref.getScorer());
optimizer->SetFeatureData(data_ref.getFeatureData());
// A task for each start point
for (size_t j = 0; j < startingPoints.size(); ++j) {
OptimizationTask* task = new OptimizationTask(optimizer, startingPoints[j]);
boost::shared_ptr<OptimizationTask>
task(new OptimizationTask(optimizer, startingPoints[j]));
tasks.push_back(task);
#ifdef WITH_THREADS
pool.Submit(task);
Expand Down Expand Up @@ -538,9 +539,6 @@ int main(int argc, char **argv)

for (size_t i = 0; i < allTasks.size(); ++i) {
allTasks[i][0]->resetOptimizer();
for (size_t j = 0; j < allTasks[i].size(); ++j) {
delete allTasks[i][j];
}
}

PrintUserTime("Stopping...");
Expand Down
77 changes: 37 additions & 40 deletions moses-cmd/LatticeMBRGrid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include "moses/StaticData.h"
#include "util/exception.hh"

#include <boost/foreach.hpp>

using namespace std;
using namespace Moses;
Expand Down Expand Up @@ -156,59 +157,55 @@ int main(int argc, char* argv[])
exit(1);
}

StaticData& staticData = const_cast<StaticData&>(StaticData::Instance());
staticData.SetUseLatticeMBR(true);
StaticData& SD = const_cast<StaticData&>(StaticData::Instance());
SD.SetUseLatticeMBR(true);

IOWrapper* ioWrapper = new IOWrapper();
boost::shared_ptr<IOWrapper> ioWrapper(new IOWrapper);
if (!ioWrapper) {
throw runtime_error("Failed to initialise IOWrapper");
}
size_t nBestSize = staticData.GetMBRSize();
size_t nBestSize = SD.GetMBRSize();

if (nBestSize <= 0) {
throw new runtime_error("Non-positive size specified for n-best list");
}

size_t lineCount = 0;
InputType* source = NULL;

const vector<float>& pgrid = grid.getGrid(lmbr_p);
const vector<float>& rgrid = grid.getGrid(lmbr_r);
const vector<float>& prune_grid = grid.getGrid(lmbr_prune);
const vector<float>& scale_grid = grid.getGrid(lmbr_scale);

while(ioWrapper->ReadInput(staticData.GetInputType(),source)) {
++lineCount;
source->SetTranslationId(lineCount);

Manager manager(*source);
manager.Decode();
TrellisPathList nBestList;
manager.CalcNBest(nBestSize, nBestList,true);
//grid search
for (vector<float>::const_iterator pi = pgrid.begin(); pi != pgrid.end(); ++pi) {
float p = *pi;
staticData.SetLatticeMBRPrecision(p);
for (vector<float>::const_iterator ri = rgrid.begin(); ri != rgrid.end(); ++ri) {
float r = *ri;
staticData.SetLatticeMBRPRatio(r);
for (vector<float>::const_iterator prune_i = prune_grid.begin(); prune_i != prune_grid.end(); ++prune_i) {
size_t prune = (size_t)(*prune_i);
staticData.SetLatticeMBRPruningFactor(prune);
for (vector<float>::const_iterator scale_i = scale_grid.begin(); scale_i != scale_grid.end(); ++scale_i) {
float scale = *scale_i;
staticData.SetMBRScale(scale);
cout << lineCount << " ||| " << p << " " << r << " " << prune << " " << scale << " ||| ";
vector<Word> mbrBestHypo = doLatticeMBR(manager,nBestList);
manager.OutputBestHypo(mbrBestHypo, lineCount, staticData.GetReportSegmentation(),
staticData.GetReportAllFactors(),cout);
}
}

}
for (boost::shared_ptr<InputType> source = ioWrapper->ReadInput();
source != NULL; source = ioWrapper->ReadInput())
{
Manager manager(*source);
manager.Decode();
TrellisPathList nBestList;
manager.CalcNBest(nBestSize, nBestList,true);
//grid search
BOOST_FOREACH(float const& p, pgrid)
{
SD.SetLatticeMBRPrecision(p);
BOOST_FOREACH(float const& r, rgrid)
{
SD.SetLatticeMBRPRatio(r);
BOOST_FOREACH(size_t const prune_i, prune_grid)
{
SD.SetLatticeMBRPruningFactor(size_t(prune_i));
BOOST_FOREACH(float const& scale_i, scale_grid)
{
SD.SetMBRScale(scale_i);
size_t lineCount = source->GetTranslationId();
cout << lineCount << " ||| " << p << " "
<< r << " " << size_t(prune_i) << " " << scale_i
<< " ||| ";
vector<Word> mbrBestHypo = doLatticeMBR(manager,nBestList);
manager.OutputBestHypo(mbrBestHypo, lineCount,
SD.GetReportSegmentation(),
SD.GetReportAllFactors(),cout);
}
}
}
}
}


}

}
35 changes: 15 additions & 20 deletions moses-cmd/MainVW.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ int main(int argc, char** argv)
PrintUserTime("Created input-output object");
}

IOWrapper* ioWrapper = new IOWrapper();
boost::shared_ptr<IOWrapper> ioWrapper(new IOWrapper());
if (ioWrapper == NULL) {
cerr << "Error; Failed to create IO object" << endl;
exit(1);
Expand All @@ -143,37 +143,32 @@ int main(int argc, char** argv)
#endif

// main loop over set of input sentences
InputType* source = NULL;
size_t lineCount = staticData.GetStartTranslationId();
while(ioWrapper->ReadInput(staticData.GetInputType(),source)) {
source->SetTranslationId(lineCount);
IFVERBOSE(1) {
ResetUserTime();
}

boost::shared_ptr<InputType> source;
while ((source = ioWrapper->ReadInput()) != NULL)
{
IFVERBOSE(1) { ResetUserTime(); }

FeatureFunction::CallChangeSource(source);
InputType* foo = source.get();
FeatureFunction::CallChangeSource(foo);

// set up task of training one sentence
TrainingTask* task = new TrainingTask(source, *ioWrapper);
// set up task of training one sentence
boost::shared_ptr<TrainingTask>
task(new TrainingTask(source.get(), *ioWrapper));

// execute task
// execute task
#ifdef WITH_THREADS
pool.Submit(task);
pool.Submit(task);
#else
task->Run();
delete task;
task->Run();
#endif

source = NULL; //make sure it doesn't get deleted
++lineCount;
}
}

// we are done, finishing up
#ifdef WITH_THREADS
pool.Stop(true); //flush remaining jobs
#endif

delete ioWrapper;
FeatureFunction::Destroy();

} catch (const std::exception &e) {
Expand Down
64 changes: 27 additions & 37 deletions moses/ExportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,36 +101,31 @@ SimpleTranslationInterface::~SimpleTranslationInterface()
//the simplified version of string input/output translation
string SimpleTranslationInterface::translate(const string &inputString)
{
Moses::IOWrapper ioWrapper;
long lineCount = Moses::StaticData::Instance().GetStartTranslationId();
boost::shared_ptr<Moses::IOWrapper> ioWrapper(new IOWrapper);
// main loop over set of input sentences
InputType* source = NULL;
size_t sentEnd = inputString.rfind('\n'); //find the last \n, the input stream has to be appended with \n to be translated
const string &newString = sentEnd != string::npos ? inputString : inputString + '\n';

istringstream inputStream(newString); //create the stream for the interface
ioWrapper.SetInputStreamFromString(inputStream);
ioWrapper->SetInputStreamFromString(inputStream);
ostringstream outputStream;
ioWrapper.SetOutputStream2SingleBestOutputCollector(&outputStream);
ioWrapper.ReadInput(SimpleTranslationInterface::m_staticData.GetInputType(),source);
if (source)
source->SetTranslationId(lineCount);
else
return "Error: Source==null!!!";
IFVERBOSE(1) {
ResetUserTime();
}

FeatureFunction::CallChangeSource(source);
ioWrapper->SetOutputStream2SingleBestOutputCollector(&outputStream);

// set up task of translating one sentence
TranslationTask task = TranslationTask(source, ioWrapper);
task.Run();
boost::shared_ptr<InputType> source = ioWrapper->ReadInput();
if (!source) return "Error: Source==null!!!";
IFVERBOSE(1) { ResetUserTime(); }

FeatureFunction::CallChangeSource(&*source);

string output = outputStream.str();
//now trim the end whitespace
const string whitespace = " \t\f\v\n\r";
size_t end = output.find_last_not_of(whitespace);
// set up task of translating one sentence
boost::shared_ptr<TranslationTask> task
= TranslationTask::create(source, ioWrapper);
task->Run();

string output = outputStream.str();
//now trim the end whitespace
const string whitespace = " \t\f\v\n\r";
size_t end = output.find_last_not_of(whitespace);
return output.erase(end + 1);
}

Expand Down Expand Up @@ -174,9 +169,9 @@ run_as_server()
else myAbyssServer.run();

std::cerr << "xmlrpc_c::serverAbyss.run() returned but should not." << std::endl;
#pragma message("BUILDING MOSES WITH SERVER SUPPORT")
// #pragma message("BUILDING MOSES WITH SERVER SUPPORT")
#else
#pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
// #pragma message("BUILDING MOSES WITHOUT SERVER SUPPORT")
std::cerr << "Moses was compiled without server support." << endl;
#endif
return 1;
Expand All @@ -194,7 +189,8 @@ batch_run()

IFVERBOSE(1) PrintUserTime("Created input-output object");

IOWrapper* ioWrapper = new IOWrapper(); // set up read/writing class
// set up read/writing class:
boost::shared_ptr<IOWrapper> ioWrapper(new IOWrapper);
UTIL_THROW_IF2(ioWrapper == NULL, "Error; Failed to create IO object"
<< " [" << HERE << "]");

Expand All @@ -212,17 +208,17 @@ batch_run()
#endif

// main loop over set of input sentences
InputType* source = NULL;
size_t lineCount = staticData.GetStartTranslationId();
while(ioWrapper->ReadInput(staticData.GetInputType(), source))

boost::shared_ptr<InputType> source;
while ((source = ioWrapper->ReadInput()) != NULL)
{
source->SetTranslationId(lineCount);
IFVERBOSE(1) ResetUserTime();

FeatureFunction::CallChangeSource(source);
FeatureFunction::CallChangeSource(source.get());

// set up task of translating one sentence
TranslationTask* task = new TranslationTask(source, *ioWrapper);
boost::shared_ptr<TranslationTask>
task = TranslationTask::create(source, ioWrapper);

// execute task
#ifdef WITH_THREADS
Expand All @@ -234,7 +230,6 @@ batch_run()
{
// simulated post-editing: always run single-threaded!
task->Run();
delete task;
string src,trg,aln;
UTIL_THROW_IF2(!getline(*ioWrapper->spe_src,src), "[" << HERE << "] "
<< "missing update data for simulated post-editing.");
Expand All @@ -258,19 +253,14 @@ batch_run()
#endif
#else
task->Run();
delete task;
#endif

source = NULL; //make sure it doesn't get deleted
++lineCount;
}

// we are done, finishing up
#ifdef WITH_THREADS
pool.Stop(true); //flush remaining jobs
#endif

delete ioWrapper;
FeatureFunction::Destroy();

IFVERBOSE(1) util::PrintUsage(std::cerr);
Expand Down
Loading

0 comments on commit 8ca11d9

Please sign in to comment.