Skip to content

Commit

Permalink
Merge pull request steemit#2372 from steemit/2347-generate-blocks-bat…
Browse files Browse the repository at this point in the history
…ch-write

generate_block uses batch write queue
  • Loading branch information
Michael Vandeberg authored Apr 20, 2018
2 parents 28ddeaf + 7ecbe6b commit 2340f2a
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 14 deletions.
80 changes: 78 additions & 2 deletions libraries/plugins/chain/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,22 @@ namespace asio = boost::asio;

#define NUM_THREADS 1

typedef fc::static_variant< const signed_block*, const signed_transaction* > write_request_ptr;
struct generate_block_request
{
generate_block_request( const fc::time_point_sec w, const account_name_type& wo, const fc::ecc::private_key& priv_key, uint32_t s ) :
when( w ),
witness_owner( wo ),
block_signing_private_key( priv_key ),
skip( s ) {}

const fc::time_point_sec when;
const account_name_type& witness_owner;
const fc::ecc::private_key& block_signing_private_key;
uint32_t skip;
signed_block block;
};

typedef fc::static_variant< const signed_block*, const signed_transaction*, generate_block_request* > write_request_ptr;
typedef fc::static_variant< boost::promise< void >*, fc::future< void >* > promise_ptr;

struct write_context
Expand Down Expand Up @@ -68,6 +83,7 @@ class chain_plugin_impl
bool running = true;
std::shared_ptr< std::thread > write_processor_thread;
boost::lockfree::queue< write_context* > write_queue;
int16_t write_lock_hold_time = 500;

database db;
};
Expand Down Expand Up @@ -124,6 +140,33 @@ struct write_request_visitor

return result;
}

bool operator()( generate_block_request* req )
{
bool result = false;

try
{
req->block = db->generate_block(
req->when,
req->witness_owner,
req->block_signing_private_key,
req->skip
);
result = true;
}
catch( fc::exception& e )
{
*except = e;
}
catch( ... )
{
*except = fc::unhandled_exception( FC_LOG_MESSAGE( warn, "Unexpected exception while pushing block." ),
std::current_exception() );
}

return result;
}
};

struct request_promise_visitor
Expand Down Expand Up @@ -192,7 +235,7 @@ void chain_plugin_impl::start_write_processing()
is_syncing = false;
}

if( !is_syncing && fc::time_point::now() - start > fc::milliseconds( 500 ) )
if( !is_syncing && write_lock_hold_time >= 0 && fc::time_point::now() - start > fc::milliseconds( write_lock_hold_time ) )
{
break;
}
Expand Down Expand Up @@ -495,6 +538,39 @@ void chain_plugin::accept_transaction( const steem::chain::signed_transaction& t
return;
}

steem::chain::signed_block chain_plugin::generate_block(
const fc::time_point_sec when,
const account_name_type& witness_owner,
const fc::ecc::private_key& block_signing_private_key,
uint32_t skip )
{
generate_block_request req( when, witness_owner, block_signing_private_key, skip );
boost::promise< void > prom;
write_context cxt;
cxt.req_ptr = &req;
cxt.prom_ptr = &prom;

my->write_queue.push( &cxt );

prom.get_future().get();

if( cxt.except ) throw *(cxt.except);

FC_ASSERT( cxt.success, "Block could not be generated" );

return req.block;
}

int16_t chain_plugin::set_write_lock_hold_time( int16_t new_time )
{
FC_ASSERT( get_state() == appbase::abstract_plugin::state::initialized,
"Can only change write_lock_hold_time while chain_plugin is initialized." );

int16_t old_time = my->write_lock_hold_time;
my->write_lock_hold_time = new_time;
return old_time;
}

