Merge pull request #265 from martinsumner/mas-i264-cachesize
Mas i264 cachesize
This commit is contained in:
commit
08d071dce8
6 changed files with 167 additions and 115 deletions
|
@ -21,7 +21,7 @@
|
||||||
|
|
||||||
%% @doc The key size of the Bookie's in-memory cache
|
%% @doc The key size of the Bookie's in-memory cache
|
||||||
{mapping, "leveled.cache_size", "leveled.cache_size", [
|
{mapping, "leveled.cache_size", "leveled.cache_size", [
|
||||||
{default, 4000},
|
{default, 2500},
|
||||||
{datatype, integer},
|
{datatype, integer},
|
||||||
hidden
|
hidden
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -2187,21 +2187,13 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
|
|
||||||
-spec maybe_withjitter(integer(), integer()) -> boolean().
|
-spec maybe_withjitter(integer(), integer()) -> boolean().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Push down randomly, but the closer to the maximum size, the more likely a
|
%% Push down randomly, but the closer to 4 * the maximum size, the more likely
|
||||||
%% push should be
|
%% a push should be
|
||||||
maybe_withjitter(CacheSize, MaxCacheSize) ->
|
maybe_withjitter(CacheSize, MaxCacheSize) when CacheSize > MaxCacheSize ->
|
||||||
if
|
R = leveled_rand:uniform(4 * MaxCacheSize),
|
||||||
CacheSize > MaxCacheSize ->
|
(CacheSize - MaxCacheSize) > R;
|
||||||
R = leveled_rand:uniform(7 * MaxCacheSize),
|
maybe_withjitter(_CacheSize, _MaxCacheSize) ->
|
||||||
if
|
false.
|
||||||
(CacheSize - MaxCacheSize) > R ->
|
|
||||||
true;
|
|
||||||
true ->
|
|
||||||
false
|
|
||||||
end;
|
|
||||||
true ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
-spec get_loadfun(book_state()) -> fun().
|
-spec get_loadfun(book_state()) -> fun().
|
||||||
|
|
|
@ -98,7 +98,7 @@
|
||||||
{"P0008",
|
{"P0008",
|
||||||
{info, "Penciller closing for reason ~w"}},
|
{info, "Penciller closing for reason ~w"}},
|
||||||
{"P0010",
|
{"P0010",
|
||||||
{info, "No level zero action on close of Penciller ~w"}},
|
{info, "discarded=~w level zero on close of Penciller"}},
|
||||||
{"P0011",
|
{"P0011",
|
||||||
{info, "Shutdown complete for Penciller for reason ~w"}},
|
{info, "Shutdown complete for Penciller for reason ~w"}},
|
||||||
{"P0012",
|
{"P0012",
|
||||||
|
|
|
@ -173,7 +173,7 @@
|
||||||
pcl_snapstart/1,
|
pcl_snapstart/1,
|
||||||
pcl_start/1,
|
pcl_start/1,
|
||||||
pcl_pushmem/2,
|
pcl_pushmem/2,
|
||||||
pcl_fetchlevelzero/2,
|
pcl_fetchlevelzero/3,
|
||||||
pcl_fetch/4,
|
pcl_fetch/4,
|
||||||
pcl_fetchkeys/5,
|
pcl_fetchkeys/5,
|
||||||
pcl_fetchkeys/6,
|
pcl_fetchkeys/6,
|
||||||
|
@ -306,6 +306,8 @@
|
||||||
-type iterator() :: list(iterator_entry()).
|
-type iterator() :: list(iterator_entry()).
|
||||||
-type bad_ledgerkey() :: list().
|
-type bad_ledgerkey() :: list().
|
||||||
|
|
||||||
|
-export_type([levelzero_cacheentry/0]).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -348,7 +350,7 @@ pcl_pushmem(Pid, LedgerCache) ->
|
||||||
%% Bookie to dump memory onto penciller
|
%% Bookie to dump memory onto penciller
|
||||||
gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
|
gen_server:call(Pid, {push_mem, LedgerCache}, infinity).
|
||||||
|
|
||||||
-spec pcl_fetchlevelzero(pid(), integer()) -> tuple().
|
-spec pcl_fetchlevelzero(pid(), non_neg_integer(), fun()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Allows a single slot of the penciller's levelzero cache to be fetched. The
|
%% Allows a single slot of the penciller's levelzero cache to be fetched. The
|
||||||
%% levelzero cache can be up to 40K keys - sending this to the process that is
|
%% levelzero cache can be up to 40K keys - sending this to the process that is
|
||||||
|
@ -358,13 +360,13 @@ pcl_pushmem(Pid, LedgerCache) ->
|
||||||
%%
|
%%
|
||||||
%% The return value will be a leveled_skiplist that forms that part of the
|
%% The return value will be a leveled_skiplist that forms that part of the
|
||||||
%% cache
|
%% cache
|
||||||
pcl_fetchlevelzero(Pid, Slot) ->
|
pcl_fetchlevelzero(Pid, Slot, ReturnFun) ->
|
||||||
% Timeout to cause crash of L0 file when it can't get the close signal
|
% Timeout to cause crash of L0 file when it can't get the close signal
|
||||||
% as it is deadlocked making this call.
|
% as it is deadlocked making this call.
|
||||||
%
|
%
|
||||||
% If the timeout gets hit outside of close scenario the Penciller will
|
% If the timeout gets hit outside of close scenario the Penciller will
|
||||||
% be stuck in L0 pending
|
% be stuck in L0 pending
|
||||||
gen_server:call(Pid, {fetch_levelzero, Slot}, 60000).
|
gen_server:cast(Pid, {fetch_levelzero, Slot, ReturnFun}).
|
||||||
|
|
||||||
-spec pcl_fetch(pid(), leveled_codec:ledger_key())
|
-spec pcl_fetch(pid(), leveled_codec:ledger_key())
|
||||||
-> leveled_codec:ledger_kv()|not_present.
|
-> leveled_codec:ledger_kv()|not_present.
|
||||||
|
@ -683,15 +685,12 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
|
||||||
{reply, returned, State};
|
{reply, returned, State};
|
||||||
{false, true} ->
|
{false, true} ->
|
||||||
leveled_log:log("P0042", [State#state.levelzero_size]),
|
leveled_log:log("P0042", [State#state.levelzero_size]),
|
||||||
% The cache is full (there are 127 items already in it), so
|
% The cache is full (the maximum line items have been reached), so
|
||||||
% can't accept any more. However, we need to try and roll
|
% can't accept any more. However, we need to try and roll memory
|
||||||
% memory otherwise cache may be permanently full.
|
% otherwise cache may be permanently full.
|
||||||
gen_server:reply(From, returned),
|
gen_server:reply(From, returned),
|
||||||
{L0Pend, L0Constructor, none} =
|
{UpdState, none} = maybe_roll_memory(State, true, false),
|
||||||
maybe_roll_memory(State, false),
|
{noreply, UpdState};
|
||||||
{noreply,
|
|
||||||
State#state{levelzero_pending=L0Pend,
|
|
||||||
levelzero_constructor=L0Constructor}};
|
|
||||||
{false, false} ->
|
{false, false} ->
|
||||||
% leveled_log:log("P0018", [ok, false, false]),
|
% leveled_log:log("P0018", [ok, false, false]),
|
||||||
PushedTree =
|
PushedTree =
|
||||||
|
@ -892,8 +891,6 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
|
||||||
CloneState#state{snapshot_fully_loaded=true,
|
CloneState#state{snapshot_fully_loaded=true,
|
||||||
manifest=ManifestClone}},
|
manifest=ManifestClone}},
|
||||||
State#state{manifest = Manifest0}};
|
State#state{manifest = Manifest0}};
|
||||||
handle_call({fetch_levelzero, Slot}, _From, State) ->
|
|
||||||
{reply, lists:nth(Slot, State#state.levelzero_cache), State};
|
|
||||||
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
handle_call(close, _From, State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||||
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
|
ok = pcl_releasesnapshot(State#state.source_penciller, self()),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
|
@ -909,20 +906,21 @@ handle_call(close, _From, State) ->
|
||||||
% on the clerk.
|
% on the clerk.
|
||||||
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
||||||
leveled_log:log("P0008", [close]),
|
leveled_log:log("P0008", [close]),
|
||||||
|
L0Empty = State#state.levelzero_size == 0,
|
||||||
|
case (not State#state.levelzero_pending and not L0Empty) of
|
||||||
|
true ->
|
||||||
L0_Left = State#state.levelzero_size > 0,
|
L0_Left = State#state.levelzero_size > 0,
|
||||||
case {State#state.levelzero_pending, L0_Left} of
|
{UpdState, _L0Bloom} = maybe_roll_memory(State, L0_Left, true),
|
||||||
{false, true} ->
|
L0Pid = UpdState#state.levelzero_constructor,
|
||||||
{_L0Pend, L0Pid, _L0Bloom} = maybe_roll_memory(State, true),
|
|
||||||
case is_pid(L0Pid) of
|
case is_pid(L0Pid) of
|
||||||
true ->
|
true ->
|
||||||
ok = leveled_sst:sst_close(L0Pid);
|
ok = leveled_sst:sst_close(L0Pid);
|
||||||
false ->
|
false ->
|
||||||
ok
|
leveled_log:log("P0010", [State#state.levelzero_size])
|
||||||
end;
|
end;
|
||||||
StatusTuple ->
|
false ->
|
||||||
leveled_log:log("P0010", [StatusTuple])
|
leveled_log:log("P0010", [State#state.levelzero_size])
|
||||||
end,
|
end,
|
||||||
|
|
||||||
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
shutdown_manifest(State#state.manifest, State#state.levelzero_constructor),
|
||||||
{stop, normal, ok, State};
|
{stop, normal, ok, State};
|
||||||
handle_call(doom, _From, State) ->
|
handle_call(doom, _From, State) ->
|
||||||
|
@ -958,14 +956,21 @@ handle_call(check_for_work, _From, State) ->
|
||||||
handle_call(persisted_sqn, _From, State) ->
|
handle_call(persisted_sqn, _From, State) ->
|
||||||
{reply, State#state.persisted_sqn, State}.
|
{reply, State#state.persisted_sqn, State}.
|
||||||
|
|
||||||
handle_cast({manifest_change, NewManifest}, State) ->
|
handle_cast({manifest_change, Manifest}, State) ->
|
||||||
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),
|
NewManSQN = leveled_pmanifest:get_manifest_sqn(Manifest),
|
||||||
OldManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest),
|
OldManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest),
|
||||||
leveled_log:log("P0041", [OldManSQN, NewManSQN]),
|
leveled_log:log("P0041", [OldManSQN, NewManSQN]),
|
||||||
ok = leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
|
% Only safe to update the manifest if the SQN increments
|
||||||
UpdManifest = leveled_pmanifest:merge_snapshot(State#state.manifest,
|
if NewManSQN > OldManSQN ->
|
||||||
NewManifest),
|
ok =
|
||||||
{noreply, State#state{manifest = UpdManifest, work_ongoing=false}};
|
leveled_pclerk:clerk_promptdeletions(State#state.clerk, NewManSQN),
|
||||||
|
% This is accepted as the new manifest, files may be deleted
|
||||||
|
UpdManifest =
|
||||||
|
leveled_pmanifest:merge_snapshot(State#state.manifest, Manifest),
|
||||||
|
% Need to preserve the penciller's view of snapshots stored in
|
||||||
|
% the manifest
|
||||||
|
{noreply, State#state{manifest=UpdManifest, work_ongoing=false}}
|
||||||
|
end;
|
||||||
handle_cast({release_snapshot, Snapshot}, State) ->
|
handle_cast({release_snapshot, Snapshot}, State) ->
|
||||||
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
|
Manifest0 = leveled_pmanifest:release_snapshot(State#state.manifest,
|
||||||
Snapshot),
|
Snapshot),
|
||||||
|
@ -1044,6 +1049,9 @@ handle_cast(work_for_clerk, State) ->
|
||||||
_ ->
|
_ ->
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
handle_cast({fetch_levelzero, Slot, ReturnFun}, State) ->
|
||||||
|
ReturnFun(lists:nth(Slot, State#state.levelzero_cache)),
|
||||||
|
{noreply, State};
|
||||||
handle_cast({log_level, LogLevel}, State) ->
|
handle_cast({log_level, LogLevel}, State) ->
|
||||||
PC = State#state.clerk,
|
PC = State#state.clerk,
|
||||||
ok = leveled_pclerk:clerk_loglevel(PC, LogLevel),
|
ok = leveled_pclerk:clerk_loglevel(PC, LogLevel),
|
||||||
|
@ -1293,37 +1301,37 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
|
||||||
false ->
|
false ->
|
||||||
true
|
true
|
||||||
end,
|
end,
|
||||||
NoPendingManifestChange = not State#state.work_ongoing,
|
|
||||||
JitterCheck = RandomFactor or CacheMuchTooBig,
|
JitterCheck = RandomFactor or CacheMuchTooBig,
|
||||||
Due = CacheTooBig and JitterCheck,
|
Due = CacheTooBig and JitterCheck,
|
||||||
case {Due, NoPendingManifestChange} of
|
{UpdState0, _L0Bloom} = maybe_roll_memory(UpdState, Due, false),
|
||||||
{true, true} ->
|
LogSubs = [NewL0Size, Due, State#state.work_ongoing],
|
||||||
{L0Pend, L0Constructor, none} =
|
|
||||||
maybe_roll_memory(UpdState, false),
|
|
||||||
LogSubs = [NewL0Size, true, true],
|
|
||||||
leveled_log:log_timer("P0031", LogSubs, SW),
|
leveled_log:log_timer("P0031", LogSubs, SW),
|
||||||
UpdState#state{levelzero_pending=L0Pend,
|
UpdState0
|
||||||
levelzero_constructor=L0Constructor};
|
|
||||||
_ ->
|
|
||||||
LogSubs = [NewL0Size, Due, NoPendingManifestChange],
|
|
||||||
leveled_log:log_timer("P0031", LogSubs, SW),
|
|
||||||
UpdState
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
-spec maybe_roll_memory(pcl_state(), boolean())
|
-spec maybe_roll_memory(pcl_state(), boolean(), boolean())
|
||||||
-> {boolean(), pid()|undefined, leveled_ebloom:bloom()|none}.
|
-> {pcl_state(), leveled_ebloom:bloom()|none}.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Check that no L0 file is present before rolling memory
|
%% Check that no L0 file is present before rolling memory. Returns a boolean
|
||||||
maybe_roll_memory(State, SyncRoll) ->
|
%% to indicate if memory has been rolled, the Pid of the L0 constructor and
|
||||||
|
%% The bloom of the L0 file (or none)
|
||||||
|
maybe_roll_memory(State, false, _SyncRoll) ->
|
||||||
|
{State, none};
|
||||||
|
maybe_roll_memory(State, true, SyncRoll) ->
|
||||||
BlockedByL0 = leveled_pmanifest:levelzero_present(State#state.manifest),
|
BlockedByL0 = leveled_pmanifest:levelzero_present(State#state.manifest),
|
||||||
case BlockedByL0 of
|
PendingManifestChange = State#state.work_ongoing,
|
||||||
|
% It is critical that memory is not rolled if the manifest is due to be
|
||||||
|
% updated by a change by the clerk. When that manifest change is made it
|
||||||
|
% will override the addition of L0 and data will be lost.
|
||||||
|
case (BlockedByL0 or PendingManifestChange) of
|
||||||
true ->
|
true ->
|
||||||
{false, undefined, none};
|
{State, none};
|
||||||
false ->
|
false ->
|
||||||
{L0Constructor, Bloom} = roll_memory(State, SyncRoll),
|
{L0Constructor, Bloom} = roll_memory(State, SyncRoll),
|
||||||
{true, L0Constructor, Bloom}
|
{State#state{levelzero_pending=true,
|
||||||
|
levelzero_constructor=L0Constructor},
|
||||||
|
Bloom}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec roll_memory(pcl_state(), boolean())
|
-spec roll_memory(pcl_state(), boolean())
|
||||||
|
@ -1347,7 +1355,8 @@ roll_memory(State, false) ->
|
||||||
FileName = sst_filename(ManSQN, 0, 0),
|
FileName = sst_filename(ManSQN, 0, 0),
|
||||||
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
|
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
|
||||||
PCL = self(),
|
PCL = self(),
|
||||||
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
FetchFun =
|
||||||
|
fun(Slot, ReturnFun) -> pcl_fetchlevelzero(PCL, Slot, ReturnFun) end,
|
||||||
R = leveled_sst:sst_newlevelzero(RootPath,
|
R = leveled_sst:sst_newlevelzero(RootPath,
|
||||||
FileName,
|
FileName,
|
||||||
length(State#state.levelzero_cache),
|
length(State#state.levelzero_cache),
|
||||||
|
@ -2309,24 +2318,16 @@ create_file_test() ->
|
||||||
ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")),
|
ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")),
|
||||||
KVL = lists:usort(generate_randomkeys({50000, 0})),
|
KVL = lists:usort(generate_randomkeys({50000, 0})),
|
||||||
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
|
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
|
||||||
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
|
|
||||||
{ok, SP, noreply} =
|
{ok, SP, noreply} =
|
||||||
leveled_sst:sst_newlevelzero(RP,
|
leveled_sst:sst_newlevelzero(RP,
|
||||||
Filename,
|
Filename,
|
||||||
1,
|
1,
|
||||||
FetchFun,
|
[Tree],
|
||||||
undefined,
|
undefined,
|
||||||
50000,
|
50000,
|
||||||
#sst_options{press_method = native}),
|
#sst_options{press_method = native}),
|
||||||
lists:foreach(fun(X) ->
|
{ok, SrcFN, StartKey, EndKey} = leveled_sst:sst_checkready(SP),
|
||||||
case checkready(SP) of
|
|
||||||
timeout ->
|
|
||||||
timer:sleep(X);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end end,
|
|
||||||
[50, 100, 200, 400, 800]),
|
|
||||||
{ok, SrcFN, StartKey, EndKey} = checkready(SP),
|
|
||||||
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
|
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
|
||||||
?assertMatch({o, _, _, _}, StartKey),
|
?assertMatch({o, _, _, _}, StartKey),
|
||||||
?assertMatch({o, _, _, _}, EndKey),
|
?assertMatch({o, _, _, _}, EndKey),
|
||||||
|
@ -2339,14 +2340,6 @@ slow_fetch_test() ->
|
||||||
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)),
|
?assertMatch(not_present, log_slowfetch(2, not_present, "fake", 0, 1)),
|
||||||
?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)).
|
?assertMatch("value", log_slowfetch(2, "value", "fake", 0, 1)).
|
||||||
|
|
||||||
checkready(Pid) ->
|
|
||||||
try
|
|
||||||
leveled_sst:sst_checkready(Pid)
|
|
||||||
catch
|
|
||||||
exit:{timeout, _} ->
|
|
||||||
timeout
|
|
||||||
end.
|
|
||||||
|
|
||||||
timings_test() ->
|
timings_test() ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
timer:sleep(1),
|
timer:sleep(1),
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-define(MAX_CACHE_LINES, 31). % Must be less than 128
|
||||||
|
|
||||||
% -type index_array() :: array:array().
|
% -type index_array() :: array:array().
|
||||||
-type index_array() :: any()|none. % To live with OTP16
|
-type index_array() :: any()|none. % To live with OTP16
|
||||||
|
|
||||||
|
@ -55,9 +57,9 @@
|
||||||
|
|
||||||
-spec cache_full(list()) -> boolean().
|
-spec cache_full(list()) -> boolean().
|
||||||
%% @doc
|
%% @doc
|
||||||
%% If there are already 127 entries in the cache then the cache is full
|
%% If there are already 31 entries in the cache then the cache is full
|
||||||
cache_full(L0Cache) ->
|
cache_full(L0Cache) ->
|
||||||
length(L0Cache) == 127.
|
length(L0Cache) == ?MAX_CACHE_LINES.
|
||||||
|
|
||||||
-spec prepare_for_index(index_array(), leveled_codec:segment_hash())
|
-spec prepare_for_index(index_array(), leveled_codec:segment_hash())
|
||||||
-> index_array().
|
-> index_array().
|
||||||
|
|
|
@ -65,6 +65,7 @@
|
||||||
-ifdef(fsm_deprecated).
|
-ifdef(fsm_deprecated).
|
||||||
-compile({nowarn_deprecated_function,
|
-compile({nowarn_deprecated_function,
|
||||||
[{gen_fsm, start_link, 3},
|
[{gen_fsm, start_link, 3},
|
||||||
|
{gen_fsm, sync_send_event, 2},
|
||||||
{gen_fsm, sync_send_event, 3},
|
{gen_fsm, sync_send_event, 3},
|
||||||
{gen_fsm, send_event, 2},
|
{gen_fsm, send_event, 2},
|
||||||
{gen_fsm, send_all_state_event, 2}]}).
|
{gen_fsm, send_all_state_event, 2}]}).
|
||||||
|
@ -188,7 +189,9 @@
|
||||||
timings = no_timing :: sst_timings(),
|
timings = no_timing :: sst_timings(),
|
||||||
timings_countdown = 0 :: integer(),
|
timings_countdown = 0 :: integer(),
|
||||||
starting_pid :: pid()|undefined,
|
starting_pid :: pid()|undefined,
|
||||||
fetch_cache = array:new([{size, ?CACHE_SIZE}])}).
|
fetch_cache = array:new([{size, ?CACHE_SIZE}]),
|
||||||
|
new_slots :: list()|undefined,
|
||||||
|
deferred_startup_tuple :: tuple()|undefined}).
|
||||||
|
|
||||||
-record(sst_timings,
|
-record(sst_timings,
|
||||||
{sample_count = 0 :: integer(),
|
{sample_count = 0 :: integer(),
|
||||||
|
@ -335,7 +338,7 @@ sst_new(RootPath, Filename,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec sst_newlevelzero(string(), string(),
|
-spec sst_newlevelzero(string(), string(),
|
||||||
integer(), fun(), pid()|undefined, integer(),
|
integer(), fun()|list(), pid()|undefined, integer(),
|
||||||
sst_options()) ->
|
sst_options()) ->
|
||||||
{ok, pid(), noreply}.
|
{ok, pid(), noreply}.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -343,23 +346,34 @@ sst_new(RootPath, Filename,
|
||||||
%% it will be as big as the input. Also the KVList is not passed in, it is
|
%% it will be as big as the input. Also the KVList is not passed in, it is
|
||||||
%% fetched slot by slot using the FetchFun
|
%% fetched slot by slot using the FetchFun
|
||||||
sst_newlevelzero(RootPath, Filename,
|
sst_newlevelzero(RootPath, Filename,
|
||||||
Slots, FetchFun, Penciller,
|
Slots, Fetcher, Penciller,
|
||||||
MaxSQN, OptsSST) ->
|
MaxSQN, OptsSST) ->
|
||||||
PressMethod0 = compress_level(0, OptsSST#sst_options.press_method),
|
PressMethod0 = compress_level(0, OptsSST#sst_options.press_method),
|
||||||
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
|
||||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
||||||
gen_fsm:send_event(Pid,
|
% Initiate the file into the "starting" state
|
||||||
|
ok = gen_fsm:sync_send_event(Pid,
|
||||||
{sst_newlevelzero,
|
{sst_newlevelzero,
|
||||||
RootPath,
|
RootPath,
|
||||||
Filename,
|
Filename,
|
||||||
Slots,
|
|
||||||
FetchFun,
|
|
||||||
Penciller,
|
Penciller,
|
||||||
MaxSQN,
|
MaxSQN,
|
||||||
OptsSST0,
|
OptsSST0,
|
||||||
?INDEX_MODDATE}),
|
?INDEX_MODDATE}),
|
||||||
|
ok =
|
||||||
|
case is_list(Fetcher) of
|
||||||
|
true ->
|
||||||
|
gen_fsm:send_event(Pid, {complete_l0startup, Fetcher});
|
||||||
|
false ->
|
||||||
|
% Fetcher is a function
|
||||||
|
gen_fsm:send_event(Pid, {sst_returnslot, none, Fetcher, Slots})
|
||||||
|
% Start the fetch loop (async). Having the fetch loop running
|
||||||
|
% on async message passing means that the SST file can now be
|
||||||
|
% closed while the fetch loop is still completing
|
||||||
|
end,
|
||||||
{ok, Pid, noreply}.
|
{ok, Pid, noreply}.
|
||||||
|
|
||||||
|
|
||||||
-spec sst_get(pid(), leveled_codec:ledger_key())
|
-spec sst_get(pid(), leveled_codec:ledger_key())
|
||||||
-> leveled_codec:ledger_kv()|not_present.
|
-> leveled_codec:ledger_kv()|not_present.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -415,7 +429,7 @@ sst_setfordelete(Pid, Penciller) ->
|
||||||
%% For this file to be closed and deleted
|
%% For this file to be closed and deleted
|
||||||
sst_clear(Pid) ->
|
sst_clear(Pid) ->
|
||||||
gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity),
|
gen_fsm:sync_send_event(Pid, {set_for_delete, false}, infinity),
|
||||||
gen_fsm:sync_send_event(Pid, close, 1000).
|
gen_fsm:sync_send_event(Pid, close).
|
||||||
|
|
||||||
-spec sst_deleteconfirmed(pid()) -> ok.
|
-spec sst_deleteconfirmed(pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -432,13 +446,13 @@ sst_deleteconfirmed(Pid) ->
|
||||||
%% the filename and the {startKey, EndKey} for the manifest.
|
%% the filename and the {startKey, EndKey} for the manifest.
|
||||||
sst_checkready(Pid) ->
|
sst_checkready(Pid) ->
|
||||||
%% Only used in test
|
%% Only used in test
|
||||||
gen_fsm:sync_send_event(Pid, background_complete, 100).
|
gen_fsm:sync_send_event(Pid, background_complete).
|
||||||
|
|
||||||
-spec sst_close(pid()) -> ok.
|
-spec sst_close(pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Close the file
|
%% Close the file
|
||||||
sst_close(Pid) ->
|
sst_close(Pid) ->
|
||||||
gen_fsm:sync_send_event(Pid, close, 2000).
|
gen_fsm:sync_send_event(Pid, close).
|
||||||
|
|
||||||
-spec sst_printtimings(pid()) -> ok.
|
-spec sst_printtimings(pid()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -446,7 +460,7 @@ sst_close(Pid) ->
|
||||||
%% forced to be printed.
|
%% forced to be printed.
|
||||||
%% Used in unit tests to force the printing of timings
|
%% Used in unit tests to force the printing of timings
|
||||||
sst_printtimings(Pid) ->
|
sst_printtimings(Pid) ->
|
||||||
gen_fsm:sync_send_event(Pid, print_timings, 1000).
|
gen_fsm:sync_send_event(Pid, print_timings).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -492,15 +506,29 @@ starting({sst_new,
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex,
|
UpdState#state{blockindex_cache = BlockIndex,
|
||||||
starting_pid = StartingPID},
|
starting_pid = StartingPID},
|
||||||
?STARTUP_TIMEOUT}.
|
?STARTUP_TIMEOUT};
|
||||||
|
|
||||||
starting({sst_newlevelzero, RootPath, Filename,
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
Slots, FetchFun, Penciller, MaxSQN,
|
Penciller, MaxSQN,
|
||||||
OptsSST, IdxModDate}, State) ->
|
OptsSST, IdxModDate}, _From, State) ->
|
||||||
|
DeferredStartupTuple =
|
||||||
|
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate},
|
||||||
|
{reply, ok, starting,
|
||||||
|
State#state{deferred_startup_tuple = DeferredStartupTuple}};
|
||||||
|
starting(close, _From, State) ->
|
||||||
|
% No file should have been created, so nothing to close.
|
||||||
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
starting({complete_l0startup, Slots}, State) ->
|
||||||
|
starting(complete_l0startup, State#state{new_slots = Slots});
|
||||||
|
starting(complete_l0startup, State) ->
|
||||||
|
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate} =
|
||||||
|
State#state.deferred_startup_tuple,
|
||||||
SW0 = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
|
FetchedSlots = State#state.new_slots,
|
||||||
leveled_log:save(OptsSST#sst_options.log_options),
|
leveled_log:save(OptsSST#sst_options.log_options),
|
||||||
PressMethod = OptsSST#sst_options.press_method,
|
PressMethod = OptsSST#sst_options.press_method,
|
||||||
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
FetchFun = fun(Slot) -> lists:nth(Slot, FetchedSlots) end,
|
||||||
|
KVList = leveled_pmem:to_list(length(FetchedSlots), FetchFun),
|
||||||
Time0 = timer:now_diff(os:timestamp(), SW0),
|
Time0 = timer:now_diff(os:timestamp(), SW0),
|
||||||
|
|
||||||
SW1 = os:timestamp(),
|
SW1 = os:timestamp(),
|
||||||
|
@ -524,7 +552,12 @@ starting({sst_newlevelzero, RootPath, Filename,
|
||||||
PressMethod, IdxModDate),
|
PressMethod, IdxModDate),
|
||||||
{UpdState, Bloom} =
|
{UpdState, Bloom} =
|
||||||
read_file(ActualFilename,
|
read_file(ActualFilename,
|
||||||
State#state{root_path=RootPath, yield_blockquery=true}),
|
State#state{root_path=RootPath,
|
||||||
|
yield_blockquery=true,
|
||||||
|
% Important to empty this from state rather
|
||||||
|
% than carry it through to the next stage
|
||||||
|
new_slots=undefined,
|
||||||
|
deferred_startup_tuple=undefined}),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
Time4 = timer:now_diff(os:timestamp(), SW4),
|
Time4 = timer:now_diff(os:timestamp(), SW4),
|
||||||
|
|
||||||
|
@ -547,9 +580,37 @@ starting({sst_newlevelzero, RootPath, Filename,
|
||||||
{next_state,
|
{next_state,
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex}}
|
UpdState#state{blockindex_cache = BlockIndex}}
|
||||||
|
end;
|
||||||
|
starting({sst_returnslot, FetchedSlot, FetchFun, SlotCount}, State) ->
|
||||||
|
Self = self(),
|
||||||
|
FetchedSlots =
|
||||||
|
case FetchedSlot of
|
||||||
|
none ->
|
||||||
|
[];
|
||||||
|
_ ->
|
||||||
|
[FetchedSlot|State#state.new_slots]
|
||||||
|
end,
|
||||||
|
case length(FetchedSlots) == SlotCount of
|
||||||
|
true ->
|
||||||
|
gen_fsm:send_event(Self, complete_l0startup),
|
||||||
|
{next_state,
|
||||||
|
starting,
|
||||||
|
% Reverse the slots so that they are back in the expected
|
||||||
|
% order
|
||||||
|
State#state{new_slots = lists:reverse(FetchedSlots)}};
|
||||||
|
false ->
|
||||||
|
ReturnFun =
|
||||||
|
fun(NextSlot) ->
|
||||||
|
gen_fsm:send_event(Self,
|
||||||
|
{sst_returnslot, NextSlot,
|
||||||
|
FetchFun, SlotCount})
|
||||||
|
end,
|
||||||
|
FetchFun(length(FetchedSlots) + 1, ReturnFun),
|
||||||
|
{next_state,
|
||||||
|
starting,
|
||||||
|
State#state{new_slots = FetchedSlots}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
reader({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
% Get a KV value and potentially take sample timings
|
% Get a KV value and potentially take sample timings
|
||||||
{Result, UpdState, UpdTimings} =
|
{Result, UpdState, UpdTimings} =
|
||||||
|
@ -3351,6 +3412,10 @@ take_max_lastmoddate_test() ->
|
||||||
% modified dates
|
% modified dates
|
||||||
?assertMatch(1, take_max_lastmoddate(0, 1)).
|
?assertMatch(1, take_max_lastmoddate(0, 1)).
|
||||||
|
|
||||||
|
stopstart_test() ->
|
||||||
|
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
|
||||||
|
% check we can close in the starting state. This may happen due to the
|
||||||
|
% fetcher on new level zero files working in a loop
|
||||||
|
ok = sst_close(Pid).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue