diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index d3c3f1f..0960c68 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -152,7 +152,9 @@ -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). --record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), +-record(ledger_cache, {mem :: ets:tab(), + loader = leveled_skiplist:empty(false) :: tuple(), + index = leveled_pmem:new_index(), % array min_sqn = infinity :: integer()|infinity, max_sqn = 0 :: integer()}). @@ -243,22 +245,21 @@ init([Opts]) -> CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) + erlang:phash2(self()) rem CacheJitter, + NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=#ledger_cache{}, + ledger_cache=#ledger_cache{mem = NewETS}, is_snapshot=false}}; Bookie -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - CacheToLoad = {leveled_skiplist:empty(true), 0, 0}, - ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad), + ok = load_snapshot(Penciller, LedgerCache), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, - ledger_cache=LedgerCache, is_snapshot=true}} end. @@ -457,18 +458,20 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ load_snapshot(LedgerSnapshot, LedgerCache) -> - CacheToLoad = {LedgerCache#ledger_cache.skiplist, + CacheToLoad = {LedgerCache#ledger_cache.loader, + LedgerCache#ledger_cache.index, LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.max_sqn}, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). empty_ledgercache() -> - #ledger_cache{}. + #ledger_cache{mem = ets:new(empty, [ordered_set])}. push_ledgercache(Penciller, Cache) -> - CacheToLoad = {Cache#ledger_cache.skiplist, - Cache#ledger_cache.min_sqn, - Cache#ledger_cache.max_sqn}, + CacheToLoad = {Cache#ledger_cache.loader, + Cache#ledger_cache.index, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). %%%============================================================================ @@ -486,7 +489,7 @@ maybe_longrunning(SW, Aspect) -> end. cache_size(LedgerCache) -> - leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). + ets:info(LedgerCache#ledger_cache.mem, size). bucket_stats(State, Bucket, Tag) -> {ok, @@ -703,18 +706,25 @@ snapshot_store(State, SnapType) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=State#state.penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), + LedgerCache = readycache_forsnapshot(State#state.ledger_cache), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, source_inker=State#state.inker}, {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), - {ok, {LedgerSnapshot, State#state.ledger_cache}, - JournalSnapshot}; + {ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot}; ledger -> - {ok, {LedgerSnapshot, State#state.ledger_cache}, - null} + {ok, {LedgerSnapshot, LedgerCache}, null} end. +readycache_forsnapshot(LedgerCache) -> + % Need to convert the Ledger Cache away from using the ETS table + SkipList = leveled_skiplist:from_orderedset(LedgerCache#ledger_cache.mem), + Idx = LedgerCache#ledger_cache.index, + MinSQN = LedgerCache#ledger_cache.min_sqn, + MaxSQN = LedgerCache#ledger_cache.max_sqn, + #ledger_cache{loader=SkipList, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. + set_options(Opts) -> MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), @@ -760,25 +770,25 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> SW = os:timestamp(), - Hash = leveled_codec:magic_hash(Key), - if - Hash /= no_lookup -> - L0R = leveled_skiplist:lookup(Key, - Hash, - LedgerCache#ledger_cache.skiplist), - case L0R of - {value, Head} -> - maybe_longrunning(SW, local_head), + CacheResult = + case LedgerCache#ledger_cache.mem of + undefined -> + []; + Tab -> + ets:lookup(Tab, Key) + end, + case CacheResult of + [{Key, Head}] -> + Head; + [] -> + Hash = leveled_codec:magic_hash(Key), + case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of + {Key, Head} -> + maybe_longrunning(SW, pcl_head), Head; - none -> - case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of - {Key, Head} -> - maybe_longrunning(SW, pcl_head), - Head; - not_present -> - maybe_longrunning(SW, pcl_head), - not_present - end + not_present -> + maybe_longrunning(SW, pcl_head), + not_present end end. @@ -923,49 +933,61 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) -> preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), - leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL); + KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs, + Bucket, + Key, + SQN, + TTL), + {no_lookup, SQN, KeyChanges}; preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> - {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey, - SQN, - Obj, - Size, - TTL), - [PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs, - Bucket, - Key, - SQN, - TTL). + {Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey, + SQN, + Obj, + Size, + TTL), + KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs, + Bucket, + Key, + SQN, + TTL), + {H, SQN, KeyChanges}. -addto_ledgercache(Changes, Cache) -> +addto_ledgercache({H, SQN, KeyChanges}, Cache) -> + ets:insert(Cache#ledger_cache.mem, KeyChanges), + UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), + Cache#ledger_cache{index = UpdIndex, + min_sqn=min(SQN, Cache#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. + +addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> FoldChangesFun = - fun({K, V}, Cache0) -> - {SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), - SL0 = Cache0#ledger_cache.skiplist, - SL1 = - case Hash of - no_lookup -> - leveled_skiplist:enter_nolookup(K, V, SL0); - _ -> - leveled_skiplist:enter(K, Hash, V, SL0) - end, - Cache0#ledger_cache{skiplist=SL1, - min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), - max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} - end, - lists:foldl(FoldChangesFun, Cache, Changes). + fun({K, V}, SL0) -> + leveled_skiplist:enter_nolookup(K, V, SL0) + end, + UpdSL = lists:foldl(FoldChangesFun, Cache#ledger_cache.loader, KeyChanges), + UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), + Cache#ledger_cache{index = UpdIndex, + loader = UpdSL, + min_sqn=min(SQN, Cache#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. + maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist), + Tab = Cache#ledger_cache.mem, + CacheSize = ets:info(Tab, size), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - CacheToLoad = {Cache#ledger_cache.skiplist, + CacheToLoad = {leveled_skiplist:from_orderedset(Tab), + Cache#ledger_cache.index, Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of ok -> - {ok, #ledger_cache{}}; + Cache0 = #ledger_cache{}, + true = ets:delete_all_objects(Tab), + {ok, Cache0#ledger_cache{mem=Tab}}; returned -> {returned, Cache} end; @@ -1002,12 +1024,18 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> SQN when SQN < MaxSQN -> Changes = preparefor_ledgercache(Type, PK, SQN, Obj, VSize, IndexSpecs), - {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; + {loop, + {MinSQN, + MaxSQN, + addto_ledgercache(Changes, OutputTree, loader)}}; MaxSQN -> leveled_log:log("B0006", [SQN]), Changes = preparefor_ledgercache(Type, PK, SQN, Obj, VSize, IndexSpecs), - {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; + {stop, + {MinSQN, + MaxSQN, + addto_ledgercache(Changes, OutputTree, loader)}}; SQN when SQN > MaxSQN -> leveled_log:log("B0007", [MaxSQN, SQN]), {stop, Acc0} diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index ab9ee27..10ffcc7 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -314,11 +314,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> _ -> {active, TS} end, + Hash = magic_hash(PrimaryKey), Value = {SQN, Status, - magic_hash(PrimaryKey), + Hash, extract_metadata(Obj, Size, Tag)}, - {Bucket, Key, {PrimaryKey, Value}}. + {Bucket, Key, {PrimaryKey, Value}, Hash}. integer_now() -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 853498b..a07065b 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -4,14 +4,17 @@ %% persisted, ordered view of non-recent Keys and Metadata which have been %% added to the store. %% - The penciller maintains a manifest of all the files within the current -%% Ledger. +%% Ledger. %% - The Penciller provides re-write (compaction) work up to be managed by %% the Penciller's Clerk %% - The Penciller can be cloned and maintains a register of clones who have %% requested snapshots of the Ledger -%% - The accepts new dumps (in the form of a gb_tree) from the Bookie, and -%% calls the Bookie once the process of pencilling this data in the Ledger is -%% complete - and the Bookie is free to forget about the data +%% - The accepts new dumps (in the form of a leveled_skiplist accomponied by +%% an array of hash-listing binaries) from the Bookie, and responds either 'ok' +%% to the bookie if the information is accepted nad the Bookie can refresh its +%% memory, or 'returned' if the bookie must continue without refreshing as the +%% Penciller is not currently able to accept the update (potentially due to a +%% backlog of compaction work) %% - The Penciller's persistence of the ledger may not be reliable, in that it %% may lose data but only in sequence from a particular sequence number. On %% startup the Penciller will inform the Bookie of the highest sequence number @@ -21,14 +24,14 @@ %% -------- LEDGER --------- %% %% The Ledger is divided into many levels -%% - L0: New keys are received from the Bookie and merged into a single -%% gb_tree, until that tree is the size of a SST file, and it is then persisted +%% - L0: New keys are received from the Bookie and and kept in the levelzero +%% cache, until that cache is the size of a SST file, and it is then persisted %% as a SST file at this level. L0 SST files can be larger than the normal %% maximum size - so we don't have to consider problems of either having more %% than one L0 file (and handling what happens on a crash between writing the %% files when the second may have overlapping sequence numbers), or having a %% remainder with overlapping in sequence numbers in memory after the file is -%% written. Once the persistence is completed, the L0 tree can be erased. +%% written. Once the persistence is completed, the L0 cache can be erased. %% There can be only one SST file at Level 0, so the work to merge that file %% to the lower level must be the highest priority, as otherwise writes to the %% ledger will stall, when there is next a need to persist. @@ -64,10 +67,10 @@ %% %% The Penciller must support the PUSH of a dump of keys from the Bookie. The %% call to PUSH should be immediately acknowledged, and then work should be -%% completed to merge the tree into the L0 tree. +%% completed to merge the cache update into the L0 cache. %% %% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the -%% conversion of the current L0 tree into a SST file, but not completed this +%% conversion of the current L0 cache into a SST file, but not completed this %% change. The Penciller in this case returns the push, and the Bookie should %% continue to grow the cache before trying again. %% @@ -220,7 +223,7 @@ levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), levelzero_cointoss = false :: boolean(), - levelzero_index, % may be none or an ETS table reference + levelzero_index, % An array is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), @@ -322,6 +325,7 @@ init([PCLopts]) -> SrcPenciller = PCLopts#penciller_options.source_penciller, {ok, State} = pcl_registersnapshot(SrcPenciller, self()), leveled_log:log("P0001", [self()]), + io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]), {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; %% Need to do something about timeout {_RootPath, false} -> @@ -329,14 +333,14 @@ init([PCLopts]) -> end. -handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, +handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, From, State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: % - % 1 - Receive a gb_tree containing the latest Key/Value pairs (note that - % we mean value from the perspective of the Ledger, not the full value - % stored in the Inker) + % 1 - Receive a cache. The cache has four parts: a skiplist of keys and + % values, an array of 256 binaries listing the hashes present in the + % skiplist, a min SQN and a max SQN % % 2 - Check to see if there is a levelzero file pending. If so, the % update must be returned. If not the update can be accepted @@ -346,10 +350,10 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, % % 4 - Update the cache: % a) Append the cache to the list - % b) Add hashes for all the elements to the index + % b) Add each of the 256 hash-listing binaries to the master L0 index array % % Check the approximate size of the cache. If it is over the maximum size, - % trigger a backgroun L0 file write and update state of levelzero_pending. + % trigger a background L0 file write and update state of levelzero_pending. case State#state.levelzero_pending or State#state.work_backlog of true -> leveled_log:log("P0018", [returned, @@ -359,11 +363,12 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, false -> leveled_log:log("P0018", [ok, false, false]), gen_server:reply(From, ok), - {noreply, update_levelzero(State#state.levelzero_size, - {PushedTree, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache, - State)} + {noreply, + update_levelzero(State#state.levelzero_size, + {PushedTree, PushedIdx, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache, + State)} end; handle_call({fetch, Key, Hash}, _From, State) -> {R, HeadTimer} = timed_fetch_mem(Key, @@ -410,17 +415,22 @@ handle_call(work_for_clerk, From, State) -> handle_call(get_startup_sqn, _From, State) -> {reply, State#state.persisted_sqn, State}; handle_call({register_snapshot, Snapshot}, _From, State) -> - Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], + Rs = [{Snapshot, + State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {ok, State}, State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) -> +handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}}, + _From, State) -> L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, {BookieIncrTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache), {LedgerSQN, L0Size, L0Cache} = L0D, + L0Index = leveled_pmem:add_to_index(BookieIdx, + State#state.levelzero_index, + length(L0Cache)), {reply, ok, State#state{levelzero_cache=L0Cache, levelzero_size=L0Size, - levelzero_index=none, + levelzero_index=L0Index, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; handle_call({fetch_levelzero, Slot}, _From, State) -> @@ -466,9 +476,10 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> filename=FN}, UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), % Prompt clerk to ask about work - do this for every L0 roll - leveled_pmem:clear_index(State#state.levelzero_index), + UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), {noreply, State#state{levelzero_cache=[], + levelzero_index=UpdIndex, levelzero_pending=false, levelzero_constructor=undefined, levelzero_size=0, @@ -642,20 +653,23 @@ start_from_file(PCLopts) -> -update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, +update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, LedgerSQN, L0Cache, State) -> SW = os:timestamp(), Update = leveled_pmem:add_to_cache(L0Size, {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache), - leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index), + UpdL0Index = leveled_pmem:add_to_index(PushedIdx, + State#state.levelzero_index, + length(L0Cache) + 1), {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, if UpdMaxSQN >= LedgerSQN -> UpdState = State#state{levelzero_cache=UpdL0Cache, levelzero_size=NewL0Size, + levelzero_index=UpdL0Index, ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE, @@ -740,20 +754,14 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), element(1, R). -fetch_mem(Key, Hash, Manifest, L0Cache, none) -> - L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + PosList = leveled_pmem:check_index(Hash, L0Index), + L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache), case L0Check of {false, not_found} -> fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3); {true, KV} -> {KV, 0} - end; -fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> - case leveled_pmem:check_index(Hash, L0Index) of - true -> - fetch_mem(Key, Hash, Manifest, L0Cache, none); - false -> - fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3) end. fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -1373,12 +1381,15 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> T0 = leveled_skiplist:empty(true), - T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) -> - SL = leveled_skiplist:enter(K, V, AccSL), + I0 = leveled_pmem:new_index(), + T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) -> + UpdSL = leveled_skiplist:enter(K, V, AccSL), SQN = leveled_codec:strip_to_seqonly({K, V}), - {SL, min(SQN, MinSQN), max(SQN, MaxSQN)} + H = leveled_codec:magic_hash(K), + UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H), + {UpdSL, UpdIdx, min(SQN, MinSQN), max(SQN, MaxSQN)} end, - {T0, infinity, 0}, + {T0, I0, infinity, 0}, KL), case pcl_pushmem(PCL, T1) of returned -> diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 0c61acf..9480abe 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -19,34 +19,23 @@ %% used to either point lookups at the right tree in the list, or inform the %% requestor it is not present avoiding any lookups. %% -%% Tests show this takes one third of the time at push (when compared to -%% merging to a single tree), and is an order of magnitude more efficient as -%% the tree reaches peak size. It is also an order of magnitude more -%% efficient to use the hash index when compared to looking through all the -%% trees. -%% -%% Total time for single_tree 217000 microseconds -%% Total time for array_tree 209000 microseconds -%% Total time for array_list 142000 microseconds -%% Total time for array_filter 69000 microseconds -%% List of 2000 checked without array - success count of 90 in 36000 microsecs -%% List of 2000 checked with array - success count of 90 in 1000 microsecs -%% %% The trade-off taken with the approach is that the size of the L0Cache is -%% uncertain. The Size count is incremented if the hash is not already -%% present, so the size may be lower than the actual size due to hash -%% collisions +%% uncertain. The Size count is incremented based on the inbound size and so +%% does not necessarily reflect the size once the lists are merged (reflecting +%% rotating objects) -module(leveled_pmem). -include("include/leveled.hrl"). -export([ + prepare_for_index/2, add_to_cache/4, to_list/2, check_levelzero/3, + check_levelzero/4, merge_trees/4, - add_to_index/2, + add_to_index/3, new_index/0, clear_index/1, check_index/2 @@ -59,6 +48,14 @@ %%% API %%%============================================================================ +prepare_for_index(IndexArray, no_lookup) -> + IndexArray; +prepare_for_index(IndexArray, Hash) -> + {Slot, H0} = split_hash(Hash), + Bin = array:get(Slot, IndexArray), + array:set(Slot, <>, IndexArray). + + add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> LM1Size = leveled_skiplist:size(LevelMinus1), case LM1Size of @@ -73,32 +70,29 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> end end. -add_to_index(LevelMinus1, L0Index) -> +add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 -> IndexAddFun = - fun({_K, V}) -> - {_, _, Hash, _} = leveled_codec:striphead_to_details(V), - case Hash of - no_lookup -> - ok; - _ -> - ets:insert(L0Index, {Hash}) - end - end, - lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)). + fun(Slot, Acc) -> + Bin0 = array:get(Slot, Acc), + BinLM1 = array:get(Slot, LM1Array), + array:set(Slot, + <>, + Acc) + end, + lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)). new_index() -> - ets:new(l0index, [private, set]). + array:new([{size, 256}, {default, <<>>}]). -clear_index(L0Index) -> - ets:delete_all_objects(L0Index). +clear_index(_L0Index) -> + new_index(). check_index(Hash, L0Index) -> - case ets:lookup(L0Index, Hash) of - [{Hash}] -> - true; - [] -> - false - end. + {Slot, H0} = split_hash(Hash), + Bin = array:get(Slot, L0Index), + find_pos(Bin, H0, [], 0). to_list(Slots, FetchFun) -> SW = os:timestamp(), @@ -114,13 +108,15 @@ to_list(Slots, FetchFun) -> FullList. -check_levelzero(Key, TreeList) -> - check_levelzero(Key, leveled_codec:magic_hash(Key), TreeList). +check_levelzero(Key, PosList, TreeList) -> + check_levelzero(Key, leveled_codec:magic_hash(Key), PosList, TreeList). -check_levelzero(_Key, _Hash, []) -> +check_levelzero(_Key, _Hash, _PosList, []) -> {false, not_found}; -check_levelzero(Key, Hash, TreeList) -> - check_slotlist(Key, Hash, lists:seq(1, length(TreeList)), TreeList). +check_levelzero(_Key, _Hash, [], _TreeList) -> + {false, not_found}; +check_levelzero(Key, Hash, PosList, TreeList) -> + check_slotlist(Key, Hash, PosList, TreeList). merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> @@ -136,6 +132,22 @@ merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> %%% Internal Functions %%%============================================================================ + +find_pos(<<>>, _Hash, PosList, _SlotID) -> + PosList; +find_pos(<<1:1/integer, Hash:23/integer, T/binary>>, Hash, PosList, SlotID) -> + find_pos(T, Hash, PosList ++ [SlotID], SlotID); +find_pos(<<1:1/integer, _Miss:23/integer, T/binary>>, Hash, PosList, SlotID) -> + find_pos(T, Hash, PosList, SlotID); +find_pos(<<0:1/integer, NxtSlot:7/integer, T/binary>>, Hash, PosList, _SlotID) -> + find_pos(T, Hash, PosList, NxtSlot). + + +split_hash(Hash) -> + Slot = Hash band 255, + H0 = (Hash bsr 8) band 8388607, + {Slot, H0}. + check_slotlist(Key, Hash, CheckList, TreeList) -> SlotCheckFun = fun(SlotToCheck, {Found, KV}) -> @@ -162,12 +174,21 @@ check_slotlist(Key, Hash, CheckList, TreeList) -> -ifdef(TEST). +generate_randomkeys_aslist(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> + lists:ukeysort(1, + generate_randomkeys(Seqn, + Count, + [], + BucketRangeLow, + BucketRangeHigh)). + generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> - generate_randomkeys(Seqn, - Count, - leveled_skiplist:empty(true), - BucketRangeLow, - BucketRangeHigh). + KVL = generate_randomkeys(Seqn, + Count, + [], + BucketRangeLow, + BucketRangeHigh), + leveled_skiplist:from_list(KVL). generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; @@ -179,7 +200,7 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> {Seqn, {active, infinity}, null}}, generate_randomkeys(Seqn + 1, Count - 1, - leveled_skiplist:enter(K, V, Acc), + [{K, V}|Acc], BucketLow, BRange). @@ -230,8 +251,9 @@ compare_method_test() -> [], TestList), + PosList = lists:seq(1, length(TreeList)), S1 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = check_levelzero(Key, TreeList), + R0 = check_levelzero(Key, PosList, TreeList), [R0|Acc] end, [], @@ -267,6 +289,41 @@ compare_method_test() -> [timer:now_diff(os:timestamp(), SWb), Sz1]), ?assertMatch(Sz0, Sz1). +with_index_test() -> + IndexPrepareFun = + fun({K, _V}, Acc) -> + H = leveled_codec:magic_hash(K), + prepare_for_index(Acc, H) + end, + LoadFun = + fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) -> + LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500), + LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), + LM1SL = leveled_skiplist:from_list(LM1), + UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), + R = add_to_cache(L0Size, + {LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, + LedgerSQN, + L0TreeList), + {R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)} + end, + + R0 = lists:foldl(LoadFun, {{0, 0, []}, new_index(), []}, lists:seq(1, 16)), + + {{SQN, Size, TreeList}, L0Index, SrcKVL} = R0, + ?assertMatch(32000, SQN), + ?assertMatch(true, Size =< 32000), + CheckFun = + fun({K, V}, {L0Idx, L0Cache}) -> + H = leveled_codec:magic_hash(K), + PosList = check_index(H, L0Idx), + ?assertMatch({true, {K, V}}, + check_slotlist(K, H, PosList, L0Cache)), + {L0Idx, L0Cache} + end, + + _R1 = lists:foldl(CheckFun, {L0Index, TreeList}, SrcKVL). + -endif. \ No newline at end of file diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl index a5c3414..b79d050 100644 --- a/src/leveled_skiplist.erl +++ b/src/leveled_skiplist.erl @@ -20,6 +20,8 @@ from_list/2, from_sortedlist/1, from_sortedlist/2, + from_orderedset/1, + from_orderedset/2, to_list/1, enter/3, enter/4, @@ -71,6 +73,12 @@ enter_nolookup(Key, Value, SkipList) -> element(2, SkipList), ?SKIP_WIDTH, ?LIST_HEIGHT)}. +from_orderedset(Table) -> + from_orderedset(Table, false). + +from_orderedset(Table, Bloom) -> + from_sortedlist(ets:tab2list(Table), Bloom). + from_list(UnsortedKVL) -> from_list(UnsortedKVL, false). @@ -81,6 +89,8 @@ from_list(UnsortedKVL, BloomProtect) -> from_sortedlist(SortedKVL) -> from_sortedlist(SortedKVL, false). +from_sortedlist([], BloomProtect) -> + empty(BloomProtect); from_sortedlist(SortedKVL, BloomProtect) -> Bloom0 = case BloomProtect of @@ -103,11 +113,16 @@ lookup(Key, SkipList) -> end. lookup(Key, Hash, SkipList) -> - case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of - false -> - none; - true -> - list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) + case element(1, SkipList) of + list_only -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT); + _ -> + case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of + false -> + none; + true -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) + end end. @@ -188,53 +203,27 @@ enter(Key, Value, Hash, SkipList, Width, Level) -> {MarkerKey, UpdSubSkipList}) end. - -from_list(KVL, Width, 1) -> - Slots = length(KVL) div Width, - SkipList0 = lists:map(fun(X) -> - N = X * Width, - {K, _V} = lists:nth(N, KVL), - {K, lists:sublist(KVL, - N - Width + 1, - Width)} - end, - lists:seq(1, length(KVL) div Width)), - case Slots * Width < length(KVL) of - true -> - {LastK, _V} = lists:last(KVL), - SkipList0 ++ [{LastK, lists:nthtail(Slots * Width, KVL)}]; - false -> - SkipList0 - end; -from_list(KVL, Width, Level) -> - SkipWidth = width(Level, Width), - LoftSlots = length(KVL) div SkipWidth, - case LoftSlots of - 0 -> - {K, _V} = lists:last(KVL), - [{K, from_list(KVL, Width, Level - 1)}]; - _ -> - SkipList0 = - lists:map(fun(X) -> - N = X * SkipWidth, - {K, _V} = lists:nth(N, KVL), - SL = lists:sublist(KVL, - N - SkipWidth + 1, - SkipWidth), - {K, from_list(SL, Width, Level - 1)} - end, - lists:seq(1, LoftSlots)), - case LoftSlots * SkipWidth < length(KVL) of - true -> - {LastK, _V} = lists:last(KVL), - TailList = lists:nthtail(LoftSlots * SkipWidth, KVL), - SkipList0 ++ [{LastK, from_list(TailList, - Width, - Level - 1)}]; - false -> - SkipList0 - end - end. +from_list(SkipList, _SkipWidth, 0) -> + SkipList; +from_list(KVList, SkipWidth, ListHeight) -> + L0 = length(KVList), + SL0 = + case L0 > SkipWidth of + true -> + from_list(KVList, L0, [], SkipWidth); + false -> + {LastK, _LastSL} = lists:last(KVList), + [{LastK, KVList}] + end, + from_list(SL0, SkipWidth, ListHeight - 1). + +from_list([], 0, SkipList, _SkipWidth) -> + SkipList; +from_list(KVList, L, SkipList, SkipWidth) -> + SubLL = min(SkipWidth, L), + {Head, Tail} = lists:split(SubLL, KVList), + {LastK, _LastV} = lists:last(Head), + from_list(Tail, L - SubLL, SkipList ++ [{LastK, Head}], SkipWidth). list_lookup(Key, SkipList, 1) -> @@ -431,7 +420,15 @@ skiplist_nobloom_test() -> skiplist_tester(Bloom) -> N = 4000, KL = generate_randomkeys(1, N, 1, N div 5), - + + OS = ets:new(test, [ordered_set, private]), + ets:insert(OS, KL), + SWaETS = os:timestamp(), + SkipList = from_orderedset(OS, Bloom), + io:format(user, "Generating skip list with ~w keys in ~w microseconds " ++ + "from ordered set~n", + [N, timer:now_diff(os:timestamp(), SWaETS)]), + SWaGSL = os:timestamp(), SkipList = from_list(lists:reverse(KL), Bloom), io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++ @@ -533,13 +530,28 @@ skiplist_timingtest(KL, SkipList, N, Bloom) -> [timer:now_diff(os:timestamp(), SWc)]), AltKL1 = generate_randomkeys(1, 2000, 1, 200), - SWd = os:timestamp(), + SWd0 = os:timestamp(), lists:foreach(fun({K, _V}) -> lookup(K, SkipList) end, AltKL1), io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWd)]), + [timer:now_diff(os:timestamp(), SWd0)]), + SWd1 = os:timestamp(), + lists:foreach(fun({K, _V}) -> + leveled_codec:magic_hash(K) + end, + AltKL1), + io:format(user, "Generating 2000 magic hashes took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWd1)]), + SWd2 = os:timestamp(), + lists:foreach(fun({K, _V}) -> + erlang:phash2(K) + end, + AltKL1), + io:format(user, "Generating 2000 not so magic hashes took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWd2)]), + AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300), SWe = os:timestamp(), lists:foreach(fun({K, _V}) -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 759b5cb..62aa904 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -1230,11 +1230,11 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o), - {_B, _K, KV} = leveled_codec:generate_ledgerkv(LedgerKey, - Seqn, - crypto:rand_bytes(64), - 64, - infinity), + {_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey, + Seqn, + crypto:rand_bytes(64), + 64, + infinity), generate_randomkeys(Seqn + 1, Count - 1, [KV|Acc],