Skip to content

Commit

Permalink
[HOPSWORKS-1800] prov-core xattr - remove scan operations - add cache (
Browse files Browse the repository at this point in the history
  • Loading branch information
o-alex authored Jun 29, 2020
1 parent 817fe9b commit f01408b
Show file tree
Hide file tree
Showing 23 changed files with 751 additions and 415 deletions.
2 changes: 2 additions & 0 deletions config.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ meta_database = hopsworks
hive_meta_database = metastore
poll_maxTimeToWait = 2000
lru_cap = 10000
prov_file_lru_cap = 10000
prov_core_lru_cap = 100
recovery = false

# log level trace=0, debug=1, info=2, warn=3, error=4, fatal=5
Expand Down
45 changes: 1 addition & 44 deletions include/FileProvenanceConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/classification.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/assign.hpp>
#include "tables/FileProvenanceLogTable.h"
#include "FileProvenanceConstantsRaw.h"

namespace FileProvenanceConstants {

const Int8 XATTRS_USER_NAMESPACE = 5;
const std::string README_FILE = "README.md";

const std::string TYPE_NONE = "NONE";
Expand All @@ -44,7 +43,6 @@ namespace FileProvenanceConstants {

const std::string XATTR = "xattr_prov";

const std::string XATTR_PROV_CORE = "core";
const std::string XATTR_PROJECT_IID = "project_iid"; //part of project core

const std::string PROV_TYPE_STORE_NONE = "NONE";
Expand Down Expand Up @@ -104,51 +102,10 @@ namespace FileProvenanceConstants {
}
};

const std::string H_OP_CREATE = "CREATE";
const std::string H_OP_DELETE = "DELETE";
const std::string H_OP_ACCESS_DATA = "ACCESS_DATA";
const std::string H_OP_MODIFY_DATA = "MODIFY_DATA";
const std::string H_OP_METADATA = "METADATA";
const std::string H_OP_XATTR_ADD = "XATTR_ADD";
const std::string H_OP_XATTR_UPDATE = "XATTR_UPDATE";
const std::string H_OP_XATTR_DELETE = "XATTR_DELETE";
const std::string H_OP_OTHER = "OTHER";

const std::string ELASTIC_NOP = "\n";
const std::string ELASTIC_NOP2 = "\n\n";

enum Operation {
OP_CREATE,
OP_DELETE,
OP_ACCESS_DATA,
OP_MODIFY_DATA,
OP_XATTR_ADD,
OP_XATTR_UPDATE,
OP_XATTR_DELETE,
OP_METADATA,
OP_OTHER
};

const boost::unordered_map<std::string, Operation> ops = boost::assign::map_list_of
(H_OP_CREATE, OP_CREATE)
(H_OP_DELETE, OP_DELETE)
(H_OP_ACCESS_DATA, OP_ACCESS_DATA)
(H_OP_MODIFY_DATA, OP_MODIFY_DATA)
(H_OP_XATTR_ADD, OP_XATTR_ADD)
(H_OP_XATTR_UPDATE, OP_XATTR_UPDATE)
(H_OP_XATTR_DELETE, OP_XATTR_DELETE)
(H_OP_METADATA, OP_METADATA)
(H_OP_OTHER, OP_OTHER);

inline Operation findOp(const FileProvenanceRow row) {
if(ops.find(row.mOperation) == ops.end()) {
LOG_WARN("no such operation:" << row.mOperation);
std::stringstream cause;
cause << "no such operation:" << row.mOperation;
throw std::logic_error(cause.str());
}
return ops.at(row.mOperation);
}
const std::string APP_SUBMITTED_STATE = "SUBMITTED";
const std::string APP_RUNNING_STATE = "RUNNING";

Expand Down
53 changes: 53 additions & 0 deletions include/FileProvenanceConstantsRaw.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#ifndef EPIPE_FILEPROVENANCECONSTANTSRAW_H
#define EPIPE_FILEPROVENANCECONSTANTSRAW_H

#include <boost/assign.hpp>

namespace FileProvenanceConstantsRaw {
const Int8 XATTRS_USER_NAMESPACE = 5;
const std::string XATTR_PROV_CORE = "core";

const std::string H_OP_CREATE = "CREATE";
const std::string H_OP_DELETE = "DELETE";
const std::string H_OP_ACCESS_DATA = "ACCESS_DATA";
const std::string H_OP_MODIFY_DATA = "MODIFY_DATA";
const std::string H_OP_METADATA = "METADATA";
const std::string H_OP_XATTR_ADD = "XATTR_ADD";
const std::string H_OP_XATTR_UPDATE = "XATTR_UPDATE";
const std::string H_OP_XATTR_DELETE = "XATTR_DELETE";
const std::string H_OP_OTHER = "OTHER";

enum Operation {
OP_CREATE,
OP_DELETE,
OP_ACCESS_DATA,
OP_MODIFY_DATA,
OP_XATTR_ADD,
OP_XATTR_UPDATE,
OP_XATTR_DELETE,
OP_METADATA,
OP_OTHER
};

const boost::unordered_map<std::string, Operation> ops = boost::assign::map_list_of
(H_OP_CREATE, OP_CREATE)
(H_OP_DELETE, OP_DELETE)
(H_OP_ACCESS_DATA, OP_ACCESS_DATA)
(H_OP_MODIFY_DATA, OP_MODIFY_DATA)
(H_OP_XATTR_ADD, OP_XATTR_ADD)
(H_OP_XATTR_UPDATE, OP_XATTR_UPDATE)
(H_OP_XATTR_DELETE, OP_XATTR_DELETE)
(H_OP_METADATA, OP_METADATA)
(H_OP_OTHER, OP_OTHER);

inline Operation findOp(std::string operation) {
if(ops.find(operation) == ops.end()) {
std::stringstream cause;
cause << "no such operation:" << operation;
LOG_WARN(cause.str());
throw std::logic_error(cause.str());
}
return ops.at(operation);
}
}
#endif //EPIPE_FILEPROVENANCECONSTANTSRAW_H
3 changes: 2 additions & 1 deletion include/FileProvenanceElastic.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
class FileProvenanceElastic : public ElasticSearchBase {
public:
FileProvenanceElastic(const HttpClientConfig elastic_client_config,int time_to_wait_before_inserting, int bulk_size,
const bool stats, SConn conn);
const bool stats, SConn conn, int file_lru_cap, int xattr_lru_cap);

virtual ~FileProvenanceElastic();
private:
SConn mConn;
FileProvenanceLogTable mFileProvTable;

void intProcessOneByOne(eBulk bulk);
bool intProcessBatch(std::string val, std::vector<eBulk>* bulks, std::vector<const LogHandler*> cleanupHandlers, ptime start_time);
Expand Down
15 changes: 8 additions & 7 deletions include/FileProvenanceElasticDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,38 +33,39 @@ struct ProcessRowResult {
std::list<std::string> mElasticOps;
FileProvenancePK mLogPK;
boost::optional<FPXAttrBufferPK> mCompanionPK;
FileProvenanceConstants::Operation mProvOp;
FileProvenanceConstantsRaw::Operation mProvOp;
};

class FileProvenanceElasticDataReader : public NdbDataReader<FileProvenanceRow, SConn> {
public:
FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int lru_cap);
FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_cap);
virtual ~FileProvenanceElasticDataReader();
protected:

private:
FileProvenanceLogTable mFileLogTable;
FileProvenanceXAttrBufferTable mXAttrBuffer;
INodeTable inodesTable;

void processAddedandDeleted(Pq* data_batch, eBulk& bulk);
ProcessRowResult rowResult(std::list<std::string> elasticOps, FileProvenancePK logPK,
boost::optional<FPXAttrBufferPK> companionPK, FileProvenanceConstants::Operation provOp);
boost::optional<FPXAttrBufferPK> companionPK, FileProvenanceConstantsRaw::Operation provOp);
ProcessRowResult process_row(FileProvenanceRow row);
bool projectExists(Int64 projectIId, Int64 timestamp);
FPXAttrBufferRow readBufferedXAttr(FPXAttrBufferPK xattrBufferKey);
boost::optional<FPXAttrBufferRow> getProvCore(FPXAttrVersionsK versionsKey);
boost::optional<FPXAttrBufferRow> getProvCore(Int64 inodeId, int inodeLogicalTime);
boost::optional<FPXAttrBufferRow> readProvCore(Int64 inodeId, int fromLogicalTime, int toLogicalTime);
ULSet getViewInodes(Pq* data_batch);
std::string getElasticBulkOps(std::list <std::string> bulkOps);
};

