Switch to binary index in pmem

Remove the ets index in pmem and use a binary index instead.  This may
be slower, but avoids the bulk upload to ets, and means that matches
know of position (so only skiplists with a match need be tried).

Also stops the discrepancy between snapshots and non-snapshots - as
previously the snapshots were always slowed by not having access to the
ETS table.
This commit is contained in:
martinsumner 2017-01-05 21:58:33 +00:00
parent 1d3fb18df7
commit 5a88565c08
5 changed files with 183 additions and 100 deletions

View file

@ -220,7 +220,7 @@
levelzero_size = 0 :: integer(),
levelzero_maxcachesize :: integer(),
levelzero_cointoss = false :: boolean(),
levelzero_index, % may be none or an ETS table reference
levelzero_index, % An array
is_snapshot = false :: boolean(),
snapshot_fully_loaded = false :: boolean(),
@ -330,7 +330,7 @@ init([PCLopts]) ->
end.
handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
handle_call({push_mem, {PushedTree, PushedIdx, MinSQN, MaxSQN}},
From,
State=#state{is_snapshot=Snap}) when Snap == false ->
% The push_mem process is as follows:
@ -360,11 +360,12 @@ handle_call({push_mem, {PushedTree, MinSQN, MaxSQN}},
false ->
leveled_log:log("P0018", [ok, false, false]),
gen_server:reply(From, ok),
{noreply, update_levelzero(State#state.levelzero_size,
{PushedTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache,
State)}
{noreply,
update_levelzero(State#state.levelzero_size,
{PushedTree, PushedIdx, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache,
State)}
end;
handle_call({fetch, Key, Hash}, _From, State) ->
{R, HeadTimer} = timed_fetch_mem(Key,
@ -411,17 +412,22 @@ handle_call(work_for_clerk, From, State) ->
handle_call(get_startup_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State};
handle_call({register_snapshot, Snapshot}, _From, State) ->
Rs = [{Snapshot, State#state.manifest_sqn}|State#state.registered_snapshots],
Rs = [{Snapshot,
State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {ok, State}, State#state{registered_snapshots = Rs}};
handle_call({load_snapshot, {BookieIncrTree, MinSQN, MaxSQN}}, _From, State) ->
handle_call({load_snapshot, {BookieIncrTree, BookieIdx, MinSQN, MaxSQN}},
_From, State) ->
L0D = leveled_pmem:add_to_cache(State#state.levelzero_size,
{BookieIncrTree, MinSQN, MaxSQN},
State#state.ledger_sqn,
State#state.levelzero_cache),
{LedgerSQN, L0Size, L0Cache} = L0D,
L0Index = leveled_pmem:add_to_index(BookieIdx,
State#state.levelzero_index,
length(L0Cache)),
{reply, ok, State#state{levelzero_cache=L0Cache,
levelzero_size=L0Size,
levelzero_index=none,
levelzero_index=L0Index,
ledger_sqn=LedgerSQN,
snapshot_fully_loaded=true}};
handle_call({fetch_levelzero, Slot}, _From, State) ->
@ -467,9 +473,10 @@ handle_cast({levelzero_complete, FN, StartKey, EndKey}, State) ->
filename=FN},
UpdMan = lists:keystore(0, 1, State#state.manifest, {0, [ManEntry]}),
% Prompt clerk to ask about work - do this for every L0 roll
leveled_pmem:clear_index(State#state.levelzero_index),
UpdIndex = leveled_pmem:clear_index(State#state.levelzero_index),
ok = leveled_pclerk:clerk_prompt(State#state.clerk),
{noreply, State#state{levelzero_cache=[],
levelzero_index=UpdIndex,
levelzero_pending=false,
levelzero_constructor=undefined,
levelzero_size=0,
@ -643,20 +650,23 @@ start_from_file(PCLopts) ->
update_levelzero(L0Size, {PushedTree, MinSQN, MaxSQN},
update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
LedgerSQN, L0Cache, State) ->
SW = os:timestamp(),
Update = leveled_pmem:add_to_cache(L0Size,
{PushedTree, MinSQN, MaxSQN},
LedgerSQN,
L0Cache),
leveled_pmem:add_to_index(PushedTree, State#state.levelzero_index),
UpdL0Index = leveled_pmem:add_to_index(PushedIdx,
State#state.levelzero_index,
length(L0Cache) + 1),
{UpdMaxSQN, NewL0Size, UpdL0Cache} = Update,
if
UpdMaxSQN >= LedgerSQN ->
UpdState = State#state{levelzero_cache=UpdL0Cache,
levelzero_size=NewL0Size,
levelzero_index=UpdL0Index,
ledger_sqn=UpdMaxSQN},
CacheTooBig = NewL0Size > State#state.levelzero_maxcachesize,
CacheMuchTooBig = NewL0Size > ?SUPER_MAX_TABLE_SIZE,
@ -741,20 +751,14 @@ plain_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
R = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
element(1, R).
fetch_mem(Key, Hash, Manifest, L0Cache, none) ->
L0Check = leveled_pmem:check_levelzero(Key, Hash, L0Cache),
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
PosList = leveled_pmem:check_index(Hash, L0Index),
L0Check = leveled_pmem:check_levelzero(Key, Hash, PosList, L0Cache),
case L0Check of
{false, not_found} ->
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3);
{true, KV} ->
{KV, 0}
end;
fetch_mem(Key, Hash, Manifest, L0Cache, L0Index) ->
case leveled_pmem:check_index(Hash, L0Index) of
true ->
fetch_mem(Key, Hash, Manifest, L0Cache, none);
false ->
fetch(Key, Hash, Manifest, 0, fun timed_sst_get/3)
end.
fetch(_Key, _Hash, _Manifest, ?MAX_LEVELS + 1, _FetchFun) ->
@ -1374,12 +1378,15 @@ confirm_delete_test() ->
maybe_pause_push(PCL, KL) ->
T0 = leveled_skiplist:empty(true),
T1 = lists:foldl(fun({K, V}, {AccSL, MinSQN, MaxSQN}) ->
SL = leveled_skiplist:enter(K, V, AccSL),
I0 = leveled_pmem:new_index(),
T1 = lists:foldl(fun({K, V}, {AccSL, AccIdx, MinSQN, MaxSQN}) ->
UpdSL = leveled_skiplist:enter(K, V, AccSL),
SQN = leveled_codec:strip_to_seqonly({K, V}),
{SL, min(SQN, MinSQN), max(SQN, MaxSQN)}
H = leveled_codec:magic_hash(K),
UpdIdx = leveled_pmem:prepare_for_index(AccIdx, H),
{UpdSL, UpdIdx, min(SQN, MinSQN), max(SQN, MaxSQN)}
end,
{T0, infinity, 0},
{T0, I0, infinity, 0},
KL),
case pcl_pushmem(PCL, T1) of
returned ->