Skip to content

Commit

Permalink
Issue steemit#2066 Implemented account and operation filtering option…
Browse files Browse the repository at this point in the history
…s. Needs testing.
  • Loading branch information
vogel76 authored and mvandeberg committed Mar 21, 2018
1 parent 4459c17 commit c54384b
Showing 1 changed file with 146 additions and 28 deletions.
174 changes: 146 additions & 28 deletions libraries/plugins/rocksdb/rocksdb_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,43 @@ const Comparator* by_account_name_storage_id_pair_Comparator()

#define checkStatus(s) FC_ASSERT((s).ok(), "Data access failed: ${m}", ("m", (s).ToString()))

class operation_name_provider
{
public:
typedef void result_type;

static std::string getName(const operation& op)
{
operation_name_provider provider;
op.visit(provider);
return provider._name;
}

template<typename Op>
void operator()( const Op& ) const
{
_name = fc::get_typename<Op>::name();
}

private:
operation_name_provider() = default;

/// Class attributes:
private:
mutable std::string _name;
};

} /// anonymous

class rocksdb_plugin::impl final
{
public:

explicit impl(const rocksdb_plugin& mainPlugin) :
impl(const rocksdb_plugin& mainPlugin, const bpo::variables_map& options) :
_mainDb(appbase::app().get_plugin<steem::plugins::chain::chain_plugin>().db()),
_mainPlugin(mainPlugin)
{
collectOptions(options);
_pre_apply_connection = _mainDb.pre_apply_operation.connect( [&]( const operation_notification& note ){ on_operation(note); } );
}

Expand Down Expand Up @@ -393,19 +420,26 @@ class rocksdb_plugin::impl final

void on_operation(const operation_notification& opNote);

private:
void collectOptions(const boost::program_options::variables_map& options);
void collectOptions(const bpo::variables_map& options);

/** Returns true if given account is tracked.
* Depends on `"account-history-whitelist-ops"`, `account-history-blacklist-ops` option usage.
* Only some accounts can be chosen for tracking operation history.
*/
bool isTrackedAccount(const account_name_type& name) const;

/** Returns a collection of the accounts being impacted by given `op` and
* also tracked, because of passed options.
*
* \see isTrackedAccount.
*/
std::vector<account_name_type> getImpactedAccounts(const operation& op) const;

/** Returns true if given operation should be collected.
* Depends on `account-history-blacklist-ops`, `account-history-whitelist-ops`.
*/
bool isTrackedOperation(const operation& op) const;

void storeOpFilteringParameters(const std::vector<std::string>& opList,
flat_set<std::string>* storage) const;

Expand All @@ -414,6 +448,7 @@ class rocksdb_plugin::impl final
WRITE_BUFFER_FLUSH_LIMIT = 100
};

/// Class attributes:
private:
typedef flat_map< account_name_type, account_name_type > account_name_range_index;

Expand All @@ -427,22 +462,28 @@ class rocksdb_plugin::impl final
/// Helper member to be able to detect another incomming tx and increment tx-counter.
transaction_id_type _lastTx;
size_t _txNo = 0;
/// Total processed ops in this session.
/// Total processed ops in this session (counts every operation, even excluded by filtering).
size_t _totalOps = 0;
/// Total number of ops being skipped by filtering options.
size_t _excludedOps = 0;

/// IDs to be assigned to object.id field.
size_t _operationSeqId = 0;
size_t _accountHistorySeqId = 0;

/// Number of data-chunks for ops being stored inside _writeBuffer. To decide when to flush.
unsigned int _collectedOps = 0;
bool _filter_content = false;
bool _blacklist = false;
bool _prune = true;
/** Limit which value depends on block data source:
* - if blocks come from network, there is no need for delaying write, becasue they appear quite rare (limit == 1)
* - if reindex process or direct import has been spawned, this massive operation can need reduction of direct writes (limit == WRITE_BUFFER_FLUSH_LIMIT).
*/
unsigned int _collectedOpsWriteLimit = WRITE_BUFFER_FLUSH_LIMIT;

account_name_range_index _tracked_accounts;
flat_set< string > _op_list;
flat_set< string > _blacklisted_op_list;
flat_set<std::string> _op_list;
flat_set<std::string> _blacklisted_op_list;

bool _prune = true;
};

void rocksdb_plugin::impl::collectOptions(const boost::program_options::variables_map& options)
Expand Down Expand Up @@ -494,16 +535,81 @@ void rocksdb_plugin::impl::collectOptions(const boost::program_options::variable
_prune = !options["history-disable-pruning"].as<bool>();
}

bool rocksdb_plugin::impl::isTrackedAccount(const account_name_type& name) const
inline bool rocksdb_plugin::impl::isTrackedAccount(const account_name_type& name) const
{
FC_TODO("NOT IMPLEMENTED YET");
return true;
if(_tracked_accounts.empty())
return true;

/// Code below is based on original contents of account_history_plugin_impl::on_operation
auto itr = _tracked_accounts.lower_bound(name);

/*
* The map containing the ranges uses the key as the lower bound and the value as the upper bound.
* Because of this, if a value exists with the range (key, value], then calling lower_bound on
* the map will return the key of the next pair. Under normal circumstances of those ranges not
* intersecting, the value we are looking for will not be present in range that is returned via
* lower_bound.
*
* Consider the following example using ranges ["a","c"], ["g","i"]
* If we are looking for "bob", it should be tracked because it is in the lower bound.
* However, lower_bound( "bob" ) returns an iterator to ["g","i"]. So we need to decrement the iterator
* to get the correct range.
*
* If we are looking for "g", lower_bound( "g" ) will return ["g","i"], so we need to make sure we don't
* decrement.
*
* If the iterator points to the end, we should check the previous (equivalent to rbegin)
*
* And finally if the iterator is at the beginning, we should not decrement it for obvious reasons
*/
if(itr != _tracked_accounts.begin() &&
((itr != _tracked_accounts.end() && itr->first != name ) || itr == _tracked_accounts.end()))
{
--itr;
}

bool inRange = itr != _tracked_accounts.end() && itr->first <= name && name <= itr->second;
return inRange;
}

