Switch the LM1 cache to be a tree

Use a tree of lists not a skiplist
This commit is contained in:
martinsumner 2017-01-20 16:36:20 +00:00
parent 1745ba6863
commit 3d99036093
4 changed files with 88 additions and 77 deletions

View file

@ -139,6 +139,7 @@
get_opt/3, get_opt/3,
load_snapshot/2, load_snapshot/2,
empty_ledgercache/0, empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2]). push_ledgercache/2]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -153,7 +154,8 @@
-define(LONG_RUNNING, 80000). -define(LONG_RUNNING, 80000).
-record(ledger_cache, {mem :: ets:tab(), -record(ledger_cache, {mem :: ets:tab(),
loader = leveled_skiplist:empty(false) :: tuple(), loader = leveled_tree:empty() :: tuple(),
load_queue = [] :: list(),
index = leveled_pmem:new_index(), % array index = leveled_pmem:new_index(), % array
min_sqn = infinity :: integer()|infinity, min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}). max_sqn = 0 :: integer()}).
@ -474,6 +476,11 @@ push_ledgercache(Penciller, Cache) ->
Cache#ledger_cache.max_sqn}, Cache#ledger_cache.max_sqn},
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
loadqueue_ledgercache(Cache) ->
SL = lists:ukeysort(1, Cache#ledger_cache.load_queue),
T = leveled_tree:from_orderedlist(SL),
Cache#ledger_cache{load_queue = [], loader = T}.
%%%============================================================================ %%%============================================================================
%%% Internal functions %%% Internal functions
%%%============================================================================ %%%============================================================================
@ -719,11 +726,11 @@ snapshot_store(State, SnapType) ->
readycache_forsnapshot(LedgerCache) -> readycache_forsnapshot(LedgerCache) ->
% Need to convert the Ledger Cache away from using the ETS table % Need to convert the Ledger Cache away from using the ETS table
SkipList = leveled_skiplist:from_orderedset(LedgerCache#ledger_cache.mem), Tree = leveled_tree:from_orderedset(LedgerCache#ledger_cache.mem),
Idx = LedgerCache#ledger_cache.index, Idx = LedgerCache#ledger_cache.index,
MinSQN = LedgerCache#ledger_cache.min_sqn, MinSQN = LedgerCache#ledger_cache.min_sqn,
MaxSQN = LedgerCache#ledger_cache.max_sqn, MaxSQN = LedgerCache#ledger_cache.max_sqn,
#ledger_cache{loader=SkipList, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}. #ledger_cache{loader=Tree, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}.
set_options(Opts) -> set_options(Opts) ->
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000), MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
@ -961,14 +968,10 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) -> addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
FoldChangesFun = UpdQ = KeyChanges ++ Cache#ledger_cache.load_queue,
fun({K, V}, SL0) ->
leveled_skiplist:enter_nolookup(K, V, SL0)
end,
UpdSL = lists:foldl(FoldChangesFun, Cache#ledger_cache.loader, KeyChanges),
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H), UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
Cache#ledger_cache{index = UpdIndex, Cache#ledger_cache{index = UpdIndex,
loader = UpdSL, load_queue = UpdQ,
min_sqn=min(SQN, Cache#ledger_cache.min_sqn), min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}. max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
@ -979,7 +982,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
if if
TimeToPush -> TimeToPush ->
CacheToLoad = {leveled_skiplist:from_orderedset(Tab), CacheToLoad = {leveled_tree:from_orderedset(Tab),
Cache#ledger_cache.index, Cache#ledger_cache.index,
Cache#ledger_cache.min_sqn, Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn}, Cache#ledger_cache.max_sqn},

View file

@ -669,10 +669,14 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
push_to_penciller(Penciller, LedgerCache) -> push_to_penciller(Penciller, LedgerCache) ->
% The push to penciller must start as a tree to correctly de-duplicate % The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading % the list by order before becoming a de-duplicated list for loading
LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache),
push_to_penciller_loop(Penciller, LC0).
push_to_penciller_loop(Penciller, LedgerCache) ->
case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of
returned -> returned ->
timer:sleep(?LOADING_PAUSE), timer:sleep(?LOADING_PAUSE),
push_to_penciller(Penciller, LedgerCache); push_to_penciller_loop(Penciller, LedgerCache);
ok -> ok ->
ok ok
end. end.

View file

@ -9,7 +9,7 @@
%% the Penciller's Clerk %% the Penciller's Clerk
%% - The Penciller can be cloned and maintains a register of clones who have %% - The Penciller can be cloned and maintains a register of clones who have
%% requested snapshots of the Ledger %% requested snapshots of the Ledger
%% - The accepts new dumps (in the form of a leveled_skiplist accomponied by %% - The accepts new dumps (in the form of a leveled_tree accomponied by
%% an array of hash-listing binaries) from the Bookie, and responds either 'ok' %% an array of hash-listing binaries) from the Bookie, and responds either 'ok'
%% to the bookie if the information is accepted nad the Bookie can refresh its %% to the bookie if the information is accepted nad the Bookie can refresh its
%% memory, or 'returned' if the bookie must continue without refreshing as the %% memory, or 'returned' if the bookie must continue without refreshing as the
@ -224,7 +224,7 @@
levelzero_pending = false :: boolean(), levelzero_pending = false :: boolean(),
levelzero_constructor :: pid(), levelzero_constructor :: pid(),
levelzero_cache = [] :: list(), % a list of skiplists levelzero_cache = [] :: list(), % a list of trees
levelzero_size = 0 :: integer(), levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(), levelzero_maxcachesize :: integer(),
levelzero_cointoss = false :: boolean(), levelzero_cointoss = false :: boolean(),
@ -345,9 +345,9 @@ handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
State=#state{is_snapshot=Snap}) when Snap == false -> State=#state{is_snapshot=Snap}) when Snap == false ->
% The push_mem process is as follows: % The push_mem process is as follows:
% %
% 1 - Receive a cache. The cache has four parts: a skiplist of keys and % 1 - Receive a cache. The cache has four parts: a tree of keys and
% values, an array of 256 binaries listing the hashes present in the % values, an array of 256 binaries listing the hashes present in the
% skiplist, a min SQN and a max SQN % tree, a min SQN and a max SQN
% %
% 2 - Check to see if there is a levelzero file pending. If so, the % 2 - Check to see if there is a levelzero file pending. If so, the
% update must be returned. If not the update can be accepted % update must be returned. If not the update can be accepted
@ -404,7 +404,7 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
leveled_pmem:merge_trees(StartKey, leveled_pmem:merge_trees(StartKey,
EndKey, EndKey,
State#state.levelzero_cache, State#state.levelzero_cache,
leveled_skiplist:empty()); leveled_tree:empty());
List -> List ->
List List
end, end,
@ -1072,10 +1072,10 @@ clean_subdir(DirPath) ->
maybe_pause_push(PCL, KL) -> maybe_pause_push(PCL, KL) ->
T0 = leveled_skiplist:empty(true), T0 = [],
I0 = leveled_pmem:new_index(), I0 = leveled_pmem:new_index(),
T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) -> T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) ->
UpdSL = leveled_skiplist:enter(K, V, AccSL), UpdSL = [{K, V}|AccSL],
SQN = leveled_codec:strip_to_seqonly({K, V}), SQN = leveled_codec:strip_to_seqonly({K, V}),
H = leveled_codec:magic_hash(K), H = leveled_codec:magic_hash(K),
UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H), UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H),
@ -1083,7 +1083,10 @@ maybe_pause_push(PCL, KL) ->
end, end,
{T0, I0, infinity, 0}, {T0, I0, infinity, 0},
KL), KL),
case pcl_pushmem(PCL, T1) of SL = element(1, T1),
Tree = leveled_tree:from_orderedlist(lists:ukeysort(1, SL)),
T2 = setelement(1, T1, Tree),
case pcl_pushmem(PCL, T2) of
returned -> returned ->
timer:sleep(50), timer:sleep(50),
maybe_pause_push(PCL, KL); maybe_pause_push(PCL, KL);
@ -1315,63 +1318,63 @@ sqnoverlap_otherway_findnextkey_test() ->
foldwithimm_simple_test() -> foldwithimm_simple_test() ->
QueryArray = [ QueryArray = [
{2, [{{o, "Bucket1", "Key1"}, {5, {active, infinity}, 0, null}}, {2, [{{o, "Bucket1", "Key1", null},
{{o, "Bucket1", "Key5"}, {1, {active, infinity}, 0, null}}]}, {5, {active, infinity}, 0, null}},
{3, [{{o, "Bucket1", "Key3"}, {3, {active, infinity}, 0, null}}]}, {{o, "Bucket1", "Key5", null},
{5, [{{o, "Bucket1", "Key5"}, {2, {active, infinity}, 0, null}}]} {1, {active, infinity}, 0, null}}]},
{3, [{{o, "Bucket1", "Key3", null},
{3, {active, infinity}, 0, null}}]},
{5, [{{o, "Bucket1", "Key5", null},
{2, {active, infinity}, 0, null}}]}
], ],
IMM0 = leveled_skiplist:enter({o, "Bucket1", "Key6"}, KL1A = [{{o, "Bucket1", "Key6", null}, {7, {active, infinity}, 0, null}},
{7, {active, infinity}, 0, null}, {{o, "Bucket1", "Key1", null}, {8, {active, infinity}, 0, null}},
leveled_skiplist:empty()), {{o, "Bucket1", "Key8", null}, {9, {active, infinity}, 0, null}}],
IMM1 = leveled_skiplist:enter({o, "Bucket1", "Key1"}, IMM2 = leveled_tree:from_orderedlist(lists:ukeysort(1, KL1A)),
{8, {active, infinity}, 0, null}, IMMiter = leveled_tree:match_range({o, "Bucket1", "Key1", null},
IMM0), {o, null, null, null},
IMM2 = leveled_skiplist:enter({o, "Bucket1", "Key8"}, IMM2),
{9, {active, infinity}, 0, null},
IMM1),
IMMiter = leveled_skiplist:to_range(IMM2, {o, "Bucket1", "Key1"}),
AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}), AccFun = fun(K, V, Acc) -> SQN = leveled_codec:strip_to_seqonly({K, V}),
Acc ++ [{K, SQN}] end, Acc ++ [{K, SQN}] end,
Acc = keyfolder(IMMiter, Acc = keyfolder(IMMiter,
QueryArray, QueryArray,
{o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, []}),
?assertMatch([{{o, "Bucket1", "Key1"}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3"}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key5"}, 2}, {{o, "Bucket1", "Key5", null}, 2},
{{o, "Bucket1", "Key6"}, 7}], Acc), {{o, "Bucket1", "Key6", null}, 7}], Acc),
IMM1A = leveled_skiplist:enter({o, "Bucket1", "Key1"}, IMMiterA = [{{o, "Bucket1", "Key1", null},
{8, {active, infinity}, 0, null}, {8, {active, infinity}, 0, null}}],
leveled_skiplist:empty()),
IMMiterA = leveled_skiplist:to_range(IMM1A, {o, "Bucket1", "Key1"}),
AccA = keyfolder(IMMiterA, AccA = keyfolder(IMMiterA,
QueryArray, QueryArray,
{o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, []}),
?assertMatch([{{o, "Bucket1", "Key1"}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3"}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key5"}, 2}], AccA), {{o, "Bucket1", "Key5", null}, 2}], AccA),
IMM3 = leveled_skiplist:enter({o, "Bucket1", "Key4"}, KL1B = [{{o, "Bucket1", "Key4", null}, {10, {active, infinity}, 0, null}}|KL1A],
{10, {active, infinity}, 0, null}, IMM3 = leveled_tree:from_orderedlist(lists:ukeysort(1, KL1B)),
IMM2), IMMiterB = leveled_tree:match_range({o, "Bucket1", "Key1", null},
IMMiterB = leveled_skiplist:to_range(IMM3, {o, "Bucket1", "Key1"}), {o, null, null, null},
IMM3),
AccB = keyfolder(IMMiterB, AccB = keyfolder(IMMiterB,
QueryArray, QueryArray,
{o, "Bucket1", "Key1"}, {o, "Bucket1", "Key6"}, {o, "Bucket1", "Key1", null}, {o, "Bucket1", "Key6", null},
{AccFun, []}), {AccFun, []}),
?assertMatch([{{o, "Bucket1", "Key1"}, 8}, ?assertMatch([{{o, "Bucket1", "Key1", null}, 8},
{{o, "Bucket1", "Key3"}, 3}, {{o, "Bucket1", "Key3", null}, 3},
{{o, "Bucket1", "Key4"}, 10}, {{o, "Bucket1", "Key4", null}, 10},
{{o, "Bucket1", "Key5"}, 2}, {{o, "Bucket1", "Key5", null}, 2},
{{o, "Bucket1", "Key6"}, 7}], AccB). {{o, "Bucket1", "Key6", null}, 7}], AccB).
create_file_test() -> create_file_test() ->
Filename = "../test/new_file.sst", Filename = "../test/new_file.sst",
ok = file:write_file(Filename, term_to_binary("hello")), ok = file:write_file(Filename, term_to_binary("hello")),
KVL = lists:usort(generate_randomkeys(10000)), KVL = lists:usort(generate_randomkeys(10000)),
Tree = leveled_skiplist:from_list(KVL), Tree = leveled_tree:from_orderedlist(KVL),
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
{ok, {ok,
SP, SP,

View file

@ -57,7 +57,7 @@ prepare_for_index(IndexArray, Hash) ->
add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
LM1Size = leveled_skiplist:size(LevelMinus1), LM1Size = leveled_tree:tsize(LevelMinus1),
case LM1Size of case LM1Size of
0 -> 0 ->
{LedgerSQN, L0Size, TreeList}; {LedgerSQN, L0Size, TreeList};
@ -99,7 +99,7 @@ to_list(Slots, FetchFun) ->
SlotList = lists:reverse(lists:seq(1, Slots)), SlotList = lists:reverse(lists:seq(1, Slots)),
FullList = lists:foldl(fun(Slot, Acc) -> FullList = lists:foldl(fun(Slot, Acc) ->
Tree = FetchFun(Slot), Tree = FetchFun(Slot),
L = leveled_skiplist:to_list(Tree), L = leveled_tree:to_list(Tree),
lists:ukeymerge(1, Acc, L) lists:ukeymerge(1, Acc, L)
end, end,
[], [],
@ -119,14 +119,14 @@ check_levelzero(Key, Hash, PosList, TreeList) ->
check_slotlist(Key, Hash, PosList, TreeList). check_slotlist(Key, Hash, PosList, TreeList).
merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) -> merge_trees(StartKey, EndKey, TreeList, LevelMinus1) ->
lists:foldl(fun(SkipList, Acc) -> lists:foldl(fun(Tree, Acc) ->
R = leveled_skiplist:to_range(SkipList, R = leveled_tree:match_range(StartKey,
StartKey, EndKey,
EndKey), Tree),
lists:ukeymerge(1, Acc, R) end, lists:ukeymerge(1, Acc, R) end,
[], [],
[LevelMinus1|lists:reverse(SkipListList)]). [LevelMinus1|lists:reverse(TreeList)]).
%%%============================================================================ %%%============================================================================
%%% Internal Functions %%% Internal Functions
@ -148,7 +148,7 @@ split_hash(Hash) ->
H0 = (Hash bsr 8) band 8388607, H0 = (Hash bsr 8) band 8388607,
{Slot, H0}. {Slot, H0}.
check_slotlist(Key, Hash, CheckList, TreeList) -> check_slotlist(Key, _Hash, CheckList, TreeList) ->
SlotCheckFun = SlotCheckFun =
fun(SlotToCheck, {Found, KV}) -> fun(SlotToCheck, {Found, KV}) ->
case Found of case Found of
@ -156,7 +156,7 @@ check_slotlist(Key, Hash, CheckList, TreeList) ->
{Found, KV}; {Found, KV};
false -> false ->
CheckTree = lists:nth(SlotToCheck, TreeList), CheckTree = lists:nth(SlotToCheck, TreeList),
case leveled_skiplist:lookup(Key, Hash, CheckTree) of case leveled_tree:match(Key, CheckTree) of
none -> none ->
{Found, KV}; {Found, KV};
{value, Value} -> {value, Value} ->
@ -188,7 +188,7 @@ generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
[], [],
BucketRangeLow, BucketRangeLow,
BucketRangeHigh), BucketRangeHigh),
leveled_skiplist:from_list(KVL). leveled_tree:from_orderedlist(lists:ukeysort(1, KVL)).
generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) -> generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc; Acc;
@ -223,7 +223,7 @@ compare_method_test() ->
?assertMatch(32000, SQN), ?assertMatch(32000, SQN),
?assertMatch(true, Size =< 32000), ?assertMatch(true, Size =< 32000),
TestList = leveled_skiplist:to_list(generate_randomkeys(1, 2000, 1, 800)), TestList = leveled_tree:to_list(generate_randomkeys(1, 2000, 1, 800)),
FindKeyFun = FindKeyFun =
fun(Key) -> fun(Key) ->
@ -232,7 +232,7 @@ compare_method_test() ->
true -> true ->
{true, KV}; {true, KV};
false -> false ->
L0 = leveled_skiplist:lookup(Key, Tree), L0 = leveled_tree:match(Key, Tree),
case L0 of case L0 of
none -> none ->
{false, not_found}; {false, not_found};
@ -270,19 +270,20 @@ compare_method_test() ->
P = leveled_codec:endkey_passed(EndKey, K), P = leveled_codec:endkey_passed(EndKey, K),
case {K, P} of case {K, P} of
{K, false} when K >= StartKey -> {K, false} when K >= StartKey ->
leveled_skiplist:enter(K, V, Acc); [{K, V}|Acc];
_ -> _ ->
Acc Acc
end end
end, end,
leveled_skiplist:empty(), [],
DumpList), DumpList),
Sz0 = leveled_skiplist:size(Q0), Tree = leveled_tree:from_orderedlist(lists:ukeysort(1, Q0)),
Sz0 = leveled_tree:tsize(Tree),
io:format("Crude method took ~w microseconds resulting in tree of " ++ io:format("Crude method took ~w microseconds resulting in tree of " ++
"size ~w~n", "size ~w~n",
[timer:now_diff(os:timestamp(), SWa), Sz0]), [timer:now_diff(os:timestamp(), SWa), Sz0]),
SWb = os:timestamp(), SWb = os:timestamp(),
Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_skiplist:empty()), Q1 = merge_trees(StartKey, EndKey, TreeList, leveled_tree:empty()),
Sz1 = length(Q1), Sz1 = length(Q1),
io:format("Merge method took ~w microseconds resulting in tree of " ++ io:format("Merge method took ~w microseconds resulting in tree of " ++
"size ~w~n", "size ~w~n",
@ -299,7 +300,7 @@ with_index_test() ->
fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) -> fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) ->
LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500), LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500),
LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1),
LM1SL = leveled_skiplist:from_list(LM1), LM1SL = leveled_tree:from_orderedlist(lists:ukeysort(1, LM1)),
UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1),
R = add_to_cache(L0Size, R = add_to_cache(L0Size,
{LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, {LM1SL, LedgerSQN + 1, LedgerSQN + 2000},