Skip to content

Commit

Permalink
feat: update ar_doctor_bench to use ar_mining_io
Browse files Browse the repository at this point in the history
This gives a truer estimate of the miner's expected read performance
  • Loading branch information
JamesPiechota committed Nov 29, 2024
1 parent d33a7f5 commit 2f72694
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 41 deletions.
71 changes: 52 additions & 19 deletions apps/arweave/src/ar_doctor_bench.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
-include_lib("kernel/include/file.hrl").
-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("arweave/include/ar_chunk_storage.hrl").
-include_lib("arweave/include/ar_mining.hrl").
-include_lib("arweave/include/ar_consensus.hrl").

-define(NUM_ITERATIONS, 5).
Expand Down Expand Up @@ -41,14 +41,19 @@ bench_read(Args) ->
[DurationString, DataDir | StorageModuleConfigs] = Args,
Duration = list_to_integer(DurationString),

StorageModules = parse_storage_modules(StorageModuleConfigs, []),
Config = #config{data_dir = DataDir, storage_modules = StorageModules},
{StorageModules, Address} = parse_storage_modules(StorageModuleConfigs, [], undefined),
ar:console("Assuming mining address: ~p~n", [ar_util:safe_encode(Address)]),
Config = #config{
data_dir = DataDir,
storage_modules = StorageModules,
mining_addr = Address},
application:set_env(arweave, config, Config),

ar_kv_sup:start_link(),
ar_storage_sup:start_link(),
ar_sync_record_sup:start_link(),
ar_chunk_storage_sup:start_link(),
ar_mining_io:start_link(standalone),

ar:console("~n~nDisk read benchmark will run for ~B seconds.~n", [Duration]),
ar:console("Data will be logged continuously to ~p in the format:~n", [?OUTPUT_FILENAME]),
Expand All @@ -74,11 +79,21 @@ bench_read(Args) ->

true.

parse_storage_modules([], StorageModules) ->
StorageModules;
parse_storage_modules([StorageModuleConfig | StorageModuleConfigs], StorageModules) ->
parse_storage_modules([], StorageModules, Address) ->
{StorageModules, Address};
parse_storage_modules([StorageModuleConfig | StorageModuleConfigs], StorageModules, Address) ->
{ok, StorageModule} = ar_config:parse_storage_module(StorageModuleConfig),
parse_storage_modules(StorageModuleConfigs, StorageModules ++ [StorageModule]).
Address2 = ar_storage_module:address(StorageModule),
case Address2 == Address orelse Address == undefined of
true ->
ok;
false ->
ar:console("Warning: multiple mining addresses specified in storage_modules:~n")
end,
parse_storage_modules(
StorageModuleConfigs,
StorageModules ++ [StorageModule],
Address2).

read_storage_module(_DataDir, StorageModule, StopTime) ->
StoreID = ar_storage_module:id(StorageModule),
Expand All @@ -87,7 +102,7 @@ read_storage_module(_DataDir, StorageModule, StopTime) ->

OutputFileName = string:replace(?OUTPUT_FILENAME, "<storage_module>", StoreID),

random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName).
random_read(StorageModule, StartOffset, EndOffset, StopTime, OutputFileName).

% random_chunk_pread(DataDir, StoreID),
% random_dev_pread(DataDir, StoreID),
Expand All @@ -96,13 +111,13 @@ read_storage_module(_DataDir, StorageModule, StopTime) ->
% dd_devs_read(DataDir, StoreID),
% dd_dev_read(DataDir, StoreID),

random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName) ->
random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName, 0, 0).
random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName, SumChunks, SumElapsedTime) ->
random_read(StorageModule, StartOffset, EndOffset, StopTime, OutputFileName) ->
random_read(StorageModule, StartOffset, EndOffset, StopTime, OutputFileName, 0, 0).
random_read(StorageModule, StartOffset, EndOffset, StopTime, OutputFileName, SumChunks, SumElapsedTime) ->
StartTime = erlang:monotonic_time(),
case StartTime < StopTime of
true ->
Chunks = read(StoreID, StartOffset, EndOffset, ?RECALL_RANGE_SIZE, ?NUM_FILES),
Chunks = read(StorageModule, StartOffset, EndOffset, ?RECALL_RANGE_SIZE, ?NUM_FILES),
EndTime = erlang:monotonic_time(),
ElapsedTime = erlang:convert_time_unit(EndTime - StartTime, native, millisecond),

