Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesPiechota committed Jan 17, 2025
1 parent 7c34926 commit 89d5b1f
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 486 deletions.
5 changes: 0 additions & 5 deletions apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@
%% The default rocksdb WAL sync interval, 1 minute.
-define(DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S, 60).

%% The maximum allowed total size (in bytes) of the entropies generated
%% for the 2.9 replication.
-define(DEFAULT_MAX_REPLICA_2_9_ENTROPY_CACHE_SIZE, 33_554_432). % 8_388_608 * 4.

%% The number of 2.9 storage modules allowed to prepare the storage at a time.
-ifdef(AR_TEST).
-define(DEFAULT_REPLICA_2_9_WORKERS, 2).
Expand Down Expand Up @@ -227,7 +223,6 @@
pool_api_key = not_set,
pool_worker_name = not_set,
replica_2_9_workers = ?DEFAULT_REPLICA_2_9_WORKERS,
replica_2_9_entropy_cache_size = ?DEFAULT_MAX_REPLICA_2_9_ENTROPY_CACHE_SIZE,
%% Undocumented/unsupported options
chunk_storage_file_size = ?CHUNK_GROUP_SIZE,
rocksdb_flush_interval_s = ?DEFAULT_ROCKSDB_FLUSH_INTERVAL_S,
Expand Down
52 changes: 31 additions & 21 deletions apps/arweave/src/ar_chunk_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -580,27 +580,37 @@ do_prepare_replica_2_9(State) ->
is_recorded;
false ->
%% Get all the entropies needed to encipher the chunk at BucketEndOffset.
Entropies = ar_entropy_storage:generate_entropies(
RewardAddr, BucketEndOffset, SubChunkStart),
EntropyKeys = ar_entropy_storage:generate_entropy_keys(
RewardAddr, BucketEndOffset, SubChunkStart),
SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset),
%% If we are not at the beginning of the entropy, shift the offset to
%% the left. store_entropy will traverse the entire 2.9 partition shifting
%% the offset by sector size. It may happen some sub-chunks will be written
%% to the neighbouring storage module(s) on the left or on the right
%% since the storage module may be configured to be smaller than the
%% partition.
BucketEndOffset2 = ar_entropy_storage:shift_entropy_offset(
BucketEndOffset, -SliceIndex),
%% The end of a recall partition (3.6TB) may fall in the middle of a chunk, so
%% we'll use the padded offset to end the store_entropy iteration.
PartitionEnd = (Partition + 1) * ?PARTITION_SIZE,
PaddedPartitionEnd =
get_chunk_bucket_end(ar_block:get_chunk_padded_offset(PartitionEnd)),
ar_entropy_storage:store_entropy(
Entropies, BucketEndOffset2, SubChunkStart, PaddedPartitionEnd,
EntropyKeys, RewardAddr, 0, 0)
Entropies = prometheus_histogram:observe_duration(
replica_2_9_entropy_duration_milliseconds, ["32"],
fun() ->
ar_entropy_storage:generate_entropies(
RewardAddr, BucketEndOffset, SubChunkStart)
end),

