Handle L0 cache being full

A test thta will cause leveled to crash due to a low cache size being set - but protect against this (as well as the general scenario of the cache being full).

There could be a potential case where a L0 file present (post pending) without work backlog being set.  In this case we want to roll the level zero to memory, but don't accept the cache update if the L0 cache is already full.
This commit is contained in:
Martin Sumner 2019-01-14 12:27:51 +00:00
parent d5a9f2e8b7
commit c060c0e41d
4 changed files with 74 additions and 22 deletions

View file

@ -672,13 +672,27 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
%
% Check the approximate size of the cache. If it is over the maximum size,
% trigger a background L0 file write and update state of levelzero_pending.
case State#state.levelzero_pending or State#state.work_backlog of
true ->
CacheUpdateBlockedByPendingWork
= State#state.levelzero_pending or State#state.work_backlog,
CacheFull = leveled_pmem:cache_full(State#state.levelzero_cache),
case {CacheUpdateBlockedByPendingWork, CacheFull} of
{true, _} ->
leveled_log:log("P0018", [returned,
State#state.levelzero_pending,
State#state.work_backlog]),
{reply, returned, State};
false ->
{false, true} ->
leveled_log:log("P0042", [State#state.levelzero_size]),
% The cache is full (there are 127 items already in it), so
% can't accept any more. However, we need to try and roll
% memory otherwise cache may be permanently full.
gen_server:reply(From, returned),
{L0Pend, L0Constructor, none} =
maybe_roll_memory(State, false),
{noreply,
State#state{levelzero_pending=L0Pend,
levelzero_constructor=L0Constructor}};
{false, false} ->
leveled_log:log("P0018", [ok, false, false]),
PushedTree =
case is_tuple(LedgerTable) of
@ -689,7 +703,9 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
?CACHE_TYPE)
end,
% Reply must happen after the table has been converted
gen_server:reply(From, ok),
gen_server:reply(From, ok),
% Update LevelZero will add to the cache and maybe roll the
% cache from memory to L0 disk if the cache is too big
{noreply,
update_levelzero(State#state.levelzero_size,
{PushedTree, PushedIdx, MinSQN, MaxSQN},
@ -893,13 +909,16 @@ handle_call(close, _From, State) ->
% on the clerk.
ok = leveled_pclerk:clerk_close(State#state.clerk),
leveled_log:log("P0008", [close]),
L0_Present = leveled_pmanifest:key_lookup(State#state.manifest, 0, all),
L0_Left = State#state.levelzero_size > 0,
case {State#state.levelzero_pending, L0_Present, L0_Left} of
{false, false, true} ->
{L0Pid, _L0Bloom} = roll_memory(State, true),
ok = leveled_sst:sst_close(L0Pid);
case {State#state.levelzero_pending, L0_Left} of
{false, true} ->
{_L0Pend, L0Pid, _L0Bloom} = maybe_roll_memory(State, true),
case is_pid(L0Pid) of
true ->
ok = leveled_sst:sst_close(L0Pid);
false ->
ok
end;
StatusTuple ->
leveled_log:log("P0010", [StatusTuple])
end,
@ -1249,8 +1268,6 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
ledger_sqn=UpdMaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE,
L0Free =
not leveled_pmanifest:levelzero_present(State#state.manifest),
RandomFactor =
case State#state.levelzero_cointoss of
true ->
@ -1265,20 +1282,36 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
end,
NoPendingManifestChange = not State#state.work_ongoing,
JitterCheck = RandomFactor or CacheMuchTooBig,
case {CacheTooBig, L0Free, JitterCheck, NoPendingManifestChange} of
{true, true, true, true} ->
{L0Constructor, none} = roll_memory(UpdState, false),
leveled_log:log_timer("P0031", [true, true], SW),
UpdState#state{levelzero_pending=true,
case {CacheTooBig, JitterCheck, NoPendingManifestChange} of
{true, true, true} ->
{L0Pend, L0Constructor, none} =
maybe_roll_memory(UpdState, false),
leveled_log:log_timer("P0031", [true, true, L0Pend], SW),
UpdState#state{levelzero_pending=L0Pend,
levelzero_constructor=L0Constructor};
_ ->
leveled_log:log_timer("P0031",
[CacheTooBig, JitterCheck],
[CacheTooBig, JitterCheck, false],
SW),
UpdState
end
end.
-spec maybe_roll_memory(pcl_state(), boolean())
-> {boolean(), pid()|undefined, leveled_ebloom:bloom()|none}.
%% @doc
%% Check that no L0 file is present before rolling memory
maybe_roll_memory(State, SyncRoll) ->
BlockedByL0 = leveled_pmanifest:levelzero_present(State#state.manifest),
case BlockedByL0 of
true ->
{false, undefined, none};
false ->
{L0Constructor, Bloom} = roll_memory(State, SyncRoll),
{true, L0Constructor, Bloom}
end.
-spec roll_memory(pcl_state(), boolean())
-> {pid(), leveled_ebloom:bloom()|none}.
%% @doc
@ -1916,7 +1949,10 @@ add_missing_hash({K, {SQN, ST, MD}}) ->
clean_dir_test() ->
% Pointless gesture to test coverage
RootPath = "../test/ledger",
ok = clean_subdir(RootPath ++ "/test.bob").
ok = filelib:ensure_dir(RootPath),
?assertMatch(ok, file:write_file(RootPath ++ "/test.bob", "hello")),
ok = clean_subdir(RootPath ++ "/test.bob"),
ok = file:delete(RootPath ++ "/test.bob").
archive_files_test() ->