Expand All @@ -112,21 +127,39 @@ random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName, SumChunks
Line = io_lib:format("~B,~B,~B,~B~n", [
Timestamp, BytesRead, ElapsedTime, BytesRead * 1000 div ElapsedTime]),
file:write_file(OutputFileName, Line, [append]),
random_read(StoreID, StartOffset, EndOffset, StopTime, OutputFileName,
random_read(StorageModule, StartOffset, EndOffset, StopTime, OutputFileName,
SumChunks + Chunks, SumElapsedTime + ElapsedTime);
false ->
StoreID = ar_storage_module:id(StorageModule),
{StoreID, SumChunks, SumElapsedTime}
end.

read(StoreID, StartOffset, EndOffset, Size, NumReads) ->
read(StoreID, StartOffset, EndOffset, Size, 0, NumReads).
read(StorageModule, StartOffset, EndOffset, Size, NumReads) ->
read(StorageModule, StartOffset, EndOffset, Size, 0, NumReads).

read(_StoreID, _StartOffset, _EndOffset, _Size, NumChunks, 0) ->
read(_StorageModule, _StartOffset, _EndOffset, _Size, NumChunks, 0) ->
NumChunks;
read(StoreID, StartOffset, EndOffset, Size, NumChunks, NumReads) ->
read(StorageModule, StartOffset, EndOffset, Size, NumChunks, NumReads) ->
Offset = rand:uniform(EndOffset - Size - StartOffset + 1) + StartOffset,
Chunks = ar_chunk_storage:get_range(Offset, Size, StoreID),
read(StoreID, StartOffset, EndOffset, Size, NumChunks + length(Chunks), NumReads - 1).
% Chunks = ar_chunk_storage:get_range(Offset, Size, StoreID),
% read(StoreID, StartOffset, EndOffset, Size, NumChunks + length(Chunks), NumReads - 1).
MiningAddress = ar_storage_module:address(StorageModule),
PackingDifficulty = ar_storage_module:packing_difficulty(StorageModule),
Candidate = #mining_candidate{
mining_address = MiningAddress,
packing_difficulty = PackingDifficulty
},
RangeExists = ar_mining_io:read_recall_range(chunk1, self(), Candidate, Offset),
case RangeExists of
true ->
receive
{chunks_read, _WhichChunk, _Candidate, _RecallRangeStart, ChunkOffsets} ->
read(StorageModule, StartOffset, EndOffset, Size, NumChunks + length(ChunkOffsets), NumReads - 1)
end;
false ->
read(StorageModule, StartOffset, EndOffset, Size, NumChunks, NumReads)
end.


%% XXX: the following functions are not used, but may be useful in the future to benchmark
%% different read strategies. They can be deleted when they are no longer useful.
Expand Down
50 changes: 31 additions & 19 deletions apps/arweave/src/ar_mining_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

-behaviour(gen_server).

-export([start_link/0, set_largest_seen_upper_bound/1,
-export([start_link/0, start_link/1, set_largest_seen_upper_bound/1,
get_partitions/0, get_partitions/1, read_recall_range/4, garbage_collect/0]).

-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2]).
Expand All @@ -16,6 +16,7 @@
-define(CACHE_TTL_MS, 2000).