case Entropies of
{error, Reason} ->
{error, Reason};
_ ->
EntropyKeys = ar_entropy_storage:generate_entropy_keys(
RewardAddr, BucketEndOffset, SubChunkStart),
SliceIndex = ar_replica_2_9:get_slice_index(BucketEndOffset),
%% If we are not at the beginning of the entropy, shift the offset to
%% the left. store_entropy will traverse the entire 2.9 partition shifting
%% the offset by sector size. It may happen some sub-chunks will be written
%% to the neighbouring storage module(s) on the left or on the right
%% since the storage module may be configured to be smaller than the
%% partition.
BucketEndOffset2 = ar_entropy_storage:shift_entropy_offset(
BucketEndOffset, -SliceIndex),
%% The end of a recall partition (3.6TB) may fall in the middle of a chunk, so
%% we'll use the padded offset to end the store_entropy iteration.
PartitionEnd = (Partition + 1) * ?PARTITION_SIZE,
PaddedPartitionEnd =
get_chunk_bucket_end(ar_block:get_chunk_padded_offset(PartitionEnd)),
ar_entropy_storage:store_entropy(
Entropies, BucketEndOffset2, SubChunkStart, PaddedPartitionEnd,
EntropyKeys, RewardAddr, 0, 0)
end
end,
?LOG_DEBUG([{event, do_prepare_replica_2_9}, {store_id, StoreID},
{start, Start}, {padded_end_offset, BucketEndOffset},
Expand Down
10 changes: 8 additions & 2 deletions apps/arweave/src/ar_data_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3006,9 +3006,15 @@ process_store_chunk_queue(State, StartLen) ->
orelse Now - Timestamp > ?STORE_CHUNK_QUEUE_FLUSH_TIME_THRESHOLD of
true ->
{{_Offset, _Timestamp, _Ref, ChunkArgs, Args}, Q2} = gb_sets:take_smallest(Q),
{_Packing, _Chunk, _AbsoluteOffset, _TXRoot, ChunkSize} = ChunkArgs,

StartTime = erlang:monotonic_time(),

store_chunk2(ChunkArgs, Args, State),

ar_metrics:record_rate_metric(
StartTime, ChunkSize, chunk_store_rate, [StoreID]),

prometheus_histogram:observe_duration(chunk_store_duration_milliseconds, [],
fun() -> store_chunk2(ChunkArgs, Args, State) end),
decrement_chunk_cache_size(),
State2 = State#sync_data_state{ store_chunk_queue = Q2,
store_chunk_queue_len = Len - 1,
Expand Down
19 changes: 13 additions & 6 deletions apps/arweave/src/ar_device_lock.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ do_acquire_lock(Mode, StoreID, State) ->
Device = maps:get(StoreID, State#state.store_id_to_device),
DeviceLock = maps:get(Device, State#state.device_locks, sync),
PrepareLocks = count_prepare_locks(State),
MaxPrepareLocks = 128,
MaxPrepareLocks = 4,
{Acquired, NewDeviceLock} = case Mode of
sync ->
%% Can only aquire a sync lock if the device is in sync mode
Expand Down Expand Up @@ -254,19 +254,26 @@ count_prepare_locks(State) ->
log_device_locks(State) ->
StoreIDToDevice = State#state.store_id_to_device,
DeviceLocks = State#state.device_locks,
maps:fold(
fun(StoreID, Device, _) ->
SortedStoreIDList = lists:sort(
fun({StoreID1, Device1}, {StoreID2, Device2}) ->
case Device1 =:= Device2 of
true -> StoreID1 =< StoreID2;
false -> Device1 < Device2
end
end,
maps:to_list(StoreIDToDevice)),
lists:foreach(
fun({StoreID, Device}) ->
DeviceLock = maps:get(Device, DeviceLocks, sync),
Status = case DeviceLock of
sync -> sync;
{prepare, StoreID} -> prepare;
{repack, StoreID} -> repack;
_ -> paused
end,
?LOG_INFO([{event, device_lock_status}, {store_id, StoreID}, {status, Status}])
?LOG_INFO([{event, device_lock_status}, {device, Device}, {store_id, StoreID}, {status, Status}])
end,
ok,
StoreIDToDevice
SortedStoreIDList
).

%%%===================================================================
Expand Down
117 changes: 72 additions & 45 deletions apps/arweave/src/ar_entropy_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,60 @@ delete_record(PaddedEndOffset, StoreID) ->

generate_missing_entropy(PaddedEndOffset, RewardAddr) ->
Entropies = generate_entropies(RewardAddr, PaddedEndOffset, 0),
EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset),
take_combined_entropy_by_index(Entropies, EntropyIndex).
case Entropies of
{error, Reason} ->
{error, Reason};
_ ->
EntropyIndex = ar_replica_2_9:get_slice_index(PaddedEndOffset),
take_combined_entropy_by_index(Entropies, EntropyIndex)
end.

%% @doc Returns all the entropies needed to encipher the chunk at PaddedEndOffset.
%% ar_packing_server:get_replica_2_9_entropy/3 will query a cached entropy, or generate it
%% if it is not cached.
generate_entropies(_RewardAddr, _PaddedEndOffset, SubChunkStart)
when SubChunkStart == ?DATA_CHUNK_SIZE ->
[];
generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart) ->
SubChunkSize = ?COMPOSITE_PACKING_SUB_CHUNK_SIZE,
[ar_packing_server:get_replica_2_9_entropy(RewardAddr, PaddedEndOffset, SubChunkStart)
| generate_entropies(RewardAddr, PaddedEndOffset, SubChunkStart + SubChunkSize)].
EntropyTasks = lists:map(
fun(Offset) ->
Ref = make_ref(),
ar_packing_server:request_entropy_generation(
Ref, self(), {RewardAddr, PaddedEndOffset, Offset}),
Ref
end,
lists:seq(SubChunkStart, ?DATA_CHUNK_SIZE - SubChunkSize, SubChunkSize)
),
Entropies = collect_entropies(EntropyTasks, []),
case Entropies of
{error, _Reason} ->
flush_entropy_messages();
_ ->
ok
end,
Entropies.

collect_entropies([], Acc) ->
lists:reverse(Acc);
collect_entropies([Ref | Rest], Acc) ->
receive
{entropy_generated, Ref, {error, Reason}} ->
?LOG_ERROR([{event, failed_to_generate_replica_2_9_entropy}, {error, Reason}]),
{error, Reason};
{entropy_generated, Ref, Entropy} ->
collect_entropies(Rest, [Entropy | Acc])
after 60000 ->
?LOG_ERROR([{event, entropy_generation_timeout}, {ref, Ref}]),
{error, timeout}
end.

flush_entropy_messages() ->
?LOG_INFO([{event, flush_entropy_messages}]),
receive
{entropy_generated, _, _} ->
flush_entropy_messages()
after 0 ->
ok
end.

generate_entropy_keys(_RewardAddr, _Offset, SubChunkStart)
when SubChunkStart == ?DATA_CHUNK_SIZE ->
Expand Down Expand Up @@ -179,20 +220,10 @@ store_entropy(Entropies,
StoreID2,
RewardAddr),

EndTime = erlang:monotonic_time(),
ElapsedTime =
erlang:convert_time_unit(EndTime - StartTime,
native,
microsecond),
%% bytes per second
WriteRate =
case ElapsedTime > 0 of
true -> 1000000 * byte_size(ChunkEntropy) div ElapsedTime;
false -> 0
end,
prometheus_gauge:set(replica_2_9_entropy_store_rate,
[StoreID2],
WriteRate),
ar_metrics:record_rate_metric(StartTime,
byte_size(ChunkEntropy),
replica_2_9_entropy_write_rate,
[StoreID2]),
From ! {store_entropy_sub_chunk_written, WaitNAcc + 1}
end),
WaitNAcc + 1
Expand Down Expand Up @@ -240,34 +271,33 @@ record_chunk(PaddedEndOffset, Chunk, RewardAddr, StoreID, FileIndex, IsPrepared)
true ->
ar_chunk_storage:get(StartOffset, StartOffset, StoreID)
end,
case ReadEntropy of
RecordChunk = case ReadEntropy of
{error, _} = Error2 ->
release_semaphore(Filepath),
Error2;
not_found ->
release_semaphore(Filepath),
{error, not_prepared_yet2};
missing_entropy ->
Packing = {replica_2_9, RewardAddr},
Entropy = generate_missing_entropy(PaddedEndOffset, RewardAddr),
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
Result = ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex),
release_semaphore(Filepath),
Result;
case Entropy of
{error, Reason} ->
{error, Reason};
_ ->
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex)
end;
no_entropy_yet ->
Result = ar_chunk_storage:record_chunk(
PaddedEndOffset, Chunk, unpacked_padded, StoreID, FileIndex),
release_semaphore(Filepath),
Result;
ar_chunk_storage:record_chunk(
PaddedEndOffset, Chunk, unpacked_padded, StoreID, FileIndex);
{_EndOffset, Entropy} ->
Packing = {replica_2_9, RewardAddr},
PackedChunk = ar_packing_server:encipher_replica_2_9_chunk(Chunk, Entropy),
Result = ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex),
release_semaphore(Filepath),
Result
end.
ar_chunk_storage:record_chunk(
PaddedEndOffset, PackedChunk, Packing, StoreID, FileIndex)
end,
release_semaphore(Filepath),
RecordChunk.

