diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0960c68..53359c2 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -139,6 +139,7 @@ get_opt/3, load_snapshot/2, empty_ledgercache/0, + loadqueue_ledgercache/1, push_ledgercache/2]). -include_lib("eunit/include/eunit.hrl"). @@ -153,7 +154,8 @@ -define(LONG_RUNNING, 80000). -record(ledger_cache, {mem :: ets:tab(), - loader = leveled_skiplist:empty(false) :: tuple(), + loader = leveled_tree:empty() :: tuple(), + load_queue = [] :: list(), index = leveled_pmem:new_index(), % array min_sqn = infinity :: integer()|infinity, max_sqn = 0 :: integer()}). @@ -474,6 +476,11 @@ push_ledgercache(Penciller, Cache) -> Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). +loadqueue_ledgercache(Cache) -> + SL = lists:ukeysort(1, Cache#ledger_cache.load_queue), + T = leveled_tree:from_orderedlist(SL), + Cache#ledger_cache{load_queue = [], loader = T}. + %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -719,11 +726,11 @@ snapshot_store(State, SnapType) -> readycache_forsnapshot(LedgerCache) -> % Need to convert the Ledger Cache away from using the ETS table - SkipList = leveled_skiplist:from_orderedset(LedgerCache#ledger_cache.mem), + Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem), Idx = LedgerCache#ledger_cache.index, MinSQN = LedgerCache#ledger_cache.min_sqn, MaxSQN = LedgerCache#ledger_cache.max_sqn, - #ledger_cache{loader=SkipList, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. + #ledger_cache{loader=Tree, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. set_options(Opts) -> MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), @@ -961,14 +968,10 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) -> max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> - FoldChangesFun = - fun({K, V}, SL0) -> - leveled_skiplist:enter_nolookup(K, V, SL0) - end, - UpdSL = lists:foldl(FoldChangesFun, Cache#ledger_cache.loader, KeyChanges), + UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue, UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), Cache#ledger_cache{index = UpdIndex, - loader = UpdSL, + load_queue = UpdQ, min_sqn=min(SQN, Cache#ledger_cache.min_sqn), max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. @@ -979,7 +982,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - CacheToLoad = {leveled_skiplist:from_orderedset(Tab), + CacheToLoad = {leveled_tree:from_orderedset(Tab), Cache#ledger_cache.index, Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f56ea20..6789302 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -669,10 +669,14 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, push_to_penciller(Penciller, LedgerCache) -> % The push to penciller must start as a tree to correctly de-duplicate % the list by order before becoming a de-duplicated list for loading + LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache), + push_to_penciller_loop(Penciller, LC0). + +push_to_penciller_loop(Penciller, LedgerCache) -> case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of returned -> timer:sleep(?LOADING_PAUSE), - push_to_penciller(Penciller, LedgerCache); + push_to_penciller_loop(Penciller, LedgerCache); ok -> ok end. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 9a6daf3..d18cb09 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -9,7 +9,7 @@ %% the Penciller's Clerk %% - The Penciller can be cloned and maintains a register of clones who have %% requested snapshots of the Ledger -%% - The accepts new dumps (in the form of a leveled_skiplist accomponied by +%% - The accepts new dumps (in the form of a leveled_tree accomponied by %% an array of hash-listing binaries) from the Bookie, and responds either 'ok' %% to the bookie if the information is accepted nad the Bookie can refresh its %% memory, or 'returned' if the bookie must continue without refreshing as the @@ -224,7 +224,7 @@ levelzero_pending = false :: boolean(), levelzero_constructor :: pid(), - levelzero_cache = [] :: list(), % a list of skiplists + levelzero_cache = [] :: list(), % a list of trees levelzero_size = 0 :: integer(), levelzero_maxcachesize :: integer(), levelzero_cointoss = false :: boolean(), @@ -345,9 +345,9 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}}, State=#state{is_snapshot=Snap}) when Snap == false -> % The push_mem process is as follows: % - % 1 - Receive a cache. The cache has four parts: a skiplist of keys and + % 1 - Receive a cache. The cache has four parts: a tree of keys and % values, an array of 256 binaries listing the hashes present in the - % skiplist, a min SQN and a max SQN + % tree, a min SQN and a max SQN % % 2 - Check to see if there is a levelzero file pending. If so, the % update must be returned. If not the update can be accepted @@ -404,7 +404,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys}, leveled_pmem:merge_trees(StartKey, EndKey, State#state.levelzero_cache, - leveled_skiplist:empty()); + leveled_tree:empty()); List -> List end, @@ -1072,10 +1072,10 @@ clean_subdir(DirPath) -> maybe_pause_push(PCL, KL) -> - T0 = leveled_skiplist:empty(true), + T0 = [], I0 = leveled_pmem:new_index(), T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) -> - UpdSL = leveled_skiplist:enter(K, V, AccSL), + UpdSL = [{K, V}|AccSL], SQN = leveled_codec:strip_to_seqonly({K, V}), H = leveled_codec:magic_hash(K), UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H), @@ -1083,7 +1083,10 @@ maybe_pause_push(PCL, KL) -> end, {T0, I0, infinity, 0}, KL), - case pcl_pushmem(PCL, T1) of + SL = element(1, T1), + Tree = leveled_tree:from_orderedlist(lists:ukeysort(1, SL)), + T2 = setelement(1, T1, Tree), + case pcl_pushmem(PCL, T2) of returned -> timer:sleep(50), maybe_pause_push(PCL, KL); @@ -1315,63 +1318,63 @@ sqnoverlap_otherway_findnextkey_test() -> foldwithimm_simple_test() -> QueryArray = [ - {2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, - {{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, - {3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, - {5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} + {2, [{{o, "Bucket1", "Key1", null}, + {5, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key5", null}, + {1, {active, infinity}, 0, null}}]}, + {3, [{{o, "Bucket1", "Key3", null}, + {3, {active, infinity}, 0, null}}]}, + {5, [{{o, "Bucket1", "Key5", null}, + {2, {active, infinity}, 0, null}}]} ], - IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"}, - {7, {active, infinity}, 0, null}, - leveled_skiplist:empty()), - IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, 0, null}, - IMM0), - IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"}, - {9, {active, infinity}, 0, null}, - IMM1), - IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}), + KL1A = [{{o, "Bucket1", "Key6", null}, {7, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key1", null}, {8, {active, infinity}, 0, null}}, + {{o, "Bucket1", "Key8", null}, {9, {active, infinity}, 0, null}}], + IMM2 = leveled_tree:from_orderedlist(lists:ukeysort(1, KL1A)), + IMMiter = leveled_tree:match_range({o, "Bucket1", "Key1", null}, + {o, null, null, null}, + IMM2), AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), Acc ++ [{K, SQN}] end, Acc = keyfolder(IMMiter, QueryArray, - {o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, + {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, {AccFun, []}), - ?assertMatch([{{o, "Bucket1", "Key1"}, 8}, - {{o, "Bucket1", "Key3"}, 3}, - {{o, "Bucket1", "Key5"}, 2}, - {{o, "Bucket1", "Key6"}, 7}], Acc), + ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, + {{o, "Bucket1", "Key3", null}, 3}, + {{o, "Bucket1", "Key5", null}, 2}, + {{o, "Bucket1", "Key6", null}, 7}], Acc), - IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"}, - {8, {active, infinity}, 0, null}, - leveled_skiplist:empty()), - IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}), + IMMiterA = [{{o, "Bucket1", "Key1", null}, + {8, {active, infinity}, 0, null}}], AccA = keyfolder(IMMiterA, - QueryArray, - {o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, - {AccFun, []}), - ?assertMatch([{{o, "Bucket1", "Key1"}, 8}, - {{o, "Bucket1", "Key3"}, 3}, - {{o, "Bucket1", "Key5"}, 2}], AccA), + QueryArray, + {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, + {AccFun, []}), + ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, + {{o, "Bucket1", "Key3", null}, 3}, + {{o, "Bucket1", "Key5", null}, 2}], AccA), - IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"}, - {10, {active, infinity}, 0, null}, - IMM2), - IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}), + KL1B = [{{o, "Bucket1", "Key4", null}, {10, {active, infinity}, 0, null}}|KL1A], + IMM3 = leveled_tree:from_orderedlist(lists:ukeysort(1, KL1B)), + IMMiterB = leveled_tree:match_range({o, "Bucket1", "Key1", null}, + {o, null, null, null}, + IMM3), AccB = keyfolder(IMMiterB, QueryArray, - {o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, + {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null}, {AccFun, []}), - ?assertMatch([{{o, "Bucket1", "Key1"}, 8}, - {{o, "Bucket1", "Key3"}, 3}, - {{o, "Bucket1", "Key4"}, 10}, - {{o, "Bucket1", "Key5"}, 2}, - {{o, "Bucket1", "Key6"}, 7}], AccB). + ?assertMatch([{{o, "Bucket1", "Key1", null}, 8}, + {{o, "Bucket1", "Key3", null}, 3}, + {{o, "Bucket1", "Key4", null}, 10}, + {{o, "Bucket1", "Key5", null}, 2}, + {{o, "Bucket1", "Key6", null}, 7}], AccB). create_file_test() -> Filename = "../test/new_file.sst", ok = file:write_file(Filename, term_to_binary("hello")), KVL = lists:usort(generate_randomkeys(10000)), - Tree = leveled_skiplist:from_list(KVL), + Tree = leveled_tree:from_orderedlist(KVL), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, {ok, SP, diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 9480abe..fff113d 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -57,7 +57,7 @@ prepare_for_index(IndexArray, Hash) -> add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> - LM1Size = leveled_skiplist:size(LevelMinus1), + LM1Size = leveled_tree:tsize(LevelMinus1), case LM1Size of 0 -> {LedgerSQN, L0Size, TreeList}; @@ -99,7 +99,7 @@ to_list(Slots, FetchFun) -> SlotList = lists:reverse(lists:seq(1, Slots)), FullList = lists:foldl(fun(Slot, Acc) -> Tree = FetchFun(Slot), - L = leveled_skiplist:to_list(Tree), + L = leveled_tree:to_list(Tree), lists:ukeymerge(1, Acc, L) end, [], @@ -119,14 +119,14 @@ check_levelzero(Key, Hash, PosList, TreeList) -> check_slotlist(Key, Hash, PosList, TreeList). -merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> - lists:foldl(fun(SkipList, Acc) -> - R = leveled_skiplist:to_range(SkipList, - StartKey, - EndKey), +merge_trees(StartKey, EndKey, TreeList, LevelMinus1) -> + lists:foldl(fun(Tree, Acc) -> + R = leveled_tree:match_range(StartKey, + EndKey, + Tree), lists:ukeymerge(1, Acc, R) end, [], - [LevelMinus1|lists:reverse(SkipListList)]). + [LevelMinus1|lists:reverse(TreeList)]). %%%============================================================================ %%% Internal Functions @@ -148,7 +148,7 @@ split_hash(Hash) -> H0 = (Hash bsr 8) band 8388607, {Slot, H0}. -check_slotlist(Key, Hash, CheckList, TreeList) -> +check_slotlist(Key, _Hash, CheckList, TreeList) -> SlotCheckFun = fun(SlotToCheck, {Found, KV}) -> case Found of @@ -156,7 +156,7 @@ check_slotlist(Key, Hash, CheckList, TreeList) -> {Found, KV}; false -> CheckTree = lists:nth(SlotToCheck, TreeList), - case leveled_skiplist:lookup(Key, Hash, CheckTree) of + case leveled_tree:match(Key, CheckTree) of none -> {Found, KV}; {value, Value} -> @@ -188,7 +188,7 @@ generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) -> [], BucketRangeLow, BucketRangeHigh), - leveled_skiplist:from_list(KVL). + leveled_tree:from_orderedlist(lists:ukeysort(1, KVL)). generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> Acc; @@ -223,7 +223,7 @@ compare_method_test() -> ?assertMatch(32000, SQN), ?assertMatch(true, Size =< 32000), - TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)), + TestList = leveled_tree:to_list(generate_randomkeys(1, 2000, 1, 800)), FindKeyFun = fun(Key) -> @@ -232,7 +232,7 @@ compare_method_test() -> true -> {true, KV}; false -> - L0 = leveled_skiplist:lookup(Key, Tree), + L0 = leveled_tree:match(Key, Tree), case L0 of none -> {false, not_found}; @@ -270,19 +270,20 @@ compare_method_test() -> P = leveled_codec:endkey_passed(EndKey, K), case {K, P} of {K, false} when K >= StartKey -> - leveled_skiplist:enter(K, V, Acc); + [{K, V}|Acc]; _ -> Acc end end, - leveled_skiplist:empty(), + [], DumpList), - Sz0 = leveled_skiplist:size(Q0), + Tree = leveled_tree:from_orderedlist(lists:ukeysort(1, Q0)), + Sz0 = leveled_tree:tsize(Tree), io:format("Crude method took ~w microseconds resulting in tree of " ++ "size ~w~n", [timer:now_diff(os:timestamp(), SWa), Sz0]), SWb = os:timestamp(), - Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_skiplist:empty()), + Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_tree:empty()), Sz1 = length(Q1), io:format("Merge method took ~w microseconds resulting in tree of " ++ "size ~w~n", @@ -299,7 +300,7 @@ with_index_test() -> fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) -> LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500), LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), - LM1SL = leveled_skiplist:from_list(LM1), + LM1SL = leveled_tree:from_orderedlist(lists:ukeysort(1, LM1)), UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), R = add_to_cache(L0Size, {LM1SL, LedgerSQN + 1, LedgerSQN + 2000},