-record(state, {
mode = miner,
partition_upper_bound = 0,
io_threads = #{},
io_thread_monitor_refs = #{}
Expand All @@ -27,7 +28,10 @@

%% @doc Start the gen_server.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start_link(miner).

start_link(Mode) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Mode, []).

set_largest_seen_upper_bound(PartitionUpperBound) ->
gen_server:call(?MODULE, {set_largest_seen_upper_bound, PartitionUpperBound}, 60000).
Expand Down Expand Up @@ -65,14 +69,14 @@ garbage_collect() ->
%%% Generic server callbacks.
%%%===================================================================

init([]) ->
init(Mode) ->
State =
lists:foldl(
fun ({PartitionNumber, MiningAddress, PackingDifficulty, StoreID}, Acc) ->
start_io_thread(PartitionNumber, MiningAddress,
PackingDifficulty, StoreID, Acc)
end,
#state{},
#state{ mode = Mode },
get_io_channels()
),
{ok, State}.
Expand Down Expand Up @@ -209,7 +213,7 @@ start_io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
when is_map_key({PartitionNumber, MiningAddress, PackingDifficulty, StoreID}, Threads) ->
State;
start_io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
#state{ io_threads = Threads, io_thread_monitor_refs = Refs } = State) ->
#state{ mode = Mode, io_threads = Threads, io_thread_monitor_refs = Refs } = State) ->
Now = os:system_time(millisecond),
Thread =
spawn(
Expand All @@ -220,7 +224,7 @@ start_io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
_ ->
ar_chunk_storage:open_files(StoreID)
end,
io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID, #{}, Now)
io_thread(Mode, PartitionNumber, MiningAddress, PackingDifficulty, StoreID, #{}, Now)
end
),
Ref = monitor(process, Thread),
Expand All @@ -244,18 +248,23 @@ handle_io_thread_down(Ref, Reason,
start_io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
State#state{ io_threads = Threads2, io_thread_monitor_refs = Refs2 }).

io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID, Cache, LastClearTime) ->
io_thread(Mode, PartitionNumber, MiningAddress, PackingDifficulty, StoreID, Cache, LastClearTime) ->
receive
{WhichChunk, {Worker, Candidate, RecallRangeStart}} ->
{ChunkOffsets, Cache2} =
get_chunks(WhichChunk, Candidate, RecallRangeStart, StoreID, Cache),
ar_mining_worker:chunks_read(
Worker, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets),
get_chunks(Mode, WhichChunk, Candidate, RecallRangeStart, StoreID, Cache),
chunks_read(Mode, Worker, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets),
{Cache3, LastClearTime2} = maybe_clear_cached_chunks(Cache2, LastClearTime),
io_thread(PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
io_thread(Mode, PartitionNumber, MiningAddress, PackingDifficulty, StoreID,
Cache3, LastClearTime2)
end.

chunks_read(miner, Worker, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets) ->
ar_mining_worker:chunks_read(
Worker, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets);
chunks_read(standalone, Worker, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets) ->
Worker ! {chunks_read, WhichChunk, Candidate, RecallRangeStart, ChunkOffsets}.

get_packed_intervals(Start, End, MiningAddress, PackingDifficulty, "default", Intervals) ->
Packing = ar_block:get_packing(PackingDifficulty, MiningAddress),
case ar_sync_record:get_next_synced_interval(Start, End, Packing, ar_data_sync, "default") of
Expand Down Expand Up @@ -297,20 +306,20 @@ maybe_clear_cached_chunks(Cache, LastClearTime) ->
%%
%% However if the request is from our local miner there's no need to cache since the H1
%% batch is always handled all at once.
get_chunks(WhichChunk, Candidate, RangeStart, StoreID, Cache) ->
get_chunks(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache) ->
case Candidate#mining_candidate.cm_lead_peer of
not_set ->
ChunkOffsets = read_range(WhichChunk, Candidate, RangeStart, StoreID),
ChunkOffsets = read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID),
{ChunkOffsets, Cache};
_ ->
cached_read_range(WhichChunk, Candidate, RangeStart, StoreID, Cache)
cached_read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache)
end.

cached_read_range(WhichChunk, Candidate, RangeStart, StoreID, Cache) ->
cached_read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID, Cache) ->
Now = os:system_time(millisecond),
case maps:get(RangeStart, Cache, not_found) of
not_found ->
ChunkOffsets = read_range(WhichChunk, Candidate, RangeStart, StoreID),
ChunkOffsets = read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID),
Cache2 = maps:put(RangeStart, {Now, ChunkOffsets}, Cache),
{ChunkOffsets, Cache2};
{_CachedTime, ChunkOffsets} ->
Expand All @@ -326,7 +335,7 @@ cached_read_range(WhichChunk, Candidate, RangeStart, StoreID, Cache) ->
{ChunkOffsets, Cache}
end.

