diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a50e9fa..62892ec 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -136,7 +136,10 @@ book_destroy/1]). -export([get_opt/2, - get_opt/3]). + get_opt/3, + load_snapshot/2, + empty_ledgercache/0, + push_ledgercache/2]). -include_lib("eunit/include/eunit.hrl"). @@ -148,15 +151,18 @@ -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). +-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), + min_sqn = infinity :: integer()|infinity, + max_sqn = 0 :: integer()}). + -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), - ledger_cache, % a skiplist + ledger_cache = #ledger_cache{}, is_snapshot :: boolean(), slow_offer = false :: boolean()}). - %%%============================================================================ %%% API %%%============================================================================ @@ -238,14 +244,14 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=leveled_skiplist:empty(true), + ledger_cache=#ledger_cache{}, is_snapshot=false}}; Bookie -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - ok = leveled_penciller:pcl_loadsnapshot(Penciller, - leveled_skiplist:empty(true)), + CacheToLoad = {leveled_skiplist:empty(true), 0, 0}, + ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, @@ -276,9 +282,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> false -> gen_server:reply(From, ok) end, - case maybepush_ledgercache(State#state.cache_size, - Cache0, - State#state.penciller) of + case maybepush_ledgercache(State#state.cache_size, + Cache0, + State#state.penciller) of {ok, NewCache} -> {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; {returned, NewCache} -> @@ -292,7 +298,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head), + {Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head), case Status of tomb -> {reply, not_found, State}; @@ -317,11 +323,10 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head), - case Status of - tomb -> + case leveled_codec:striphead_to_details(Head) of + {_SeqN, tomb, _MH, _MD} -> {reply, not_found, State}; - {active, TS} -> + {_SeqN, {active, TS}, _MH, MD} -> case TS >= leveled_codec:integer_now() of true -> OMD = leveled_codec:build_metadata_object(LedgerKey, MD), @@ -426,19 +431,39 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================ +%%% External functions +%%%============================================================================ + +load_snapshot(LedgerSnapshot, LedgerCache) -> + CacheToLoad = {LedgerCache#ledger_cache.skiplist, + LedgerCache#ledger_cache.min_sqn, + LedgerCache#ledger_cache.max_sqn}, + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). + +empty_ledgercache() -> + #ledger_cache{}. + +push_ledgercache(Penciller, Cache) -> + CacheToLoad = {Cache#ledger_cache.skiplist, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, + leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). %%%============================================================================ %%% Internal functions %%%============================================================================ +cache_size(LedgerCache) -> + leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). + bucket_stats(State, Bucket, Tag) -> {ok, {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_size(), @@ -459,9 +484,8 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), BucketAcc = get_nextbucket(null, Tag, LedgerSnapshot, @@ -514,9 +538,8 @@ index_query(State, {B, null} end, Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, StartObjKey, ?IDX_TAG, @@ -556,9 +579,8 @@ hashtree_query(State, Tag, JournalCheck) -> {LedgerSnapshot, LedgerCache}, JournalSnapshot} = snapshot_store(State, SnapType), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), @@ -607,9 +629,8 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {FoldObjectsFun, []} end, Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, @@ -628,9 +649,8 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_keys(FoldKeysFun), @@ -708,7 +728,7 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> - case leveled_skiplist:lookup(Key, LedgerCache) of + case leveled_skiplist:lookup(Key, LedgerCache#ledger_cache.skiplist) of {value, Head} -> Head; none -> @@ -874,18 +894,34 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, - Cache, - Changes). + FoldChangesFun = + fun({K, V}, Cache0) -> + {SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), + SL0 = Cache0#ledger_cache.skiplist, + SL1 = + case Hash of + no_lookup -> + leveled_skiplist:enter_nolookup(K, V, SL0); + _ -> + leveled_skiplist:enter(K, Hash, V, SL0) + end, + Cache0#ledger_cache{skiplist=SL1, + min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} + end, + lists:foldl(FoldChangesFun, Cache, Changes). maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = leveled_skiplist:size(Cache), + CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - case leveled_penciller:pcl_pushmem(Penciller, Cache) of + CacheToLoad = {Cache#ledger_cache.skiplist, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, + case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of ok -> - {ok, leveled_skiplist:empty(true)}; + {ok, #ledger_cache{}}; returned -> {returned, Cache} end; diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index f8216d6..63777b2 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -1272,27 +1272,13 @@ write_top_index_table(Handle, BasePos, List) -> %% To make this compatible with original Bernstein format this endian flip %% and also the use of the standard hash function required. -%% -%% Hash function contains mysterious constants, some explanation here as to -%% what they are - -%% http://stackoverflow.com/ ++ -%% questions/10696223/reason-for-5381-number-in-djb-hash-function endian_flip(Int) -> <> = <>, X. hash(Key) -> - BK = term_to_binary(Key), - H = 5381, - hash1(H, BK) band 16#FFFFFFFF. - -hash1(H, <<>>) -> - H; -hash1(H, <>) -> - H1 = H * 33, - H2 = H1 bxor B, - hash1(H2, Rest). + leveled_codec:magic_hash(Key). % Get the least significant 8 bits from the hash. hash_to_index(Hash) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 19e9c9f..72b90b0 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,6 +39,7 @@ strip_to_statusonly/1, strip_to_keyseqstatusonly/1, strip_to_keyseqonly/1, + strip_to_seqnhashonly/1, striphead_to_details/1, is_active/3, endkey_passed/2, @@ -62,11 +63,38 @@ convert_indexspecs/5, generate_uuid/0, integer_now/0, - riak_extract_metadata/2]). + riak_extract_metadata/2, + magic_hash/1]). -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object +%% Use DJ Bernstein magic hash function. Note, this is more expensive than +%% phash2 but provides a much more balanced result. +%% +%% Hash function contains mysterious constants, some explanation here as to +%% what they are - +%% http://stackoverflow.com/ ++ +%% questions/10696223/reason-for-5381-number-in-djb-hash-function + +magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) -> + magic_hash({Bucket, Key}); +magic_hash({?STD_TAG, Bucket, Key, _SubKey}) -> + magic_hash({Bucket, Key}); +magic_hash({?IDX_TAG, _B, _Idx, _Key}) -> + no_lookup; +magic_hash(AnyKey) -> + BK = term_to_binary(AnyKey), + H = 5381, + hash1(H, BK) band 16#FFFFFFFF. + +hash1(H, <<>>) -> + H; +hash1(H, <>) -> + H1 = H * 33, + H2 = H1 bxor B, + hash1(H2, Rest). + %% Credit to %% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl @@ -87,15 +115,18 @@ inker_reload_strategy(AltList) -> strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _V}) -> K. -strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. +strip_to_keyseqstatusonly({K, {SeqN, St, _, _MD}}) -> {K, SeqN, St}. -strip_to_statusonly({_, {_, St, _}}) -> St. +strip_to_statusonly({_, {_, St, _, _}}) -> St. -strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. +strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN. -strip_to_keyseqonly({LK, {SeqN, _, _}}) -> {LK, SeqN}. +strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}. + +strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}. + +striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}. -striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. key_dominates(LeftKey, RightKey) -> case {LeftKey, RightKey} of @@ -103,10 +134,10 @@ key_dominates(LeftKey, RightKey) -> left_hand_first; {{LK, _LVAL}, {RK, _RVAL}} when RK < LK -> right_hand_first; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + {{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}} when LK == RK, LSN >= RSN -> left_hand_dominant; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + {{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}} when LK == RK, LSN < RSN -> right_hand_dominant end. @@ -218,8 +249,6 @@ create_value_for_journal(Value) -> Value end. - - hash(Obj) -> erlang:phash2(term_to_binary(Obj)). @@ -273,7 +302,7 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> end, {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxValue), - {SQN, Status, null}} + {SQN, Status, no_lookup, null}} end, IndexSpecs). @@ -285,9 +314,11 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> _ -> {active, TS} end, - {Bucket, - Key, - {PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}. + Value = {SQN, + Status, + magic_hash(PrimaryKey), + extract_metadata(Obj, Size, Tag)}, + {Bucket, Key, {PrimaryKey, Value}}. integer_now() -> @@ -304,7 +335,7 @@ extract_metadata(Obj, Size, ?STD_TAG) -> get_size(PK, Value) -> {Tag, _Bucket, _Key, _} = PK, - {_, _, MD} = Value, + {_, _, _, MD} = Value, case Tag of ?RIAK_TAG -> {_RMD, _VC, _Hash, Size} = MD, @@ -316,7 +347,7 @@ get_size(PK, Value) -> get_keyandhash(LK, Value) -> {Tag, Bucket, Key, _} = LK, - {_, _, MD} = Value, + {_, _, _, MD} = Value, case Tag of ?RIAK_TAG -> {_RMD, _VC, Hash, _Size} = MD, @@ -375,11 +406,14 @@ indexspecs_test() -> {remove, "t1_bin", "abdc456"}], Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(1, Changes)), + {1, {active, infinity}, no_lookup, null}}, + lists:nth(1, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(2, Changes)), + {1, {active, infinity}, no_lookup, null}}, + lists:nth(2, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, - {1, tomb, null}}, lists:nth(3, Changes)). + {1, tomb, no_lookup, null}}, + lists:nth(3, Changes)). endkey_passed_test() -> TestKey = {i, null, null, null}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2bfcd9c..9a37cae 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -633,13 +633,13 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> leveled_log:log("I0014", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, leveled_skiplist:empty(true)}, + InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), + {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> + ok = push_to_penciller(Penciller, AccLC), {ok, AccMinSQN}; - {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), + {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> + ok = push_to_penciller(Penciller, AccLC), NextSQN = MaxSQN + 1, load_between_sequence(NextSQN, NextSQN + ?LOADING_BATCH, @@ -657,14 +657,13 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, ok end. -push_to_penciller(Penciller, KeyTree) -> +push_to_penciller(Penciller, LedgerCache) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading - R = leveled_penciller:pcl_pushmem(Penciller, KeyTree), - case R of + case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of returned -> timer:sleep(?LOADING_PAUSE), - push_to_penciller(Penciller, KeyTree); + push_to_penciller(Penciller, LedgerCache); ok -> ok end. @@ -739,7 +738,7 @@ initiate_penciller_snapshot(Bookie) -> {ok, {LedgerSnap, LedgerCache}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, LedgerCache), + leveled_bookie:load_snapshot(LedgerSnap, LedgerCache), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 6c7e4cb..fa26555 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -165,7 +165,7 @@ {"I0013", {info, "File ~s to be removed from manifest"}}, {"I0014", - {info, "On startup oading from filename ~s from SQN ~w"}}, + {info, "On startup loading from filename ~s from SQN ~w"}}, {"I0015", {info, "Opening manifest file at ~s with SQN ~w"}}, {"I0016", diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index bbd2dae..272071d 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -363,11 +363,11 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> BNumber = string:right(integer_to_list(BucketLow + random:uniform(BRange)), 4, $0), KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0), - RandKey = {{o, - "Bucket" ++ BNumber, - "Key" ++ KNumber}, - {Count + 1, - {active, infinity}, null}}, + K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber}, + RandKey = {K, {Count + 1, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). choose_pid_toquery([ManEntry|_T], Key) when diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a111054..a1ab9b1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -168,9 +168,11 @@ pcl_pushmem/2, pcl_fetchlevelzero/2, pcl_fetch/2, + pcl_fetch/3, pcl_fetchkeys/5, pcl_fetchnextkey/5, pcl_checksequencenumber/3, + pcl_checksequencenumber/4, pcl_workforclerk/1, pcl_promptmanifestchange/2, pcl_confirml0complete/4, @@ -213,8 +215,6 @@ levelzero_pending = false :: boolean(), levelzero_constructor :: pid(), levelzero_cache = [] :: list(), % a list of skiplists - levelzero_index, - % is an array - but cannot specif due to OTP compatability levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), levelzero_cointoss = false :: boolean(), @@ -236,9 +236,9 @@ pcl_start(PCLopts) -> gen_server:start(?MODULE, [PCLopts], []). -pcl_pushmem(Pid, DumpList) -> +pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller - gen_server:call(Pid, {push_mem, DumpList}, infinity). + gen_server:call(Pid, {push_mem, LedgerCache}, infinity). pcl_fetchlevelzero(Pid, Slot) -> %% Timeout to cause crash of L0 file when it can't get the close signal @@ -249,7 +249,14 @@ pcl_fetchlevelzero(Pid, Slot) -> gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). pcl_fetch(Pid, Key) -> - gen_server:call(Pid, {fetch, Key}, infinity). + Hash = leveled_codec:magic_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {fetch, Key, Hash}, infinity) + end. + +pcl_fetch(Pid, Key, Hash) -> + gen_server:call(Pid, {fetch, Key, Hash}, infinity). pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, @@ -262,7 +269,14 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> infinity). pcl_checksequencenumber(Pid, Key, SQN) -> - gen_server:call(Pid, {check_sqn, Key, SQN}, infinity). + Hash = leveled_codec:magic_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) + end. + +pcl_checksequencenumber(Pid, Key, Hash, SQN) -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity). pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). @@ -313,8 +327,9 @@ init([PCLopts]) -> end. -handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) - when Snap == false -> +handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, + From, + State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: % % 1 - Receive a gb_tree containing the latest Key/Value pairs (note that @@ -342,25 +357,24 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) false -> leveled_log:log("P0018", [ok, false, false]), gen_server:reply(From, ok), - {noreply, update_levelzero(State#state.levelzero_index, - State#state.levelzero_size, - PushedTree, + {noreply, update_levelzero(State#state.levelzero_size, + {PushedTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache, State)} end; -handle_call({fetch, Key}, _From, State) -> +handle_call({fetch, Key, Hash}, _From, State) -> {reply, fetch_mem(Key, + Hash, State#state.manifest, - State#state.levelzero_index, State#state.levelzero_cache), State}; -handle_call({check_sqn, Key, SQN}, _From, State) -> +handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, compare_to_sqn(fetch_mem(Key, + Hash, State#state.manifest, - State#state.levelzero_index, State#state.levelzero_cache), SQN), State}; @@ -394,15 +408,13 @@ handle_call(get_startup_sqn, _From, State) -> handle_call({register_snapshot, Snapshot}, _From, State) -> Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {ok, State}, State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, BookieIncrTree}, _From, State) -> - L0D = leveled_pmem:add_to_index(snap, - State#state.levelzero_size, - BookieIncrTree, +handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) -> + L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, + {BookieIncrTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache), - {LedgerSQN, L0Size, L0Index, L0Cache} = L0D, + {LedgerSQN, L0Size, L0Cache} = L0D, {reply, ok, State#state{levelzero_cache=L0Cache, - levelzero_index=L0Index, levelzero_size=L0Size, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; @@ -453,7 +465,6 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> {noreply, State#state{levelzero_cache=[], levelzero_pending=false, levelzero_constructor=undefined, - levelzero_index=leveled_pmem:new_index(), levelzero_size=0, manifest=UpdMan, persisted_sqn=State#state.ledger_sqn}}. @@ -546,7 +557,6 @@ start_from_file(PCLopts) -> InitState = #state{clerk=MergeClerk, root_path=RootPath, - levelzero_index = leveled_pmem:new_index(), levelzero_maxcachesize=MaxTableSize, levelzero_cointoss=CoinToss}, @@ -622,19 +632,18 @@ start_from_file(PCLopts) -> -update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> - Update = leveled_pmem:add_to_index(L0Index, - L0Size, - PushedTree, +update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, + LedgerSQN, L0Cache, State) -> + Update = leveled_pmem:add_to_cache(L0Size, + {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache), - {MaxSQN, NewL0Size, UpdL0Index, UpdL0Cache} = Update, + {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, if - MaxSQN >= LedgerSQN -> + UpdMaxSQN >= LedgerSQN -> UpdState = State#state{levelzero_cache=UpdL0Cache, - levelzero_index=UpdL0Index, levelzero_size=NewL0Size, - ledger_sqn=MaxSQN}, + ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, Level0Free = length(get_item(0, State#state.manifest, [])) == 0, RandomFactor = @@ -659,7 +668,6 @@ update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> end; NewL0Size == L0Size -> State#state{levelzero_cache=L0Cache, - levelzero_index=L0Index, levelzero_size=L0Size, ledger_sqn=LedgerSQN} end. @@ -707,8 +715,8 @@ levelzero_filename(State) -> FileName. -fetch_mem(Key, Manifest, L0Index, L0Cache) -> - L0Check = leveled_pmem:check_levelzero(Key, L0Index, L0Cache), +fetch_mem(Key, Hash, Manifest, L0Cache) -> + L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); @@ -1284,8 +1292,12 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> T0 = leveled_skiplist:empty(true), - T1 = lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, - T0, + T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) -> + SL = leveled_skiplist:enter(K, V, AccSL), + SQN = leveled_codec:strip_to_seqonly({K, V}), + {SL, min(SQN, MinSQN), max(SQN, MaxSQN)} + end, + {T0, infinity, 0}, KL), case pcl_pushmem(PCL, T1) of returned -> @@ -1295,23 +1307,32 @@ maybe_pause_push(PCL, KL) -> ok end. +%% old test data doesn't have the magic hash +add_missing_hash({K, {SQN, ST, MD}}) -> + {K, {SQN, ST, leveled_codec:magic_hash(K), MD}}. + + simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001", null}, - {1, {active, infinity}, null}}, + Key1_Pre = {{o,"Bucket0001", "Key0001", null}, + {1, {active, infinity}, null}}, + Key1 = add_missing_hash(Key1_Pre), KL1 = leveled_sft:generate_randomkeys({1000, 2}), - Key2 = {{o,"Bucket0002", "Key0002", null}, + Key2_Pre = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, + Key2 = add_missing_hash(Key2_Pre), KL2 = leveled_sft:generate_randomkeys({900, 1003}), % Keep below the max table size by having 900 not 1000 - Key3 = {{o,"Bucket0003", "Key0003", null}, + Key3_Pre = {{o,"Bucket0003", "Key0003", null}, {2003, {active, infinity}, null}}, + Key3 = add_missing_hash(Key3_Pre), KL3 = leveled_sft:generate_randomkeys({1000, 2004}), - Key4 = {{o,"Bucket0004", "Key0004", null}, + Key4_Pre = {{o,"Bucket0004", "Key0004", null}, {3004, {active, infinity}, null}}, + Key4 = add_missing_hash(Key4_Pre), KL4 = leveled_sft:generate_randomkeys({1000, 3005}), ok = maybe_pause_push(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), @@ -1351,7 +1372,8 @@ simple_server_test() -> SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap, leveled_skiplist:empty()), + leveled_bookie:load_snapshot(PclSnap, + leveled_bookie:empty_ledgercache()), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), @@ -1383,7 +1405,9 @@ simple_server_test() -> % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot - Key1A = {{o,"Bucket0001", "Key0001", null}, {4005, {active, infinity}, null}}, + Key1A_Pre = {{o,"Bucket0001", "Key0001", null}, + {4005, {active, infinity}, null}}, + Key1A = add_missing_hash(Key1A_Pre), KL1A = leveled_sft:generate_randomkeys({2000, 4006}), ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, KL1A), @@ -1400,7 +1424,7 @@ simple_server_test() -> term_to_binary("Hello")), {ok, PclSnap2} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap2, leveled_skiplist:empty()), + leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", @@ -1506,23 +1530,26 @@ simple_findnextkey_test() -> sqnoverlap_findnextkey_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], {Array2, KV1} = find_nextkey(QueryArray, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, KV1), + ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + KV1), {Array3, KV2} = find_nextkey(Array2, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}, KV2), + ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}, + KV2), {Array4, KV3} = find_nextkey(Array3, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}, KV3), + ?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}}, + KV3), ER = find_nextkey(Array4, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), @@ -1530,23 +1557,26 @@ sqnoverlap_findnextkey_test() -> sqnoverlap_otherway_findnextkey_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {1, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], {Array2, KV1} = find_nextkey(QueryArray, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, KV1), + ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + KV1), {Array3, KV2} = find_nextkey(Array2, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}, KV2), + ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}, + KV2), {Array4, KV3} = find_nextkey(Array3, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}, KV3), + ?assertMatch({{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}, + KV3), ER = find_nextkey(Array4, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), @@ -1554,19 +1584,19 @@ sqnoverlap_otherway_findnextkey_test() -> foldwithimm_simple_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {1, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"}, - {7, {active, infinity}, null}, + {7, {active, infinity}, 0, null}, leveled_skiplist:empty()), IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, + {8, {active, infinity}, 0, null}, IMM0), IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"}, - {9, {active, infinity}, null}, + {9, {active, infinity}, 0, null}, IMM1), IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}), AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), @@ -1581,7 +1611,7 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key6"}, 7}], Acc), IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, + {8, {active, infinity}, 0, null}, leveled_skiplist:empty()), IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}), AccA = keyfolder(IMMiterA, @@ -1593,7 +1623,7 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key5"}, 2}], AccA), IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"}, - {10, {active, infinity}, null}, + {10, {active, infinity}, 0, null}, IMM2), IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}), AccB = keyfolder(IMMiterB, @@ -1688,14 +1718,15 @@ badmanifest_test() -> clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001", null}, + Key1_pre = {{o,"Bucket0001", "Key0001", null}, {1001, {active, infinity}, null}}, + Key1 = add_missing_hash(Key1_pre), KL1 = leveled_sft:generate_randomkeys({1000, 1}), ok = maybe_pause_push(PCL, KL1 ++ [Key1]), %% Added together, as split apart there will be a race between the close %% call to the penciller and the second fetch of the cache entry - ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), + ?assertMatch(Key1, pcl_fetch(PCL, {o, "Bucket0001", "Key0001", null})), timer:sleep(100), % Avoids confusion if L0 file not written before close ok = pcl_close(PCL), diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 61ecd4e..5ba62aa 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -42,9 +42,8 @@ -include("include/leveled.hrl"). -export([ - add_to_index/5, + add_to_cache/4, to_list/2, - new_index/0, check_levelzero/3, merge_trees/4 ]). @@ -56,53 +55,20 @@ %%% API %%%============================================================================ -add_to_index(snap, L0Size, LevelMinus1, LedgerSQN, TreeList) -> - FoldFun = fun({K, V}, {AccMinSQN, AccMaxSQN, AccCount}) -> - SQN = leveled_codec:strip_to_seqonly({K, V}), - {min(SQN, AccMinSQN), - max(SQN, AccMaxSQN), - AccCount + 1} - end, - LM1List = leveled_skiplist:to_list(LevelMinus1), - StartingT = {infinity, 0, L0Size}, - {MinSQN, MaxSQN, NewL0Size} = lists:foldl(FoldFun, StartingT, LM1List), - if - MinSQN > LedgerSQN -> - {MaxSQN, - NewL0Size, - snap, - lists:append(TreeList, [LevelMinus1])} - end; -add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> - SW = os:timestamp(), - SlotInTreeList = length(TreeList) + 1, - FoldFun = fun({K, V}, {AccMinSQN, AccMaxSQN, AccCount}) -> - SQN = leveled_codec:strip_to_seqonly({K, V}), - Hash = erlang:phash2(K), - Count0 = case ets:lookup(L0Index, Hash) of - [] -> - ets:insert(L0Index, {Hash, [SlotInTreeList]}), - AccCount + 1; - [{Hash, L}] -> - ets:insert(L0Index, {Hash, [SlotInTreeList|L]}), - AccCount - end, - {min(SQN, AccMinSQN), - max(SQN, AccMaxSQN), - Count0} - end, - LM1List = leveled_skiplist:to_list(LevelMinus1), - StartingT = {infinity, 0, L0Size}, - {MinSQN, MaxSQN, NewL0Size} = lists:foldl(FoldFun, StartingT, LM1List), - leveled_log:log_timer("PM001", [NewL0Size], SW), - if - MinSQN > LedgerSQN -> - {MaxSQN, - NewL0Size, - L0Index, - lists:append(TreeList, [LevelMinus1])} +add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> + LM1Size = leveled_skiplist:size(LevelMinus1), + case LM1Size of + 0 -> + {LedgerSQN, L0Size, TreeList}; + _ -> + if + MinSQN >= LedgerSQN -> + {MaxSQN, + L0Size + LM1Size, + lists:append(TreeList, [LevelMinus1])} + end end. - + to_list(Slots, FetchFun) -> SW = os:timestamp(), @@ -118,21 +84,13 @@ to_list(Slots, FetchFun) -> FullList. -new_index() -> - ets:new(index, [set, private]). +check_levelzero(Key, TreeList) -> + check_levelzero(Key, leveled_codec:magic_hash(Key), TreeList). -check_levelzero(_Key, _L0Index, []) -> +check_levelzero(_Key, _Hash, []) -> {false, not_found}; -check_levelzero(Key, snap, TreeList) -> - check_slotlist(Key, lists:seq(1, length(TreeList)), TreeList); -check_levelzero(Key, L0Index, TreeList) -> - Hash = erlang:phash2(Key), - case ets:lookup(L0Index, Hash) of - [] -> - {false, not_found}; - [{Hash, SlotList}] -> - check_slotlist(Key, SlotList, TreeList) - end. +check_levelzero(Key, Hash, TreeList) -> + check_slotlist(Key, Hash, lists:seq(1, length(TreeList)), TreeList). merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> @@ -148,7 +106,7 @@ merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> %%% Internal Functions %%%============================================================================ -check_slotlist(Key, CheckList, TreeList) -> +check_slotlist(Key, Hash, CheckList, TreeList) -> SlotCheckFun = fun(SlotToCheck, {Found, KV}) -> case Found of @@ -156,7 +114,7 @@ check_slotlist(Key, CheckList, TreeList) -> {Found, KV}; false -> CheckTree = lists:nth(SlotToCheck, TreeList), - case leveled_skiplist:lookup(Key, CheckTree) of + case leveled_skiplist:lookup(Key, Hash, CheckTree) of none -> {Found, KV}; {value, Value} -> @@ -166,7 +124,7 @@ check_slotlist(Key, CheckList, TreeList) -> end, lists:foldl(SlotCheckFun, {false, not_found}, - lists:reverse(lists:usort(CheckList))). + lists:reverse(CheckList)). %%%============================================================================ %%% Test @@ -177,7 +135,7 @@ check_slotlist(Key, CheckList, TreeList) -> generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, - leveled_skiplist:empty(), + leveled_skiplist:empty(true), BucketRangeLow, BucketRangeHigh). @@ -197,58 +155,59 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> compare_method_test() -> - R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0Index, L0TreeList}) -> + R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0TreeList}) -> LM1 = generate_randomkeys(LedgerSQN + 1, 2000, 1, 500), - add_to_index(L0Index, L0Size, LM1, LedgerSQN, - L0TreeList) + add_to_cache(L0Size, + {LM1, + LedgerSQN + 1, + LedgerSQN + 2000}, + LedgerSQN, + L0TreeList) end, - {0, 0, new_index(), []}, + {0, 0, []}, lists:seq(1, 16)), - {SQN, Size, Index, TreeList} = R, + {SQN, Size, TreeList} = R, ?assertMatch(32000, SQN), ?assertMatch(true, Size =< 32000), TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)), - S0 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = lists:foldr(fun(Tree, {Found, KV}) -> - case Found of - true -> - {true, KV}; - false -> - L0 = leveled_skiplist:lookup(Key, Tree), - case L0 of - none -> - {false, not_found}; - {value, Value} -> - {true, {Key, Value}} - end + FindKeyFun = + fun(Key) -> + fun(Tree, {Found, KV}) -> + case Found of + true -> + {true, KV}; + false -> + L0 = leveled_skiplist:lookup(Key, Tree), + case L0 of + none -> + {false, not_found}; + {value, Value} -> + {true, {Key, Value}} end - end, - {false, not_found}, - TreeList), - [R0|Acc] - end, - [], - TestList), + end + end + end, - S1 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = check_levelzero(Key, Index, TreeList), - [R0|Acc] - end, + S0 = lists:foldl(fun({Key, _V}, Acc) -> + R0 = lists:foldr(FindKeyFun(Key), + {false, not_found}, + TreeList), + [R0|Acc] end, [], TestList), - S2 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = check_levelzero(Key, snap, TreeList), + + S1 = lists:foldl(fun({Key, _V}, Acc) -> + R0 = check_levelzero(Key, TreeList), [R0|Acc] end, [], TestList), ?assertMatch(S0, S1), - ?assertMatch(S0, S2), StartKey = {o, "Bucket0100", null, null}, EndKey = {o, "Bucket0200", null, null}, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 9c67721..4c86dff 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1400,12 +1400,15 @@ generate_randomkeys(Count) -> generate_randomkeys(0, _SQN, Acc) -> lists:reverse(Acc); generate_randomkeys(Count, SQN, Acc) -> - RandKey = {{o, - lists:concat(["Bucket", random:uniform(1024)]), - lists:concat(["Key", random:uniform(1024)]), - null}, + K = {o, + lists:concat(["Bucket", random:uniform(1024)]), + lists:concat(["Key", random:uniform(1024)]), + null}, + RandKey = {K, {SQN, - {active, infinity}, null}}, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). generate_sequentialkeys(Count, Start) -> @@ -1415,75 +1418,86 @@ generate_sequentialkeys(Target, Incr, Acc) when Incr =:= Target -> Acc; generate_sequentialkeys(Target, Incr, Acc) -> KeyStr = string:right(integer_to_list(Incr), 8, $0), - NextKey = {{o, - "BucketSeq", - lists:concat(["Key", KeyStr]), - null}, + K = {o, "BucketSeq", lists:concat(["Key", KeyStr]), null}, + NextKey = {K, {5, - {active, infinity}, null}}, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_sequentialkeys(Target, Incr + 1, [NextKey|Acc]). simple_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key3", null}, {2, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {3, {active, infinity}, null}}], + KeyList1 = [{{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, + {{o, "Bucket1", "Key3", null}, + {2, {active, infinity}, no_lookup, null}}], + KeyList2 = [{{o, "Bucket1", "Key2", null}, + {3, {active, infinity}, no_lookup, null}}], {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, KeyList2, #level{level=1}), ?assertMatch(partial, ListStatus), [H1|T1] = MergedKeyList, - ?assertMatch(H1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), + ?assertMatch({{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, H1), [H2|T2] = T1, - ?assertMatch(H2, {{o, "Bucket1", "Key2", null}, {3, {active, infinity}, null}}), - ?assertMatch(T2, [{{o, "Bucket1", "Key3", null}, {2, {active, infinity}, null}}]), + ?assertMatch({{o, "Bucket1", "Key2", null}, + {3, {active, infinity}, no_lookup, null}}, H2), + ?assertMatch([{{o, "Bucket1", "Key3", null}, + {2, {active, infinity}, no_lookup, null}}], T2), ?assertMatch(SN, {1,3}). dominate_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key2", null}, {2, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {3, {tomb, infinity}, null}}], + KeyList1 = [{{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, + {{o, "Bucket1", "Key2", null}, + {2, {active, infinity}, no_lookup, null}}], + KeyList2 = [{{o, "Bucket1", "Key2", null}, + {3, {tomb, infinity}, no_lookup, null}}], {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, KeyList2, #level{level=1}), ?assertMatch(partial, ListStatus), [K1, K2] = MergedKeyList, - ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), - ?assertMatch(K2, {{o, "Bucket1", "Key2", null}, {3, {tomb, infinity}, null}}), + ?assertMatch(K1, lists:nth(1, KeyList1)), + ?assertMatch(K2, lists:nth(1, KeyList2)), ?assertMatch(SN, {1,3}). sample_keylist() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key4", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key6", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key8", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9a", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9c", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9d", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key4", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key6", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key8", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key4", null}, {3, {active, infinity}, null}}, - {{o, "Bucket3", "Key6", null}, {2, {active, infinity}, null}}, - {{o, "Bucket3", "Key8", null}, {1, {active, infinity}, null}}], + KeyList1 = + [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}], + KeyList2 = + [{{o, "Bucket1", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key4", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key6", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key8", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9a", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9c", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9d", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key4", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key6", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key8", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key4", null}, {3, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key6", null}, {2, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key8", null}, {1, {active, infinity}, 0, null}}], {KeyList1, KeyList2}. alternating_create_block_test() -> @@ -1495,12 +1509,12 @@ alternating_create_block_test() -> ?assertMatch(BlockSize, 32), ?assertMatch(ListStatus, complete), K1 = lists:nth(1, MergedKeyList), - ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), + ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}), K11 = lists:nth(11, MergedKeyList), - ?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, null}}), + ?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}), K32 = lists:nth(32, MergedKeyList), - ?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, null}}), - HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, null}}, + ?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}), + HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, 0, null}}, {_, ListStatus2, _, _, _, _} = create_block([HKey|KeyList1], KeyList2, #level{level=1}), @@ -1752,7 +1766,7 @@ initial_create_file_test() -> Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8", null}), io:format("Result is ~w~n", [Result1]), ?assertMatch(Result1, {{o, "Bucket1", "Key8", null}, - {1, {active, infinity}, null}}), + {1, {active, infinity}, 0, null}}), Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88", null}), io:format("Result is ~w~n", [Result2]), ?assertMatch(Result2, not_present), @@ -1768,17 +1782,17 @@ big_create_file_test() -> InitFileMD, KL1, KL2, #level{level=1}), - [{K1, {Sq1, St1, V1}}|_] = KL1, - [{K2, {Sq2, St2, V2}}|_] = KL2, + [{K1, {Sq1, St1, MH1, V1}}|_] = KL1, + [{K2, {Sq2, St2, MH2, V2}}|_] = KL2, Result1 = fetch_keyvalue(Handle, FileMD, K1), Result2 = fetch_keyvalue(Handle, FileMD, K2), - ?assertMatch(Result1, {K1, {Sq1, St1, V1}}), - ?assertMatch(Result2, {K2, {Sq2, St2, V2}}), + ?assertMatch(Result1, {K1, {Sq1, St1, MH1, V1}}), + ?assertMatch(Result2, {K2, {Sq2, St2, MH2, V2}}), SubList = lists:sublist(KL2, 1000), - lists:foreach(fun(K) -> - {Kn, {_, _, _}} = K, + lists:foreach(fun(KV) -> + {Kn, _} = KV, Rn = fetch_keyvalue(Handle, FileMD, Kn), - ?assertMatch({Kn, {_, _, _}}, Rn) + ?assertMatch({Kn, _}, Rn) end, SubList), Result3 = fetch_keyvalue(Handle, @@ -1834,13 +1848,13 @@ initial_iterator_test() -> ok = file:delete(Filename). key_dominates_test() -> - KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, []}}, - KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, []}}, - KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, []}}, - KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, []}}, - KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, []}}, - KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, []}}, - KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, []}}, + KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, + KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, + KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, 0, []}}, + KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, 0, []}}, + KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, 0, []}}, + KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, 0, []}}, + KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, 0, []}}, KL1 = [KV1, KV2], KL2 = [KV3, KV4], ?assertMatch({{next_key, KV1}, [KV2], KL2}, @@ -1970,21 +1984,21 @@ hashclash_test() -> "Bucket", "Key8400" ++ integer_to_list(X), null}, - Value = {X, {active, infinity}, null}, + Value = {X, {active, infinity}, 0, null}, Acc ++ [{Key, Value}] end, [], lists:seq(10,98)), - KeyListToUse = [{Key1, {1, {active, infinity}, null}}|KeyList] - ++ [{Key99, {99, {active, infinity}, null}}], + KeyListToUse = [{Key1, {1, {active, infinity}, 0, null}}|KeyList] + ++ [{Key99, {99, {active, infinity}, 0, null}}], {InitHandle, InitFileMD} = create_file(Filename), {Handle, _FileMD, _Rem} = complete_file(InitHandle, InitFileMD, KeyListToUse, [], #level{level=1}), ok = file:close(Handle), {ok, SFTr, _KeyExtremes} = sft_open(Filename), - ?assertMatch({Key1, {1, {active, infinity}, null}}, + ?assertMatch({Key1, {1, {active, infinity}, 0, null}}, sft_get(SFTr, Key1)), - ?assertMatch({Key99, {99, {active, infinity}, null}}, + ?assertMatch({Key99, {99, {active, infinity}, 0, null}}, sft_get(SFTr, Key99)), ?assertMatch(not_present, sft_get(SFTr, KeyNF)), diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl index 63a3842..17da98c 100644 --- a/src/leveled_skiplist.erl +++ b/src/leveled_skiplist.erl @@ -22,6 +22,8 @@ from_sortedlist/2, to_list/1, enter/3, + enter/4, + enter_nolookup/3, to_range/2, to_range/3, lookup/2, @@ -43,17 +45,31 @@ %%%============================================================================ enter(Key, Value, SkipList) -> - Hash = erlang:phash2(Key), - case is_list(SkipList) of - true -> - enter(Key, Value, Hash, SkipList, ?SKIP_WIDTH, ?LIST_HEIGHT); - false -> - SkipList0 = add_to_array(Hash, SkipList), - NewListPart = enter(Key, Value, Hash, - dict:fetch(?SKIP_WIDTH, SkipList0), - ?SKIP_WIDTH, ?LIST_HEIGHT), - dict:store(?SKIP_WIDTH, NewListPart, SkipList0) - end. + Hash = leveled_codec:magic_hash(Key), + enter(Key, Hash, Value, SkipList). + +enter(Key, Hash, Value, SkipList) -> + Bloom0 = + case element(1, SkipList) of + list_only -> + list_only; + Bloom -> + leveled_tinybloom:enter({hash, Hash}, Bloom) + end, + {Bloom0, + enter(Key, Value, Hash, + element(2, SkipList), + ?SKIP_WIDTH, ?LIST_HEIGHT)}. + +%% Can iterate over a key entered this way, but never lookup the key +%% used for index terms +%% The key may still be a marker key - and the much cheaper native hash +%% is used to dtermine this, avoiding the more expensive magic hash +enter_nolookup(Key, Value, SkipList) -> + {element(1, SkipList), + enter(Key, Value, erlang:phash2(Key), + element(2, SkipList), + ?SKIP_WIDTH, ?LIST_HEIGHT)}. from_list(UnsortedKVL) -> from_list(UnsortedKVL, false). @@ -66,71 +82,45 @@ from_sortedlist(SortedKVL) -> from_sortedlist(SortedKVL, false). from_sortedlist(SortedKVL, BloomProtect) -> - case BloomProtect of - true -> - SL0 = lists:foldr(fun({K, _V}, SkipL) -> - H = erlang:phash2(K), - add_to_array(H, SkipL) end, - empty(true), - SortedKVL), - dict:store(?SKIP_WIDTH, - from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT), - SL0); - false -> - from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT) - end. + Bloom0 = + case BloomProtect of + true -> + lists:foldr(fun({K, _V}, Bloom) -> + leveled_tinybloom:enter(K, Bloom) end, + leveled_tinybloom:empty(?SKIP_WIDTH), + SortedKVL); + false -> + list_only + end, + {Bloom0, from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT)}. lookup(Key, SkipList) -> - case is_list(SkipList) of - true -> - list_lookup(Key, SkipList, ?LIST_HEIGHT); - false -> - lookup(Key, erlang:phash2(Key), SkipList) + case element(1, SkipList) of + list_only -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT); + _ -> + lookup(Key, leveled_codec:magic_hash(Key), SkipList) end. lookup(Key, Hash, SkipList) -> - {Slot, Bit} = hash_toslotbit(Hash), - RestLen = ?BITARRAY_SIZE - Bit - 1, - <<_Head:Bit/bitstring, - B:1/bitstring, - _Rest:RestLen/bitstring>> = dict:fetch(Slot, SkipList), - case B of - <<0:1>> -> + case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of + false -> none; - <<1:1>> -> - list_lookup(Key, dict:fetch(?SKIP_WIDTH, SkipList), ?LIST_HEIGHT) + true -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) end. %% Rather than support iterator_from like gb_trees, will just an output a key %% sorted list for the desired range, which can the be iterated over as normal to_range(SkipList, Start) -> - case is_list(SkipList) of - true -> - to_range(SkipList, Start, ?INFINITY_KEY, ?LIST_HEIGHT); - false -> - to_range(dict:fetch(?SKIP_WIDTH, SkipList), - Start, ?INFINITY_KEY, - ?LIST_HEIGHT) - end. + to_range(element(2, SkipList), Start, ?INFINITY_KEY, ?LIST_HEIGHT). to_range(SkipList, Start, End) -> - case is_list(SkipList) of - true -> - to_range(SkipList, Start, End, ?LIST_HEIGHT); - false -> - to_range(dict:fetch(?SKIP_WIDTH, SkipList), - Start, End, - ?LIST_HEIGHT) - end. + to_range(element(2, SkipList), Start, End, ?LIST_HEIGHT). to_list(SkipList) -> - case is_list(SkipList) of - true -> - to_list(SkipList, ?LIST_HEIGHT); - false -> - to_list(dict:fetch(?SKIP_WIDTH, SkipList), ?LIST_HEIGHT) - end. + to_list(element(2, SkipList), ?LIST_HEIGHT). empty() -> empty(false). @@ -138,46 +128,20 @@ empty() -> empty(BloomProtect) -> case BloomProtect of true -> - FoldFun = - fun(X, Acc) -> dict:store(X, <<0:?BITARRAY_SIZE>>, Acc) end, - lists:foldl(FoldFun, - dict:store(?SKIP_WIDTH, - empty([], ?LIST_HEIGHT), - dict:new()), - lists:seq(0, ?SKIP_WIDTH - 1)); + {leveled_tinybloom:empty(?SKIP_WIDTH), + empty([], ?LIST_HEIGHT)}; false -> - empty([], ?LIST_HEIGHT) + {list_only, empty([], ?LIST_HEIGHT)} end. size(SkipList) -> - case is_list(SkipList) of - true -> - size(SkipList, ?LIST_HEIGHT); - false -> - size(dict:fetch(?SKIP_WIDTH, SkipList), ?LIST_HEIGHT) - end. - + size(element(2, SkipList), ?LIST_HEIGHT). %%%============================================================================ %%% SkipList Base Functions %%%============================================================================ -hash_toslotbit(Hash) -> - Slot = Hash band (?SKIP_WIDTH - 1), - Bit = (Hash bsr ?SKIP_WIDTH) band (?BITARRAY_SIZE - 1), - {Slot, Bit}. - - -add_to_array(Hash, SkipList) -> - {Slot, Bit} = hash_toslotbit(Hash), - RestLen = ?BITARRAY_SIZE - Bit - 1, - <> = dict:fetch(Slot, SkipList), - BitArray = <>, - dict:store(Slot, BitArray, SkipList). - enter(Key, Value, Hash, SkipList, Width, 1) -> {MarkerKey, SubList} = find_mark(Key, SkipList), case Hash rem Width of @@ -488,68 +452,30 @@ dotest_skiplist_small(N) -> skiplist_withbloom_test() -> io:format(user, "~n~nBloom protected skiplist test:~n~n", []), - N = 4000, - KL = generate_randomkeys(1, N, 1, N div 5), - - SWaGSL = os:timestamp(), - SkipList = from_list(lists:reverse(KL), true), - io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++ - "Top level key count of ~w~n", - [N, - timer:now_diff(os:timestamp(), SWaGSL), - length(dict:fetch(?SKIP_WIDTH, SkipList))]), - io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, - dict:fetch(?SKIP_WIDTH, SkipList))]), - KLSorted = lists:ukeysort(1, lists:reverse(KL)), + skiplist_tester(true). - SWaGSL2 = os:timestamp(), - SkipList = from_sortedlist(KLSorted, true), - io:format(user, "Generating skip list with ~w sorted keys in ~w " ++ - "microseconds~n", - [N, timer:now_diff(os:timestamp(), SWaGSL2)]), - - SWaDSL = os:timestamp(), - SkipList1 = - lists:foldl(fun({K, V}, SL) -> - enter(K, V, SL) - end, - empty(true), - KL), - io:format(user, "Dynamic load of skiplist with ~w keys took ~w " ++ - "microseconds~n" ++ - "Top level key count of ~w~n", - [N, - timer:now_diff(os:timestamp(), SWaDSL), - length(dict:fetch(?SKIP_WIDTH, SkipList1))]), - io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, - dict:fetch(?SKIP_WIDTH, SkipList1))]), - - io:format(user, "~nRunning timing tests for generated skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList, N), - - io:format(user, "~nRunning timing tests for dynamic skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList1, N). - skiplist_nobloom_test() -> io:format(user, "~n~nBloom free skiplist test:~n~n", []), + skiplist_tester(false). + +skiplist_tester(Bloom) -> N = 4000, KL = generate_randomkeys(1, N, 1, N div 5), SWaGSL = os:timestamp(), - SkipList = from_list(lists:reverse(KL)), + SkipList = from_list(lists:reverse(KL), Bloom), io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++ "Top level key count of ~w~n", [N, timer:now_diff(os:timestamp(), SWaGSL), - length(SkipList)]), + length(element(2, SkipList))]), io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, SkipList)]), + [lists:map(fun({_L, SL}) -> length(SL) end, + element(2, SkipList))]), KLSorted = lists:ukeysort(1, lists:reverse(KL)), SWaGSL2 = os:timestamp(), - SkipList = from_sortedlist(KLSorted), + SkipList = from_sortedlist(KLSorted, Bloom), io:format(user, "Generating skip list with ~w sorted keys in ~w " ++ "microseconds~n", [N, timer:now_diff(os:timestamp(), SWaGSL2)]), @@ -559,25 +485,26 @@ skiplist_nobloom_test() -> lists:foldl(fun({K, V}, SL) -> enter(K, V, SL) end, - empty(), + empty(Bloom), KL), io:format(user, "Dynamic load of skiplist with ~w keys took ~w " ++ "microseconds~n" ++ "Top level key count of ~w~n", [N, timer:now_diff(os:timestamp(), SWaDSL), - length(SkipList1)]), + length(element(2, SkipList1))]), io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, SkipList1)]), + [lists:map(fun({_L, SL}) -> length(SL) end, + element(2, SkipList1))]), io:format(user, "~nRunning timing tests for generated skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList, N), + skiplist_timingtest(KLSorted, SkipList, N, Bloom), io:format(user, "~nRunning timing tests for dynamic skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList1, N). + skiplist_timingtest(KLSorted, SkipList1, N, Bloom). -skiplist_timingtest(KL, SkipList, N) -> +skiplist_timingtest(KL, SkipList, N, Bloom) -> io:format(user, "Timing tests on skiplist of size ~w~n", [leveled_skiplist:size(SkipList)]), CheckList1 = lists:sublist(KL, N div 4, 200), @@ -666,7 +593,24 @@ skiplist_timingtest(KL, SkipList, N) -> FlatList = to_list(SkipList), io:format(user, "Flattening skiplist took ~w microseconds~n", [timer:now_diff(os:timestamp(), SWg)]), - ?assertMatch(KL, FlatList). + ?assertMatch(KL, FlatList), + + case Bloom of + true -> + HashList = lists:map(fun(_X) -> + random:uniform(4296967295) end, + lists:seq(1, 2000)), + SWh = os:timestamp(), + lists:foreach(fun(X) -> + lookup(X, X, SkipList) end, + HashList), + io:format(user, + "Getting 2000 missing keys when hash was known " ++ + "took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWh)]); + false -> + ok + end. define_kv(X) -> {{o, "Bucket", "Key" ++ string:right(integer_to_list(X), 6), null}, @@ -688,5 +632,21 @@ skiplist_roundsize_test() -> ?assertMatch(L, R) end, lists:seq(0, 24)). +skiplist_nolookup_test() -> + N = 4000, + KL = generate_randomkeys(1, N, 1, N div 5), + SkipList = lists:foldl(fun({K, V}, Acc) -> + enter_nolookup(K, V, Acc) end, + empty(true), + KL), + KLSorted = lists:ukeysort(1, lists:reverse(KL)), + lists:foreach(fun({K, _V}) -> + ?assertMatch(none, lookup(K, SkipList)) end, + KL), + ?assertMatch(KLSorted, to_list(SkipList)). + +empty_skiplist_size_test() -> + ?assertMatch(0, leveled_skiplist:size(empty(false))), + ?assertMatch(0, leveled_skiplist:size(empty(true))). -endif. \ No newline at end of file diff --git a/src/leveled_tinybloom.erl b/src/leveled_tinybloom.erl new file mode 100644 index 0000000..166d616 --- /dev/null +++ b/src/leveled_tinybloom.erl @@ -0,0 +1,151 @@ +%% -------- TINY BLOOM --------- +%% +%% For sheltering relatively expensive lookups with a probabilistic check +%% +%% Uses multiple 256 byte blooms. Can sensibly hold up to 1000 keys per array. +%% Even at 1000 keys should still offer only a 20% false positive +%% +%% Restricted to no more than 256 arrays - so can't handle more than 250K keys +%% in total +%% +%% Implemented this way to make it easy to control false positive (just by +%% setting the width). Also only requires binary manipulations of a single +%% hash + +-module(leveled_tinybloom). + +-include("include/leveled.hrl"). + +-export([ + enter/2, + check/2, + empty/1 + ]). + +-include_lib("eunit/include/eunit.hrl"). + +%%%============================================================================ +%%% Bloom API +%%%============================================================================ + + +empty(Width) when Width =< 256 -> + FoldFun = fun(X, Acc) -> dict:store(X, <<0:4096>>, Acc) end, + lists:foldl(FoldFun, dict:new(), lists:seq(0, Width - 1)). + +enter({hash, Hash}, Bloom) -> + {H0, Bit1, Bit2} = split_hash(Hash), + Slot = H0 rem dict:size(Bloom), + BitArray0 = dict:fetch(Slot, Bloom), + BitArray1 = lists:foldl(fun add_to_array/2, + BitArray0, + lists:usort([Bit1, Bit2])), + dict:store(Slot, BitArray1, Bloom); +enter(Key, Bloom) -> + Hash = leveled_codec:magic_hash(Key), + enter({hash, Hash}, Bloom). + +check({hash, Hash}, Bloom) -> + {H0, Bit1, Bit2} = split_hash(Hash), + Slot = H0 rem dict:size(Bloom), + BitArray = dict:fetch(Slot, Bloom), + case getbit(Bit1, BitArray) of + <<0:1>> -> + false; + <<1:1>> -> + case getbit(Bit2, BitArray) of + <<0:1>> -> + false; + <<1:1>> -> + true + end + end; +check(Key, Bloom) -> + Hash = leveled_codec:magic_hash(Key), + check({hash, Hash}, Bloom). + +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + +split_hash(Hash) -> + H0 = Hash band 255, + H1 = (Hash bsr 8) band 4095, + H2 = Hash bsr 20, + {H0, H1, H2}. + +add_to_array(Bit, BitArray) -> + RestLen = 4096 - Bit - 1, + <> = BitArray, + <>. + +getbit(Bit, BitArray) -> + RestLen = 4096 - Bit - 1, + <<_Head:Bit/bitstring, + B:1/bitstring, + _Rest:RestLen/bitstring>> = BitArray, + B. + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +simple_test() -> + N = 4000, + W = 4, + KLin = lists:map(fun(X) -> "Key_" ++ + integer_to_list(X) ++ + integer_to_list(random:uniform(100)) ++ + binary_to_list(crypto:rand_bytes(2)) + end, + lists:seq(1, N)), + KLout = lists:map(fun(X) -> + "NotKey_" ++ + integer_to_list(X) ++ + integer_to_list(random:uniform(100)) ++ + binary_to_list(crypto:rand_bytes(2)) + end, + lists:seq(1, N)), + SW0_PH = os:timestamp(), + lists:foreach(fun(X) -> erlang:phash2(X) end, KLin), + io:format(user, + "~nNative hash function hashes ~w keys in ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW0_PH)]), + SW0_MH = os:timestamp(), + lists:foreach(fun(X) -> leveled_codec:magic_hash(X) end, KLin), + io:format(user, + "~nMagic hash function hashes ~w keys in ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW0_MH)]), + + SW1 = os:timestamp(), + Bloom = lists:foldr(fun enter/2, empty(W), KLin), + io:format(user, + "~nAdding ~w keys to bloom took ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW1)]), + + SW2 = os:timestamp(), + lists:foreach(fun(X) -> ?assertMatch(true, check(X, Bloom)) end, KLin), + io:format(user, + "~nChecking ~w keys in bloom took ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW2)]), + + SW3 = os:timestamp(), + FP = lists:foldr(fun(X, Acc) -> case check(X, Bloom) of + true -> Acc + 1; + false -> Acc + end end, + 0, + KLout), + io:format(user, + "~nChecking ~w keys out of bloom took ~w microseconds " ++ + "with ~w false positive rate~n", + [N, timer:now_diff(os:timestamp(), SW3), FP / N]), + ?assertMatch(true, FP < (N div 4)). + + +-endif. \ No newline at end of file