Skip to content

Commit

Permalink
[FSTORE-351] epipe - provenance index - handle resource folders corre…
Browse files Browse the repository at this point in the history
…ctly (#79)

* [FSTORE-351] epipe - provenance index - handle resource folders correctly

* wrong resource dir
  • Loading branch information
o-alex authored Oct 7, 2022
1 parent 387c140 commit 8ee239f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
56 changes: 39 additions & 17 deletions include/FileProvenanceConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
#include "tables/INodeTable.h"

namespace FileProvenanceConstants {
const std::string featureViewFolder = ".featureviews";
const std::set<std::string> featureGroupResourceFolders = {"code","storage_connector_resources"};
const std::set<std::string> trainingDatasetResourceFolders = {"code", "transformation_functions"};
const std::set<std::string> featureViewResourceFolders = {};

const std::string README_FILE = "README.md";

Expand Down Expand Up @@ -160,12 +164,6 @@ namespace FileProvenanceConstants {
return row.mDatasetName == part;
}

inline bool isDatasetName2(FileProvenanceRow row, std::string part) {
std::stringstream mlDataset;
mlDataset << row.mProjectName << "_" << part;
return row.mDatasetName == mlDataset.str();
}

inline bool isReadmeFile(FileProvenanceRow row) {
return row.mInodeName == README_FILE;
}
Expand Down Expand Up @@ -256,23 +254,32 @@ namespace FileProvenanceConstants {
}

inline std::string featureViewArtifact(Int64 parentIId, Int64 datasetIId, Int64 featureViewIId, std::string inodeName) {
if(parentIId == featureViewIId) {
return "featureview";
} else if(parentIId == datasetIId) {
if(inodeName == ".featureviews" || inodeName == "code" || inodeName == "transformation_functions") {
if(parentIId == datasetIId) {
const bool isResourceFolder =
trainingDatasetResourceFolders.find(inodeName) != trainingDatasetResourceFolders.end();
if (inodeName == featureViewFolder || isResourceFolder) {
LOG_DEBUG("skipping training dataset basic folder:" << inodeName);
return DONT_EXIST_STR();
} else {
return "trainingdataset";
}
} else if(parentIId == featureViewIId) {
const bool isResourceFolder = featureViewResourceFolders.find(inodeName) != featureViewResourceFolders.end();
if(isResourceFolder) {
LOG_DEBUG("skipping training dataset basic folder:" << inodeName);
return DONT_EXIST_STR();
} else {
return "featureview";
}
} else {
return DONT_EXIST_STR();
}
}

inline std::string featureStoreArtifact(Int64 parentIId, Int64 datasetIId, std::string inodeName) {
if(parentIId == datasetIId) {
if(inodeName == "code" || inodeName == "storage_connector_resources") {
const bool isResourceFolder = featureGroupResourceFolders.find(inodeName) != featureGroupResourceFolders.end();
if(isResourceFolder) {
LOG_DEBUG("skipping feature store basic folder:" << inodeName);
return DONT_EXIST_STR();
} else {
Expand All @@ -289,7 +296,7 @@ namespace FileProvenanceConstants {
}

//we do not know the partitionId
INodeRow row = inodesTable.get(conn, parentIId, ".featureviews", parentIId);
INodeRow row = inodesTable.get(conn, parentIId, featureViewFolder, parentIId);
if(row.mId != 0) {
FeatureViewInodeCache::getInstance().add(parentIId, row.mId);
}
Expand All @@ -310,15 +317,18 @@ namespace FileProvenanceConstants {
}

inline bool typeMLFeature(FileProvenanceRow row) {
return row.mProjectId == -1 && row.mDatasetName == featurestoreName(row.mProjectName);
const bool isHiveDir = row.mProjectId == -1;
const bool isFeaturestoreFolder = row.mDatasetName == featurestoreName(row.mProjectName);
const bool is_resource_folder = featureGroupResourceFolders.find(row.mP1Name) != featureGroupResourceFolders.end();
return isHiveDir && isFeaturestoreFolder && !is_resource_folder;
}

inline bool isMLFeature(FileProvenanceRow row) {
return typeMLFeature(row) && row.mDatasetId == row.mParentId;
return typeMLFeature(row) && oneLvlDeep(row);
}

inline bool partOfMLFeature(FileProvenanceRow row) {
return typeMLFeature(row) && row.mDatasetId != row.mParentId;
return typeMLFeature(row) && onePlusLvlDeep(row);
}

inline std::string getMLFeatureId(FileProvenanceRow row) {
Expand All @@ -329,12 +339,24 @@ namespace FileProvenanceConstants {
return oneNameForPart(row);
}

inline std::string getTrainingDatasetFolder(std::string projectName) {
std::stringstream name;
name << projectName << "_Training_Datasets";\
return name.str();
}

inline bool typeMLTDataset(FileProvenanceRow row) {
const bool isTrainingDatasetFolder = getTrainingDatasetFolder(row.mProjectName) == row.mDatasetName;
const bool isResourceFolder = trainingDatasetResourceFolders.find(row.mP1Name) != trainingDatasetResourceFolders.end();
const bool isFeatureViewFolder = row.mP1Name == featureViewFolder;
return isTrainingDatasetFolder && !isResourceFolder && !isFeatureViewFolder;
}
inline bool isMLTDataset(FileProvenanceRow row) {
return isDatasetName2(row, "Training_Datasets") && oneLvlDeep(row);
return typeMLTDataset(row) && oneLvlDeep(row);
}

inline bool partOfMLTDataset(FileProvenanceRow row) {
return isDatasetName2(row, "Training_Datasets") && onePlusLvlDeep(row);
return typeMLTDataset(row) && onePlusLvlDeep(row);
}

inline std::string getMLTDatasetId(FileProvenanceRow row) {
Expand Down
3 changes: 1 addition & 2 deletions include/Notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ class Notifier : public ClusterConnectionBase {
const std::string elastic_app_provenance_index,
const int elastic_batch_size, const int elastic_issue_time,
const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery, const bool stats,
Barrier barrier, const bool hiveCleaner, const std::string
metricsServer);
Barrier barrier, const bool hiveCleaner, const std::string metricsServer);
void start();
virtual ~Notifier();

Expand Down
3 changes: 1 addition & 2 deletions src/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ Notifier::Notifier(const char* connection_string, const char* database_name,
const std::string elastic_app_provenance_index,
const int elastic_batch_size, const int elastic_issue_time,
const int lru_cap, const int prov_file_lru_cap, const int prov_core_lru_cap, const bool recovery,
const bool stats, Barrier barrier, const bool hiveCleaner, const
std::string metricsServer)
const bool stats, Barrier barrier, const bool hiveCleaner, const std::string metricsServer)
: ClusterConnectionBase(connection_string, database_name, meta_database_name, hive_meta_database_name),
mMutationsTU(mutations_tu), mFileProvenanceTU(elastic_provenance_tu), mAppProvenanceTU(elastic_provenance_tu),
mPollMaxTimeToWait(poll_maxTimeToWait), mElasticClientConfig(elastic_client_config), mHopsworksEnabled(hopsworks),
Expand Down

0 comments on commit 8ee239f

Please sign in to comment.