%% @doc Return the byte (>= ChunkStartOffset, < ChunkEndOffset)
%% that necessarily belongs to the chunk stored
Expand Down Expand Up @@ -316,6 +346,7 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->
{error, _} = Error ->
Error;
{_, UnpackedChunk} ->
ar_sync_record:delete(EndOffset, EndOffset - ?DATA_CHUNK_SIZE, ar_data_sync, StoreID),
ar_packing_server:encipher_replica_2_9_chunk(UnpackedChunk, ChunkEntropy)
end;
false ->
Expand All @@ -328,12 +359,6 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->
{error, _} = Error2 ->
Error2;
_ ->
case IsUnpackedChunkRecorded of
true ->
ar_sync_record:delete(EndOffset, EndOffset - ?DATA_CHUNK_SIZE, ar_data_sync, StoreID);
false ->
ok
end,
case ar_chunk_storage:write_chunk(EndOffset, Chunk, #{}, StoreID) of
{ok, Filepath} ->
ets:insert(chunk_storage_file_index,
Expand All @@ -347,8 +372,9 @@ record_entropy(ChunkEntropy, BucketEndOffset, StoreID, RewardAddr) ->

case Result of
{error, Reason} ->
?LOG_ERROR([{event, failed_to_store_replica_2_9_sub_chunk_entropy},
?LOG_ERROR([{event, failed_to_store_replica_2_9_chunk_entropy},
{filepath, Filepath},
{byte, Byte},
{padded_end_offset, EndOffset},
{bucket_end_offset, BucketEndOffset},
{store_id, StoreID},
Expand Down Expand Up @@ -423,7 +449,8 @@ shift_entropy_offset(Offset, SectorCount) ->
acquire_semaphore(Filepath) ->
case ets:insert_new(ar_entropy_storage, {{semaphore, Filepath}}) of
false ->
?LOG_DEBUG([{event, details_store_chunk}, {section, waiting_on_semaphore}, {filepath, Filepath}]),
?LOG_DEBUG([
{event, details_store_chunk}, {section, waiting_on_semaphore}, {filepath, Filepath}]),
timer:sleep(20),
acquire_semaphore(Filepath);
true ->
Expand Down
Loading

0 comments on commit 89d5b1f

Please sign in to comment.