diff --git a/src/leveled_log.erl b/src/leveled_log.erl index a10e641..f2306ce 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -104,6 +104,8 @@ {info, "L0 completion confirmed and will transition to not pending"}}, {"P0030", {warn, "We're doomed - intention recorded to destroy all files"}}, + {"P0031", + {info, "Completion of update to levelzero"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 431c501..3547342 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -219,6 +219,7 @@ levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), levelzero_cointoss = false :: boolean(), + levelzero_index, % may be none or an ETS table reference is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -369,14 +370,16 @@ handle_call({fetch, Key, Hash}, _From, State) -> fetch_mem(Key, Hash, State#state.manifest, - State#state.levelzero_cache), + State#state.levelzero_cache, + State#state.levelzero_index), State}; handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, compare_to_sqn(fetch_mem(Key, Hash, State#state.manifest, - State#state.levelzero_cache), + State#state.levelzero_cache, + State#state.levelzero_index), SQN), State}; handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, @@ -417,6 +420,7 @@ handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) -> {LedgerSQN, L0Size, L0Cache} = L0D, {reply, ok, State#state{levelzero_cache=L0Cache, levelzero_size=L0Size, + levelzero_index=none, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; handle_call({fetch_levelzero, Slot}, _From, State) -> @@ -468,6 +472,7 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey, Bloom}, State) -> levelzero_pending=false, levelzero_constructor=undefined, levelzero_size=0, + levelzero_index=leveled_pmem:new_index(), manifest=UpdMan, persisted_sqn=State#state.ledger_sqn}}. @@ -560,7 +565,8 @@ start_from_file(PCLopts) -> InitState = #state{clerk=MergeClerk, root_path=RootPath, levelzero_maxcachesize=MaxTableSize, - levelzero_cointoss=CoinToss}, + levelzero_cointoss=CoinToss, + levelzero_index=leveled_pmem:new_index()}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", @@ -636,10 +642,13 @@ start_from_file(PCLopts) -> update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache, State) -> + SW = os:timestamp(), Update = leveled_pmem:add_to_cache(L0Size, {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache), + leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index), + {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, if UpdMaxSQN >= LedgerSQN -> @@ -661,15 +670,20 @@ update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, false -> true end, - case {CacheTooBig, Level0Free, RandomFactor or CacheMuchTooBig} of + JitterCheck = RandomFactor or CacheMuchTooBig, + case {CacheTooBig, Level0Free, JitterCheck} of {true, true, true} -> - L0Constructor = roll_memory(UpdState, false), + L0Constructor = roll_memory(UpdState, false), + leveled_log:log_timer("P0031", [], SW), UpdState#state{levelzero_pending=true, levelzero_constructor=L0Constructor}; _ -> + leveled_log:log_timer("P0031", [], SW), UpdState end; + NewL0Size == L0Size -> + leveled_log:log_timer("P0031", [], SW), State#state{levelzero_cache=L0Cache, levelzero_size=L0Size, ledger_sqn=LedgerSQN} @@ -718,13 +732,21 @@ levelzero_filename(State) -> FileName. -fetch_mem(Key, Hash, Manifest, L0Cache) -> + +fetch_mem(Key, Hash, Manifest, L0Cache, none) -> L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/2); {true, KV} -> KV + end; +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + case leveled_pmem:check_index(Hash, L0Index) of + true -> + fetch_mem(Key, Hash, Manifest, L0Cache, none); + false -> + fetch(Key, Hash, Manifest, 0, fun leveled_sft:sft_get/2) end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 5ba62aa..9f81c01 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -45,7 +45,10 @@ add_to_cache/4, to_list/2, check_levelzero/3, - merge_trees/4 + merge_trees/4, + add_to_index/2, + new_index/0, + check_index/2 ]). -include_lib("eunit/include/eunit.hrl"). @@ -69,6 +72,29 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> end end. +add_to_index(LevelMinus1, L0Index) -> + IndexAddFun = + fun({_K, V}) -> + {_, _, Hash, _} = leveled_codec:striphead_to_details(V), + case Hash of + no_lookup -> + ok; + _ -> + ets:insert(L0Index, {Hash}) + end + end, + lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)). + +new_index() -> + ets:new(l0index, [private, set]). + +check_index(Hash, L0Index) -> + case ets:lookup(L0Index, Hash) of + [{Hash}] -> + true; + [] -> + false + end. to_list(Slots, FetchFun) -> SW = os:timestamp(),