bool chain_plugin::block_is_on_preferred_chain(const steem::chain::block_id_type& block_id )
{
// If it's not known, it's not preferred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class chain_plugin : public plugin< chain_plugin >
virtual ~chain_plugin();

bfs::path state_storage_dir() const;

static const std::string& name() { static std::string name = STEEM_CHAIN_PLUGIN_NAME; return name; }

virtual void set_program_options( options_description& cli, options_description& cfg ) override;
Expand All @@ -35,6 +35,25 @@ class chain_plugin : public plugin< chain_plugin >

bool accept_block( const steem::chain::signed_block& block, bool currently_syncing, uint32_t skip );
void accept_transaction( const steem::chain::signed_transaction& trx );
steem::chain::signed_block generate_block(
const fc::time_point_sec when,
const account_name_type& witness_owner,
const fc::ecc::private_key& block_signing_private_key,
uint32_t skip = database::skip_nothing
);

/**
* Sets the time (in ms) that the write thread will hold the lock for.
* A time of -1 is no limit and pre-empts all readers. A time of 0 will
* only ever hold to lock for a single write before returning to readers.
* By default, this value is 500 ms.
*
* This value cannot be changed once the plugin is started.
*
* The old value is returned so that plugins can respect overrides from
* other plugins.
*/
int16_t set_write_lock_hold_time( int16_t new_time );

bool block_is_on_preferred_chain( const steem::chain::block_id_type& block_id );

Expand Down
26 changes: 15 additions & 11 deletions libraries/plugins/witness/witness_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace detail {
public:
witness_plugin_impl( boost::asio::io_service& io ) :
_timer(io),
_chain_plugin( appbase::app().get_plugin< steem::plugins::chain::chain_plugin >() ),
_db( appbase::app().get_plugin< steem::plugins::chain::chain_plugin >().db() ) {}

void on_pre_apply_transaction( const chain::transaction_notification& note );
Expand All @@ -72,7 +73,8 @@ namespace detail {
std::set< steem::protocol::account_name_type > _witnesses;
boost::asio::deadline_timer _timer;

chain::database& _db;
plugins::chain::chain_plugin& _chain_plugin;
chain::database& _db;
boost::signals2::connection _pre_apply_operation_conn;
boost::signals2::connection _post_apply_block_conn;
boost::signals2::connection _pre_apply_transaction_conn;
Expand Down Expand Up @@ -464,24 +466,23 @@ namespace detail {

block_production_condition::block_production_condition_enum witness_plugin_impl::maybe_produce_block(fc::mutable_variant_object& capture)
{
chain::database& db = appbase::app().get_plugin< steem::plugins::chain::chain_plugin >().db();
fc::time_point now_fine = fc::time_point::now();
fc::time_point_sec now = now_fine + fc::microseconds( 500000 );

// If the next block production opportunity is in the present or future, we're synced.
if( !_production_enabled )
{
if( db.get_slot_time(1) >= now )
if( _db.get_slot_time(1) >= now )
_production_enabled = true;
else
return block_production_condition::not_synced;
}

// is anyone scheduled to produce now or one second in the future?
uint32_t slot = db.get_slot_at_time( now );
uint32_t slot = _db.get_slot_at_time( now );
if( slot == 0 )
{
capture("next_time", db.get_slot_time(1));
capture("next_time", _db.get_slot_time(1));
return block_production_condition::not_time_yet;
}

Expand All @@ -493,18 +494,18 @@ namespace detail {
// which would result in allowing a later block to have a timestamp
// less than or equal to the previous block
//
assert( now > db.head_block_time() );
assert( now > _db.head_block_time() );

chain::account_name_type scheduled_witness = db.get_scheduled_witness( slot );
chain::account_name_type scheduled_witness = _db.get_scheduled_witness( slot );
// we must control the witness scheduled to produce the next block.
if( _witnesses.find( scheduled_witness ) == _witnesses.end() )
{
capture("scheduled_witness", scheduled_witness);
return block_production_condition::not_my_turn;
}

fc::time_point_sec scheduled_time = db.get_slot_time( slot );
chain::public_key_type scheduled_key = db.get< chain::witness_object, chain::by_name >(scheduled_witness).signing_key;
fc::time_point_sec scheduled_time = _db.get_slot_time( slot );
chain::public_key_type scheduled_key = _db.get< chain::witness_object, chain::by_name >(scheduled_witness).signing_key;
auto private_key_itr = _private_keys.find( scheduled_key );

if( private_key_itr == _private_keys.end() )
Expand All @@ -514,7 +515,7 @@ namespace detail {
return block_production_condition::no_private_key;
}

uint32_t prate = db.witness_participation_rate();
uint32_t prate = _db.witness_participation_rate();
if( prate < _required_witness_participation )
{
capture("pct", uint32_t(100*uint64_t(prate) / STEEM_1_PERCENT));
Expand All @@ -527,7 +528,7 @@ namespace detail {
return block_production_condition::lag;
}

auto block = db.generate_block(
auto block = _chain_plugin.generate_block(
scheduled_time,
scheduled_witness,
private_key_itr->second,
Expand Down Expand Up @@ -594,6 +595,9 @@ void witness_plugin::plugin_initialize(const boost::program_options::variables_m
add_plugin_index< reserve_ratio_index >( my->_db );

appbase::app().get_plugin< steem::plugins::p2p::p2p_plugin >().set_block_production( true );

if( my->_witnesses.size() && my->_private_keys.size() )
my->_chain_plugin.set_write_lock_hold_time( -1 );
} FC_LOG_AND_RETHROW() }

void witness_plugin::plugin_startup()
Expand Down

0 comments on commit 2340f2a

Please sign in to comment.