Merge pull request #15 from martinsumner/mas-altmem

Mas altmem
This commit is contained in:
martinsumner 2017-01-06 10:30:03 +00:00 committed by GitHub
commit b2bb4ce73e
6 changed files with 325 additions and 216 deletions

View file

@ -152,7 +152,9 @@
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 80000).
-record(ledger_cache, {skiplist = leveled_skiplist:empty(true) :: tuple(),
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_skiplist:empty(false) :: tuple(),
index = leveled_pmem:new_index(), % array
min_sqn = infinity :: integer()|infinity,
max_sqn = 0 :: integer()}).
@ -243,22 +245,21 @@ init([Opts]) ->
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
+ erlang:phash2(self()) rem CacheJitter,
NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]),
{ok, #state{inker=Inker,
penciller=Penciller,
cache_size=CacheSize,
ledger_cache=#ledger_cache{},
ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}};
Bookie ->
{ok,
{Penciller, LedgerCache},
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
CacheToLoad = {leveled_skiplist:empty(true), 0, 0},
ok = leveled_penciller:pcl_loadsnapshot(Penciller, CacheToLoad),
ok = load_snapshot(Penciller, LedgerCache),
leveled_log:log("B0002", [Inker, Penciller]),
{ok, #state{penciller=Penciller,
inker=Inker,
ledger_cache=LedgerCache,
is_snapshot=true}}
end.
@ -457,18 +458,20 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================
load_snapshot(LedgerSnapshot, LedgerCache) ->
CacheToLoad = {LedgerCache#ledger_cache.skiplist,
CacheToLoad = {LedgerCache#ledger_cache.loader,
LedgerCache#ledger_cache.index,
LedgerCache#ledger_cache.min_sqn,
LedgerCache#ledger_cache.max_sqn},
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, CacheToLoad).
empty_ledgercache() ->
#ledger_cache{}.
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
push_ledgercache(Penciller, Cache) ->
CacheToLoad = {Cache#ledger_cache.skiplist,
Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn},
CacheToLoad = {Cache#ledger_cache.loader,
Cache#ledger_cache.index,
Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn},
leveled_penciller:pcl_pushmem(Penciller, CacheToLoad).
%%%============================================================================
@ -486,7 +489,7 @@ maybe_longrunning(SW, Aspect) ->
end.
cache_size(LedgerCache) ->
leveled_skiplist:size(LedgerCache#ledger_cache.skiplist).
ets:info(LedgerCache#ledger_cache.mem, size).
bucket_stats(State, Bucket, Tag) ->
{ok,
@ -703,18 +706,25 @@ snapshot_store(State, SnapType) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=State#state.penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
LedgerCache = readycache_forsnapshot(State#state.ledger_cache),
case SnapType of
store ->
InkerOpts = #inker_options{start_snapshot=true,
source_inker=State#state.inker},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{ok, {LedgerSnapshot, State#state.ledger_cache},
JournalSnapshot};
{ok, {LedgerSnapshot, LedgerCache}, JournalSnapshot};
ledger ->
{ok, {LedgerSnapshot, State#state.ledger_cache},
null}
{ok, {LedgerSnapshot, LedgerCache}, null}
end.
readycache_forsnapshot(LedgerCache) ->
% Need to convert the Ledger Cache away from using the ETS table
SkipList = leveled_skiplist:from_orderedset(LedgerCache#ledger_cache.mem),
Idx = LedgerCache#ledger_cache.index,
MinSQN = LedgerCache#ledger_cache.min_sqn,
MaxSQN = LedgerCache#ledger_cache.max_sqn,
#ledger_cache{loader=SkipList, index=Idx, min_sqn=MinSQN, max_sqn=MaxSQN}.
set_options(Opts) ->
MaxJournalSize0 = get_opt(max_journalsize, Opts, 10000000000),
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
@ -760,25 +770,25 @@ startup(InkerOpts, PencillerOpts) ->
fetch_head(Key, Penciller, LedgerCache) ->
SW = os:timestamp(),
Hash = leveled_codec:magic_hash(Key),
if
Hash /= no_lookup ->
L0R = leveled_skiplist:lookup(Key,
Hash,
LedgerCache#ledger_cache.skiplist),
case L0R of
{value, Head} ->
maybe_longrunning(SW, local_head),
CacheResult =
case LedgerCache#ledger_cache.mem of
undefined ->
[];
Tab ->
ets:lookup(Tab, Key)
end,
case CacheResult of
[{Key, Head}] ->
Head;
[] ->
Hash = leveled_codec:magic_hash(Key),
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
{Key, Head} ->
maybe_longrunning(SW, pcl_head),
Head;
none ->
case leveled_penciller:pcl_fetch(Penciller, Key, Hash) of
{Key, Head} ->
maybe_longrunning(SW, pcl_head),
Head;
not_present ->
maybe_longrunning(SW, pcl_head),
not_present
end
not_present ->
maybe_longrunning(SW, pcl_head),
not_present
end
end.
@ -923,49 +933,61 @@ accumulate_index(TermRe, AddFun, FoldKeysFun) ->
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IndexSpecs, TTL}) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL);
KeyChanges = leveled_codec:convert_indexspecs(IndexSpecs,
Bucket,
Key,
SQN,
TTL),
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, {IndexSpecs, TTL}) ->
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey,
SQN,
Obj,
Size,
TTL),
[PrimaryChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
Bucket,
Key,
SQN,
TTL).
{Bucket, Key, ObjKeyChange, H} = leveled_codec:generate_ledgerkv(LedgerKey,
SQN,
Obj,
Size,
TTL),
KeyChanges = [ObjKeyChange] ++ leveled_codec:convert_indexspecs(IndexSpecs,
Bucket,
Key,
SQN,
TTL),
{H, SQN, KeyChanges}.
addto_ledgercache(Changes, Cache) ->
addto_ledgercache({H, SQN, KeyChanges}, Cache) ->
ets:insert(Cache#ledger_cache.mem, KeyChanges),
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
Cache#ledger_cache{index = UpdIndex,
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
FoldChangesFun =
fun({K, V}, Cache0) ->
{SQN, Hash} = leveled_codec:strip_to_seqnhashonly({K, V}),
SL0 = Cache0#ledger_cache.skiplist,
SL1 =
case Hash of
no_lookup ->
leveled_skiplist:enter_nolookup(K, V, SL0);
_ ->
leveled_skiplist:enter(K, Hash, V, SL0)
end,
Cache0#ledger_cache{skiplist=SL1,
min_sqn=min(SQN, Cache0#ledger_cache.min_sqn),
max_sqn=max(SQN, Cache0#ledger_cache.max_sqn)}
end,
lists:foldl(FoldChangesFun, Cache, Changes).
fun({K, V}, SL0) ->
leveled_skiplist:enter_nolookup(K, V, SL0)
end,
UpdSL = lists:foldl(FoldChangesFun, Cache#ledger_cache.loader, KeyChanges),
UpdIndex = leveled_pmem:prepare_for_index(Cache#ledger_cache.index, H),
Cache#ledger_cache{index = UpdIndex,
loader = UpdSL,
min_sqn=min(SQN, Cache#ledger_cache.min_sqn),
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
CacheSize = leveled_skiplist:size(Cache#ledger_cache.skiplist),
Tab = Cache#ledger_cache.mem,
CacheSize = ets:info(Tab, size),
TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize),
if
TimeToPush ->
CacheToLoad = {Cache#ledger_cache.skiplist,
CacheToLoad = {leveled_skiplist:from_orderedset(Tab),
Cache#ledger_cache.index,
Cache#ledger_cache.min_sqn,
Cache#ledger_cache.max_sqn},
case leveled_penciller:pcl_pushmem(Penciller, CacheToLoad) of
ok ->
{ok, #ledger_cache{}};
Cache0 = #ledger_cache{},
true = ets:delete_all_objects(Tab),
{ok, Cache0#ledger_cache{mem=Tab}};
returned ->
{returned, Cache}
end;
@ -1002,12 +1024,18 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
SQN when SQN < MaxSQN ->
Changes = preparefor_ledgercache(Type, PK, SQN,
Obj, VSize, IndexSpecs),
{loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
{loop,
{MinSQN,
MaxSQN,
addto_ledgercache(Changes, OutputTree, loader)}};
MaxSQN ->
leveled_log:log("B0006", [SQN]),
Changes = preparefor_ledgercache(Type, PK, SQN,
Obj, VSize, IndexSpecs),
{stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
{stop,
{MinSQN,
MaxSQN,
addto_ledgercache(Changes, OutputTree, loader)}};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0}

View file

@ -314,11 +314,12 @@ generate_ledgerkv(PrimaryKey, SQN, Obj, Size, TS) ->
_ ->
{active, TS}
end,
Hash = magic_hash(PrimaryKey),
Value = {SQN,
Status,
magic_hash(PrimaryKey),
Hash,
extract_metadata(Obj, Size, Tag)},
{Bucket, Key, {PrimaryKey, Value}}.
{Bucket, Key, {PrimaryKey, Value}, Hash}.
integer_now() ->

View file

@ -4,14 +4,17 @@
%% persisted, ordered view of non-recent Keys and Metadata which have been
%% added to the store.
%% - The penciller maintains a manifest of all the files within the current
%% Ledger.
%% Ledger.
%% - The Penciller provides re-write (compaction) work up to be managed by
%% the Penciller's Clerk
%% - The Penciller can be cloned and maintains a register of clones who have
%% requested snapshots of the Ledger
%% - The accepts new dumps (in the form of a gb_tree) from the Bookie, and
%% calls the Bookie once the process of pencilling this data in the Ledger is
%% complete - and the Bookie is free to forget about the data
%% - The accepts new dumps (in the form of a leveled_skiplist accomponied by
%% an array of hash-listing binaries) from the Bookie, and responds either 'ok'
%% to the bookie if the information is accepted nad the Bookie can refresh its
%% memory, or 'returned' if the bookie must continue without refreshing as the
%% Penciller is not currently able to accept the update (potentially due to a
%% backlog of compaction work)
%% - The Penciller's persistence of the ledger may not be reliable, in that it
%% may lose data but only in sequence from a particular sequence number. On
%% startup the Penciller will inform the Bookie of the highest sequence number
@ -21,14 +24,14 @@
%% -------- LEDGER ---------
%%
%% The Ledger is divided into many levels
%% - L0: New keys are received from the Bookie and merged into a single
%% gb_tree, until that tree is the size of a SST file, and it is then persisted
%% - L0: New keys are received from the Bookie and and kept in the levelzero
%% cache, until that cache is the size of a SST file, and it is then persisted
%% as a SST file at this level. L0 SST files can be larger than the normal
%% maximum size - so we don't have to consider problems of either having more
%% than one L0 file (and handling what happens on a crash between writing the
%% files when the second may have overlapping sequence numbers), or having a
%% remainder with overlapping in sequence numbers in memory after the file is
%% written. Once the persistence is completed, the L0 tree can be erased.
%% written. Once the persistence is completed, the L0 cache can be erased.
%% There can be only one SST file at Level 0, so the work to merge that file
%% to the lower level must be the highest priority, as otherwise writes to the
%% ledger will stall, when there is next a need to persist.
@ -64,10 +67,10 @@
%%
%% The Penciller must support the PUSH of a dump of keys from the Bookie. The
%% call to PUSH should be immediately acknowledged, and then work should be
%% completed to merge the tree into the L0 tree.
%% completed to merge the cache update into the L0 cache.
%%
%% The Penciller MUST NOT accept a new PUSH if the Clerk has commenced the
%% conversion of the current L0 tree into a SST file, but not completed this
%% conversion of the current L0 cache into a SST file, but not completed this
%% change. The Penciller in this case returns the push, and the Bookie should
%% continue to grow the cache before trying again.
%%
@ -220,7 +223,7 @@
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(),
levelzero_cointoss = false :: boolean(),
levelzero_index, % may be none or an ETS table reference
levelzero_index, % An array
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
@ -322,6 +325,7 @@ init([PCLopts]) ->
SrcPenciller = PCLopts#penciller_options.source_penciller,
{ok, State} = pcl_registersnapshot(SrcPenciller, self()),
leveled_log:log("P0001", [self()]),
io:format("Snapshot ledger sqn at ~w~n", [State#state.ledger_sqn]),
{ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}};
%% Need to do something about timeout
{_RootPath, false} ->
@ -329,14 +333,14 @@ init([PCLopts]) ->
end.
handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
From,
State=#state{is_snapshot=Snap}) when Snap == false ->
% The push_mem process is as follows:
%
% 1 - Receive a gb_tree containing the latest Key/Value pairs (note that
% we mean value from the perspective of the Ledger, not the full value
% stored in the Inker)
% 1 - Receive a cache. The cache has four parts: a skiplist of keys and
% values, an array of 256 binaries listing the hashes present in the
% skiplist, a min SQN and a max SQN
%
% 2 - Check to see if there is a levelzero file pending. If so, the
% update must be returned. If not the update can be accepted
@ -346,10 +350,10 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
%
% 4 - Update the cache:
% a) Append the cache to the list
% b) Add hashes for all the elements to the index
% b) Add each of the 256 hash-listing binaries to the master L0 index array
%
% Check the approximate size of the cache. If it is over the maximum size,
% trigger a backgroun L0 file write and update state of levelzero_pending.
% trigger a background L0 file write and update state of levelzero_pending.
case State#state.levelzero_pending or State#state.work_backlog of
true ->
leveled_log:log("P0018", [returned,
@ -359,11 +363,12 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
false ->
leveled_log:log("P0018", [ok, false, false]),
gen_server:reply(From, ok),
{noreply, update_levelzero(State#state.levelzero_size,
{PushedTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache,
State)}
{noreply,
update_levelzero(State#state.levelzero_size,
{PushedTree, PushedIdx, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache,
State)}
end;
handle_call({fetch, Key, Hash}, _From, State) ->
{R, HeadTimer} = timed_fetch_mem(Key,
@ -410,17 +415,22 @@ handle_call(work_for_clerk, From, State) ->
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots],
Rs = [{Snapshot,
State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {ok, State}, State#state{registered_snapshots = Rs}};
handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) ->
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size,
{BookieIncrTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
{LedgerSQN, L0Size, L0Cache} = L0D,
L0Index = leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_index,
length(L0Cache)),
{reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size,
levelzero_index=none,
levelzero_index=L0Index,
ledger_sqn=LedgerSQN,
snapshot_fully_loaded=true}};
handle_call({fetch_levelzero, Slot}, _From, State) ->
@ -466,9 +476,10 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
filename=FN},
UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}),
% Prompt clerk to ask about work - do this for every L0 roll
leveled_pmem:clear_index(State#state.levelzero_index),
UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index),
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
{noreply, State#state{levelzero_cache=[],
levelzero_index=UpdIndex,
levelzero_pending=false,
levelzero_constructor=undefined,
levelzero_size=0,
@ -642,20 +653,23 @@ start_from_file(PCLopts) ->
update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
LedgerSQN, L0Cache, State) ->
SW = os:timestamp(),
Update = leveled_pmem:add_to_cache(L0Size,
{PushedTree, MinSQN, MaxSQN},
LedgerSQN,
L0Cache),
leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index),
UpdL0Index = leveled_pmem:add_to_index(PushedIdx,
State#state.levelzero_index,
length(L0Cache) + 1),
{UpdMaxSQN, NewL0Size, UpdL0Cache} = Update,
if
UpdMaxSQN >= LedgerSQN ->
UpdState = State#state{levelzero_cache=UpdL0Cache,
levelzero_size=NewL0Size,
levelzero_index=UpdL0Index,
ledger_sqn=UpdMaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE,
@ -740,20 +754,14 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
PosList = leveled_pmem:check_index(Hash, L0Index),
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
case L0Check of
{false, not_found} ->
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
{true, KV} ->
{KV, 0}
end;
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
case leveled_pmem:check_index(Hash, L0Index) of
true ->
fetch_mem(Key, Hash, Manifest, L0Cache, none);
false ->
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3)
end.
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
@ -1373,12 +1381,15 @@ confirm_delete_test() ->
maybe_pause_push(PCL, KL) ->
T0 = leveled_skiplist:empty(true),
T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) ->
SL = leveled_skiplist:enter(K, V, AccSL),
I0 = leveled_pmem:new_index(),
T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) ->
UpdSL = leveled_skiplist:enter(K, V, AccSL),
SQN = leveled_codec:strip_to_seqonly({K, V}),
{SL, min(SQN, MinSQN), max(SQN, MaxSQN)}
H = leveled_codec:magic_hash(K),
UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H),
{UpdSL, UpdIdx, min(SQN, MinSQN), max(SQN, MaxSQN)}
end,
{T0, infinity, 0},
{T0, I0, infinity, 0},
KL),
case pcl_pushmem(PCL, T1) of
returned ->

View file

@ -19,34 +19,23 @@
%% used to either point lookups at the right tree in the list, or inform the
%% requestor it is not present avoiding any lookups.
%%
%% Tests show this takes one third of the time at push (when compared to
%% merging to a single tree), and is an order of magnitude more efficient as
%% the tree reaches peak size. It is also an order of magnitude more
%% efficient to use the hash index when compared to looking through all the
%% trees.
%%
%% Total time for single_tree 217000 microseconds
%% Total time for array_tree 209000 microseconds
%% Total time for array_list 142000 microseconds
%% Total time for array_filter 69000 microseconds
%% List of 2000 checked without array - success count of 90 in 36000 microsecs
%% List of 2000 checked with array - success count of 90 in 1000 microsecs
%%
%% The trade-off taken with the approach is that the size of the L0Cache is
%% uncertain. The Size count is incremented if the hash is not already
%% present, so the size may be lower than the actual size due to hash
%% collisions
%% uncertain. The Size count is incremented based on the inbound size and so
%% does not necessarily reflect the size once the lists are merged (reflecting
%% rotating objects)
-module(leveled_pmem).
-include("include/leveled.hrl").
-export([
prepare_for_index/2,
add_to_cache/4,
to_list/2,
check_levelzero/3,
check_levelzero/4,
merge_trees/4,
add_to_index/2,
add_to_index/3,
new_index/0,
clear_index/1,
check_index/2
@ -59,6 +48,14 @@
%%% API
%%%============================================================================
prepare_for_index(IndexArray, no_lookup) ->
IndexArray;
prepare_for_index(IndexArray, Hash) ->
{Slot, H0} = split_hash(Hash),
Bin = array:get(Slot, IndexArray),
array:set(Slot, <<Bin/binary, 1:1/integer, H0:23/integer>>, IndexArray).
add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
LM1Size = leveled_skiplist:size(LevelMinus1),
case LM1Size of
@ -73,32 +70,29 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) ->
end
end.
add_to_index(LevelMinus1, L0Index) ->
add_to_index(LM1Array, L0Index, CacheSlot) when CacheSlot < 128 ->
IndexAddFun =
fun({_K, V}) ->
{_, _, Hash, _} = leveled_codec:striphead_to_details(V),
case Hash of
no_lookup ->
ok;
_ ->
ets:insert(L0Index, {Hash})
end
end,
lists:foreach(IndexAddFun, leveled_skiplist:to_list(LevelMinus1)).
fun(Slot, Acc) ->
Bin0 = array:get(Slot, Acc),
BinLM1 = array:get(Slot, LM1Array),
array:set(Slot,
<<Bin0/binary,
0:1/integer, CacheSlot:7/integer,
BinLM1/binary>>,
Acc)
end,
lists:foldl(IndexAddFun, L0Index, lists:seq(0, 255)).
new_index() ->
ets:new(l0index, [private, set]).
array:new([{size, 256}, {default, <<>>}]).
clear_index(L0Index) ->
ets:delete_all_objects(L0Index).
clear_index(_L0Index) ->
new_index().
check_index(Hash, L0Index) ->
case ets:lookup(L0Index, Hash) of
[{Hash}] ->
true;
[] ->
false
end.
{Slot, H0} = split_hash(Hash),
Bin = array:get(Slot, L0Index),
find_pos(Bin, H0, [], 0).
to_list(Slots, FetchFun) ->
SW = os:timestamp(),
@ -114,13 +108,15 @@ to_list(Slots, FetchFun) ->
FullList.
check_levelzero(Key, TreeList) ->
check_levelzero(Key, leveled_codec:magic_hash(Key), TreeList).
check_levelzero(Key, PosList, TreeList) ->
check_levelzero(Key, leveled_codec:magic_hash(Key), PosList, TreeList).
check_levelzero(_Key, _Hash, []) ->
check_levelzero(_Key, _Hash, _PosList, []) ->
{false, not_found};
check_levelzero(Key, Hash, TreeList) ->
check_slotlist(Key, Hash, lists:seq(1, length(TreeList)), TreeList).
check_levelzero(_Key, _Hash, [], _TreeList) ->
{false, not_found};
check_levelzero(Key, Hash, PosList, TreeList) ->
check_slotlist(Key, Hash, PosList, TreeList).
merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) ->
@ -136,6 +132,22 @@ merge_trees(StartKey, EndKey, SkipListList, LevelMinus1) ->
%%% Internal Functions
%%%============================================================================
find_pos(<<>>, _Hash, PosList, _SlotID) ->
PosList;
find_pos(<<1:1/integer, Hash:23/integer, T/binary>>, Hash, PosList, SlotID) ->
find_pos(T, Hash, PosList ++ [SlotID], SlotID);
find_pos(<<1:1/integer, _Miss:23/integer, T/binary>>, Hash, PosList, SlotID) ->
find_pos(T, Hash, PosList, SlotID);
find_pos(<<0:1/integer, NxtSlot:7/integer, T/binary>>, Hash, PosList, _SlotID) ->
find_pos(T, Hash, PosList, NxtSlot).
split_hash(Hash) ->
Slot = Hash band 255,
H0 = (Hash bsr 8) band 8388607,
{Slot, H0}.
check_slotlist(Key, Hash, CheckList, TreeList) ->
SlotCheckFun =
fun(SlotToCheck, {Found, KV}) ->
@ -162,12 +174,21 @@ check_slotlist(Key, Hash, CheckList, TreeList) ->
-ifdef(TEST).
generate_randomkeys_aslist(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
lists:ukeysort(1,
generate_randomkeys(Seqn,
Count,
[],
BucketRangeLow,
BucketRangeHigh)).
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn,
Count,
leveled_skiplist:empty(true),
BucketRangeLow,
BucketRangeHigh).
KVL = generate_randomkeys(Seqn,
Count,
[],
BucketRangeLow,
BucketRangeHigh),
leveled_skiplist:from_list(KVL).
generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
Acc;
@ -179,7 +200,7 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
{Seqn, {active, infinity}, null}},
generate_randomkeys(Seqn + 1,
Count - 1,
leveled_skiplist:enter(K, V, Acc),
[{K, V}|Acc],
BucketLow,
BRange).
@ -230,8 +251,9 @@ compare_method_test() ->
[],
TestList),
PosList = lists:seq(1, length(TreeList)),
S1 = lists:foldl(fun({Key, _V}, Acc) ->
R0 = check_levelzero(Key, TreeList),
R0 = check_levelzero(Key, PosList, TreeList),
[R0|Acc]
end,
[],
@ -267,6 +289,41 @@ compare_method_test() ->
[timer:now_diff(os:timestamp(), SWb), Sz1]),
?assertMatch(Sz0, Sz1).
with_index_test() ->
IndexPrepareFun =
fun({K, _V}, Acc) ->
H = leveled_codec:magic_hash(K),
prepare_for_index(Acc, H)
end,
LoadFun =
fun(_X, {{LedgerSQN, L0Size, L0TreeList}, L0Idx, SrcList}) ->
LM1 = generate_randomkeys_aslist(LedgerSQN + 1, 2000, 1, 500),
LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1),
LM1SL = leveled_skiplist:from_list(LM1),
UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1),
R = add_to_cache(L0Size,
{LM1SL, LedgerSQN + 1, LedgerSQN + 2000},
LedgerSQN,
L0TreeList),
{R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)}
end,
R0 = lists:foldl(LoadFun, {{0, 0, []}, new_index(), []}, lists:seq(1, 16)),
{{SQN, Size, TreeList}, L0Index, SrcKVL} = R0,
?assertMatch(32000, SQN),
?assertMatch(true, Size =< 32000),
CheckFun =
fun({K, V}, {L0Idx, L0Cache}) ->
H = leveled_codec:magic_hash(K),
PosList = check_index(H, L0Idx),
?assertMatch({true, {K, V}},
check_slotlist(K, H, PosList, L0Cache)),
{L0Idx, L0Cache}
end,
_R1 = lists:foldl(CheckFun, {L0Index, TreeList}, SrcKVL).
-endif.

View file

@ -20,6 +20,8 @@
from_list/2,
from_sortedlist/1,
from_sortedlist/2,
from_orderedset/1,
from_orderedset/2,
to_list/1,
enter/3,
enter/4,
@ -71,6 +73,12 @@ enter_nolookup(Key, Value, SkipList) ->
element(2, SkipList),
?SKIP_WIDTH, ?LIST_HEIGHT)}.
from_orderedset(Table) ->
from_orderedset(Table, false).
from_orderedset(Table, Bloom) ->
from_sortedlist(ets:tab2list(Table), Bloom).
from_list(UnsortedKVL) ->
from_list(UnsortedKVL, false).
@ -81,6 +89,8 @@ from_list(UnsortedKVL, BloomProtect) ->
from_sortedlist(SortedKVL) ->
from_sortedlist(SortedKVL, false).
from_sortedlist([], BloomProtect) ->
empty(BloomProtect);
from_sortedlist(SortedKVL, BloomProtect) ->
Bloom0 =
case BloomProtect of
@ -103,11 +113,16 @@ lookup(Key, SkipList) ->
end.
lookup(Key, Hash, SkipList) ->
case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of
false ->
none;
true ->
list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT)
case element(1, SkipList) of
list_only ->
list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT);
_ ->
case leveled_tinybloom:check({hash, Hash}, element(1, SkipList)) of
false ->
none;
true ->
list_lookup(Key, element(2, SkipList), ?LIST_HEIGHT)
end
end.
@ -188,53 +203,27 @@ enter(Key, Value, Hash, SkipList, Width, Level) ->
{MarkerKey, UpdSubSkipList})
end.
from_list(KVL, Width, 1) ->
Slots = length(KVL) div Width,
SkipList0 = lists:map(fun(X) ->
N = X * Width,
{K, _V} = lists:nth(N, KVL),
{K, lists:sublist(KVL,
N - Width + 1,
Width)}
end,
lists:seq(1, length(KVL) div Width)),
case Slots * Width < length(KVL) of
true ->
{LastK, _V} = lists:last(KVL),
SkipList0 ++ [{LastK, lists:nthtail(Slots * Width, KVL)}];
false ->
SkipList0
end;
from_list(KVL, Width, Level) ->
SkipWidth = width(Level, Width),
LoftSlots = length(KVL) div SkipWidth,
case LoftSlots of
0 ->
{K, _V} = lists:last(KVL),
[{K, from_list(KVL, Width, Level - 1)}];
_ ->
SkipList0 =
lists:map(fun(X) ->
N = X * SkipWidth,
{K, _V} = lists:nth(N, KVL),
SL = lists:sublist(KVL,
N - SkipWidth + 1,
SkipWidth),
{K, from_list(SL, Width, Level - 1)}
end,
lists:seq(1, LoftSlots)),
case LoftSlots * SkipWidth < length(KVL) of
true ->
{LastK, _V} = lists:last(KVL),
TailList = lists:nthtail(LoftSlots * SkipWidth, KVL),
SkipList0 ++ [{LastK, from_list(TailList,
Width,
Level - 1)}];
false ->
SkipList0
end
end.
from_list(SkipList, _SkipWidth, 0) ->
SkipList;
from_list(KVList, SkipWidth, ListHeight) ->
L0 = length(KVList),
SL0 =
case L0 > SkipWidth of
true ->
from_list(KVList, L0, [], SkipWidth);
false ->
{LastK, _LastSL} = lists:last(KVList),
[{LastK, KVList}]
end,
from_list(SL0, SkipWidth, ListHeight - 1).
from_list([], 0, SkipList, _SkipWidth) ->
SkipList;
from_list(KVList, L, SkipList, SkipWidth) ->
SubLL = min(SkipWidth, L),
{Head, Tail} = lists:split(SubLL, KVList),
{LastK, _LastV} = lists:last(Head),
from_list(Tail, L - SubLL, SkipList ++ [{LastK, Head}], SkipWidth).
list_lookup(Key, SkipList, 1) ->
@ -431,7 +420,15 @@ skiplist_nobloom_test() ->
skiplist_tester(Bloom) ->
N = 4000,
KL = generate_randomkeys(1, N, 1, N div 5),
OS = ets:new(test, [ordered_set, private]),
ets:insert(OS, KL),
SWaETS = os:timestamp(),
SkipList = from_orderedset(OS, Bloom),
io:format(user, "Generating skip list with ~w keys in ~w microseconds " ++
"from ordered set~n",
[N, timer:now_diff(os:timestamp(), SWaETS)]),
SWaGSL = os:timestamp(),
SkipList = from_list(lists:reverse(KL), Bloom),
io:format(user, "Generating skip list with ~w keys in ~w microseconds~n" ++
@ -533,13 +530,28 @@ skiplist_timingtest(KL, SkipList, N, Bloom) ->
[timer:now_diff(os:timestamp(), SWc)]),
AltKL1 = generate_randomkeys(1, 2000, 1, 200),
SWd = os:timestamp(),
SWd0 = os:timestamp(),
lists:foreach(fun({K, _V}) ->
lookup(K, SkipList)
end,
AltKL1),
io:format(user, "Getting 2000 mainly missing keys took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd)]),
[timer:now_diff(os:timestamp(), SWd0)]),
SWd1 = os:timestamp(),
lists:foreach(fun({K, _V}) ->
leveled_codec:magic_hash(K)
end,
AltKL1),
io:format(user, "Generating 2000 magic hashes took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd1)]),
SWd2 = os:timestamp(),
lists:foreach(fun({K, _V}) ->
erlang:phash2(K)
end,
AltKL1),
io:format(user, "Generating 2000 not so magic hashes took ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWd2)]),
AltKL2 = generate_randomkeys(1, 1000, N div 5 + 1, N div 5 + 300),
SWe = os:timestamp(),
lists:foreach(fun({K, _V}) ->

View file

@ -1230,11 +1230,11 @@ generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
LedgerKey = leveled_codec:to_ledgerkey("Bucket" ++ BNumber,
"Key" ++ KNumber,
o),
{_B, _K, KV} = leveled_codec:generate_ledgerkv(LedgerKey,
Seqn,
crypto:rand_bytes(64),
64,
infinity),
{_B, _K, KV, _H} = leveled_codec:generate_ledgerkv(LedgerKey,
Seqn,
crypto:rand_bytes(64),
64,
infinity),
generate_randomkeys(Seqn + 1,
Count - 1,
[KV|Acc],