bool rocksdb_plugin::impl::isTrackedOperation(const operation& op) const
std::vector<account_name_type> rocksdb_plugin::impl::getImpactedAccounts(const operation& op) const
{
FC_TODO("NOT IMPLEMENTED YET");
return true;
flat_set<account_name_type> impacted;
steem::app::operation_get_impacted_accounts(op, impacted);
std::vector<account_name_type> retVal;

if(impacted.empty())
return retVal;

if(_tracked_accounts.empty())
{
retVal.insert(retVal.end(), impacted.begin(), impacted.end());
return retVal;
}

retVal.reserve(impacted.size());

for(const auto& name : impacted)
{
if(isTrackedAccount(name))
retVal.push_back(name);
}

return retVal;
}

inline bool rocksdb_plugin::impl::isTrackedOperation(const operation& op) const
{
if(_op_list.empty() && _blacklisted_op_list.empty())
return true;

auto opName = operation_name_provider::getName(op);

if(_blacklisted_op_list.find(opName) != _blacklisted_op_list.end())
return false;

bool isTracked = (_op_list.empty() || (_op_list.find(opName) != _op_list.end()));
return isTracked;
}

void rocksdb_plugin::impl::storeOpFilteringParameters(const std::vector<std::string>& opList,
Expand Down Expand Up @@ -704,14 +810,16 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi

if(blockNum % 10000 == 0 && txInBlock == 0 && opInTx == 0)
{
ilog( "RocksDb data import processed blocks: ${n}, containing: ${tx} transactions and ${op} operations.",
ilog("RocksDb data import processed blocks: ${n}, containing: ${tx} transactions and ${op} operations."
" ${ep} operations have been filtered out due to configured options.",
("n", blockNum)
("tx", _txNo)
("op", _totalOps));
("op", _totalOps)
("ep", _excludedOps)
);
}

flat_set<account_name_type> impacted;
steem::app::operation_get_impacted_accounts(op, impacted);
auto impacted = getImpactedAccounts(op);

if(impacted.empty())
return; /// Ignore operations not impacting any account (according to original implementation)
Expand Down Expand Up @@ -745,15 +853,22 @@ void rocksdb_plugin::impl::importOperation(uint32_t blockNum, const fc::time_poi
s = _writeBuffer.Put(_columnHandles[2], blockNoIdSlice, idSlice);
checkStatus(s);

for(const auto& name : impacted)
if(isTrackedOperation(op))
{
account_name_storage_id_pair nameIdPair(name.data, _accountHistorySeqId++);
PrimitiveTypeSlice<account_name_storage_id_pair> nameIdPairSlice(nameIdPair);
s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice);
checkStatus(s);
for(const auto& name : impacted)
{
account_name_storage_id_pair nameIdPair(name.data, _accountHistorySeqId++);
PrimitiveTypeSlice<account_name_storage_id_pair> nameIdPairSlice(nameIdPair);
s = _writeBuffer.Put(_columnHandles[3], nameIdPairSlice, idSlice);
checkStatus(s);
}
}
else
{
++_excludedOps;
}

if(++_collectedOps >= WRITE_BUFFER_FLUSH_LIMIT)
if(++_collectedOps >= _collectedOpsWriteLimit)
flushWriteBuffer();

++_totalOps;
Expand All @@ -775,6 +890,7 @@ void rocksdb_plugin::impl::importData(unsigned int blockLimit)
_lastTx = transaction_id_type();
_txNo = 0;
_totalOps = 0;
_excludedOps = 0;

benchmark_dumper dumper;
dumper.initialize();
Expand Down Expand Up @@ -812,10 +928,12 @@ void rocksdb_plugin::impl::importData(unsigned int blockLimit)
("cm", measure.current_mem)
("pm", measure.peak_mem) );

ilog( "RocksDb data import finished. Processed blocks: ${n}, containing: ${tx} transactions and ${op} operations.",
ilog( "RocksDb data import finished. Processed blocks: ${n}, containing: ${tx} transactions and ${op} operations. ${ep} operations have been filtered out due to configured options.",
("n", blockNo)
("tx", _txNo)
("op", _totalOps));
("op", _totalOps)
("ep", _excludedOps)
);
}

void rocksdb_plugin::impl::on_operation(const operation_notification& n)
Expand Down Expand Up @@ -862,7 +980,7 @@ void rocksdb_plugin::set_program_options(

void rocksdb_plugin::plugin_initialize(const boost::program_options::variables_map& options)
{
_my = std::make_unique<impl>(*this);
_my = std::make_unique<impl>(*this, options);

if(options.count("rocksdb-path"))
_dbPath = options.at("rocksdb-path").as<bfs::path>();
Expand Down

0 comments on commit c54384b

Please sign in to comment.