Re-introduce ETS Index

Add ETS Index back in to avoid having to check each skip list in turn.
Also this helps keep a lower skip list size.
This commit is contained in:
martinsumner 2016-12-11 05:23:24 +00:00
parent f848500eff
commit 8bcb49479d
3 changed files with 57 additions and 7 deletions

View file

@ -104,6 +104,8 @@
{info, "L0 completion confirmed and will transition to not pending"}}, {info, "L0 completion confirmed and will transition to not pending"}},
{"P0030", {"P0030",
{warn, "We're doomed - intention recorded to destroy all files"}}, {warn, "We're doomed - intention recorded to destroy all files"}},
{"P0031",
{info, "Completion of update to levelzero"}},
{"PC001", {"PC001",
{info, "Penciller's clerk ~w started with owner ~w"}}, {info, "Penciller's clerk ~w started with owner ~w"}},

View file

@ -219,6 +219,7 @@
levelzero_size = 0 :: integer(), levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(), levelzero_maxcachesize :: integer(),
levelzero_cointoss = false :: boolean(), levelzero_cointoss = false :: boolean(),
levelzero_index, % may be none or an ETS table reference
is_snapshot = false :: boolean(), is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(), snapshot_fully_loaded = false :: boolean(),
@ -369,14 +370,16 @@ handle_call({fetch, Key, Hash}, _From, State) ->
fetch_mem(Key, fetch_mem(Key,
Hash, Hash,
State#state.manifest, State#state.manifest,
State#state.levelzero_cache), State#state.levelzero_cache,
State#state.levelzero_index),
State}; State};
handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> handle_call({check_sqn, Key, Hash, SQN}, _From, State) ->
{reply, {reply,
compare_to_sqn(fetch_mem(Key, compare_to_sqn(fetch_mem(Key,
Hash, Hash,
State#state.manifest, State#state.manifest,
State#state.levelzero_cache), State#state.levelzero_cache,
State#state.levelzero_index),
SQN), SQN),
State}; State};
handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
@ -417,6 +420,7 @@ handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) ->
{LedgerSQN, L0Size, L0Cache} = L0D, {LedgerSQN, L0Size, L0Cache} = L0D,
{reply, ok, State#state{levelzero_cache=L0Cache, {reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size, levelzero_size=L0Size,
levelzero_index=none,
ledger_sqn=LedgerSQN, ledger_sqn=LedgerSQN,
snapshot_fully_loaded=true}}; snapshot_fully_loaded=true}};
handle_call({fetch_levelzero, Slot}, _From, State) -> handle_call({fetch_levelzero, Slot}, _From, State) ->
@ -468,6 +472,7 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey, Bloom}, State) ->
levelzero_pending=false, levelzero_pending=false,
levelzero_constructor=undefined, levelzero_constructor=undefined,
levelzero_size=0, levelzero_size=0,
levelzero_index=leveled_pmem:new_index(),
manifest=UpdMan, manifest=UpdMan,
persisted_sqn=State#state.ledger_sqn}}. persisted_sqn=State#state.ledger_sqn}}.
@ -560,7 +565,8 @@ start_from_file(PCLopts) ->
InitState = #state{clerk=MergeClerk, InitState = #state{clerk=MergeClerk,
root_path=RootPath, root_path=RootPath,
levelzero_maxcachesize=MaxTableSize, levelzero_maxcachesize=MaxTableSize,
levelzero_cointoss=CoinToss}, levelzero_cointoss=CoinToss,
levelzero_index=leveled_pmem:new_index()},
%% Open manifest %% Open manifest
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
@ -636,10 +642,13 @@ start_from_file(PCLopts) ->
update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
LedgerSQN, L0Cache, State) -> LedgerSQN, L0Cache, State) ->
SW = os:timestamp(),
Update = leveled_pmem:add_to_cache(L0Size, Update = leveled_pmem:add_to_cache(L0Size,
{PushedTree, MinSQN, MaxSQN}, {PushedTree, MinSQN, MaxSQN},
LedgerSQN, LedgerSQN,
L0Cache), L0Cache),
leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index),
{UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update,
if if
UpdMaxSQN >= LedgerSQN -> UpdMaxSQN >= LedgerSQN ->
@ -661,15 +670,20 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
false -> false ->
true true
end, end,
case {CacheTooBig, Level0Free, RandomFactor or CacheMuchTooBig} of JitterCheck = RandomFactor or CacheMuchTooBig,
case {CacheTooBig, Level0Free, JitterCheck} of
{true, true, true} -> {true, true, true} ->
L0Constructor = roll_memory(UpdState, false), L0Constructor = roll_memory(UpdState, false),
leveled_log:log_timer("P0031", [], SW),
UpdState#state{levelzero_pending=true, UpdState#state{levelzero_pending=true,
levelzero_constructor=L0Constructor}; levelzero_constructor=L0Constructor};
_ -> _ ->
leveled_log:log_timer("P0031", [], SW),
UpdState UpdState
end; end;
NewL0Size == L0Size -> NewL0Size == L0Size ->
leveled_log:log_timer("P0031", [], SW),
State#state{levelzero_cache=L0Cache, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size, levelzero_size=L0Size,
ledger_sqn=LedgerSQN} ledger_sqn=LedgerSQN}
@ -718,13 +732,21 @@ levelzero_filename(State) ->
FileName. FileName.
fetch_mem(Key, Hash, Manifest, L0Cache) ->
fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
case L0Check of case L0Check of
{false, not_found} -> {false, not_found} ->
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/2); fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/2);
{true, KV} -> {true, KV} ->
KV KV
end;
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
case leveled_pmem:check_index(Hash, L0Index) of
true ->
fetch_mem(Key, Hash, Manifest, L0Cache, none);
false ->
fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/2)
end. end.
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->

View file

@ -45,7 +45,10 @@
add_to_cache/4, add_to_cache/4,
to_list/2, to_list/2,
check_levelzero/3, check_levelzero/3,
merge_trees/4 merge_trees/4,
add_to_index/2,
new_index/0,
check_index/2
]). ]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -69,6 +72,29 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
end end
end. end.
add_to_index(LevelMinus1, L0Index) ->
IndexAddFun =
fun({_K, V}) ->
{_, _, Hash, _} = leveled_codec:striphead_to_details(V),
case Hash of
no_lookup ->
ok;
_ ->
ets:insert(L0Index, {Hash})
end
end,
lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)).
new_index() ->
ets:new(l0index, [private, set]).
check_index(Hash, L0Index) ->
case ets:lookup(L0Index, Hash) of
[{Hash}] ->
true;
[] ->
false
end.
to_list(Slots, FetchFun) -> to_list(Slots, FetchFun) ->
SW = os:timestamp(), SW = os:timestamp(),