class FileProvenanceElasticDataReaders : public NdbDataReaders<FileProvenanceRow, SConn>{
public:
FileProvenanceElasticDataReaders(SConn* hopsConns, int num_readers,const bool hopsworks,
TimedRestBatcher* restEndpoint, int lru_cap) :
TimedRestBatcher* restEndpoint, int prov_file_lru_cap, int prov_core_lru_cap, int inodes_lru_ca) :
NdbDataReaders(restEndpoint){
for(int i=0; i<num_readers; i++){
FileProvenanceElasticDataReader* dr
= new FileProvenanceElasticDataReader(hopsConns[i], hopsworks, lru_cap);
= new FileProvenanceElasticDataReader(hopsConns[i], hopsworks, prov_file_lru_cap, prov_core_lru_cap, inodes_lru_ca);
dr->start(i, this);
mDataReaders.push_back(dr);
}
Expand Down
2 changes: 1 addition & 1 deletion include/FileProvenanceTableTailer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class FileProvenanceTableTailer : public RCTableTailer<FileProvenanceRow> {
public:
FileProvenanceTableTailer(Ndb* ndb, Ndb* ndbRecovery, const int poll_maxTimeToWait, const Barrier barrier);
FileProvenanceTableTailer(Ndb* ndb, Ndb* ndbRecovery, const int poll_maxTimeToWait, const Barrier barrier, int prov_file_lru_cap, int prov_core_lru_ca);
FileProvenanceRow consume();
virtual ~FileProvenanceTableTailer();

Expand Down
5 changes: 4 additions & 1 deletion include/Notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class Notifier : public ClusterConnectionBase {
const int poll_maxTimeToWait, const HttpClientConfig elastic_client_config, const bool hopsworks,
const std::string elastic_search_index, const std::string elastic_featurestore_index,
const std::string elastic_app_provenance_index,
const int elastic_batch_size, const int elastic_issue_time, const int lru_cap, const bool recovery, const bool stats,
const int elastic_batch_size, const int elastic_issue_time,
const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery, const bool stats,
Barrier barrier, const bool hiveCleaner, const std::string
metricsServer);
void start();
Expand All @@ -68,6 +69,8 @@ class Notifier : public ClusterConnectionBase {
const int mElasticBatchsize;
const int mElasticIssueTime;
const int mLRUCap;
const int mProvFileLRUCap;
const int mProvCoreLRUCap;
const bool mRecovery;
const bool mStats;
const Barrier mBarrier;
Expand Down
18 changes: 9 additions & 9 deletions include/TimedRestBatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ struct eBulk {

};

struct ParsingResponse{
bool mSuccess;
bool mRetryable;
std::string errorMsg;
};

class TimedRestBatcher : public Batcher {
public:
TimedRestBatcher(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size);
Expand All @@ -213,16 +219,11 @@ class TimedRestBatcher : public Batcher {
ptime mTimeElasticConnectionFailed;
Uint32 mCurrentQueueSize;

bool httpPostRequest(std::string requestUrl, std::string json);
bool httpDeleteRequest(std::string requestUrl);
ParsingResponse httpPostRequest(std::string requestUrl, std::string json);
ParsingResponse httpDeleteRequest(std::string requestUrl);

virtual void process(std::vector<eBulk>* data) = 0;

struct ParsingResponse{
bool mSuccess;
bool mRetryable;
};

virtual ParsingResponse parseResponse(std::string response) = 0;

private:
Expand All @@ -242,8 +243,7 @@ class TimedRestBatcher : public Batcher {
DELETE
};

bool handleHttpRequestWithRetry(HttpVerb verb,std::string
requestUrl, std::string json);
ParsingResponse handleHttpRequestWithRetry(HttpVerb verb, std::string requestUrl, std::string json);

};
#endif //TIMEDRESTBATCHER_H
9 changes: 5 additions & 4 deletions include/tables/DBTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ template<typename TableRow>
class DBTable : public DBTableBase {
public:
DBTable(const std::string table);
DBTable(const std::string table, const DBTableBase* companionTableBase);
DBTable(const std::string table, DBTableBase* companionTableBase);

void getAll(Ndb* connection);
bool next();
Expand All @@ -55,14 +55,16 @@ class DBTable : public DBTableBase {
NdbTransaction* mCurrentTransaction;
NdbOperation* mCurrentOperation;
NdbRecAttr** mCurrentRow;
const DBTableBase* mCompanionTableBase;

const NdbDictionary::Table* mCompanionTable;

void close();
void applyConditionOnOperation(NdbOperation* operation, AnyMap& any);
void applyConditionOnOperationOnCompanion(NdbOperation* operation, AnyMap& any);

protected:
DBTableBase* mCompanionTableBase;

NdbRecAttr** getColumnValues(NdbOperation* op);
void start(Ndb* connection);
void start(Ndb* connection, boost::optional<Int64> partitionId);
Expand Down Expand Up @@ -101,7 +103,7 @@ DBTable<TableRow>::DBTable(const std::string table)
}

template<typename TableRow>
DBTable<TableRow>::DBTable(const std::string table, const DBTableBase* companionTableBase)
DBTable<TableRow>::DBTable(const std::string table, DBTableBase* companionTableBase)
: DBTableBase(table), mReadEpoch(false), mCompanionTableBase(companionTableBase) {
}

Expand All @@ -110,7 +112,6 @@ void DBTable<TableRow>::setReadEpoch(bool readEpoch) {
mReadEpoch = readEpoch;
LOG_DEBUG(getName() << " -- ReadEpoch : " << mReadEpoch);
}

template<typename TableRow>
NdbRecAttr** DBTable<TableRow>::getColumnValues(NdbOperation* op) {
int numCols = mReadEpoch ? getNoColumns() + 1 : getNoColumns();
Expand Down
4 changes: 2 additions & 2 deletions include/tables/DBWatchTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ template<typename TableRow>
class DBWatchTable : public DBTable<TableRow> {
public:
DBWatchTable(const std::string table);
DBWatchTable(const std::string table, const DBTableBase* companionTable);
DBWatchTable(const std::string table, DBTableBase* companionTable);
evtvec_size_type getNoEvents() const;
NdbDictionary::Event::TableEvent getEvent(evtvec_size_type index) const;
EpochsRowsMap<TableRow> getAllForRecovery(Ndb* connection);
Expand All @@ -73,7 +73,7 @@ DBWatchTable<TableRow>::DBWatchTable(const std::string table) : DBTable<TableRow
}

template<typename TableRow>
DBWatchTable<TableRow>::DBWatchTable(const std::string table, const DBTableBase* companionTable) :
DBWatchTable<TableRow>::DBWatchTable(const std::string table, DBTableBase* companionTable) :
DBTable<TableRow>(table, companionTable) {
}

Expand Down
Loading

0 comments on commit f01408b

Please sign in to comment.