Half-way to ets for Bookie mem

A half-way implementation with use of ETS as the bookie's memory
This commit is contained in:
martinsumner 2017-01-05 17:00:12 +00:00
parent bbdb35ae03
commit e6270d288f
3 changed files with 81 additions and 50 deletions

View file

@ -152,7 +152,8 @@
-define(JOURNAL_SIZE_JITTER, 20). -define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 80000). -define(LONG_RUNNING, 80000).
-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(), -record(ledger_cache, {mem :: ets:tab(),
loader = leveled_skiplist:empty(false) :: tuple(),
min_sqn = infinity :: integer()|infinity, min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}). max_sqn = 0 :: integer()}).
@ -243,22 +244,21 @@ init([Opts]) ->
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
+ erlang:phash2(self()) rem CacheJitter, + erlang:phash2(self()) rem CacheJitter,
NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]), leveled_log:log("B0001", [Inker, Penciller]),
{ok, #state{inker=Inker, {ok, #state{inker=Inker,
penciller=Penciller, penciller=Penciller,
cache_size=CacheSize, cache_size=CacheSize,
ledger_cache=#ledger_cache{}, ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}}; is_snapshot=false}};
Bookie -> Bookie ->
{ok, {ok,
{Penciller, LedgerCache}, {Penciller, LedgerCache},
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
CacheToLoad = {leveled_skiplist:empty(true), 0, 0}, ok = leveled_penciller:pcl_loadsnapshot(Penciller, LedgerCache),
ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad),
leveled_log:log("B0002", [Inker, Penciller]), leveled_log:log("B0002", [Inker, Penciller]),
{ok, #state{penciller=Penciller, {ok, #state{penciller=Penciller,
inker=Inker, inker=Inker,
ledger_cache=LedgerCache,
is_snapshot=true}} is_snapshot=true}}
end. end.
@ -457,16 +457,17 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================ %%%============================================================================
load_snapshot(LedgerSnapshot, LedgerCache) -> load_snapshot(LedgerSnapshot, LedgerCache) ->
CacheToLoad = {LedgerCache#ledger_cache.skiplist, Tab = LedgerCache#ledger_cache.mem,
CacheToLoad = {leveled_skiplist:from_orderedset(Tab),
LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.min_sqn,
LedgerCache#ledger_cache.max_sqn}, LedgerCache#ledger_cache.max_sqn},
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad). ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
empty_ledgercache() -> empty_ledgercache() ->
#ledger_cache{}. #ledger_cache{mem = ets:new(empty, [ordered_set])}.
push_ledgercache(Penciller, Cache) -> push_ledgercache(Penciller, Cache) ->
CacheToLoad = {Cache#ledger_cache.skiplist, CacheToLoad = {Cache#ledger_cache.loader,
Cache#ledger_cache.min_sqn, Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn}, Cache#ledger_cache.max_sqn},
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad). leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
@ -486,7 +487,7 @@ maybe_longrunning(SW, Aspect) ->
end. end.
cache_size(LedgerCache) -> cache_size(LedgerCache) ->
leveled_skiplist:size(LedgerCache#ledger_cache.skiplist). ets:info(LedgerCache#ledger_cache.mem, size).
bucket_stats(State, Bucket, Tag) -> bucket_stats(State, Bucket, Tag) ->
{ok, {ok,
@ -760,25 +761,18 @@ startup(InkerOpts, PencillerOpts) ->
fetch_head(Key, Penciller, LedgerCache) -> fetch_head(Key, Penciller, LedgerCache) ->
SW = os:timestamp(), SW = os:timestamp(),
Hash = leveled_codec:magic_hash(Key), case ets:lookup(LedgerCache#ledger_cache.mem, Key) of
if [{Key, Head}] ->
Hash /= no_lookup -> Head;
L0R = leveled_skiplist:lookup(Key, [] ->
Hash, Hash = leveled_codec:magic_hash(Key),
LedgerCache#ledger_cache.skiplist), case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
case L0R of {Key, Head} ->
{value, Head} -> maybe_longrunning(SW, pcl_head),
maybe_longrunning(SW, local_head),
Head; Head;
none -> not_present ->
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of maybe_longrunning(SW, pcl_head),
{Key, Head} -> not_present
maybe_longrunning(SW, pcl_head),
Head;
not_present ->
maybe_longrunning(SW, pcl_head),
not_present
end
end end
end. end.
@ -940,32 +934,40 @@ preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
addto_ledgercache(Changes, Cache) -> addto_ledgercache(Changes, Cache) ->
FoldChangesFun = FoldChangesFun =
fun({K, V}, Cache0) -> fun({K, V}, Cache0) ->
{SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}), {SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}),
SL0 = Cache0#ledger_cache.skiplist, true = ets:insert(Cache0#ledger_cache.mem, {K, V}),
SL1 = Cache0#ledger_cache{min_sqn=min(SQN, Cache0#ledger_cache.min_sqn),
case Hash of max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)}
no_lookup -> end,
leveled_skiplist:enter_nolookup(K, V, SL0); lists:foldl(FoldChangesFun, Cache, Changes).
_ ->
leveled_skiplist:enter(K, Hash, V, SL0) addto_ledgercache(Changes, Cache, loader) ->
end, FoldChangesFun =
Cache0#ledger_cache{skiplist=SL1, fun({K, V}, Cache0) ->
{SQN, _Hash} = leveled_codec:strip_to_seqnhashonly({K, V}),
SL0 = Cache0#ledger_cache.loader,
SL1 = leveled_skiplist:enter_nolookup(K, V, SL0),
Cache0#ledger_cache{loader = SL1,
min_sqn=min(SQN, Cache0#ledger_cache.min_sqn), min_sqn=min(SQN, Cache0#ledger_cache.min_sqn),
max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)} max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)}
end, end,
lists:foldl(FoldChangesFun, Cache, Changes). lists:foldl(FoldChangesFun, Cache, Changes).
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist), Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
if if
TimeToPush -> TimeToPush ->
CacheToLoad = {Cache#ledger_cache.skiplist, CacheToLoad = {leveled_skiplist:from_orderedset(Tab),
Cache#ledger_cache.min_sqn, Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn}, Cache#ledger_cache.max_sqn},
case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of
ok -> ok ->
{ok, #ledger_cache{}}; true = ets:delete_all_objects(Tab),
Cache0 = #ledger_cache{},
{ok, Cache0#ledger_cache{mem=Tab}};
returned -> returned ->
{returned, Cache} {returned, Cache}
end; end;
@ -1002,12 +1004,18 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
SQN when SQN < MaxSQN -> SQN when SQN < MaxSQN ->
Changes = preparefor_ledgercache(Type, PK, SQN, Changes = preparefor_ledgercache(Type, PK, SQN,
Obj, VSize, IndexSpecs), Obj, VSize, IndexSpecs),
{loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; {loop,
{MinSQN,
MaxSQN,
addto_ledgercache(Changes, OutputTree, loader)}};
MaxSQN -> MaxSQN ->
leveled_log:log("B0006", [SQN]), leveled_log:log("B0006", [SQN]),
Changes = preparefor_ledgercache(Type, PK, SQN, Changes = preparefor_ledgercache(Type, PK, SQN,
Obj, VSize, IndexSpecs), Obj, VSize, IndexSpecs),
{stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; {stop,
{MinSQN,
MaxSQN,
addto_ledgercache(Changes, OutputTree, loader)}};
SQN when SQN > MaxSQN -> SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]), leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0} {stop, Acc0}

View file

@ -322,6 +322,7 @@ init([PCLopts]) ->
SrcPenciller = PCLopts#penciller_options.source_penciller, SrcPenciller = PCLopts#penciller_options.source_penciller,
{ok, State} = pcl_registersnapshot(SrcPenciller, self()), {ok, State} = pcl_registersnapshot(SrcPenciller, self()),
leveled_log:log("P0001", [self()]), leveled_log:log("P0001", [self()]),
io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]),
{ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}};
%% Need to do something about timeout %% Need to do something about timeout
{_RootPath, false} -> {_RootPath, false} ->

