Skip to content

Commit

Permalink
Ms.mempool improvements (Chia-Network#1823)
Browse files Browse the repository at this point in the history
* Remove overflow from list, and remove useless call to handle_eos

* Unindent

* Changes to mempool

* tests

* progress on tests

* Add tests for new mempool

* Fix lint and revert streamable changes

* Improve logging

* Test level warning

* Fix test

* Increase mempool size to 150x
  • Loading branch information
mariano54 authored Apr 14, 2021
1 parent 9ea6399 commit d2466ee
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 874 deletions.
11 changes: 1 addition & 10 deletions chia/consensus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,13 @@ class ConsensusConstants:
GENESIS_PRE_FARM_POOL_PUZZLE_HASH: bytes32 # The block at height must pay out to this pool puzzle hash
GENESIS_PRE_FARM_FARMER_PUZZLE_HASH: bytes32 # The block at height must pay out to this farmer puzzle hash
MAX_VDF_WITNESS_SIZE: int # The maximum number of classgroup elements within an n-wesolowski proof
# Target tx count per sec
TX_PER_SEC: int
# Size of mempool = 10x the size of block
MEMPOOL_BLOCK_BUFFER: int
# Max coin amount uint(1 << 64). This allows coin amounts to fit in 64 bits. This is around 18M chia.
MAX_COIN_AMOUNT: int
# Raw size per block target = 1,000,000 bytes
# Rax TX (single in, single out) = 219 bytes (not compressed)
# TX = 457 vBytes
# floor(1,000,000 / 219) * 457 = 2086662 (size in vBytes)
# Max block cost in virtual bytes
MAX_BLOCK_COST: int
# MAX block cost in clvm cost units = MAX_BLOCK_COST * CLVM_COST_RATIO_CONSTANT
# 1 vByte = 108 clvm cost units
CLVM_COST_RATIO_CONSTANT: int
# Max block cost in clvm cost units (MAX_BLOCK_COST * CLVM_COST_RATIO_CONSTANT)
# Max block cost in clvm cost units
MAX_BLOCK_COST_CLVM: int

WEIGHT_PROOF_THRESHOLD: uint8
Expand Down
17 changes: 3 additions & 14 deletions chia/consensus/default_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,13 @@
"3d8765d3a597ec1d99663f6c9816d915b9f68613ac94009884c4addaefcce6af"
),
"MAX_VDF_WITNESS_SIZE": 64,
# Target tx count per sec
"TX_PER_SEC": 20,
# Size of mempool = 10x the size of block
"MEMPOOL_BLOCK_BUFFER": 10,
# Size of mempool = 150x the size of block
"MEMPOOL_BLOCK_BUFFER": 150,
# Max coin amount, fits into 64 bits
"MAX_COIN_AMOUNT": uint64((1 << 64) - 1),
# Targeting twice bitcoin's block size of 1.3MB per block
# Raw size per block target = 1,300,000 * 600 / 47 = approx 100 KB
# Rax TX (single in, single out) = 219 bytes (not compressed)
# TX = 457 vBytes
# floor(100 * 1024 / 219) * 457 = 213684 (size in vBytes)
# Max block cost in virtual bytes
"MAX_BLOCK_COST": 213684,
# MAX block cost in clvm cost units = MAX_BLOCK_COST * CLVM_COST_RATIO_CONSTANT
# 1 vByte = 108 clvm cost units
"CLVM_COST_RATIO_CONSTANT": 108,
# Max block cost in clvm cost units (MAX_BLOCK_COST * CLVM_COST_RATIO_CONSTANT)
# "MAX_BLOCK_COST_CLVM": 23077872,
# Max block cost in clvm cost units
"MAX_BLOCK_COST_CLVM": 40000000, # Based on arvid analysis
"WEIGHT_PROOF_THRESHOLD": 2,
"BLOCKS_CACHE_SIZE": 4608 + (128 * 4),
Expand Down
13 changes: 11 additions & 2 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.bech32m import encode_puzzle_hash
from chia.util.db_wrapper import DBWrapper
from chia.util.errors import ConsensusError, Err
from chia.util.ints import uint8, uint32, uint64, uint128
Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(
self.state_changed_callback: Optional[Callable] = None
self.full_node_peers = None
self.sync_store = None
self.signage_point_times = [time.time() for _ in range(self.constants.NUM_SPS_SUB_SLOT)]

if name:
self.log = logging.getLogger(name)
Expand Down Expand Up @@ -1095,9 +1097,16 @@ async def respond_unfinished_block(

self.full_node_store.add_unfinished_block(height, block, validate_result)
if farmed_block is True:
self.log.info(f"🍀 ️Farmed unfinished_block {block_hash}")
self.log.info(
f"🍀 ️Farmed unfinished_block {block_hash}, SP: {block.reward_chain_block.signage_point_index}"
)
else:
self.log.info(f"Added unfinished_block {block_hash}, not farmed")
self.log.info(
f"Added unfinished_block {block_hash}, not farmed by us,"
f" SP: {block.reward_chain_block.signage_point_index} time: "
f"{time.time() - self.signage_point_times[block.reward_chain_block.signage_point_index]}"
f"Pool pk {encode_puzzle_hash(block.foliage.foliage_block_data.pool_target.puzzle_hash, 'xch')}"
)

sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
self.constants,
Expand Down
1 change: 1 addition & 0 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ async def respond_signage_point(
f"{self.full_node.constants.NUM_SPS_SUB_SLOT}: "
f"{request.challenge_chain_vdf.output.get_hash()} "
)
self.full_node.signage_point_times[request.index_from_challenge] = time.time()
sub_slot_tuple = self.full_node.full_node_store.get_sub_slot(request.challenge_chain_vdf.challenge)
if sub_slot_tuple is not None:
prev_challenge = sub_slot_tuple[0].challenge_chain.challenge_chain_end_of_slot_vdf.challenge
Expand Down
78 changes: 48 additions & 30 deletions chia/full_node/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,39 @@


class Mempool:
spends: Dict[bytes32, MempoolItem]
sorted_spends: SortedDict # Dict[float, Dict[bytes32, MempoolItem]]
additions: Dict[bytes32, MempoolItem]
removals: Dict[bytes32, MempoolItem]
size: int

# if new min fee is added
@staticmethod
def create(size: int):
self = Mempool()
self.spends = {}
self.additions = {}
self.removals = {}
self.sorted_spends = SortedDict()
self.size = size
return self

def get_min_fee_rate(self) -> float:
if self.at_full_capacity():
fee_per_cost, val = self.sorted_spends.peekitem(index=0)
return fee_per_cost
def __init__(self, max_size_in_cost: int):
self.spends: Dict[bytes32, MempoolItem] = {}
self.sorted_spends: SortedDict = SortedDict()
self.additions: Dict[bytes32, MempoolItem] = {}
self.removals: Dict[bytes32, MempoolItem] = {}
self.max_size_in_cost: int = max_size_in_cost
self.total_mempool_cost: int = 0

def get_min_fee_rate(self, cost: int) -> float:
"""
Gets the minimum fpc rate that a transaction with specified cost will need in order to get included.
"""

if self.at_full_capacity(cost):
current_cost = self.total_mempool_cost

# Iterates through all spends in increasing fee per cost
for fee_per_cost, spends_with_fpc in self.sorted_spends.items():
for spend_name, item in spends_with_fpc.items():
current_cost -= item.cost_result.cost
# Removing one at a time, until our transaction of size cost fits
if current_cost + cost <= self.max_size_in_cost:
return fee_per_cost
raise ValueError(
f"Transaction with cost {cost} does not fit in mempool of max cost {self.max_size_in_cost}"
)
else:
return 0

def remove_spend(self, item: MempoolItem):
def remove_from_pool(self, item: MempoolItem):
"""
Removes an item from the mempool.
"""
removals: List[Coin] = item.spend_bundle.removals()
additions: List[Coin] = item.spend_bundle.additions()
for rem in removals:
Expand All @@ -44,32 +52,42 @@ def remove_spend(self, item: MempoolItem):
dic = self.sorted_spends[item.fee_per_cost]
if len(dic.values()) == 0:
del self.sorted_spends[item.fee_per_cost]
self.total_mempool_cost -= item.cost_result.cost
assert self.total_mempool_cost >= 0

def add_to_pool(
self,
item: MempoolItem,
additions: List[Coin],
removals_dic: Dict[bytes32, Coin],
):
if self.at_full_capacity():
"""
Adds an item to the mempool by kicking out transactions (if it doesn't fit), in order of increasing fee per cost
"""

while self.at_full_capacity(item.cost_result.cost):
# Val is Dict[hash, MempoolItem]
fee_per_cost, val = self.sorted_spends.peekitem(index=0)
to_remove = list(val.values())[0]
self.remove_spend(to_remove)
self.remove_from_pool(to_remove)

self.spends[item.name] = item

# sorted_spends is Dict[float, Dict[bytes32, MempoolItem]]
if item.fee_per_cost in self.sorted_spends:
self.sorted_spends[item.fee_per_cost][item.name] = item
else:
if item.fee_per_cost not in self.sorted_spends:
self.sorted_spends[item.fee_per_cost] = {}
self.sorted_spends[item.fee_per_cost][item.name] = item

self.sorted_spends[item.fee_per_cost][item.name] = item

for add in additions:
self.additions[add.name()] = item
for key in removals_dic.keys():
self.removals[key] = item
self.total_mempool_cost += item.cost_result.cost

def at_full_capacity(self, cost: int) -> bool:
"""
Checks whether the mempool is at full capacity and cannot accept a transaction with size cost.
"""

def at_full_capacity(self) -> bool:
return len(self.spends.keys()) >= self.size
return self.total_mempool_cost + cost > self.max_size_in_cost
40 changes: 25 additions & 15 deletions chia/full_node/mempool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,17 @@ def __init__(self, coin_store: CoinStore, consensus_constants: ConsensusConstant

self.coin_store = coin_store

tx_per_sec = self.constants.TX_PER_SEC
sec_per_block = self.constants.SUB_SLOT_TIME_TARGET // self.constants.SLOT_BLOCKS_TARGET
block_buffer_count = self.constants.MEMPOOL_BLOCK_BUFFER

# MEMPOOL_SIZE = 60000
self.mempool_size = int(tx_per_sec * sec_per_block * block_buffer_count)
self.potential_cache_size = 300
self.mempool_max_total_cost = int(self.constants.MAX_BLOCK_COST_CLVM * self.constants.MEMPOOL_BLOCK_BUFFER)
self.potential_cache_max_total_cost = int(
self.constants.MAX_BLOCK_COST_CLVM * self.constants.MEMPOOL_BLOCK_BUFFER
)
self.potential_cache_cost: int = 0
self.seen_cache_size = 10000
self.pool = ProcessPoolExecutor(max_workers=1)

# The mempool will correspond to a certain peak
self.peak: Optional[BlockRecord] = None
self.mempool: Mempool = Mempool.create(self.mempool_size)
self.mempool: Mempool = Mempool(self.mempool_max_total_cost)

def shut_down(self):
self.pool.shutdown(wait=True)
Expand All @@ -93,8 +91,13 @@ async def create_bundle_from_mempool(
spend_bundles: List[SpendBundle] = []
removals = []
additions = []
broke_from_inner_loop = False
log.info(f"Starting to make block, max cost: {self.constants.MAX_BLOCK_COST_CLVM}")
for dic in self.mempool.sorted_spends.values():
if broke_from_inner_loop:
break
for item in dic.values():
log.info(f"Cumulative cost: {cost_sum}")
if (
item.cost_result.cost + cost_sum <= self.constants.MAX_BLOCK_COST_CLVM
and item.fee + fee_sum <= self.constants.MAX_COIN_AMOUNT
Expand All @@ -105,6 +108,7 @@ async def create_bundle_from_mempool(
removals.extend(item.removals)
additions.extend(item.additions)
else:
broke_from_inner_loop = True
break
if len(spend_bundles) > 0:
return SpendBundle.aggregate(spend_bundles), additions, removals
Expand All @@ -130,7 +134,7 @@ def is_fee_enough(self, fees: uint64, cost: uint64) -> bool:
if cost == 0:
return False
fees_per_cost = fees / cost
if not self.mempool.at_full_capacity() or fees_per_cost >= self.mempool.get_min_fee_rate():
if not self.mempool.at_full_capacity(cost) or fees_per_cost > self.mempool.get_min_fee_rate(cost):
return True
return False

Expand Down Expand Up @@ -272,10 +276,10 @@ async def add_spendbundle(

fees_per_cost: float = fees / cost
# If pool is at capacity check the fee, if not then accept even without the fee
if self.mempool.at_full_capacity():
if self.mempool.at_full_capacity(cost):
if fees == 0:
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_LOW_FEE
if fees_per_cost < self.mempool.get_min_fee_rate():
if fees_per_cost <= self.mempool.get_min_fee_rate(cost):
return None, MempoolInclusionStatus.FAILED, Err.INVALID_FEE_LOW_FEE
# Check removals against UnspentDB + DiffStore + Mempool + SpendBundle
# Use this information later when constructing a block
Expand Down Expand Up @@ -344,7 +348,7 @@ async def add_spendbundle(
if fail_reason:
mempool_item: MempoolItem
for mempool_item in conflicting_pool_items.values():
self.mempool.remove_spend(mempool_item)
self.mempool.remove_from_pool(mempool_item)

removals: List[Coin] = [coin for coin in removal_coin_dict.values()]
new_item = MempoolItem(new_spend, uint64(fees), cost_result, spend_name, additions, removals)
Expand Down Expand Up @@ -381,10 +385,15 @@ def add_to_potential_tx_set(self, spend: SpendBundle, spend_name: bytes32, cost_
Adds SpendBundles that have failed to be added to the pool in potential tx set.
This is later used to retry to add them.
"""
if spend_name in self.potential_txs:
return

self.potential_txs[spend_name] = spend, cost_result, spend_name
self.potential_cache_cost += cost_result.cost

while len(self.potential_txs) > self.potential_cache_size:
while self.potential_cache_cost > self.potential_cache_max_total_cost:
first_in = list(self.potential_txs.keys())[0]
self.potential_cache_max_total_cost -= self.potential_txs[first_in][1].cost
self.potential_txs.pop(first_in)

def get_spendbundle(self, bundle_hash: bytes32) -> Optional[SpendBundle]:
Expand Down Expand Up @@ -413,7 +422,7 @@ async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBun
self.peak = new_peak

old_pool = self.mempool
self.mempool = Mempool.create(self.mempool_size)
self.mempool = Mempool(self.mempool_max_total_cost)

for item in old_pool.spends.values():
await self.add_spendbundle(item.spend_bundle, item.cost_result, item.spend_bundle_name, False)
Expand All @@ -426,7 +435,8 @@ async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBun
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((tx, cached_result, cached_name))
log.debug(
f"Size of mempool: {len(self.mempool.spends)}, minimum fee to get in: {self.mempool.get_min_fee_rate()}"
f"Size of mempool: {len(self.mempool.spends)} spends, cost: {self.mempool.total_mempool_cost} "
f"minimum fee to get in: {self.mempool.get_min_fee_rate(100000)}"
)
return txs_added

Expand Down
2 changes: 2 additions & 0 deletions chia/timelord/timelord_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async def new_unfinished_block_timelord(self, new_unfinished_block: timelord_pro
last_ip_iters = self.timelord.last_state.get_last_ip()
if sp_iters > ip_iters:
self.timelord.overflow_blocks.append(new_unfinished_block)
log.warning(f"Overflow unfinished block, total {self.timelord.total_unfinished}")
elif ip_iters > last_ip_iters:
new_block_iters: Optional[uint64] = self.timelord._can_infuse_unfinished_block(new_unfinished_block)
if new_block_iters:
Expand All @@ -72,6 +73,7 @@ async def new_unfinished_block_timelord(self, new_unfinished_block: timelord_pro
self.timelord.iters_to_submit[Chain.INFUSED_CHALLENGE_CHAIN].append(new_block_iters)
self.timelord.iteration_to_proof_type[new_block_iters] = IterationType.INFUSION_POINT
self.timelord.total_unfinished += 1
log.warning(f"Non-overflow unfinished block, total {self.timelord.total_unfinished}")

@api_request
async def request_compact_proof_of_time(self, vdf_info: timelord_protocol.RequestCompactProofOfTime):
Expand Down
1 change: 0 additions & 1 deletion chia/timelord/timelord_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ def set_state(self, state: Union[timelord_protocol.NewPeakTimelord, EndOfSubSlot
self.passed_ses_height_but_not_yet_included = True
else:
self.passed_ses_height_but_not_yet_included = state.passes_ses_height_but_not_yet_included
log.warning(f"Signage point index: {self.peak.reward_chain_block.signage_point_index}")
elif isinstance(state, EndOfSubSlotBundle):
self.state_type = StateType.END_OF_SUB_SLOT
if self.peak is not None:
Expand Down
1 change: 0 additions & 1 deletion chia/util/block_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@
* 24
* 10, # Allows creating blockchains with timestamps up to 10 days in the future, for testing
"MEMPOOL_BLOCK_BUFFER": 6,
"TX_PER_SEC": 1,
"CLVM_COST_RATIO_CONSTANT": 108,
"INITIAL_FREEZE_PERIOD": 0,
"NETWORK_TYPE": 1,
Expand Down
2 changes: 1 addition & 1 deletion run-py-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
python3 -m venv venv
# shellcheck disable=SC1091
. ./activate
pip3 install .
pip3 install ".[dev]"

py.test ./tests/blockchain -s -v
py.test ./tests/core -s -v
Expand Down
Loading

0 comments on commit d2466ee

Please sign in to comment.