From 516ad726672f747acc9367e686f4daed7b035e52 Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Mon, 2 Dec 2024 16:09:20 +0200 Subject: [PATCH 1/4] Implement naive clamping of the chunk cache size for mining worker --- apps/arweave/src/ar_mining_worker.erl | 62 +++++++++++++-------------- apps/arweave/src/ar_util.erl | 22 +++++++++- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 809b35ffb..b756aaefc 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -13,24 +13,24 @@ -include_lib("eunit/include/eunit.hrl"). -record(state, { - name = not_set, - partition_number = not_set, - diff_pair = not_set, - packing_difficulty = 0, - task_queue = gb_sets:new(), - active_sessions = sets:new(), - %% The sub_chunk_cache stores either the first or second sub-chunk for a given nonce. This - %% is because we process both the first and second recall ranges in parallel and don't know - %% which data will be available first. For spora_2_6 packing (aka difficulty 0), sub-chunks - %% and chunks are the same size (256KiB), for composite packing each sub-chunk is 8KiB. - sub_chunk_cache = #{}, - sub_chunk_cache_size = #{}, - sub_chunk_cache_limit = 0, - vdf_queue_limit = 0, - latest_vdf_step_number = 0, - is_pool_client = false, - h1_hashes = #{}, - h2_hashes = #{} + name = not_set :: atom(), + partition_number = not_set :: non_neg_integer() | not_set, + diff_pair = not_set, % :: ?DIFF_PAIR() + packing_difficulty = 0 :: non_neg_integer(), + task_queue = gb_sets:new() :: gb_sets:gb_set(), + active_sessions = sets:new() :: sets:set(), + %% The sub_chunk_cache stores either the first or second sub-chunk for a given nonce. This + %% is because we process both the first and second recall ranges in parallel and don't know + %% which data will be available first. For spora_2_6 packing (aka difficulty 0), sub-chunks + %% and chunks are the same size (256KiB), for composite packing each sub-chunk is 8KiB. + sub_chunk_cache = #{} :: maps:map(), + sub_chunk_cache_size = #{} :: maps:map(), + sub_chunk_cache_limit = 0 :: non_neg_integer(), + vdf_queue_limit = 0 :: non_neg_integer(), + latest_vdf_step_number = 0 :: non_neg_integer(), + is_pool_client = false, + h1_hashes = #{} :: maps:map(), + h2_hashes = #{} :: maps:map() }). -define(TASK_CHECK_FREQUENCY_MS, 200). @@ -63,7 +63,7 @@ add_task(Worker, TaskType, Candidate, ExtraArgs) -> add_delayed_task(Worker, TaskType, Candidate) -> %% Delay task by random amount between ?TASK_CHECK_FREQUENCY_MS and 2*?TASK_CHECK_FREQUENCY_MS - %% The reason for the randomization to avoid a glut tasks to all get added at the same time - + %% The reason for the randomization to avoid a glut tasks to all get added at the same time - %% in particular when the chunk cache fills up it's possible for all queued compute_h0 tasks %% to be delayed at about the same time. Delay = rand:uniform(?TASK_CHECK_FREQUENCY_MS) + ?TASK_CHECK_FREQUENCY_MS, @@ -146,7 +146,7 @@ handle_cast({chunks_read, {WhichChunk, Candidate, RangeStart, ChunkOffsets}}, St {worker, State#state.name}, {active_sessions, ar_mining_server:encode_sessions(State#state.active_sessions)}, - {candidate_session, + {candidate_session, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, {partition_number, Candidate#mining_candidate.partition_number}, {step_number, Candidate#mining_candidate.step_number}]), @@ -163,14 +163,14 @@ handle_cast({add_task, {TaskType, Candidate, _ExtraArgs} = Task}, State) -> {task, TaskType}, {active_sessions, ar_mining_server:encode_sessions(State#state.active_sessions)}, - {candidate_session, + {candidate_session, ar_nonce_limiter:encode_session_key(Candidate#mining_candidate.session_key)}, {partition_number, Candidate#mining_candidate.partition_number}, {step_number, Candidate#mining_candidate.step_number}, {nonce, Candidate#mining_candidate.nonce}]), {noreply, State} end; - + handle_cast(handle_task, #state{ task_queue = Q } = State) -> case gb_sets:is_empty(Q) of true -> @@ -305,7 +305,7 @@ process_chunks(WhichChunk, Candidate, RangeStart, Nonce, NoncesPerChunk, NonceMa ChunkOffsets, SubChunkSize, Count + 1, State2). process_all_sub_chunks(_WhichChunk, <<>>, _Candidate, _Nonce, State) -> - State; + State; process_all_sub_chunks(WhichChunk, Chunk, Candidate, Nonce, State) -> {SubChunk, Rest} = extract_sub_chunk(Chunk, Candidate), Candidate2 = Candidate#mining_candidate{ nonce = Nonce }, @@ -344,7 +344,7 @@ process_sub_chunk(chunk2, Candidate, SubChunk, State) -> {{chunk1, H1}, State2} -> ar_mining_hash:compute_h2(self(), Candidate2#mining_candidate{ h1 = H1 }), %% Decrement 1 for chunk2: - %% we're computing h2 for a peer so chunk1 was not previously read or cached + %% we're computing h2 for a peer so chunk1 was not previously read or cached %% on this node update_sub_chunk_cache_size(-1, SessionKey, State2); {do_not_cache, State2} -> @@ -433,7 +433,7 @@ handle_task({computed_h0, Candidate, _ExtraArgs}, State) -> Range2Exists = ar_mining_io:read_recall_range( chunk2, self(), Candidate3, RecallRange2Start), case Range2Exists of - true -> + true -> State; false -> %% Release just the Range2 cache space we reserved with @@ -628,7 +628,7 @@ maybe_warn_about_lag(Q, Name) -> %% Since we sample the queue asynchronously, we expect there to regularly %% be a queue of length 1 (i.e. a task may have just been added to the %% queue when we run this check). - %% + %% %% To further reduce log spam, we'll only warn if the queue is greater %% than 2. We really only care if a queue is consistently long or if %% it's getting longer. Temporary blips are fine. We may incrase @@ -662,7 +662,7 @@ count_h0_tasks(Q) -> end. maybe_warn_about_stale_chunks(State) -> - TotalChunkKeys = + TotalChunkKeys = maps:fold( fun(_SesssionKey, SessionCache, Acc) -> Acc + maps:size(SessionCache) @@ -752,11 +752,9 @@ update_sub_chunk_cache_size(0, _SessionKey, State) -> State; update_sub_chunk_cache_size(Delta, SessionKey, State) -> CacheSize = maps:get(SessionKey, State#state.sub_chunk_cache_size, 0), - prometheus_gauge:inc(mining_server_chunk_cache_size, - [State#state.partition_number], Delta), - State#state{ - sub_chunk_cache_size = maps:put(SessionKey, CacheSize + Delta, - State#state.sub_chunk_cache_size) }. + NewCacheSize = ar_util:clamp_lower(CacheSize + Delta, 0), + prometheus_gauge:inc(mining_server_sub_chunk_cache_size, [State#state.partition_number], NewCacheSize - CacheSize), + State#state{ sub_chunk_cache_size = maps:put(SessionKey, NewCacheSize, State#state.sub_chunk_cache_size) }. try_to_reserve_cache_space(SessionKey, State) -> #state{ packing_difficulty = PackingDifficulty } = State, diff --git a/apps/arweave/src/ar_util.erl b/apps/arweave/src/ar_util.erl index af694ac97..fc1ea7a9b 100644 --- a/apps/arweave/src/ar_util.erl +++ b/apps/arweave/src/ar_util.erl @@ -4,7 +4,7 @@ integer_to_binary/1, binary_to_integer/1, pick_random/1, pick_random/2, encode/1, decode/1, safe_encode/1, safe_decode/1, timestamp_to_seconds/1, parse_peer/1, peer_to_str/1, parse_port/1, safe_parse_peer/1, format_peer/1, - unique/1, count/2, + unique/1, count/2, clamp_lower/2, clamp_upper/2, clamp/3, genesis_wallets/0, pmap/2, pfilter/2, do_until/3, block_index_entry_from_block/1, bytes_to_mb_string/1, cast_after/3, encode_list_indices/1, parse_list_indices/1, @@ -381,3 +381,23 @@ assert_file_exists_and_readable(FilePath) -> io:format("~nThe filepath ~p doesn't exist or isn't readable.~n~n", [FilePath]), erlang:halt(1) end. + +%% @doc Clamp a value to a lower bound. +%% If the value is less than the lower bound, return the lower bound. +clamp_lower(Value, LowerBound) when Value < LowerBound -> + LowerBound; +clamp_lower(Value, _LowerBound) -> + Value. + +%% @doc Clamp a value to an upper bound. +%% If the value is greater than the upper bound, return the upper bound. +clamp_upper(Value, UpperBound) when Value > UpperBound -> + UpperBound; +clamp_upper(Value, _UpperBound) -> + Value. + +%% @doc Clamp a value to a lower and upper bound. +%% If the value is less than the lower bound, return the lower bound. +%% If the value is greater than the upper bound, return the upper bound. +clamp(Value, LowerBound, UpperBound) -> + clamp_lower(clamp_upper(Value, UpperBound), LowerBound). From 44d55edd2965ec90380c888a7ec82b2b39f60cee Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Mon, 2 Dec 2024 16:19:08 +0200 Subject: [PATCH 2/4] Revert the metric name --- apps/arweave/src/ar_mining_worker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index b756aaefc..d7a46f0c6 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -753,7 +753,7 @@ update_sub_chunk_cache_size(0, _SessionKey, State) -> update_sub_chunk_cache_size(Delta, SessionKey, State) -> CacheSize = maps:get(SessionKey, State#state.sub_chunk_cache_size, 0), NewCacheSize = ar_util:clamp_lower(CacheSize + Delta, 0), - prometheus_gauge:inc(mining_server_sub_chunk_cache_size, [State#state.partition_number], NewCacheSize - CacheSize), + prometheus_gauge:inc(mining_server_chunk_cache_size, [State#state.partition_number], NewCacheSize - CacheSize), State#state{ sub_chunk_cache_size = maps:put(SessionKey, NewCacheSize, State#state.sub_chunk_cache_size) }. try_to_reserve_cache_space(SessionKey, State) -> From 675d7a99bd9c3c79503da7f6eed92cd9c7dadfb0 Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Mon, 2 Dec 2024 16:39:08 +0200 Subject: [PATCH 3/4] Fix diff_pair typespec --- apps/arweave/src/ar_mining_worker.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index d7a46f0c6..9255adca8 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -15,7 +15,7 @@ -record(state, { name = not_set :: atom(), partition_number = not_set :: non_neg_integer() | not_set, - diff_pair = not_set, % :: ?DIFF_PAIR() + diff_pair = not_set :: {non_neg_integer(), non_neg_integer()} | not_set, packing_difficulty = 0 :: non_neg_integer(), task_queue = gb_sets:new() :: gb_sets:gb_set(), active_sessions = sets:new() :: sets:set(), From 2389f3fada086d48a519287aa1780a262e262cbd Mon Sep 17 00:00:00 2001 From: Denis Fakhrtdinov Date: Mon, 2 Dec 2024 16:42:22 +0200 Subject: [PATCH 4/4] Add a comment describing the temporary solution --- apps/arweave/src/ar_mining_worker.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/arweave/src/ar_mining_worker.erl b/apps/arweave/src/ar_mining_worker.erl index 9255adca8..909506583 100644 --- a/apps/arweave/src/ar_mining_worker.erl +++ b/apps/arweave/src/ar_mining_worker.erl @@ -752,6 +752,8 @@ update_sub_chunk_cache_size(0, _SessionKey, State) -> State; update_sub_chunk_cache_size(Delta, SessionKey, State) -> CacheSize = maps:get(SessionKey, State#state.sub_chunk_cache_size, 0), + % The clamp_lower function ensures that the cache size does not go below 0. + % This is temporary until we have a more permanent solution for the chunk cache. NewCacheSize = ar_util:clamp_lower(CacheSize + Delta, 0), prometheus_gauge:inc(mining_server_chunk_cache_size, [State#state.partition_number], NewCacheSize - CacheSize), State#state{ sub_chunk_cache_size = maps:put(SessionKey, NewCacheSize, State#state.sub_chunk_cache_size) }.