diff --git a/include/leveled.hrl b/include/leveled.hrl index 0e62cf3..25216f6 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -64,7 +64,8 @@ {root_path :: string(), max_inmemory_tablesize :: integer(), start_snapshot = false :: boolean(), - source_penciller :: pid()}). + source_penciller :: pid(), + levelzero_cointoss = false :: boolean}). -record(iclerk_options, {inker :: pid(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 30e56e4..412eec0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -136,7 +136,10 @@ book_destroy/1]). -export([get_opt/2, - get_opt/3]). + get_opt/3, + load_snapshot/2, + empty_ledgercache/0, + push_ledgercache/2]). -include_lib("eunit/include/eunit.hrl"). @@ -145,18 +148,21 @@ -define(LEDGER_FP, "ledger"). -define(SNAPSHOT_TIMEOUT, 300000). -define(CHECKJOURNAL_PROB, 0.2). --define(CACHE_SIZE_JITTER, 20). --define(JOURNAL_SIZE_JITTER, 10). +-define(CACHE_SIZE_JITTER, 25). +-define(JOURNAL_SIZE_JITTER, 20). + +-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), + min_sqn = infinity :: integer()|infinity, + max_sqn = 0 :: integer()}). -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), - ledger_cache :: list(), % a skiplist + ledger_cache = #ledger_cache{}, is_snapshot :: boolean(), slow_offer = false :: boolean()}). - %%%============================================================================ %%% API %%%============================================================================ @@ -233,19 +239,19 @@ init([Opts]) -> {Inker, Penciller} = startup(InkerOpts, PencillerOpts), CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) - + erlang:phash2(self()) band CacheJitter, + + erlang:phash2(self()) rem CacheJitter, leveled_log:log("B0001", [Inker, Penciller]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=leveled_skiplist:empty(), + ledger_cache=#ledger_cache{}, is_snapshot=false}}; Bookie -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - ok = leveled_penciller:pcl_loadsnapshot(Penciller, - leveled_skiplist:empty()), + CacheToLoad = {leveled_skiplist:empty(true), 0, 0}, + ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, @@ -276,9 +282,9 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> false -> gen_server:reply(From, ok) end, - case maybepush_ledgercache(State#state.cache_size, - Cache0, - State#state.penciller) of + case maybepush_ledgercache(State#state.cache_size, + Cache0, + State#state.penciller) of {ok, NewCache} -> {noreply, State#state{ledger_cache=NewCache, slow_offer=false}}; {returned, NewCache} -> @@ -292,7 +298,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {Seqn, Status, _MD} = leveled_codec:striphead_to_details(Head), + {Seqn, Status, _MH, _MD} = leveled_codec:striphead_to_details(Head), case Status of tomb -> {reply, not_found, State}; @@ -317,11 +323,10 @@ handle_call({head, Bucket, Key, Tag}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {_Seqn, Status, MD} = leveled_codec:striphead_to_details(Head), - case Status of - tomb -> + case leveled_codec:striphead_to_details(Head) of + {_SeqN, tomb, _MH, _MD} -> {reply, not_found, State}; - {active, TS} -> + {_SeqN, {active, TS}, _MH, MD} -> case TS >= leveled_codec:integer_now() of true -> OMD = leveled_codec:build_metadata_object(LedgerKey, MD), @@ -426,19 +431,39 @@ terminate(Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================ +%%% External functions +%%%============================================================================ + +load_snapshot(LedgerSnapshot, LedgerCache) -> + CacheToLoad = {LedgerCache#ledger_cache.skiplist, + LedgerCache#ledger_cache.min_sqn, + LedgerCache#ledger_cache.max_sqn}, + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). + +empty_ledgercache() -> + #ledger_cache{}. + +push_ledgercache(Penciller, Cache) -> + CacheToLoad = {Cache#ledger_cache.skiplist, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, + leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). %%%============================================================================ %%% Internal functions %%%============================================================================ +cache_size(LedgerCache) -> + leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). + bucket_stats(State, Bucket, Tag) -> {ok, {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_size(), @@ -459,9 +484,8 @@ binary_bucketlist(State, Tag, {FoldBucketsFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), BucketAcc = get_nextbucket(null, Tag, LedgerSnapshot, @@ -514,9 +538,8 @@ index_query(State, {B, null} end, Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(Bucket, StartObjKey, ?IDX_TAG, @@ -556,9 +579,8 @@ hashtree_query(State, Tag, JournalCheck) -> {LedgerSnapshot, LedgerCache}, JournalSnapshot} = snapshot_store(State, SnapType), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), AccFun = accumulate_hashes(JournalCheck, JournalSnapshot), @@ -607,9 +629,8 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {FoldObjectsFun, []} end, Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, @@ -628,9 +649,8 @@ bucketkey_query(State, Tag, Bucket, {FoldKeysFun, InitAcc}) -> {LedgerSnapshot, LedgerCache}, _JournalSnapshot} = snapshot_store(State, ledger), Folder = fun() -> - leveled_log:log("B0004", [leveled_skiplist:size(LedgerCache)]), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - LedgerCache), + leveled_log:log("B0004", [cache_size(LedgerCache)]), + load_snapshot(LedgerSnapshot, LedgerCache), SK = leveled_codec:to_ledgerkey(Bucket, null, Tag), EK = leveled_codec:to_ledgerkey(Bucket, null, Tag), AccFun = accumulate_keys(FoldKeysFun), @@ -668,7 +688,7 @@ set_options(Opts) -> MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), MaxJournalSize = MaxJournalSize0 - - erlang:phash2(self()) band JournalSizeJitter, + erlang:phash2(self()) rem JournalSizeJitter, SyncStrat = get_opt(sync_strategy, Opts, sync), WRP = get_opt(waste_retention_period, Opts), @@ -692,7 +712,8 @@ set_options(Opts) -> binary_mode=true, sync_strategy=SyncStrat}}, #penciller_options{root_path = LedgerFP, - max_inmemory_tablesize = PCLL0CacheSize}}. + max_inmemory_tablesize = PCLL0CacheSize, + levelzero_cointoss = true}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), @@ -707,15 +728,22 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> - case leveled_skiplist:lookup(Key, LedgerCache) of - {value, Head} -> - Head; - none -> - case leveled_penciller:pcl_fetch(Penciller, Key) of - {Key, Head} -> + Hash = leveled_codec:magic_hash(Key), + if + Hash /= no_lookup -> + L0R = leveled_skiplist:lookup(Key, + Hash, + LedgerCache#ledger_cache.skiplist), + case L0R of + {value, Head} -> Head; - not_present -> - not_present + none -> + case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of + {Key, Head} -> + Head; + not_present -> + not_present + end end end. @@ -873,18 +901,34 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, - Cache, - Changes). + FoldChangesFun = + fun({K, V}, Cache0) -> + {SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), + SL0 = Cache0#ledger_cache.skiplist, + SL1 = + case Hash of + no_lookup -> + leveled_skiplist:enter_nolookup(K, V, SL0); + _ -> + leveled_skiplist:enter(K, Hash, V, SL0) + end, + Cache0#ledger_cache{skiplist=SL1, + min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} + end, + lists:foldl(FoldChangesFun, Cache, Changes). maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = leveled_skiplist:size(Cache), + CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - case leveled_penciller:pcl_pushmem(Penciller, Cache) of + CacheToLoad = {Cache#ledger_cache.skiplist, + Cache#ledger_cache.min_sqn, + Cache#ledger_cache.max_sqn}, + case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of ok -> - {ok, leveled_skiplist:empty()}; + {ok, #ledger_cache{}}; returned -> {returned, Cache} end; diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 1354571..63777b2 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -860,26 +860,14 @@ close_file(Handle, HashTree, BasePos) -> get_hashtree(Key, HashTree) -> Hash = hash(Key), Index = hash_to_index(Hash), - Tree = array:get(Index, HashTree), - case gb_trees:lookup(Hash, Tree) of - {value, List} -> - List; - _ -> - [] - end. + lookup_positions(HashTree, Index, Hash). -%% Add to hash tree - this is an array of 256 gb_trees that contains the Hash +%% Add to hash tree - this is an array of 256 skiplists that contains the Hash %% and position of objects which have been added to an open CDB file put_hashtree(Key, Position, HashTree) -> Hash = hash(Key), Index = hash_to_index(Hash), - Tree = array:get(Index, HashTree), - case gb_trees:lookup(Hash, Tree) of - none -> - array:set(Index, gb_trees:insert(Hash, [Position], Tree), HashTree); - {value, L} -> - array:set(Index, gb_trees:update(Hash, [Position|L], Tree), HashTree) - end. + add_position_tohashtree(HashTree, Index, Hash, Position). %% Function to extract a Key-Value pair given a file handle and a position %% Will confirm that the key matches and do a CRC check @@ -920,7 +908,7 @@ extract_key_value_check(Handle, Position) -> %% Scan through the file until there is a failure to crc check an input, and %% at that point return the position and the key dictionary scanned so far startup_scan_over_file(Handle, Position) -> - HashTree = array:new(256, {default, gb_trees:empty()}), + HashTree = new_hashtree(), scan_over_file(Handle, Position, fun startup_filter/5, @@ -1148,7 +1136,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, QuickCheck) -> % key/value binary in the file. write_key_value_pairs(Handle, KeyValueList) -> {ok, Position} = file:position(Handle, cur), - HashTree = array:new(256, {default, gb_trees:empty()}), + HashTree = new_hashtree(), write_key_value_pairs(Handle, KeyValueList, {Position, HashTree}). write_key_value_pairs(_, [], Acc) -> @@ -1180,12 +1168,11 @@ perform_write_hash_tables(Handle, HashTreeBin, StartPos) -> write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) -> {IndexList, HashTreeBin}; write_hash_tables([Index|Rest], HashTree, CurrPos, IndexList, HashTreeBin) -> - Tree = array:get(Index, HashTree), - case gb_trees:keys(Tree) of - [] -> + case is_empty(HashTree, Index) of + true -> write_hash_tables(Rest, HashTree, CurrPos, IndexList, HashTreeBin); - _ -> - HashList = gb_trees:to_list(Tree), + false -> + HashList = to_list(HashTree, Index), BinList = build_binaryhashlist(HashList, []), IndexLength = length(BinList) * 2, SlotList = lists:duplicate(IndexLength, <<0:32, 0:32>>), @@ -1285,27 +1272,13 @@ write_top_index_table(Handle, BasePos, List) -> %% To make this compatible with original Bernstein format this endian flip %% and also the use of the standard hash function required. -%% -%% Hash function contains mysterious constants, some explanation here as to -%% what they are - -%% http://stackoverflow.com/ ++ -%% questions/10696223/reason-for-5381-number-in-djb-hash-function endian_flip(Int) -> <> = <>, X. hash(Key) -> - BK = term_to_binary(Key), - H = 5381, - hash1(H, BK) band 16#FFFFFFFF. - -hash1(H, <<>>) -> - H; -hash1(H, <>) -> - H1 = H * 33, - H2 = H1 bxor B, - hash1(H2, Rest). + leveled_codec:magic_hash(Key). % Get the least significant 8 bits from the hash. hash_to_index(Hash) -> @@ -1341,6 +1314,47 @@ multi_key_value_to_record(KVList, BinaryMode, LastPosition) -> {[], <<>>, empty}, KVList). +%%%============================================================================ +%%% HashTree Implementation +%%%============================================================================ + +lookup_positions(HashTree, Index, Hash) -> + Tree = array:get(Index, HashTree), + case leveled_skiplist:lookup(Hash, Tree) of + {value, List} -> + List; + _ -> + [] + end. + +add_position_tohashtree(HashTree, Index, Hash, Position) -> + Tree = array:get(Index, HashTree), + case leveled_skiplist:lookup(Hash, Tree) of + none -> + array:set(Index, + leveled_skiplist:enter(Hash, [Position], Tree), + HashTree); + {value, L} -> + array:set(Index, + leveled_skiplist:enter(Hash, [Position|L], Tree), + HashTree) + end. + +new_hashtree() -> + array:new(256, {default, leveled_skiplist:empty()}). + +is_empty(HashTree, Index) -> + Tree = array:get(Index, HashTree), + case leveled_skiplist:size(Tree) of + 0 -> + true; + _ -> + false + end. + +to_list(HashTree, Index) -> + Tree = array:get(Index, HashTree), + leveled_skiplist:to_list(Tree). %%%%%%%%%%%%%%%% % T E S T @@ -1402,16 +1416,16 @@ write_key_value_pairs_1_test() -> Index1 = hash_to_index(Hash1), Hash2 = hash("key2"), Index2 = hash_to_index(Hash2), - R0 = array:new(256, {default, gb_trees:empty()}), + R0 = array:new(256, {default, leveled_skiplist:empty()}), R1 = array:set(Index1, - gb_trees:insert(Hash1, - [0], - array:get(Index1, R0)), + leveled_skiplist:enter(Hash1, + [0], + array:get(Index1, R0)), R0), R2 = array:set(Index2, - gb_trees:insert(Hash2, - [30], - array:get(Index2, R1)), + leveled_skiplist:enter(Hash2, + [30], + array:get(Index2, R1)), R1), io:format("HashTree is ~w~n", [HashTree]), io:format("Expected HashTree is ~w~n", [R2]), @@ -1421,16 +1435,16 @@ write_key_value_pairs_1_test() -> write_hash_tables_1_test() -> {ok, Handle} = file:open("../test/testx.cdb", [write]), - R0 = array:new(256, {default, gb_trees:empty()}), + R0 = array:new(256, {default, leveled_skiplist:empty()}), R1 = array:set(64, - gb_trees:insert(6383014720, - [18], - array:get(64, R0)), + leveled_skiplist:enter(6383014720, + [18], + array:get(64, R0)), R0), R2 = array:set(67, - gb_trees:insert(6383014723, - [0], - array:get(67, R1)), + leveled_skiplist:enter(6383014723, + [0], + array:get(67, R1)), R1), Result = write_hash_tables(Handle, R2), io:format("write hash tables result of ~w ~n", [Result]), diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 8903198..f08e2e9 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,6 +39,7 @@ strip_to_statusonly/1, strip_to_keyseqstatusonly/1, strip_to_keyseqonly/1, + strip_to_seqnhashonly/1, striphead_to_details/1, is_active/3, endkey_passed/2, @@ -62,11 +63,37 @@ convert_indexspecs/5, generate_uuid/0, integer_now/0, - riak_extract_metadata/2]). + riak_extract_metadata/2, + magic_hash/1]). -define(V1_VERS, 1). -define(MAGIC, 53). % riak_kv -> riak_object +%% Use DJ Bernstein magic hash function. Note, this is more expensive than +%% phash2 but provides a much more balanced result. +%% +%% Hash function contains mysterious constants, some explanation here as to +%% what they are - +%% http://stackoverflow.com/questions/10696223/reason-for-5381-number-in-djb-hash-function + +magic_hash({?RIAK_TAG, Bucket, Key, _SubKey}) -> + magic_hash({Bucket, Key}); +magic_hash({?STD_TAG, Bucket, Key, _SubKey}) -> + magic_hash({Bucket, Key}); +magic_hash({?IDX_TAG, _B, _Idx, _Key}) -> + no_lookup; +magic_hash(AnyKey) -> + BK = term_to_binary(AnyKey), + H = 5381, + hash1(H, BK) band 16#FFFFFFFF. + +hash1(H, <<>>) -> + H; +hash1(H, <>) -> + H1 = H * 33, + H2 = H1 bxor B, + hash1(H2, Rest). + %% Credit to %% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl @@ -87,15 +114,18 @@ inker_reload_strategy(AltList) -> strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _V}) -> K. -strip_to_keyseqstatusonly({K, {SeqN, St, _MD}}) -> {K, SeqN, St}. +strip_to_keyseqstatusonly({K, {SeqN, St, _, _MD}}) -> {K, SeqN, St}. -strip_to_statusonly({_, {_, St, _}}) -> St. +strip_to_statusonly({_, {_, St, _, _}}) -> St. -strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. +strip_to_seqonly({_, {SeqN, _, _, _}}) -> SeqN. -strip_to_keyseqonly({LK, {SeqN, _, _}}) -> {LK, SeqN}. +strip_to_keyseqonly({LK, {SeqN, _, _, _}}) -> {LK, SeqN}. + +strip_to_seqnhashonly({_, {SeqN, _, MH, _}}) -> {SeqN, MH}. + +striphead_to_details({SeqN, St, MH, MD}) -> {SeqN, St, MH, MD}. -striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. key_dominates(LeftKey, RightKey) -> case {LeftKey, RightKey} of @@ -103,10 +133,10 @@ key_dominates(LeftKey, RightKey) -> left_hand_first; {{LK, _LVAL}, {RK, _RVAL}} when RK < LK -> right_hand_first; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + {{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}} when LK == RK, LSN >= RSN -> left_hand_dominant; - {{LK, {LSN, _LST, _LMD}}, {RK, {RSN, _RST, _RMD}}} + {{LK, {LSN, _LST, _LMH, _LMD}}, {RK, {RSN, _RST, _RMH, _RMD}}} when LK == RK, LSN < RSN -> right_hand_dominant end. @@ -195,7 +225,9 @@ compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> {TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; TagStrat -> {TagStrat, null} - end. + end; +compact_inkerkvc(_KVC, _Strategy) -> + skip. split_inkvalue(VBin) -> case is_binary(VBin) of @@ -218,8 +250,6 @@ create_value_for_journal(Value) -> Value end. - - hash(Obj) -> erlang:phash2(term_to_binary(Obj)). @@ -273,7 +303,7 @@ convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> end, {to_ledgerkey(Bucket, Key, ?IDX_TAG, IdxField, IdxValue), - {SQN, Status, null}} + {SQN, Status, no_lookup, null}} end, IndexSpecs). @@ -285,9 +315,11 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) -> _ -> {active, TS} end, - {Bucket, - Key, - {PrimaryKey, {SQN, Status, extract_metadata(Obj, Size, Tag)}}}. + Value = {SQN, + Status, + magic_hash(PrimaryKey), + extract_metadata(Obj, Size, Tag)}, + {Bucket, Key, {PrimaryKey, Value}}. integer_now() -> @@ -304,7 +336,7 @@ extract_metadata(Obj, Size, ?STD_TAG) -> get_size(PK, Value) -> {Tag, _Bucket, _Key, _} = PK, - {_, _, MD} = Value, + {_, _, _, MD} = Value, case Tag of ?RIAK_TAG -> {_RMD, _VC, _Hash, Size} = MD, @@ -316,7 +348,7 @@ get_size(PK, Value) -> get_keyandhash(LK, Value) -> {Tag, Bucket, Key, _} = LK, - {_, _, MD} = Value, + {_, _, _, MD} = Value, case Tag of ?RIAK_TAG -> {_RMD, _VC, Hash, _Size} = MD, @@ -331,8 +363,8 @@ build_metadata_object(PrimaryKey, MD) -> {Tag, _Bucket, _Key, null} = PrimaryKey, case Tag of ?RIAK_TAG -> - {SibMetaBinList, Vclock, _Hash, _Size} = MD, - riak_metadata_to_binary(Vclock, SibMetaBinList); + {SibCount, Vclock, _Hash, _Size} = MD, + riak_metadata_to_binary(Vclock, SibCount); ?STD_TAG -> MD end. @@ -341,55 +373,24 @@ build_metadata_object(PrimaryKey, MD) -> riak_extract_metadata(delete, Size) -> {delete, null, null, Size}; riak_extract_metadata(ObjBin, Size) -> - {Vclock, SibMetaBinList} = riak_metadata_from_binary(ObjBin), - {SibMetaBinList, Vclock, erlang:phash2(ObjBin), Size}. + {Vclock, SibCount} = riak_metadata_from_binary(ObjBin), + {SibCount, Vclock, erlang:phash2(ObjBin), Size}. %% <>. -riak_metadata_to_binary(Vclock, SibMetaBinList) -> +riak_metadata_to_binary(Vclock, SibCount) -> VclockBin = term_to_binary(Vclock), VclockLen = byte_size(VclockBin), - SibCount = length(SibMetaBinList), - SibsBin = slimbin_contents(SibMetaBinList), <>. + VclockBin:VclockLen/binary, SibCount:32/integer>>. -% Fixes the value length for each sibling to be zero, and so includes no value -slimbin_content(MetaBin) -> - MetaLen = byte_size(MetaBin), - <<0:32/integer, MetaLen:32/integer, MetaBin:MetaLen/binary>>. - -slimbin_contents(SibMetaBinList) -> - F = fun(MetaBin, Acc) -> - <> - end, - lists:foldl(F, <<>>, SibMetaBinList). - riak_metadata_from_binary(V1Binary) -> <> = V1Binary, - <> = Rest, - SibMetaBinList = - case SibCount of - 0 -> - []; - SC when is_integer(SC) -> - get_metadata_from_siblings(SibsBin, SibCount, []) - end, - {binary_to_term(VclockBin), SibMetaBinList}. - -get_metadata_from_siblings(<<>>, 0, SibMetaBinList) -> - SibMetaBinList; -get_metadata_from_siblings(<>, - SibCount, - SibMetaBinList) -> - <<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0, - <> = Rest1, - get_metadata_from_siblings(Rest2, - SibCount - 1, - [MetaBin|SibMetaBinList]). - + <> = Rest, + {binary_to_term(VclockBin), SibCount}. + @@ -406,11 +407,14 @@ indexspecs_test() -> {remove, "t1_bin", "abdc456"}], Changes = convert_indexspecs(IndexSpecs, "Bucket", "Key2", 1, infinity), ?assertMatch({{i, "Bucket", {"t1_int", 456}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(1, Changes)), + {1, {active, infinity}, no_lookup, null}}, + lists:nth(1, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "adbc123"}, "Key2"}, - {1, {active, infinity}, null}}, lists:nth(2, Changes)), + {1, {active, infinity}, no_lookup, null}}, + lists:nth(2, Changes)), ?assertMatch({{i, "Bucket", {"t1_bin", "abdc456"}, "Key2"}, - {1, tomb, null}}, lists:nth(3, Changes)). + {1, tomb, no_lookup, null}}, + lists:nth(3, Changes)). endkey_passed_test() -> TestKey = {i, null, null, null}, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index a060774..c612367 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -238,19 +238,26 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - FN = leveled_cdb:cdb_filename(CDB), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), - R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> - {{SQN, _Type, PK}, Size} = KS, - Check = FilterFun(FilterServer, PK, SQN), - case {Check, SQN > MaxSQN} of - {true, _} -> - {ActSize + Size - ?CRC_SIZE, RplSize}; - {false, true} -> - {ActSize + Size - ?CRC_SIZE, RplSize}; - _ -> - {ActSize, RplSize + Size - ?CRC_SIZE} - end end, - {0, 0}, - KeySizeList), + + FoldFunForSizeCompare = + fun(KS, {ActSize, RplSize}) -> + case KS of + {{SQN, _Type, PK}, Size} -> + Check = FilterFun(FilterServer, PK, SQN), + case {Check, SQN > MaxSQN} of + {true, _} -> + {ActSize + Size - ?CRC_SIZE, RplSize}; + {false, true} -> + {ActSize + Size - ?CRC_SIZE, RplSize}; + _ -> + {ActSize, RplSize + Size - ?CRC_SIZE} + end; + _ -> + {ActSize, RplSize} + end + end, + + R0 = lists:foldl(FoldFunForSizeCompare, {0, 0}, KeySizeList), {ActiveSize, ReplacedSize} = R0, Score = case ActiveSize + ReplacedSize of 0 -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index cb00883..9a37cae 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -633,13 +633,13 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos, FN, Rest) -> leveled_log:log("I0014", [FN, MinSQN]), - InitAcc = {MinSQN, MaxSQN, leveled_skiplist:empty()}, + InitAcc = {MinSQN, MaxSQN, leveled_bookie:empty_ledgercache()}, Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), + {eof, {AccMinSQN, _AccMaxSQN, AccLC}} -> + ok = push_to_penciller(Penciller, AccLC), {ok, AccMinSQN}; - {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), + {LastPosition, {_AccMinSQN, _AccMaxSQN, AccLC}} -> + ok = push_to_penciller(Penciller, AccLC), NextSQN = MaxSQN + 1, load_between_sequence(NextSQN, NextSQN + ?LOADING_BATCH, @@ -657,14 +657,13 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, ok end. -push_to_penciller(Penciller, KeyTree) -> +push_to_penciller(Penciller, LedgerCache) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading - R = leveled_penciller:pcl_pushmem(Penciller, KeyTree), - case R of + case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of returned -> timer:sleep(?LOADING_PAUSE), - push_to_penciller(Penciller, KeyTree); + push_to_penciller(Penciller, LedgerCache); ok -> ok end. @@ -739,7 +738,7 @@ initiate_penciller_snapshot(Bookie) -> {ok, {LedgerSnap, LedgerCache}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, LedgerCache), + leveled_bookie:load_snapshot(LedgerSnap, LedgerCache), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 6c7e4cb..f2306ce 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -104,6 +104,8 @@ {info, "L0 completion confirmed and will transition to not pending"}}, {"P0030", {warn, "We're doomed - intention recorded to destroy all files"}}, + {"P0031", + {info, "Completion of update to levelzero"}}, {"PC001", {info, "Penciller's clerk ~w started with owner ~w"}}, @@ -165,7 +167,7 @@ {"I0013", {info, "File ~s to be removed from manifest"}}, {"I0014", - {info, "On startup oading from filename ~s from SQN ~w"}}, + {info, "On startup loading from filename ~s from SQN ~w"}}, {"I0015", {info, "Opening manifest file at ~s with SQN ~w"}}, {"I0016", @@ -198,8 +200,6 @@ {"IC011", {info, "Not clearing filename ~s as modified delta is only ~w seconds"}}, - {"PM001", - {info, "Indexed new cache entry with total L0 cache size now ~w"}}, {"PM002", {info, "Completed dump of L0 cache to list of size ~w"}}, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index bbd2dae..b5f8e3f 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -363,11 +363,11 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> BNumber = string:right(integer_to_list(BucketLow + random:uniform(BRange)), 4, $0), KNumber = string:right(integer_to_list(random:uniform(1000)), 4, $0), - RandKey = {{o, - "Bucket" ++ BNumber, - "Key" ++ KNumber}, - {Count + 1, - {active, infinity}, null}}, + K = {o, "Bucket" ++ BNumber, "Key" ++ KNumber}, + RandKey = {K, {Count + 1, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange). choose_pid_toquery([ManEntry|_T], Key) when @@ -392,19 +392,19 @@ find_randomkeys(FList, Count, Source) -> merge_file_test() -> - KL1_L1 = lists:sort(generate_randomkeys(16000, 0, 1000)), + KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), {ok, PidL1_1, _} = leveled_sft:sft_new("../test/KL1_L1.sft", KL1_L1, [], 1), - KL1_L2 = lists:sort(generate_randomkeys(16000, 0, 250)), + KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), {ok, PidL2_1, _} = leveled_sft:sft_new("../test/KL1_L2.sft", KL1_L2, [], 2), - KL2_L2 = lists:sort(generate_randomkeys(16000, 250, 250)), + KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), {ok, PidL2_2, _} = leveled_sft:sft_new("../test/KL2_L2.sft", KL2_L2, [], 2), - KL3_L2 = lists:sort(generate_randomkeys(16000, 500, 250)), + KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), {ok, PidL2_3, _} = leveled_sft:sft_new("../test/KL3_L2.sft", KL3_L2, [], 2), - KL4_L2 = lists:sort(generate_randomkeys(16000, 750, 250)), + KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), {ok, PidL2_4, _} = leveled_sft:sft_new("../test/KL4_L2.sft", KL4_L2, [], 2), Result = perform_merge({PidL1_1, "../test/KL1_L1.sft"}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 94bac54..d5b70d1 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -168,9 +168,11 @@ pcl_pushmem/2, pcl_fetchlevelzero/2, pcl_fetch/2, + pcl_fetch/3, pcl_fetchkeys/5, pcl_fetchnextkey/5, pcl_checksequencenumber/3, + pcl_checksequencenumber/4, pcl_workforclerk/1, pcl_promptmanifestchange/2, pcl_confirml0complete/4, @@ -195,10 +197,11 @@ -define(CURRENT_FILEX, "crr"). -define(PENDING_FILEX, "pnd"). -define(MEMTABLE, mem). --define(MAX_TABLESIZE, 32000). +-define(MAX_TABLESIZE, 28000). % This is less than max - but COIN_SIDECOUNT +-define(SUPER_MAX_TABLE_SIZE, 40000). -define(PROMPT_WAIT_ONL0, 5). -define(WORKQUEUE_BACKLOG_TOLERANCE, 4). - +-define(COIN_SIDECOUNT, 5). -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), @@ -213,15 +216,15 @@ levelzero_pending = false :: boolean(), levelzero_constructor :: pid(), levelzero_cache = [] :: list(), % a list of skiplists - levelzero_index, - % is an array - but cannot specif due to OTP compatability levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), + levelzero_cointoss = false :: boolean(), + levelzero_index, % may be none or an ETS table reference is_snapshot = false :: boolean(), snapshot_fully_loaded = false :: boolean(), source_penciller :: pid(), - levelzero_astree :: list(), % skiplist + levelzero_astree :: list(), ongoing_work = [] :: list(), work_backlog = false :: boolean()}). @@ -235,9 +238,9 @@ pcl_start(PCLopts) -> gen_server:start(?MODULE, [PCLopts], []). -pcl_pushmem(Pid, DumpList) -> +pcl_pushmem(Pid, LedgerCache) -> %% Bookie to dump memory onto penciller - gen_server:call(Pid, {push_mem, DumpList}, infinity). + gen_server:call(Pid, {push_mem, LedgerCache}, infinity). pcl_fetchlevelzero(Pid, Slot) -> %% Timeout to cause crash of L0 file when it can't get the close signal @@ -248,7 +251,14 @@ pcl_fetchlevelzero(Pid, Slot) -> gen_server:call(Pid, {fetch_levelzero, Slot}, 60000). pcl_fetch(Pid, Key) -> - gen_server:call(Pid, {fetch, Key}, infinity). + Hash = leveled_codec:magic_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {fetch, Key, Hash}, infinity) + end. + +pcl_fetch(Pid, Key, Hash) -> + gen_server:call(Pid, {fetch, Key, Hash}, infinity). pcl_fetchkeys(Pid, StartKey, EndKey, AccFun, InitAcc) -> gen_server:call(Pid, @@ -261,7 +271,14 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> infinity). pcl_checksequencenumber(Pid, Key, SQN) -> - gen_server:call(Pid, {check_sqn, Key, SQN}, infinity). + Hash = leveled_codec:magic_hash(Key), + if + Hash /= no_lookup -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity) + end. + +pcl_checksequencenumber(Pid, Key, Hash, SQN) -> + gen_server:call(Pid, {check_sqn, Key, Hash, SQN}, infinity). pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). @@ -312,8 +329,9 @@ init([PCLopts]) -> end. -handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) - when Snap == false -> +handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}}, + From, + State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: % % 1 - Receive a gb_tree containing the latest Key/Value pairs (note that @@ -341,26 +359,27 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap}) false -> leveled_log:log("P0018", [ok, false, false]), gen_server:reply(From, ok), - {noreply, update_levelzero(State#state.levelzero_index, - State#state.levelzero_size, - PushedTree, + {noreply, update_levelzero(State#state.levelzero_size, + {PushedTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache, State)} end; -handle_call({fetch, Key}, _From, State) -> +handle_call({fetch, Key, Hash}, _From, State) -> {reply, fetch_mem(Key, + Hash, State#state.manifest, - State#state.levelzero_index, - State#state.levelzero_cache), + State#state.levelzero_cache, + State#state.levelzero_index), State}; -handle_call({check_sqn, Key, SQN}, _From, State) -> +handle_call({check_sqn, Key, Hash, SQN}, _From, State) -> {reply, compare_to_sqn(fetch_mem(Key, + Hash, State#state.manifest, - State#state.levelzero_index, - State#state.levelzero_cache), + State#state.levelzero_cache, + State#state.levelzero_index), SQN), State}; handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, @@ -393,16 +412,15 @@ handle_call(get_startup_sqn, _From, State) -> handle_call({register_snapshot, Snapshot}, _From, State) -> Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {ok, State}, State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, BookieIncrTree}, _From, State) -> - L0D = leveled_pmem:add_to_index(State#state.levelzero_index, - State#state.levelzero_size, - BookieIncrTree, +handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) -> + L0D = leveled_pmem:add_to_cache(State#state.levelzero_size, + {BookieIncrTree, MinSQN, MaxSQN}, State#state.ledger_sqn, State#state.levelzero_cache), - {LedgerSQN, L0Size, L0Index, L0Cache} = L0D, + {LedgerSQN, L0Size, L0Cache} = L0D, {reply, ok, State#state{levelzero_cache=L0Cache, - levelzero_index=L0Index, levelzero_size=L0Size, + levelzero_index=none, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; handle_call({fetch_levelzero, Slot}, _From, State) -> @@ -448,11 +466,11 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) -> filename=FN}, UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}), % Prompt clerk to ask about work - do this for every L0 roll + leveled_pmem:clear_index(State#state.levelzero_index), ok = leveled_pclerk:clerk_prompt(State#state.clerk), {noreply, State#state{levelzero_cache=[], levelzero_pending=false, levelzero_constructor=undefined, - levelzero_index=leveled_pmem:new_index(), levelzero_size=0, manifest=UpdMan, persisted_sqn=State#state.ledger_sqn}}. @@ -537,10 +555,17 @@ start_from_file(PCLopts) -> end, {ok, MergeClerk} = leveled_pclerk:clerk_new(self()), + + CoinToss = PCLopts#penciller_options.levelzero_cointoss, + % Used to randomly defer the writing of L0 file. Intended to help with + % vnode syncronisation issues (e.g. stop them all by default merging to + % level zero concurrently) + InitState = #state{clerk=MergeClerk, root_path=RootPath, - levelzero_index = leveled_pmem:new_index(), - levelzero_maxcachesize=MaxTableSize}, + levelzero_maxcachesize=MaxTableSize, + levelzero_cointoss=CoinToss, + levelzero_index=leveled_pmem:new_index()}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", @@ -614,32 +639,51 @@ start_from_file(PCLopts) -> -update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> - Update = leveled_pmem:add_to_index(L0Index, - L0Size, - PushedTree, +update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN}, + LedgerSQN, L0Cache, State) -> + SW = os:timestamp(), + Update = leveled_pmem:add_to_cache(L0Size, + {PushedTree, MinSQN, MaxSQN}, LedgerSQN, L0Cache), - {MaxSQN, NewL0Size, UpdL0Index, UpdL0Cache} = Update, + leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index), + + {UpdMaxSQN, NewL0Size, UpdL0Cache} = Update, if - MaxSQN >= LedgerSQN -> + UpdMaxSQN >= LedgerSQN -> UpdState = State#state{levelzero_cache=UpdL0Cache, - levelzero_index=UpdL0Index, levelzero_size=NewL0Size, - ledger_sqn=MaxSQN}, + ledger_sqn=UpdMaxSQN}, CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize, + CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE, Level0Free = length(get_item(0, State#state.manifest, [])) == 0, - case {CacheTooBig, Level0Free} of - {true, true} -> - L0Constructor = roll_memory(UpdState, false), + RandomFactor = + case State#state.levelzero_cointoss of + true -> + case random:uniform(?COIN_SIDECOUNT) of + 1 -> + true; + _ -> + false + end; + false -> + true + end, + JitterCheck = RandomFactor or CacheMuchTooBig, + case {CacheTooBig, Level0Free, JitterCheck} of + {true, true, true} -> + L0Constructor = roll_memory(UpdState, false), + leveled_log:log_timer("P0031", [], SW), UpdState#state{levelzero_pending=true, levelzero_constructor=L0Constructor}; _ -> + leveled_log:log_timer("P0031", [], SW), UpdState end; + NewL0Size == L0Size -> + leveled_log:log_timer("P0031", [], SW), State#state{levelzero_cache=L0Cache, - levelzero_index=L0Index, levelzero_size=L0Size, ledger_sqn=LedgerSQN} end. @@ -687,13 +731,21 @@ levelzero_filename(State) -> FileName. -fetch_mem(Key, Manifest, L0Index, L0Cache) -> - L0Check = leveled_pmem:check_levelzero(Key, L0Index, L0Cache), + +fetch_mem(Key, Hash, Manifest, L0Cache, none) -> + L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache), case L0Check of {false, not_found} -> fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2); {true, KV} -> KV + end; +fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) -> + case leveled_pmem:check_index(Hash, L0Index) of + true -> + fetch_mem(Key, Hash, Manifest, L0Cache, none); + false -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) end. fetch(_Key, _Manifest, ?MAX_LEVELS + 1, _FetchFun) -> @@ -706,8 +758,8 @@ fetch(Key, Manifest, Level, FetchFun) -> Key >= File#manifest_entry.start_key, File#manifest_entry.end_key >= Key -> File#manifest_entry.owner; - PidFound -> - PidFound + FoundDetails -> + FoundDetails end end, not_present, LevelManifest) of @@ -1263,9 +1315,13 @@ confirm_delete_test() -> maybe_pause_push(PCL, KL) -> - T0 = leveled_skiplist:empty(), - T1 = lists:foldl(fun({K, V}, Acc) -> leveled_skiplist:enter(K, V, Acc) end, - T0, + T0 = leveled_skiplist:empty(true), + T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) -> + SL = leveled_skiplist:enter(K, V, AccSL), + SQN = leveled_codec:strip_to_seqonly({K, V}), + {SL, min(SQN, MinSQN), max(SQN, MaxSQN)} + end, + {T0, infinity, 0}, KL), case pcl_pushmem(PCL, T1) of returned -> @@ -1275,23 +1331,32 @@ maybe_pause_push(PCL, KL) -> ok end. +%% old test data doesn't have the magic hash +add_missing_hash({K, {SQN, ST, MD}}) -> + {K, {SQN, ST, leveled_codec:magic_hash(K), MD}}. + + simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001", null}, - {1, {active, infinity}, null}}, + Key1_Pre = {{o,"Bucket0001", "Key0001", null}, + {1, {active, infinity}, null}}, + Key1 = add_missing_hash(Key1_Pre), KL1 = leveled_sft:generate_randomkeys({1000, 2}), - Key2 = {{o,"Bucket0002", "Key0002", null}, + Key2_Pre = {{o,"Bucket0002", "Key0002", null}, {1002, {active, infinity}, null}}, + Key2 = add_missing_hash(Key2_Pre), KL2 = leveled_sft:generate_randomkeys({900, 1003}), % Keep below the max table size by having 900 not 1000 - Key3 = {{o,"Bucket0003", "Key0003", null}, + Key3_Pre = {{o,"Bucket0003", "Key0003", null}, {2003, {active, infinity}, null}}, + Key3 = add_missing_hash(Key3_Pre), KL3 = leveled_sft:generate_randomkeys({1000, 2004}), - Key4 = {{o,"Bucket0004", "Key0004", null}, + Key4_Pre = {{o,"Bucket0004", "Key0004", null}, {3004, {active, infinity}, null}}, + Key4 = add_missing_hash(Key4_Pre), KL4 = leveled_sft:generate_randomkeys({1000, 3005}), ok = maybe_pause_push(PCL, [Key1]), ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), @@ -1331,7 +1396,8 @@ simple_server_test() -> SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap, leveled_skiplist:empty()), + leveled_bookie:load_snapshot(PclSnap, + leveled_bookie:empty_ledgercache()), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), @@ -1363,7 +1429,9 @@ simple_server_test() -> % Add some more keys and confirm that check sequence number still % sees the old version in the previous snapshot, but will see the new version % in a new snapshot - Key1A = {{o,"Bucket0001", "Key0001", null}, {4005, {active, infinity}, null}}, + Key1A_Pre = {{o,"Bucket0001", "Key0001", null}, + {4005, {active, infinity}, null}}, + Key1A = add_missing_hash(Key1A_Pre), KL1A = leveled_sft:generate_randomkeys({2000, 4006}), ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, KL1A), @@ -1380,7 +1448,7 @@ simple_server_test() -> term_to_binary("Hello")), {ok, PclSnap2} = pcl_start(SnapOpts), - ok = pcl_loadsnapshot(PclSnap2, leveled_skiplist:empty()), + leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()), ?assertMatch(false, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", @@ -1486,23 +1554,26 @@ simple_findnextkey_test() -> sqnoverlap_findnextkey_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], {Array2, KV1} = find_nextkey(QueryArray, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, KV1), + ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + KV1), {Array3, KV2} = find_nextkey(Array2, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}, KV2), + ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}, + KV2), {Array4, KV3} = find_nextkey(Array3, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, null}}, KV3), + ?assertMatch({{o, "Bucket1", "Key5"}, {4, {active, infinity}, 0, null}}, + KV3), ER = find_nextkey(Array4, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), @@ -1510,23 +1581,26 @@ sqnoverlap_findnextkey_test() -> sqnoverlap_otherway_findnextkey_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {1, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], {Array2, KV1} = find_nextkey(QueryArray, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, KV1), + ?assertMatch({{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + KV1), {Array3, KV2} = find_nextkey(Array2, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}, KV2), + ?assertMatch({{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}, + KV2), {Array4, KV3} = find_nextkey(Array3, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), - ?assertMatch({{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}, KV3), + ?assertMatch({{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}, + KV3), ER = find_nextkey(Array4, {o, "Bucket1", "Key0"}, {o, "Bucket1", "Key5"}), @@ -1534,19 +1608,19 @@ sqnoverlap_otherway_findnextkey_test() -> foldwithimm_simple_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, null}}, - {{o, "Bucket1", "Key5"}, {1, {active, infinity}, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, null}}]} + {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} ], IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"}, - {7, {active, infinity}, null}, + {7, {active, infinity}, 0, null}, leveled_skiplist:empty()), IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, + {8, {active, infinity}, 0, null}, IMM0), IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"}, - {9, {active, infinity}, null}, + {9, {active, infinity}, 0, null}, IMM1), IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}), AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), @@ -1561,7 +1635,7 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key6"}, 7}], Acc), IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, null}, + {8, {active, infinity}, 0, null}, leveled_skiplist:empty()), IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}), AccA = keyfolder(IMMiterA, @@ -1573,7 +1647,7 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key5"}, 2}], AccA), IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"}, - {10, {active, infinity}, null}, + {10, {active, infinity}, 0, null}, IMM2), IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}), AccB = keyfolder(IMMiterB, @@ -1668,14 +1742,15 @@ badmanifest_test() -> clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001", null}, + Key1_pre = {{o,"Bucket0001", "Key0001", null}, {1001, {active, infinity}, null}}, + Key1 = add_missing_hash(Key1_pre), KL1 = leveled_sft:generate_randomkeys({1000, 1}), ok = maybe_pause_push(PCL, KL1 ++ [Key1]), %% Added together, as split apart there will be a race between the close %% call to the penciller and the second fetch of the cache entry - ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), + ?assertMatch(Key1, pcl_fetch(PCL, {o, "Bucket0001", "Key0001", null})), timer:sleep(100), % Avoids confusion if L0 file not written before close ok = pcl_close(PCL), diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 39dd0c6..0c61acf 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -42,54 +42,63 @@ -include("include/leveled.hrl"). -export([ - add_to_index/5, + add_to_cache/4, to_list/2, - new_index/0, check_levelzero/3, - merge_trees/4 + merge_trees/4, + add_to_index/2, + new_index/0, + clear_index/1, + check_index/2 ]). -include_lib("eunit/include/eunit.hrl"). --define(SLOT_WIDTH, {4096, 12}). - %%%============================================================================ %%% API %%%============================================================================ -add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> - SW = os:timestamp(), - SlotInTreeList = length(TreeList) + 1, - FoldFun = fun({K, V}, {AccMinSQN, AccMaxSQN, AccCount, HashIndex}) -> - SQN = leveled_codec:strip_to_seqonly({K, V}), - {Hash, Slot} = hash_to_slot(K), - L = array:get(Slot, HashIndex), - Count0 = case lists:keymember(Hash, 1, L) of - true -> - AccCount; - false -> - AccCount + 1 - end, - {min(SQN, AccMinSQN), - max(SQN, AccMaxSQN), - Count0, - array:set(Slot, [{Hash, SlotInTreeList}|L], HashIndex)} - end, - LM1List = leveled_skiplist:to_list(LevelMinus1), - StartingT = {infinity, 0, L0Size, L0Index}, - {MinSQN, MaxSQN, NewL0Size, UpdL0Index} = lists:foldl(FoldFun, - StartingT, - LM1List), - leveled_log:log_timer("PM001", [NewL0Size], SW), - if - MinSQN > LedgerSQN -> - {MaxSQN, - NewL0Size, - UpdL0Index, - lists:append(TreeList, [LevelMinus1])} +add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> + LM1Size = leveled_skiplist:size(LevelMinus1), + case LM1Size of + 0 -> + {LedgerSQN, L0Size, TreeList}; + _ -> + if + MinSQN >= LedgerSQN -> + {MaxSQN, + L0Size + LM1Size, + lists:append(TreeList, [LevelMinus1])} + end + end. + +add_to_index(LevelMinus1, L0Index) -> + IndexAddFun = + fun({_K, V}) -> + {_, _, Hash, _} = leveled_codec:striphead_to_details(V), + case Hash of + no_lookup -> + ok; + _ -> + ets:insert(L0Index, {Hash}) + end + end, + lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)). + +new_index() -> + ets:new(l0index, [private, set]). + +clear_index(L0Index) -> + ets:delete_all_objects(L0Index). + +check_index(Hash, L0Index) -> + case ets:lookup(L0Index, Hash) of + [{Hash}] -> + true; + [] -> + false end. - to_list(Slots, FetchFun) -> SW = os:timestamp(), @@ -105,39 +114,13 @@ to_list(Slots, FetchFun) -> FullList. -new_index() -> - array:new(element(1, ?SLOT_WIDTH), [{default, []}, fixed]). +check_levelzero(Key, TreeList) -> + check_levelzero(Key, leveled_codec:magic_hash(Key), TreeList). - -check_levelzero(Key, L0Index, TreeList) -> - {Hash, Slot} = hash_to_slot(Key), - CheckList = array:get(Slot, L0Index), - SlotList = lists:foldl(fun({H0, S0}, SL) -> - case H0 of - Hash -> - [S0|SL]; - _ -> - SL - end - end, - [], - CheckList), - lists:foldl(fun(SlotToCheck, {Found, KV}) -> - case Found of - true -> - {Found, KV}; - false -> - CheckTree = lists:nth(SlotToCheck, TreeList), - case leveled_skiplist:lookup(Key, CheckTree) of - none -> - {Found, KV}; - {value, Value} -> - {true, {Key, Value}} - end - end - end, - {false, not_found}, - lists:reverse(lists:usort(SlotList))). +check_levelzero(_Key, _Hash, []) -> + {false, not_found}; +check_levelzero(Key, Hash, TreeList) -> + check_slotlist(Key, Hash, lists:seq(1, length(TreeList)), TreeList). merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> @@ -153,11 +136,25 @@ merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> %%% Internal Functions %%%============================================================================ - -hash_to_slot(Key) -> - H = erlang:phash2(Key), - {H bsr element(2, ?SLOT_WIDTH), H band (element(1, ?SLOT_WIDTH) - 1)}. - +check_slotlist(Key, Hash, CheckList, TreeList) -> + SlotCheckFun = + fun(SlotToCheck, {Found, KV}) -> + case Found of + true -> + {Found, KV}; + false -> + CheckTree = lists:nth(SlotToCheck, TreeList), + case leveled_skiplist:lookup(Key, Hash, CheckTree) of + none -> + {Found, KV}; + {value, Value} -> + {true, {Key, Value}} + end + end + end, + lists:foldl(SlotCheckFun, + {false, not_found}, + lists:reverse(CheckList)). %%%============================================================================ %%% Test @@ -168,7 +165,7 @@ hash_to_slot(Key) -> generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> generate_randomkeys(Seqn, Count, - leveled_skiplist:empty(), + leveled_skiplist:empty(true), BucketRangeLow, BucketRangeHigh). @@ -188,45 +185,53 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) -> compare_method_test() -> - R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0Index, L0TreeList}) -> + R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0TreeList}) -> LM1 = generate_randomkeys(LedgerSQN + 1, 2000, 1, 500), - add_to_index(L0Index, L0Size, LM1, LedgerSQN, - L0TreeList) + add_to_cache(L0Size, + {LM1, + LedgerSQN + 1, + LedgerSQN + 2000}, + LedgerSQN, + L0TreeList) end, - {0, 0, new_index(), []}, + {0, 0, []}, lists:seq(1, 16)), - {SQN, Size, Index, TreeList} = R, + {SQN, Size, TreeList} = R, ?assertMatch(32000, SQN), ?assertMatch(true, Size =< 32000), TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)), - S0 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = lists:foldr(fun(Tree, {Found, KV}) -> - case Found of - true -> - {true, KV}; - false -> - L0 = leveled_skiplist:lookup(Key, Tree), - case L0 of - none -> - {false, not_found}; - {value, Value} -> - {true, {Key, Value}} - end + FindKeyFun = + fun(Key) -> + fun(Tree, {Found, KV}) -> + case Found of + true -> + {true, KV}; + false -> + L0 = leveled_skiplist:lookup(Key, Tree), + case L0 of + none -> + {false, not_found}; + {value, Value} -> + {true, {Key, Value}} end - end, - {false, not_found}, - TreeList), - [R0|Acc] - end, - [], - TestList), + end + end + end, + + S0 = lists:foldl(fun({Key, _V}, Acc) -> + R0 = lists:foldr(FindKeyFun(Key), + {false, not_found}, + TreeList), + [R0|Acc] end, + [], + TestList), S1 = lists:foldl(fun({Key, _V}, Acc) -> - R0 = check_levelzero(Key, Index, TreeList), + R0 = check_levelzero(Key, TreeList), [R0|Acc] end, [], diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 70b0b0f..5b4f24e 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -189,9 +189,12 @@ -define(HEADER_LEN, 56). -define(ITERATOR_SCANWIDTH, 1). -define(MERGE_SCANWIDTH, 32). +-define(BLOOM_WIDTH, 48). -define(DELETE_TIMEOUT, 10000). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). -define(DISCARD_EXT, ".discarded"). +-define(WRITE_OPS, [binary, raw, read, write, delayed_write]). +-define(READ_OPS, [binary, raw, read]). -record(state, {version = ?CURRENT_VERSION :: tuple(), slot_index :: list(), @@ -469,7 +472,7 @@ generate_filenames(RootFilename) -> create_file(FileName) when is_list(FileName) -> leveled_log:log("SFT01", [FileName]), ok = filelib:ensure_dir(FileName), - {ok, Handle} = file:open(FileName, [binary, raw, read, write]), + {ok, Handle} = file:open(FileName, ?WRITE_OPS), Header = create_header(initial), {ok, _} = file:position(Handle, bof), ok = file:write(Handle, Header), @@ -508,7 +511,7 @@ open_file(FileMD) -> Slen:32/integer>> = HeaderLengths, {ok, SummaryBin} = file:pread(Handle, ?HEADER_LEN + Blen + Ilen + Flen, Slen), - {{LowSQN, HighSQN}, {LowKey, HighKey}} = binary_to_term(SummaryBin), + {{LowSQN, HighSQN}, {LowKey, HighKey}, _Bloom} = binary_to_term(SummaryBin), {ok, SlotIndexBin} = file:pread(Handle, ?HEADER_LEN + Blen, Ilen), SlotIndex = binary_to_term(SlotIndexBin), {Handle, FileMD#state{slot_index=SlotIndex, @@ -529,10 +532,11 @@ complete_file(Handle, FileMD, KL1, KL2, LevelR) -> complete_file(Handle, FileMD, KL1, KL2, LevelR, false). complete_file(Handle, FileMD, KL1, KL2, LevelR, Rename) -> + EmptyBloom = leveled_tinybloom:empty(?BLOOM_WIDTH), {ok, KeyRemainders} = write_keys(Handle, maybe_expand_pointer(KL1), maybe_expand_pointer(KL2), - [], <<>>, + [], <<>>, EmptyBloom, LevelR, fun sftwrite_function/2), {ReadHandle, UpdFileMD} = case Rename of @@ -767,12 +771,12 @@ get_nextkeyaftermatch([_KTuple|T], KeyToFind, PrevV) -> write_keys(Handle, KL1, KL2, - SlotIndex, SerialisedSlots, + SlotIndex, SerialisedSlots, InitialBloom, LevelR, WriteFun) -> write_keys(Handle, KL1, KL2, {0, 0}, - SlotIndex, SerialisedSlots, + SlotIndex, SerialisedSlots, InitialBloom, {infinity, 0}, null, {last, null}, LevelR, WriteFun). @@ -780,7 +784,7 @@ write_keys(Handle, write_keys(Handle, KL1, KL2, {SlotCount, SlotTotal}, - SlotIndex, SerialisedSlots, + SlotIndex, SerialisedSlots, Bloom, {LSN, HSN}, LowKey, LastKey, LevelR, WriteFun) when SlotCount =:= ?SLOT_GROUPWRITE_COUNT -> @@ -789,26 +793,27 @@ write_keys(Handle, reached -> {complete_keywrite(UpdHandle, SlotIndex, - {LSN, HSN}, {LowKey, LastKey}, + {{LSN, HSN}, {LowKey, LastKey}, Bloom}, WriteFun), {KL1, KL2}}; continue -> write_keys(UpdHandle, KL1, KL2, {0, SlotTotal}, - SlotIndex, <<>>, + SlotIndex, <<>>, Bloom, {LSN, HSN}, LowKey, LastKey, LevelR, WriteFun) end; write_keys(Handle, KL1, KL2, {SlotCount, SlotTotal}, - SlotIndex, SerialisedSlots, + SlotIndex, SerialisedSlots, Bloom, {LSN, HSN}, LowKey, LastKey, LevelR, WriteFun) -> - SlotOutput = create_slot(KL1, KL2, LevelR), + SlotOutput = create_slot(KL1, KL2, LevelR, Bloom), {{LowKey_Slot, SegFilter, SerialisedSlot, LengthList}, {{LSN_Slot, HSN_Slot}, LastKey_Slot, Status}, + UpdBloom, KL1rem, KL2rem} = SlotOutput, UpdSlotIndex = lists:append(SlotIndex, [{LowKey_Slot, SegFilter, LengthList}]), @@ -827,34 +832,34 @@ write_keys(Handle, UpdHandle = WriteFun(slots , {Handle, UpdSlots}), {complete_keywrite(UpdHandle, UpdSlotIndex, - SNExtremes, {FirstKey, FinalKey}, + {SNExtremes, {FirstKey, FinalKey}, UpdBloom}, WriteFun), {KL1rem, KL2rem}}; full -> write_keys(Handle, KL1rem, KL2rem, {SlotCount + 1, SlotTotal + 1}, - UpdSlotIndex, UpdSlots, + UpdSlotIndex, UpdSlots, UpdBloom, SNExtremes, FirstKey, FinalKey, LevelR, WriteFun); complete -> UpdHandle = WriteFun(slots , {Handle, UpdSlots}), {complete_keywrite(UpdHandle, UpdSlotIndex, - SNExtremes, {FirstKey, FinalKey}, + {SNExtremes, {FirstKey, FinalKey}, UpdBloom}, WriteFun), {KL1rem, KL2rem}} end. -complete_keywrite(Handle, SlotIndex, - SNExtremes, {FirstKey, FinalKey}, +complete_keywrite(Handle, + SlotIndex, + {SNExtremes, {FirstKey, FinalKey}, Bloom}, WriteFun) -> ConvSlotIndex = convert_slotindex(SlotIndex), WriteFun(finalise, {Handle, ConvSlotIndex, - SNExtremes, - {FirstKey, FinalKey}}). + {SNExtremes, {FirstKey, FinalKey}, Bloom}}). %% Take a slot index, and remove the SegFilters replacing with pointers @@ -882,16 +887,15 @@ sftwrite_function(slots, {Handle, SerialisedSlots}) -> Handle; sftwrite_function(finalise, {Handle, - {SlotFilters, PointerIndex}, - SNExtremes, - KeyExtremes}) -> + {SlotFilters, PointerIndex}, + {SNExtremes, KeyExtremes, Bloom}}) -> {ok, Position} = file:position(Handle, cur), BlocksLength = Position - ?HEADER_LEN, Index = term_to_binary(PointerIndex), IndexLength = byte_size(Index), FilterLength = byte_size(SlotFilters), - Summary = term_to_binary({SNExtremes, KeyExtremes}), + Summary = term_to_binary({SNExtremes, KeyExtremes, Bloom}), SummaryLength = byte_size(Summary), %% Write Index, Filter and Summary ok = file:write(Handle, < %% Also this should return a partial block if the KeyLists have been exhausted %% but the block is full -create_block(KeyList1, KeyList2, LevelR) -> - create_block(KeyList1, KeyList2, [], {infinity, 0}, [], LevelR). +create_block(KeyList1, KeyList2, LevelR, Bloom) -> + create_block(KeyList1, KeyList2, [], {infinity, 0}, [], LevelR, Bloom). create_block(KeyList1, KeyList2, - BlockKeyList, {LSN, HSN}, SegmentList, _LevelR) + BlockKeyList, {LSN, HSN}, SegmentList, _LevelR, Bloom) when length(BlockKeyList)==?BLOCK_SIZE -> case {KeyList1, KeyList2} of {[], []} -> - {BlockKeyList, complete, {LSN, HSN}, SegmentList, [], []}; + {lists:reverse(BlockKeyList), + complete, + {LSN, HSN}, + SegmentList, + Bloom, + [], []}; _ -> - {BlockKeyList, full, {LSN, HSN}, SegmentList, KeyList1, KeyList2} + {lists:reverse(BlockKeyList), + full, + {LSN, HSN}, + SegmentList, + Bloom, + KeyList1, KeyList2} end; -create_block([], [], - BlockKeyList, {LSN, HSN}, SegmentList, _LevelR) -> - {BlockKeyList, partial, {LSN, HSN}, SegmentList, [], []}; +create_block([], [], BlockKeyList, {LSN, HSN}, SegmentList, _LevelR, Bloom) -> + {lists:reverse(BlockKeyList), + partial, + {LSN, HSN}, + SegmentList, + Bloom, + [], []}; create_block(KeyList1, KeyList2, - BlockKeyList, {LSN, HSN}, SegmentList, LevelR) -> + BlockKeyList, {LSN, HSN}, SegmentList, LevelR, Bloom) -> case key_dominates(KeyList1, KeyList2, {LevelR#level.is_basement, LevelR#level.timestamp}) of {{next_key, TopKey}, Rem1, Rem2} -> - {UpdLSN, UpdHSN} = update_sequencenumbers(TopKey, LSN, HSN), - NewBlockKeyList = lists:append(BlockKeyList, - [TopKey]), - NewSegmentList = lists:append(SegmentList, - [hash_for_segmentid(TopKey)]), + {_K, V} = TopKey, + {SQN, _St, MH, _MD} = leveled_codec:striphead_to_details(V), + {UpdLSN, UpdHSN} = update_sequencenumbers(SQN, LSN, HSN), + UpdBloom = leveled_tinybloom:enter({hash, MH}, Bloom), + NewBlockKeyList = [TopKey|BlockKeyList], + NewSegmentList = [hash_for_segmentid(TopKey)|SegmentList], create_block(Rem1, Rem2, NewBlockKeyList, {UpdLSN, UpdHSN}, - NewSegmentList, LevelR); + NewSegmentList, LevelR, UpdBloom); {skipped_key, Rem1, Rem2} -> create_block(Rem1, Rem2, BlockKeyList, {LSN, HSN}, - SegmentList, LevelR) + SegmentList, LevelR, Bloom) end. @@ -994,45 +1013,55 @@ create_block(KeyList1, KeyList2, %% - Remainder of any KeyLists used to make the slot -create_slot(KeyList1, KeyList2, Level) -> - create_slot(KeyList1, KeyList2, Level, ?BLOCK_COUNT, [], <<>>, [], - {null, infinity, 0, null, full}). +create_slot(KeyList1, KeyList2, Level, Bloom) -> + create_slot(KeyList1, KeyList2, Level, ?BLOCK_COUNT, Bloom, + [], <<>>, [], + {null, infinity, 0, null, full}). %% Keep adding blocks to the slot until either the block count is reached or %% there is a partial block -create_slot(KL1, KL2, _, 0, SegLists, SerialisedSlot, LengthList, - {LowKey, LSN, HSN, LastKey, Status}) -> +create_slot(KL1, KL2, _, 0, Bloom, + SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, Status}) -> {{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList}, {{LSN, HSN}, LastKey, Status}, + Bloom, KL1, KL2}; -create_slot(KL1, KL2, _, _, SegLists, SerialisedSlot, LengthList, - {LowKey, LSN, HSN, LastKey, partial}) -> +create_slot(KL1, KL2, _, _, Bloom, + SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, partial}) -> {{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList}, {{LSN, HSN}, LastKey, partial}, + Bloom, KL1, KL2}; -create_slot(KL1, KL2, _, _, SegLists, SerialisedSlot, LengthList, - {LowKey, LSN, HSN, LastKey, complete}) -> +create_slot(KL1, KL2, _, _, Bloom, + SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, complete}) -> {{LowKey, generate_segment_filter(SegLists), SerialisedSlot, LengthList}, {{LSN, HSN}, LastKey, partial}, + Bloom, KL1, KL2}; -create_slot(KL1, KL2, LevelR, BlockCount, SegLists, SerialisedSlot, LengthList, - {LowKey, LSN, HSN, LastKey, _Status}) -> +create_slot(KL1, KL2, LevelR, BlockCount, Bloom, + SegLists, SerialisedSlot, LengthList, + {LowKey, LSN, HSN, LastKey, _Status}) -> {BlockKeyList, Status, {LSNb, HSNb}, - SegmentList, KL1b, KL2b} = create_block(KL1, KL2, LevelR), + SegmentList, + UpdBloom, + KL1b, KL2b} = create_block(KL1, KL2, LevelR, Bloom), TrackingMetadata = case {LowKey, BlockKeyList} of {null, []} -> {null, LSN, HSN, LastKey, Status}; {null, _} -> [NewLowKeyV|_] = BlockKeyList, - NewLastKey = lists:last([{keyonly, LastKey}|BlockKeyList]), + NewLastKey = last_key(BlockKeyList, {keyonly, LastKey}), {leveled_codec:strip_to_keyonly(NewLowKeyV), min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(NewLastKey), Status}; {_, _} -> - NewLastKey = lists:last([{keyonly, LastKey}|BlockKeyList]), + NewLastKey = last_key(BlockKeyList, {keyonly, LastKey}), {LowKey, min(LSN, LSNb), max(HSN, HSNb), leveled_codec:strip_to_keyonly(NewLastKey), @@ -1041,9 +1070,15 @@ create_slot(KL1, KL2, LevelR, BlockCount, SegLists, SerialisedSlot, LengthList, SerialisedBlock = serialise_block(BlockKeyList), BlockLength = byte_size(SerialisedBlock), SerialisedSlot2 = <>, - create_slot(KL1b, KL2b, LevelR, BlockCount - 1, SegLists ++ [SegmentList], - SerialisedSlot2, LengthList ++ [BlockLength], - TrackingMetadata). + SegList2 = SegLists ++ [SegmentList], + create_slot(KL1b, KL2b, LevelR, BlockCount - 1, UpdBloom, + SegList2, SerialisedSlot2, LengthList ++ [BlockLength], + TrackingMetadata). + +last_key([], LastKey) -> + LastKey; +last_key(BlockKeyList, _LastKey) -> + lists:last(BlockKeyList). serialise_block(BlockKeyList) -> term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]). @@ -1131,8 +1166,6 @@ pointer_append_queryresults(Results, QueryPid) -> %% Update the sequence numbers -update_sequencenumbers(Item, LSN, HSN) when is_tuple(Item) -> - update_sequencenumbers(leveled_codec:strip_to_seqonly(Item), LSN, HSN); update_sequencenumbers(SN, infinity, 0) -> {SN, SN}; update_sequencenumbers(SN, LSN, HSN) when SN < LSN -> @@ -1398,12 +1431,15 @@ generate_randomkeys(Count) -> generate_randomkeys(0, _SQN, Acc) -> lists:reverse(Acc); generate_randomkeys(Count, SQN, Acc) -> - RandKey = {{o, - lists:concat(["Bucket", random:uniform(1024)]), - lists:concat(["Key", random:uniform(1024)]), - null}, + K = {o, + lists:concat(["Bucket", random:uniform(1024)]), + lists:concat(["Key", random:uniform(1024)]), + null}, + RandKey = {K, {SQN, - {active, infinity}, null}}, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). generate_sequentialkeys(Count, Start) -> @@ -1413,96 +1449,114 @@ generate_sequentialkeys(Target, Incr, Acc) when Incr =:= Target -> Acc; generate_sequentialkeys(Target, Incr, Acc) -> KeyStr = string:right(integer_to_list(Incr), 8, $0), - NextKey = {{o, - "BucketSeq", - lists:concat(["Key", KeyStr]), - null}, + K = {o, "BucketSeq", lists:concat(["Key", KeyStr]), null}, + NextKey = {K, {5, - {active, infinity}, null}}, + {active, infinity}, + leveled_codec:magic_hash(K), + null}}, generate_sequentialkeys(Target, Incr + 1, [NextKey|Acc]). simple_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key3", null}, {2, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {3, {active, infinity}, null}}], - {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, - KeyList2, - #level{level=1}), + KeyList1 = [{{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, + {{o, "Bucket1", "Key3", null}, + {2, {active, infinity}, no_lookup, null}}], + KeyList2 = [{{o, "Bucket1", "Key2", null}, + {3, {active, infinity}, no_lookup, null}}], + BlockOutput = create_block(KeyList1, + KeyList2, + #level{level=1}, + leveled_tinybloom:empty(4)), + {MergedKeyList, ListStatus, SN, _, _, _, _} = BlockOutput, ?assertMatch(partial, ListStatus), [H1|T1] = MergedKeyList, - ?assertMatch(H1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), + ?assertMatch({{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, H1), [H2|T2] = T1, - ?assertMatch(H2, {{o, "Bucket1", "Key2", null}, {3, {active, infinity}, null}}), - ?assertMatch(T2, [{{o, "Bucket1", "Key3", null}, {2, {active, infinity}, null}}]), + ?assertMatch({{o, "Bucket1", "Key2", null}, + {3, {active, infinity}, no_lookup, null}}, H2), + ?assertMatch([{{o, "Bucket1", "Key3", null}, + {2, {active, infinity}, no_lookup, null}}], T2), ?assertMatch(SN, {1,3}). dominate_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key2", null}, {2, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {3, {tomb, infinity}, null}}], - {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, - KeyList2, - #level{level=1}), + KeyList1 = [{{o, "Bucket1", "Key1", null}, + {1, {active, infinity}, no_lookup, null}}, + {{o, "Bucket1", "Key2", null}, + {2, {active, infinity}, no_lookup, null}}], + KeyList2 = [{{o, "Bucket1", "Key2", null}, + {3, {tomb, infinity}, no_lookup, null}}], + BlockOutput = create_block(KeyList1, + KeyList2, + #level{level=1}, + leveled_tinybloom:empty(4)), + {MergedKeyList, ListStatus, SN, _, _, _, _} = BlockOutput, ?assertMatch(partial, ListStatus), [K1, K2] = MergedKeyList, - ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), - ?assertMatch(K2, {{o, "Bucket1", "Key2", null}, {3, {tomb, infinity}, null}}), + ?assertMatch(K1, lists:nth(1, KeyList1)), + ?assertMatch(K2, lists:nth(1, KeyList2)), ?assertMatch(SN, {1,3}). sample_keylist() -> - KeyList1 = [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key1", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key3", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key5", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key7", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key9", null}, {1, {active, infinity}, null}}, - {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, null}}], - KeyList2 = [{{o, "Bucket1", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key4", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key6", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key8", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9a", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9c", null}, {1, {active, infinity}, null}}, - {{o, "Bucket1", "Key9d", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key4", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key6", null}, {1, {active, infinity}, null}}, - {{o, "Bucket2", "Key8", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key2", null}, {1, {active, infinity}, null}}, - {{o, "Bucket3", "Key4", null}, {3, {active, infinity}, null}}, - {{o, "Bucket3", "Key6", null}, {2, {active, infinity}, null}}, - {{o, "Bucket3", "Key8", null}, {1, {active, infinity}, null}}], + KeyList1 = + [{{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key1", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key3", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key5", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key7", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key9", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}], + KeyList2 = + [{{o, "Bucket1", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key4", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key6", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key8", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9a", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9c", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key9d", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key4", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key6", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket2", "Key8", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key2", null}, {1, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key4", null}, {3, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key6", null}, {2, {active, infinity}, 0, null}}, + {{o, "Bucket3", "Key8", null}, {1, {active, infinity}, 0, null}}], {KeyList1, KeyList2}. alternating_create_block_test() -> {KeyList1, KeyList2} = sample_keylist(), - {MergedKeyList, ListStatus, _, _, _, _} = create_block(KeyList1, - KeyList2, - #level{level=1}), + BlockOutput = create_block(KeyList1, + KeyList2, + #level{level=1}, + leveled_tinybloom:empty(4)), + {MergedKeyList, ListStatus, _SN, _, _, _, _} = BlockOutput, BlockSize = length(MergedKeyList), ?assertMatch(BlockSize, 32), ?assertMatch(ListStatus, complete), K1 = lists:nth(1, MergedKeyList), - ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, null}}), + ?assertMatch(K1, {{o, "Bucket1", "Key1", null}, {1, {active, infinity}, 0, null}}), K11 = lists:nth(11, MergedKeyList), - ?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, null}}), + ?assertMatch(K11, {{o, "Bucket1", "Key9b", null}, {1, {active, infinity}, 0, null}}), K32 = lists:nth(32, MergedKeyList), - ?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, null}}), - HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, null}}, - {_, ListStatus2, _, _, _, _} = create_block([HKey|KeyList1], - KeyList2, - #level{level=1}), - ?assertMatch(ListStatus2, full). + ?assertMatch(K32, {{o, "Bucket4", "Key1", null}, {1, {active, infinity}, 0, null}}), + HKey = {{o, "Bucket1", "Key0", null}, {1, {active, infinity}, 0, null}}, + {_, LStatus2, _, _, _, _, _} = create_block([HKey|KeyList1], + KeyList2, + #level{level=1}, + leveled_tinybloom:empty(4)), + ?assertMatch(full, LStatus2). merge_seglists_test() -> @@ -1639,9 +1693,13 @@ merge_seglists_test() -> createslot_stage1_test() -> {KeyList1, KeyList2} = sample_keylist(), - Out = create_slot(KeyList1, KeyList2, #level{level=1}), + Out = create_slot(KeyList1, + KeyList2, + #level{level=1}, + leveled_tinybloom:empty(4)), {{LowKey, SegFilter, _SerialisedSlot, _LengthList}, {{LSN, HSN}, LastKey, Status}, + _UpdBloom, KL1, KL2} = Out, ?assertMatch(LowKey, {o, "Bucket1", "Key1", null}), ?assertMatch(LastKey, {o, "Bucket4", "Key1", null}), @@ -1662,9 +1720,11 @@ createslot_stage1_test() -> createslot_stage2_test() -> Out = create_slot(lists:sort(generate_randomkeys(100)), lists:sort(generate_randomkeys(100)), - #level{level=1}), + #level{level=1}, + leveled_tinybloom:empty(4)), {{_LowKey, _SegFilter, SerialisedSlot, LengthList}, {{_LSN, _HSN}, _LastKey, Status}, + _UpdBloom, _KL1, _KL2} = Out, ?assertMatch(Status, full), Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList), @@ -1675,9 +1735,11 @@ createslot_stage2_test() -> createslot_stage3_test() -> Out = create_slot(lists:sort(generate_sequentialkeys(100, 1)), lists:sort(generate_sequentialkeys(100, 101)), - #level{level=1}), + #level{level=1}, + leveled_tinybloom:empty(4)), {{LowKey, SegFilter, SerialisedSlot, LengthList}, {{_LSN, _HSN}, LastKey, Status}, + _UpdBloom, KL1, KL2} = Out, ?assertMatch(Status, full), Sum1 = lists:foldl(fun(X, Sum) -> Sum + X end, 0, LengthList), @@ -1713,17 +1775,19 @@ createslot_stage3_test() -> testwrite_function(slots, {Handle, SerialisedSlots}) -> lists:append(Handle, [SerialisedSlots]); -testwrite_function(finalise, {Handle, C_SlotIndex, SNExtremes, KeyExtremes}) -> - {Handle, C_SlotIndex, SNExtremes, KeyExtremes}. +testwrite_function(finalise, + {Handle, C_SlotIndex, {SNExtremes, KeyExtremes, Bloom}}) -> + {Handle, C_SlotIndex, SNExtremes, KeyExtremes, Bloom}. writekeys_stage1_test() -> {KL1, KL2} = sample_keylist(), {FunOut, {_KL1Rem, _KL2Rem}} = write_keys([], KL1, KL2, [], <<>>, + leveled_tinybloom:empty(4), #level{level=1}, fun testwrite_function/2), - {Handle, {_, PointerIndex}, SNExtremes, KeyExtremes} = FunOut, + {Handle, {_, PointerIndex}, SNExtremes, KeyExtremes, _Bloom} = FunOut, ?assertMatch(SNExtremes, {1,3}), ?assertMatch(KeyExtremes, {{o, "Bucket1", "Key1", null}, {o, "Bucket4", "Key1", null}}), @@ -1750,7 +1814,7 @@ initial_create_file_test() -> Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8", null}), io:format("Result is ~w~n", [Result1]), ?assertMatch(Result1, {{o, "Bucket1", "Key8", null}, - {1, {active, infinity}, null}}), + {1, {active, infinity}, 0, null}}), Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88", null}), io:format("Result is ~w~n", [Result2]), ?assertMatch(Result2, not_present), @@ -1766,17 +1830,17 @@ big_create_file_test() -> InitFileMD, KL1, KL2, #level{level=1}), - [{K1, {Sq1, St1, V1}}|_] = KL1, - [{K2, {Sq2, St2, V2}}|_] = KL2, + [{K1, {Sq1, St1, MH1, V1}}|_] = KL1, + [{K2, {Sq2, St2, MH2, V2}}|_] = KL2, Result1 = fetch_keyvalue(Handle, FileMD, K1), Result2 = fetch_keyvalue(Handle, FileMD, K2), - ?assertMatch(Result1, {K1, {Sq1, St1, V1}}), - ?assertMatch(Result2, {K2, {Sq2, St2, V2}}), + ?assertMatch(Result1, {K1, {Sq1, St1, MH1, V1}}), + ?assertMatch(Result2, {K2, {Sq2, St2, MH2, V2}}), SubList = lists:sublist(KL2, 1000), - lists:foreach(fun(K) -> - {Kn, {_, _, _}} = K, + lists:foreach(fun(KV) -> + {Kn, _} = KV, Rn = fetch_keyvalue(Handle, FileMD, Kn), - ?assertMatch({Kn, {_, _, _}}, Rn) + ?assertMatch({Kn, _}, Rn) end, SubList), Result3 = fetch_keyvalue(Handle, @@ -1832,13 +1896,13 @@ initial_iterator_test() -> ok = file:delete(Filename). key_dominates_test() -> - KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, []}}, - KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, []}}, - KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, []}}, - KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, []}}, - KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, []}}, - KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, []}}, - KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, []}}, + KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}}, + KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}}, + KV3 = {{o, "Bucket", "Key2", null}, {3, {active, infinity}, 0, []}}, + KV4 = {{o, "Bucket", "Key4", null}, {7, {active, infinity}, 0, []}}, + KV5 = {{o, "Bucket", "Key1", null}, {4, {active, infinity}, 0, []}}, + KV6 = {{o, "Bucket", "Key1", null}, {99, {tomb, 999}, 0, []}}, + KV7 = {{o, "Bucket", "Key1", null}, {99, tomb, 0, []}}, KL1 = [KV1, KV2], KL2 = [KV3, KV4], ?assertMatch({{next_key, KV1}, [KV2], KL2}, @@ -1968,21 +2032,21 @@ hashclash_test() -> "Bucket", "Key8400" ++ integer_to_list(X), null}, - Value = {X, {active, infinity}, null}, + Value = {X, {active, infinity}, 0, null}, Acc ++ [{Key, Value}] end, [], lists:seq(10,98)), - KeyListToUse = [{Key1, {1, {active, infinity}, null}}|KeyList] - ++ [{Key99, {99, {active, infinity}, null}}], + KeyListToUse = [{Key1, {1, {active, infinity}, 0, null}}|KeyList] + ++ [{Key99, {99, {active, infinity}, 0, null}}], {InitHandle, InitFileMD} = create_file(Filename), {Handle, _FileMD, _Rem} = complete_file(InitHandle, InitFileMD, KeyListToUse, [], #level{level=1}), ok = file:close(Handle), {ok, SFTr, _KeyExtremes} = sft_open(Filename), - ?assertMatch({Key1, {1, {active, infinity}, null}}, + ?assertMatch({Key1, {1, {active, infinity}, 0, null}}, sft_get(SFTr, Key1)), - ?assertMatch({Key99, {99, {active, infinity}, null}}, + ?assertMatch({Key99, {99, {active, infinity}, 0, null}}, sft_get(SFTr, Key99)), ?assertMatch(not_present, sft_get(SFTr, KeyNF)), diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl index b9d9af4..7fcc81a 100644 --- a/src/leveled_skiplist.erl +++ b/src/leveled_skiplist.erl @@ -17,13 +17,19 @@ -export([ from_list/1, + from_list/2, from_sortedlist/1, + from_sortedlist/2, to_list/1, enter/3, + enter/4, + enter_nolookup/3, to_range/2, to_range/3, lookup/2, + lookup/3, empty/0, + empty/1, size/1 ]). @@ -32,50 +38,111 @@ -define(SKIP_WIDTH, 16). -define(LIST_HEIGHT, 2). -define(INFINITY_KEY, {null, null, null, null, null}). - +-define(BITARRAY_SIZE, 2048). %%%============================================================================ %%% SkipList API %%%============================================================================ enter(Key, Value, SkipList) -> - enter(Key, Value, SkipList, ?SKIP_WIDTH, ?LIST_HEIGHT). + Hash = leveled_codec:magic_hash(Key), + enter(Key, Hash, Value, SkipList). + +enter(Key, Hash, Value, SkipList) -> + Bloom0 = + case element(1, SkipList) of + list_only -> + list_only; + Bloom -> + leveled_tinybloom:enter({hash, Hash}, Bloom) + end, + {Bloom0, + enter(Key, Value, erlang:phash2(Key), + element(2, SkipList), + ?SKIP_WIDTH, ?LIST_HEIGHT)}. + +%% Can iterate over a key entered this way, but never lookup the key +%% used for index terms +%% The key may still be a marker key - and the much cheaper native hash +%% is used to dtermine this, avoiding the more expensive magic hash +enter_nolookup(Key, Value, SkipList) -> + {element(1, SkipList), + enter(Key, Value, erlang:phash2(Key), + element(2, SkipList), + ?SKIP_WIDTH, ?LIST_HEIGHT)}. from_list(UnsortedKVL) -> + from_list(UnsortedKVL, false). + +from_list(UnsortedKVL, BloomProtect) -> KVL = lists:ukeysort(1, UnsortedKVL), - from_list(KVL, ?SKIP_WIDTH, ?LIST_HEIGHT). + from_sortedlist(KVL, BloomProtect). from_sortedlist(SortedKVL) -> - from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT). + from_sortedlist(SortedKVL, false). + +from_sortedlist(SortedKVL, BloomProtect) -> + Bloom0 = + case BloomProtect of + true -> + lists:foldr(fun({K, _V}, Bloom) -> + leveled_tinybloom:enter(K, Bloom) end, + leveled_tinybloom:empty(?SKIP_WIDTH), + SortedKVL); + false -> + list_only + end, + {Bloom0, from_list(SortedKVL, ?SKIP_WIDTH, ?LIST_HEIGHT)}. lookup(Key, SkipList) -> - lookup(Key, SkipList, ?LIST_HEIGHT). + case element(1, SkipList) of + list_only -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT); + _ -> + lookup(Key, leveled_codec:magic_hash(Key), SkipList) + end. + +lookup(Key, Hash, SkipList) -> + case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of + false -> + none; + true -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) + end. %% Rather than support iterator_from like gb_trees, will just an output a key %% sorted list for the desired range, which can the be iterated over as normal to_range(SkipList, Start) -> - to_range(SkipList, Start, ?INFINITY_KEY, ?LIST_HEIGHT). + to_range(element(2, SkipList), Start, ?INFINITY_KEY, ?LIST_HEIGHT). to_range(SkipList, Start, End) -> - to_range(SkipList, Start, End, ?LIST_HEIGHT). + to_range(element(2, SkipList), Start, End, ?LIST_HEIGHT). to_list(SkipList) -> - to_list(SkipList, ?LIST_HEIGHT). + to_list(element(2, SkipList), ?LIST_HEIGHT). empty() -> - empty([], ?LIST_HEIGHT). + empty(false). + +empty(BloomProtect) -> + case BloomProtect of + true -> + {leveled_tinybloom:empty(?SKIP_WIDTH), + empty([], ?LIST_HEIGHT)}; + false -> + {list_only, empty([], ?LIST_HEIGHT)} + end. size(SkipList) -> - size(SkipList, ?LIST_HEIGHT). + size(element(2, SkipList), ?LIST_HEIGHT). %%%============================================================================ %%% SkipList Base Functions %%%============================================================================ -enter(Key, Value, SkipList, Width, 1) -> - Hash = erlang:phash2(Key), +enter(Key, Value, Hash, SkipList, Width, 1) -> {MarkerKey, SubList} = find_mark(Key, SkipList), case Hash rem Width of 0 -> @@ -101,11 +168,10 @@ enter(Key, Value, SkipList, Width, 1) -> end, lists:keyreplace(MarkerKey, 1, SkipList, {MarkerKey, UpdSubList}) end; -enter(Key, Value, SkipList, Width, Level) -> - Hash = erlang:phash2(Key), +enter(Key, Value, Hash, SkipList, Width, Level) -> HashMatch = width(Level, Width), {MarkerKey, SubSkipList} = find_mark(Key, SkipList), - UpdSubSkipList = enter(Key, Value, SubSkipList, Width, Level - 1), + UpdSubSkipList = enter(Key, Value, Hash, SubSkipList, Width, Level - 1), case Hash rem HashMatch of 0 -> % @@ -171,7 +237,7 @@ from_list(KVL, Width, Level) -> end. -lookup(Key, SkipList, 1) -> +list_lookup(Key, SkipList, 1) -> SubList = get_sublist(Key, SkipList), case lists:keyfind(Key, 1, SubList) of false -> @@ -179,13 +245,13 @@ lookup(Key, SkipList, 1) -> {Key, V} -> {value, V} end; -lookup(Key, SkipList, Level) -> +list_lookup(Key, SkipList, Level) -> SubList = get_sublist(Key, SkipList), case SubList of null -> none; _ -> - lookup(Key, SubList, Level - 1) + list_lookup(Key, SubList, Level - 1) end. @@ -384,21 +450,32 @@ dotest_skiplist_small(N) -> end, lists:ukeysort(1, lists:reverse(KL))). -skiplist_test() -> - N = 8000, +skiplist_withbloom_test() -> + io:format(user, "~n~nBloom protected skiplist test:~n~n", []), + skiplist_tester(true). + +skiplist_nobloom_test() -> + io:format(user, "~n~nBloom free skiplist test:~n~n", []), + skiplist_tester(false). + +skiplist_tester(Bloom) -> + N = 4000, KL = generate_randomkeys(1, N, 1, N div 5), SWaGSL = os:timestamp(), - SkipList = from_list(lists:reverse(KL)), + SkipList = from_list(lists:reverse(KL), Bloom), io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++ "Top level key count of ~w~n", - [N, timer:now_diff(os:timestamp(), SWaGSL), length(SkipList)]), + [N, + timer:now_diff(os:timestamp(), SWaGSL), + length(element(2, SkipList))]), io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, SkipList)]), + [lists:map(fun({_L, SL}) -> length(SL) end, + element(2, SkipList))]), KLSorted = lists:ukeysort(1, lists:reverse(KL)), SWaGSL2 = os:timestamp(), - SkipList = from_sortedlist(KLSorted), + SkipList = from_sortedlist(KLSorted, Bloom), io:format(user, "Generating skip list with ~w sorted keys in ~w " ++ "microseconds~n", [N, timer:now_diff(os:timestamp(), SWaGSL2)]), @@ -408,23 +485,26 @@ skiplist_test() -> lists:foldl(fun({K, V}, SL) -> enter(K, V, SL) end, - empty(), + empty(Bloom), KL), io:format(user, "Dynamic load of skiplist with ~w keys took ~w " ++ "microseconds~n" ++ "Top level key count of ~w~n", - [N, timer:now_diff(os:timestamp(), SWaDSL), length(SkipList1)]), + [N, + timer:now_diff(os:timestamp(), SWaDSL), + length(element(2, SkipList1))]), io:format(user, "Second tier key counts of ~w~n", - [lists:map(fun({_L, SL}) -> length(SL) end, SkipList1)]), + [lists:map(fun({_L, SL}) -> length(SL) end, + element(2, SkipList1))]), io:format(user, "~nRunning timing tests for generated skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList, N), + skiplist_timingtest(KLSorted, SkipList, N, Bloom), io:format(user, "~nRunning timing tests for dynamic skiplist:~n", []), - skiplist_timingtest(KLSorted, SkipList1, N). + skiplist_timingtest(KLSorted, SkipList1, N, Bloom). + - -skiplist_timingtest(KL, SkipList, N) -> +skiplist_timingtest(KL, SkipList, N, Bloom) -> io:format(user, "Timing tests on skiplist of size ~w~n", [leveled_skiplist:size(SkipList)]), CheckList1 = lists:sublist(KL, N div 4, 200), @@ -482,13 +562,13 @@ skiplist_timingtest(KL, SkipList, N) -> io:format(user, "Finding 10 ranges took ~w microseconds~n", [timer:now_diff(os:timestamp(), SWc)]), - AltKL1 = generate_randomkeys(1, 1000, 1, 200), + AltKL1 = generate_randomkeys(1, 2000, 1, 200), SWd = os:timestamp(), lists:foreach(fun({K, _V}) -> lookup(K, SkipList) end, AltKL1), - io:format(user, "Getting 1000 mainly missing keys took ~w microseconds~n", + io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n", [timer:now_diff(os:timestamp(), SWd)]), AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300), SWe = os:timestamp(), @@ -513,7 +593,24 @@ skiplist_timingtest(KL, SkipList, N) -> FlatList = to_list(SkipList), io:format(user, "Flattening skiplist took ~w microseconds~n", [timer:now_diff(os:timestamp(), SWg)]), - ?assertMatch(KL, FlatList). + ?assertMatch(KL, FlatList), + + case Bloom of + true -> + HashList = lists:map(fun(_X) -> + random:uniform(4294967295) end, + lists:seq(1, 2000)), + SWh = os:timestamp(), + lists:foreach(fun(X) -> + lookup(X, X, SkipList) end, + HashList), + io:format(user, + "Getting 2000 missing keys when hash was known " ++ + "took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWh)]); + false -> + ok + end. define_kv(X) -> {{o, "Bucket", "Key" ++ string:right(integer_to_list(X), 6), null}, @@ -535,5 +632,21 @@ skiplist_roundsize_test() -> ?assertMatch(L, R) end, lists:seq(0, 24)). +skiplist_nolookup_test() -> + N = 4000, + KL = generate_randomkeys(1, N, 1, N div 5), + SkipList = lists:foldl(fun({K, V}, Acc) -> + enter_nolookup(K, V, Acc) end, + empty(true), + KL), + KLSorted = lists:ukeysort(1, lists:reverse(KL)), + lists:foreach(fun({K, _V}) -> + ?assertMatch(none, lookup(K, SkipList)) end, + KL), + ?assertMatch(KLSorted, to_list(SkipList)). + +empty_skiplist_size_test() -> + ?assertMatch(0, leveled_skiplist:size(empty(false))), + ?assertMatch(0, leveled_skiplist:size(empty(true))). -endif. \ No newline at end of file diff --git a/src/leveled_tinybloom.erl b/src/leveled_tinybloom.erl new file mode 100644 index 0000000..f9212ad --- /dev/null +++ b/src/leveled_tinybloom.erl @@ -0,0 +1,153 @@ +%% -------- TINY BLOOM --------- +%% +%% For sheltering relatively expensive lookups with a probabilistic check +%% +%% Uses multiple 256 byte blooms. Can sensibly hold up to 1000 keys per array. +%% Even at 1000 keys should still offer only a 20% false positive +%% +%% Restricted to no more than 256 arrays - so can't handle more than 250K keys +%% in total +%% +%% Implemented this way to make it easy to control false positive (just by +%% setting the width). Also only requires binary manipulations of a single +%% hash + +-module(leveled_tinybloom). + +-include("include/leveled.hrl"). + +-export([ + enter/2, + check/2, + empty/1 + ]). + +-include_lib("eunit/include/eunit.hrl"). + +%%%============================================================================ +%%% Bloom API +%%%============================================================================ + + +empty(Width) when Width =< 256 -> + FoldFun = fun(X, Acc) -> dict:store(X, <<0:4096>>, Acc) end, + lists:foldl(FoldFun, dict:new(), lists:seq(0, Width - 1)). + +enter({hash, no_lookup}, Bloom) -> + Bloom; +enter({hash, Hash}, Bloom) -> + {H0, Bit1, Bit2} = split_hash(Hash), + Slot = H0 rem dict:size(Bloom), + BitArray0 = dict:fetch(Slot, Bloom), + BitArray1 = lists:foldl(fun add_to_array/2, + BitArray0, + lists:usort([Bit1, Bit2])), + dict:store(Slot, BitArray1, Bloom); +enter(Key, Bloom) -> + Hash = leveled_codec:magic_hash(Key), + enter({hash, Hash}, Bloom). + +check({hash, Hash}, Bloom) -> + {H0, Bit1, Bit2} = split_hash(Hash), + Slot = H0 rem dict:size(Bloom), + BitArray = dict:fetch(Slot, Bloom), + case getbit(Bit1, BitArray) of + <<0:1>> -> + false; + <<1:1>> -> + case getbit(Bit2, BitArray) of + <<0:1>> -> + false; + <<1:1>> -> + true + end + end; +check(Key, Bloom) -> + Hash = leveled_codec:magic_hash(Key), + check({hash, Hash}, Bloom). + +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + +split_hash(Hash) -> + H0 = Hash band 255, + H1 = (Hash bsr 8) band 4095, + H2 = Hash bsr 20, + {H0, H1, H2}. + +add_to_array(Bit, BitArray) -> + RestLen = 4096 - Bit - 1, + <> = BitArray, + <>. + +getbit(Bit, BitArray) -> + RestLen = 4096 - Bit - 1, + <<_Head:Bit/bitstring, + B:1/bitstring, + _Rest:RestLen/bitstring>> = BitArray, + B. + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +simple_test() -> + N = 4000, + W = 4, + KLin = lists:map(fun(X) -> "Key_" ++ + integer_to_list(X) ++ + integer_to_list(random:uniform(100)) ++ + binary_to_list(crypto:rand_bytes(2)) + end, + lists:seq(1, N)), + KLout = lists:map(fun(X) -> + "NotKey_" ++ + integer_to_list(X) ++ + integer_to_list(random:uniform(100)) ++ + binary_to_list(crypto:rand_bytes(2)) + end, + lists:seq(1, N)), + SW0_PH = os:timestamp(), + lists:foreach(fun(X) -> erlang:phash2(X) end, KLin), + io:format(user, + "~nNative hash function hashes ~w keys in ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW0_PH)]), + SW0_MH = os:timestamp(), + lists:foreach(fun(X) -> leveled_codec:magic_hash(X) end, KLin), + io:format(user, + "~nMagic hash function hashes ~w keys in ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW0_MH)]), + + SW1 = os:timestamp(), + Bloom = lists:foldr(fun enter/2, empty(W), KLin), + io:format(user, + "~nAdding ~w keys to bloom took ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW1)]), + + SW2 = os:timestamp(), + lists:foreach(fun(X) -> ?assertMatch(true, check(X, Bloom)) end, KLin), + io:format(user, + "~nChecking ~w keys in bloom took ~w microseconds~n", + [N, timer:now_diff(os:timestamp(), SW2)]), + + SW3 = os:timestamp(), + FP = lists:foldr(fun(X, Acc) -> case check(X, Bloom) of + true -> Acc + 1; + false -> Acc + end end, + 0, + KLout), + io:format(user, + "~nChecking ~w keys out of bloom took ~w microseconds " ++ + "with ~w false positive rate~n", + [N, timer:now_diff(os:timestamp(), SW3), FP / N]), + ?assertMatch(true, FP < (N div 4)). + + +-endif. \ No newline at end of file