View file

@ -89,6 +89,8 @@ from_list(UnsortedKVL, BloomProtect) ->
from_sortedlist(SortedKVL) -> from_sortedlist(SortedKVL) ->
from_sortedlist(SortedKVL, false). from_sortedlist(SortedKVL, false).
from_sortedlist([], BloomProtect) ->
empty(BloomProtect);
from_sortedlist(SortedKVL, BloomProtect) -> from_sortedlist(SortedKVL, BloomProtect) ->
Bloom0 = Bloom0 =
case BloomProtect of case BloomProtect of
@ -111,11 +113,16 @@ lookup(Key, SkipList) ->
end. end.
lookup(Key, Hash, SkipList) -> lookup(Key, Hash, SkipList) ->
case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of case element(1, SkipList) of
false -> list_only ->
none; list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT);
true -> _ ->
list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT) case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of
false ->
none;
true ->
list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT)
end
end. end.
@ -210,7 +217,7 @@ from_list(KVList, SkipWidth, ListHeight) ->
end, end,
from_list(SL0, SkipWidth, ListHeight - 1). from_list(SL0, SkipWidth, ListHeight - 1).
from_list([], 0, SkipList, SkipWidth) -> from_list([], 0, SkipList, _SkipWidth) ->
SkipList; SkipList;
from_list(KVList, L, SkipList, SkipWidth) -> from_list(KVList, L, SkipList, SkipWidth) ->
SubLL = min(SkipWidth, L), SubLL = min(SkipWidth, L),
@ -523,13 +530,28 @@ skiplist_timingtest(KL, SkipList, N, Bloom) ->
[timer:now_diff(os:timestamp(), SWc)]), [timer:now_diff(os:timestamp(), SWc)]),
AltKL1 = generate_randomkeys(1, 2000, 1, 200), AltKL1 = generate_randomkeys(1, 2000, 1, 200),
SWd = os:timestamp(), SWd0 = os:timestamp(),
lists:foreach(fun({K, _V}) -> lists:foreach(fun({K, _V}) ->
lookup(K, SkipList) lookup(K, SkipList)
end, end,
AltKL1), AltKL1),
io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n", io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd)]), [timer:now_diff(os:timestamp(), SWd0)]),
SWd1 = os:timestamp(),
lists:foreach(fun({K, _V}) ->
leveled_codec:magic_hash(K)
end,
AltKL1),
io:format(user, "Generating 2000 magic hashes took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd1)]),
SWd2 = os:timestamp(),
lists:foreach(fun({K, _V}) ->
erlang:phash2(K)
end,
AltKL1),
io:format(user, "Generating 2000 not so magic hashes took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd2)]),
AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300), AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300),
SWe = os:timestamp(), SWe = os:timestamp(),
lists:foreach(fun({K, _V}) -> lists:foreach(fun({K, _V}) ->