diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 7c6c189..b1efd81 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -154,6 +154,7 @@ -record(ledger_cache, {mem :: ets:tab(), loader = leveled_skiplist:empty(false) :: tuple(), + index = leveled_pmem:new_index(), % array min_sqn = infinity :: integer()|infinity, max_sqn = 0 :: integer()}). @@ -458,6 +459,7 @@ code_change(_OldVsn, State, _Extra) -> load_snapshot(LedgerSnapshot, LedgerCache) -> CacheToLoad = {LedgerCache#ledger_cache.loader, + LedgerCache#ledger_cache.index, LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.max_sqn}, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). @@ -467,8 +469,9 @@ empty_ledgercache() -> push_ledgercache(Penciller, Cache) -> CacheToLoad = {Cache#ledger_cache.loader, - Cache#ledger_cache.min_sqn, - Cache#ledger_cache.max_sqn}, + Cache#ledger_cache.index, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). %%%============================================================================ @@ -929,41 +932,44 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), - leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL); + KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs, + Bucket, + Key, + SQN, + TTL), + {no_lookup, SQN, KeyChanges}; preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> - {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey, - SQN, - Obj, - Size, - TTL), - [PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs, - Bucket, - Key, - SQN, - TTL). + {Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey, + SQN, + Obj, + Size, + TTL), + KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs, + Bucket, + Key, + SQN, + TTL), + {H, SQN, KeyChanges}. -addto_ledgercache(Changes, Cache) -> +addto_ledgercache({H, SQN, KeyChanges}, Cache) -> + ets:insert(Cache#ledger_cache.mem, KeyChanges), + UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), + Cache#ledger_cache{index = UpdIndex, + min_sqn=min(SQN, Cache#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. + +addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> FoldChangesFun = - fun({K, V}, Cache0) -> - {SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), - true = ets:insert(Cache0#ledger_cache.mem, {K, V}), - Cache0#ledger_cache{min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), - max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} - end, - lists:foldl(FoldChangesFun, Cache, Changes). - -addto_ledgercache(Changes, Cache, loader) -> - FoldChangesFun = - fun({K, V}, Cache0) -> - {SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), - SL0 = Cache0#ledger_cache.loader, - SL1 = leveled_skiplist:enter_nolookup(K, V, SL0), - Cache0#ledger_cache{loader = SL1, - min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), - max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} - end, - lists:foldl(FoldChangesFun, Cache, Changes). + fun({K, V}, SL0) -> + leveled_skiplist:enter_nolookup(K, V, SL0) + end, + UpdSL = lists:foldl(FoldChangesFun, Cache#ledger_cache.loader, KeyChanges), + UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), + Cache#ledger_cache{index = UpdIndex, + loader = UpdSL, + min_sqn=min(SQN, Cache#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> @@ -973,12 +979,13 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> if TimeToPush -> CacheToLoad = {leveled_skiplist:from_orderedset(Tab), + Cache#ledger_cache.index, Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of ok -> - true = ets:delete_all_objects(Tab), Cache0 = #ledger_cache{}, + true = ets:delete_all_objects(Tab), {ok, Cache0#ledger_cache{mem=Tab}}; returned -> {returned, Cache} diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index ab9ee27..10ffcc7 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -314,11 +314,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> _ -> {active, TS} end, + Hash = magic_hash(PrimaryKey), Value = {SQN, Status, - magic_hash(PrimaryKey), + Hash, extract_metadata(Obj, Size, Tag)}, - {Bucket, Key, {PrimaryKey, Value}}. + {Bucket, Key, {PrimaryKey, Value}, Hash}. integer_now() -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d087744..506610a 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -220,7 +220,7 @@ levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), levelzero_cointoss = false :: boolean(), - levelzero_index, % may be none or an ETS table reference + levelzero_index, % An array is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -330,7 +330,7 @@ init([PCLopts]) -> end. -handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, +handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, From, State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: @@ -360,11 +360,12 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, false -> leveled_log:log("P0018", [ok, false, false]), gen_server:reply(From, ok), - {noreply, update_levelzero(State#state.levelzero_size, - {PushedTree, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache, - State)} + {noreply, + update_levelzero(State#state.levelzero_size, + {PushedTree, PushedIdx, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache, + State)} end; handle_call({fetch, Key, Hash}, _From, State) -> {R, HeadTimer} = timed_fetch_mem(Key, @@ -411,17 +412,22 @@ handle_call(work_for_clerk, From, State) -> handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> - Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], + Rs = [{Snapshot, + State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {ok, State}, State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) -> +handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, + _From, State) -> L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, {BookieIncrTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache), {LedgerSQN, L0Size, L0Cache} = L0D, + L0Index = leveled_pmem:add_to_index(BookieIdx, + State#state.levelzero_index, + length(L0Cache)), {reply, ok, State#state{levelzero_cache=L0Cache, levelzero_size=L0Size, - levelzero_index=none, + levelzero_index=L0Index, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; handle_call({fetch_levelzero, Slot}, _From, State) -> @@ -467,9 +473,10 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> filename=FN}, UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), % Prompt clerk to ask about work - do this for every L0 roll - leveled_pmem:clear_index(State#state.levelzero_index), + UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), {noreply, State#state{levelzero_cache=[], + levelzero_index=UpdIndex, levelzero_pending=false, levelzero_constructor=undefined, levelzero_size=0, @@ -643,20 +650,23 @@ start_from_file(PCLopts) -> -update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, +update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, LedgerSQN, L0Cache, State) -> SW = os:timestamp(), Update = leveled_pmem:add_to_cache(L0Size, {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache), - leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index), + UpdL0Index = leveled_pmem:add_to_index(PushedIdx, + State#state.levelzero_index, + length(L0Cache) + 1), {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, if UpdMaxSQN >= LedgerSQN -> UpdState = State#state{levelzero_cache=UpdL0Cache, levelzero_size=NewL0Size, + levelzero_index=UpdL0Index, ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE, @@ -741,20 +751,14 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), element(1, R). -fetch_mem(Key, Hash, Manifest, L0Cache, none) -> - L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + PosList = leveled_pmem:check_index(Hash, L0Index), + L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); {true, KV} -> {KV, 0} - end; -fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - case leveled_pmem:check_index(Hash, L0Index) of - true -> - fetch_mem(Key, Hash, Manifest, L0Cache, none); - false -> - fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3) end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -1374,12 +1378,15 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> T0 = leveled_skiplist:empty(true), - T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) -> - SL = leveled_skiplist:enter(K, V, AccSL), + I0 = leveled_pmem:new_index(), + T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) -> + UpdSL = leveled_skiplist:enter(K, V, AccSL), SQN = leveled_codec:strip_to_seqonly({K, V}), - {SL, min(SQN, MinSQN), max(SQN, MaxSQN)} + H = leveled_codec:magic_hash(K), + UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H), + {UpdSL, UpdIdx, min(SQN, MinSQN), max(SQN, MaxSQN)} end, - {T0, infinity, 0}, + {T0, I0, infinity, 0}, KL), case pcl_pushmem(PCL, T1) of returned -> diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 0c61acf..8dd9e2a 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -42,11 +42,13 @@ -include("include/leveled.hrl"). -export([ + prepare_for_index/2, add_to_cache/4, to_list/2, check_levelzero/3, + check_levelzero/4, merge_trees/4, - add_to_index/2, + add_to_index/3, new_index/0, clear_index/1, check_index/2 @@ -59,6 +61,12 @@ %%% API %%%============================================================================ +prepare_for_index(IndexArray, Hash) -> + {Slot, H0} = split_hash(Hash), + Bin = array:get(Slot, IndexArray), + array:set(Slot, <>, IndexArray). + + add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> LM1Size = leveled_skiplist:size(LevelMinus1), case LM1Size of @@ -73,32 +81,29 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> end end. -add_to_index(LevelMinus1, L0Index) -> +add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 -> IndexAddFun = - fun({_K, V}) -> - {_, _, Hash, _} = leveled_codec:striphead_to_details(V), - case Hash of - no_lookup -> - ok; - _ -> - ets:insert(L0Index, {Hash}) - end - end, - lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)). + fun(Slot, Acc) -> + Bin0 = array:get(Slot, Acc), + BinLM1 = array:get(Slot, LM1Array), + array:set(Slot, + <>, + Acc) + end, + lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)). new_index() -> - ets:new(l0index, [private, set]). + array:new([{size, 256}, {default, <<>>}]). -clear_index(L0Index) -> - ets:delete_all_objects(L0Index). +clear_index(_L0Index) -> + new_index(). check_index(Hash, L0Index) -> - case ets:lookup(L0Index, Hash) of - [{Hash}] -> - true; - [] -> - false - end. + {Slot, H0} = split_hash(Hash), + Bin = array:get(Slot, L0Index), + find_pos(Bin, H0, [], 0). to_list(Slots, FetchFun) -> SW = os:timestamp(), @@ -114,13 +119,15 @@ to_list(Slots, FetchFun) -> FullList. -check_levelzero(Key, TreeList) -> - check_levelzero(Key, leveled_codec:magic_hash(Key), TreeList). +check_levelzero(Key, PosList, TreeList) -> + check_levelzero(Key, leveled_codec:magic_hash(Key), PosList, TreeList). -check_levelzero(_Key, _Hash, []) -> +check_levelzero(_Key, _Hash, _PosList, []) -> {false, not_found}; -check_levelzero(Key, Hash, TreeList) -> - check_slotlist(Key, Hash, lists:seq(1, length(TreeList)), TreeList). +check_levelzero(_Key, _Hash, [], _TreeList) -> + {false, not_found}; +check_levelzero(Key, Hash, PosList, TreeList) -> + check_slotlist(Key, Hash, PosList, TreeList). merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> @@ -136,6 +143,22 @@ merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> %%% Internal Functions %%%============================================================================ + +find_pos(<<>>, _Hash, PosList, _SlotID) -> + PosList; +find_pos(<<1:1/integer, Hash:23/integer, T/binary>>, Hash, PosList, SlotID) -> + find_pos(T, Hash, PosList ++ [SlotID], SlotID); +find_pos(<<1:1/integer, _Miss:23/integer, T/binary>>, Hash, PosList, SlotID) -> + find_pos(T, Hash, PosList, SlotID); +find_pos(<<0:1/integer, NxtSlot:7/integer, T/binary>>, Hash, PosList, _SlotID) -> + find_pos(T, Hash, PosList, NxtSlot). + + +split_hash(Hash) -> + Slot = Hash band 255, + H0 = (Hash bsr 8) band 8388607, + {Slot, H0}. + check_slotlist(Key, Hash, CheckList, TreeList) -> SlotCheckFun = fun(SlotToCheck, {Found, KV}) -> @@ -162,12 +185,21 @@ check_slotlist(Key, Hash, CheckList, TreeList) -> -ifdef(TEST). +generate_randomkeys_aslist(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> + lists:ukeysort(1, + generate_randomkeys(Seqn, + Count, + [], + BucketRangeLow, + BucketRangeHigh)). + generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> - generate_randomkeys(Seqn, - Count, - leveled_skiplist:empty(true), - BucketRangeLow, - BucketRangeHigh). + KVL = generate_randomkeys(Seqn, + Count, + [], + BucketRangeLow, + BucketRangeHigh), + leveled_skiplist:from_list(KVL). generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; @@ -179,7 +211,7 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> {Seqn, {active, infinity}, null}}, generate_randomkeys(Seqn + 1, Count - 1, - leveled_skiplist:enter(K, V, Acc), + [{K, V}|Acc], BucketLow, BRange). @@ -230,8 +262,9 @@ compare_method_test() -> [], TestList), + PosList = lists:seq(1, length(TreeList)), S1 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = check_levelzero(Key, TreeList), + R0 = check_levelzero(Key, PosList, TreeList), [R0|Acc] end, [], @@ -267,6 +300,41 @@ compare_method_test() -> [timer:now_diff(os:timestamp(), SWb), Sz1]), ?assertMatch(Sz0, Sz1). +with_index_test() -> + IndexPrepareFun = + fun({K, _V}, Acc) -> + H = leveled_codec:magic_hash(K), + prepare_for_index(Acc, H) + end, + LoadFun = + fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) -> + LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500), + LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), + LM1SL = leveled_skiplist:from_list(LM1), + UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), + R = add_to_cache(L0Size, + {LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, + LedgerSQN, + L0TreeList), + {R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)} + end, + + R0 = lists:foldl(LoadFun, {{0, 0, []}, new_index(), []}, lists:seq(1, 16)), + + {{SQN, Size, TreeList}, L0Index, SrcKVL} = R0, + ?assertMatch(32000, SQN), + ?assertMatch(true, Size =< 32000), + CheckFun = + fun({K, V}, {L0Idx, L0Cache}) -> + H = leveled_codec:magic_hash(K), + PosList = check_index(H, L0Idx), + ?assertMatch({true, {K, V}}, + check_slotlist(K, H, PosList, L0Cache)), + {L0Idx, L0Cache} + end, + + _R1 = lists:foldl(CheckFun, {L0Index, TreeList}, SrcKVL). + -endif. \ No newline at end of file diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 759b5cb..62aa904 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1230,11 +1230,11 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), - {_B, _K, KV} = leveled_codec:generate_ledgerkv(LedgerKey, - Seqn, - crypto:rand_bytes(64), - 64, - infinity), + {_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey, + Seqn, + crypto:rand_bytes(64), + 64, + infinity), generate_randomkeys(Seqn + 1, Count - 1, [KV|Acc],