From e6270d288f6f05f7b64266599aad1926e7e2930f Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 5 Jan 2017 17:00:12 +0000 Subject: [PATCH] Half-way to ets for Bookie mem A half-way implementation with use of ETS as the bookie's memory --- src/leveled_bookie.erl | 92 +++++++++++++++++++++------------------ src/leveled_penciller.erl | 1 + src/leveled_skiplist.erl | 38 ++++++++++++---- 3 files changed, 81 insertions(+), 50 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index d3c3f1f..6e699fe 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -152,7 +152,8 @@ -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 80000). --record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), +-record(ledger_cache, {mem :: ets:tab(), + loader = leveled_skiplist:empty(false) :: tuple(), min_sqn = infinity :: integer()|infinity, max_sqn = 0 :: integer()}). @@ -243,22 +244,21 @@ init([Opts]) -> CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) + erlang:phash2(self()) rem CacheJitter, + NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache=#ledger_cache{}, + ledger_cache=#ledger_cache{mem = NewETS}, is_snapshot=false}}; Bookie -> {ok, {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), - CacheToLoad = {leveled_skiplist:empty(true), 0, 0}, - ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad), + ok = leveled_penciller:pcl_loadsnapshot(Penciller, LedgerCache), leveled_log:log("B0002", [Inker, Penciller]), {ok, #state{penciller=Penciller, inker=Inker, - ledger_cache=LedgerCache, is_snapshot=true}} end. @@ -457,16 +457,17 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ load_snapshot(LedgerSnapshot, LedgerCache) -> - CacheToLoad = {LedgerCache#ledger_cache.skiplist, + Tab = LedgerCache#ledger_cache.mem, + CacheToLoad = {leveled_skiplist:from_orderedset(Tab), LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.max_sqn}, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). empty_ledgercache() -> - #ledger_cache{}. + #ledger_cache{mem = ets:new(empty, [ordered_set])}. push_ledgercache(Penciller, Cache) -> - CacheToLoad = {Cache#ledger_cache.skiplist, + CacheToLoad = {Cache#ledger_cache.loader, Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). @@ -486,7 +487,7 @@ maybe_longrunning(SW, Aspect) -> end. cache_size(LedgerCache) -> - leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). + ets:info(LedgerCache#ledger_cache.mem, size). bucket_stats(State, Bucket, Tag) -> {ok, @@ -760,25 +761,18 @@ startup(InkerOpts, PencillerOpts) -> fetch_head(Key, Penciller, LedgerCache) -> SW = os:timestamp(), - Hash = leveled_codec:magic_hash(Key), - if - Hash /= no_lookup -> - L0R = leveled_skiplist:lookup(Key, - Hash, - LedgerCache#ledger_cache.skiplist), - case L0R of - {value, Head} -> - maybe_longrunning(SW, local_head), + case ets:lookup(LedgerCache#ledger_cache.mem, Key) of + [{Key, Head}] -> + Head; + [] -> + Hash = leveled_codec:magic_hash(Key), + case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of + {Key, Head} -> + maybe_longrunning(SW, pcl_head), Head; - none -> - case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of - {Key, Head} -> - maybe_longrunning(SW, pcl_head), - Head; - not_present -> - maybe_longrunning(SW, pcl_head), - not_present - end + not_present -> + maybe_longrunning(SW, pcl_head), + not_present end end. @@ -940,32 +934,40 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) -> addto_ledgercache(Changes, Cache) -> FoldChangesFun = fun({K, V}, Cache0) -> - {SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), - SL0 = Cache0#ledger_cache.skiplist, - SL1 = - case Hash of - no_lookup -> - leveled_skiplist:enter_nolookup(K, V, SL0); - _ -> - leveled_skiplist:enter(K, Hash, V, SL0) - end, - Cache0#ledger_cache{skiplist=SL1, + {SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), + true = ets:insert(Cache0#ledger_cache.mem, {K, V}), + Cache0#ledger_cache{min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), + max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} + end, + lists:foldl(FoldChangesFun, Cache, Changes). + +addto_ledgercache(Changes, Cache, loader) -> + FoldChangesFun = + fun({K, V}, Cache0) -> + {SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), + SL0 = Cache0#ledger_cache.loader, + SL1 = leveled_skiplist:enter_nolookup(K, V, SL0), + Cache0#ledger_cache{loader = SL1, min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} end, lists:foldl(FoldChangesFun, Cache, Changes). + maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist), + Tab = Cache#ledger_cache.mem, + CacheSize = ets:info(Tab, size), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), if TimeToPush -> - CacheToLoad = {Cache#ledger_cache.skiplist, + CacheToLoad = {leveled_skiplist:from_orderedset(Tab), Cache#ledger_cache.min_sqn, Cache#ledger_cache.max_sqn}, case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of ok -> - {ok, #ledger_cache{}}; + true = ets:delete_all_objects(Tab), + Cache0 = #ledger_cache{}, + {ok, Cache0#ledger_cache{mem=Tab}}; returned -> {returned, Cache} end; @@ -1002,12 +1004,18 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> SQN when SQN < MaxSQN -> Changes = preparefor_ledgercache(Type, PK, SQN, Obj, VSize, IndexSpecs), - {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; + {loop, + {MinSQN, + MaxSQN, + addto_ledgercache(Changes, OutputTree, loader)}}; MaxSQN -> leveled_log:log("B0006", [SQN]), Changes = preparefor_ledgercache(Type, PK, SQN, Obj, VSize, IndexSpecs), - {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; + {stop, + {MinSQN, + MaxSQN, + addto_ledgercache(Changes, OutputTree, loader)}}; SQN when SQN > MaxSQN -> leveled_log:log("B0007", [MaxSQN, SQN]), {stop, Acc0} diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 853498b..d087744 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -322,6 +322,7 @@ init([PCLopts]) -> SrcPenciller = PCLopts#penciller_options.source_penciller, {ok, State} = pcl_registersnapshot(SrcPenciller, self()), leveled_log:log("P0001", [self()]), + io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]), {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; %% Need to do something about timeout {_RootPath, false} -> diff --git a/src/leveled_skiplist.erl b/src/leveled_skiplist.erl index 8870609..b79d050 100644 --- a/src/leveled_skiplist.erl +++ b/src/leveled_skiplist.erl @@ -89,6 +89,8 @@ from_list(UnsortedKVL, BloomProtect) -> from_sortedlist(SortedKVL) -> from_sortedlist(SortedKVL, false). +from_sortedlist([], BloomProtect) -> + empty(BloomProtect); from_sortedlist(SortedKVL, BloomProtect) -> Bloom0 = case BloomProtect of @@ -111,11 +113,16 @@ lookup(Key, SkipList) -> end. lookup(Key, Hash, SkipList) -> - case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of - false -> - none; - true -> - list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) + case element(1, SkipList) of + list_only -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT); + _ -> + case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of + false -> + none; + true -> + list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) + end end. @@ -210,7 +217,7 @@ from_list(KVList, SkipWidth, ListHeight) -> end, from_list(SL0, SkipWidth, ListHeight - 1). -from_list([], 0, SkipList, SkipWidth) -> +from_list([], 0, SkipList, _SkipWidth) -> SkipList; from_list(KVList, L, SkipList, SkipWidth) -> SubLL = min(SkipWidth, L), @@ -523,13 +530,28 @@ skiplist_timingtest(KL, SkipList, N, Bloom) -> [timer:now_diff(os:timestamp(), SWc)]), AltKL1 = generate_randomkeys(1, 2000, 1, 200), - SWd = os:timestamp(), + SWd0 = os:timestamp(), lists:foreach(fun({K, _V}) -> lookup(K, SkipList) end, AltKL1), io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWd)]), + [timer:now_diff(os:timestamp(), SWd0)]), + SWd1 = os:timestamp(), + lists:foreach(fun({K, _V}) -> + leveled_codec:magic_hash(K) + end, + AltKL1), + io:format(user, "Generating 2000 magic hashes took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWd1)]), + SWd2 = os:timestamp(), + lists:foreach(fun({K, _V}) -> + erlang:phash2(K) + end, + AltKL1), + io:format(user, "Generating 2000 not so magic hashes took ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWd2)]), + AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300), SWe = os:timestamp(), lists:foreach(fun({K, _V}) ->