read_range(WhichChunk, Candidate, RangeStart, StoreID) ->
read_range(Mode, WhichChunk, Candidate, RangeStart, StoreID) ->
StartTime = erlang:monotonic_time(),
#mining_candidate{ mining_address = MiningAddress,
packing_difficulty = PackingDifficulty } = Candidate,
Expand All @@ -335,7 +344,7 @@ read_range(WhichChunk, Candidate, RangeStart, StoreID) ->
MiningAddress, PackingDifficulty, StoreID, ar_intervals:new()),
ChunkOffsets = ar_chunk_storage:get_range(RangeStart, RecallRangeSize, StoreID),
ChunkOffsets2 = filter_by_packing(ChunkOffsets, Intervals, StoreID),
log_read_range(Candidate, WhichChunk, length(ChunkOffsets), StartTime),
log_read_range(Mode, Candidate, WhichChunk, length(ChunkOffsets), StartTime),
ChunkOffsets2.

filter_by_packing([], _Intervals, _StoreID) ->
Expand All @@ -350,7 +359,9 @@ filter_by_packing([{EndOffset, Chunk} | ChunkOffsets], Intervals, "default" = St
filter_by_packing(ChunkOffsets, _Intervals, _StoreID) ->
ChunkOffsets.

log_read_range(Candidate, WhichChunk, FoundChunks, StartTime) ->
log_read_range(standalone, _Candidate, _WhichChunk, _FoundChunks, _StartTime) ->
ok;
log_read_range(_Mode, Candidate, WhichChunk, FoundChunks, StartTime) ->
EndTime = erlang:monotonic_time(),
ElapsedTime = erlang:convert_time_unit(EndTime-StartTime, native, millisecond),
ReadRate = case ElapsedTime > 0 of
Expand Down Expand Up @@ -378,6 +389,7 @@ log_read_range(Candidate, WhichChunk, FoundChunks, StartTime) ->
find_thread(PartitionNumber, MiningAddress, PackingDifficulty, RangeEnd, RangeStart, Threads) ->
Keys = find_thread2(PartitionNumber, MiningAddress, PackingDifficulty,
maps:iterator(Threads)),

case find_thread3(Keys, RangeEnd, RangeStart, 0, not_found) of
not_found ->
not_found;
Expand Down
23 changes: 20 additions & 3 deletions apps/arweave/src/ar_storage_module.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
-module(ar_storage_module).

-export([id/1, label/1, address_label/1, address_label/2, packing_label/1, label_by_id/1,
get_by_id/1, get_range/1, get_packing/1, get_size/1, get/2, get_all/1, get_all/2,
has_any/1, has_range/2, get_cover/3]).
-export([id/1, label/1, address_label/1, address_label/2, address/1, packing_difficulty/1,
packing_label/1, label_by_id/1, get_by_id/1, get_range/1, get_packing/1, get_size/1,
get/2, get_all/1, get_all/2, has_any/1, has_range/2, get_cover/3]).

-export([get_unique_sorted_intervals/1]).

Expand All @@ -22,6 +22,9 @@
-define(OVERLAP, (?LEGACY_RECALL_RANGE_SIZE)).
-endif.

-type storage_module() :: {integer(), integer(), {atom(), binary()}}
| {integer(), integer(), {atom(), binary(), integer()}}.

%%%===================================================================
%%% Public interface.
%%%===================================================================
Expand Down Expand Up @@ -85,6 +88,20 @@ address_label(Addr, PackingDifficulty) ->
integer_to_list(Label)
end.

-spec address(ar_storage_module:storage_module()) -> binary() | undefined.
address({_, _, {spora_2_6, Addr}}) ->
Addr;
address({_, _, {composite, Addr, _PackingDifficulty}}) ->
Addr;
address(_StorageModule) ->
undefined.

-spec packing_difficulty(ar_storage_module:storage_module()) -> integer().
packing_difficulty({_, _, {composite, _Addr, PackingDifficulty}}) ->
PackingDifficulty;
packing_difficulty(_StorageModule) ->
0.

packing_label({spora_2_6, Addr}) ->
AddrLabel = ar_storage_module:address_label(Addr),
list_to_atom("spora_2_6_" ++ AddrLabel);
Expand Down

0 comments on commit 2f72694

Please sign in to comment.