Merge branch 'master' into develop-3.0

This commit is contained in:
Martin Sumner 2020-04-09 12:23:42 +01:00
commit 4caefcf4aa
20 changed files with 1482 additions and 406 deletions

View file

@ -1,7 +1,7 @@
{application, leveled,
[
{description, "Key Value store based on LSM-Tree and designed for larger values"},
{vsn, "0.9.18"},
{vsn, "0.9.21"},
{registered, []},
{applications, [
kernel,

View file

@ -93,8 +93,6 @@
]).
-export([empty_ledgercache/0,
loadqueue_ledgercache/1,
push_ledgercache/2,
snapshot_store/6,
fetch_value/2,
journal_notfound/4]).
@ -105,6 +103,7 @@
-include_lib("eunit/include/eunit.hrl").
-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
@ -166,7 +165,7 @@
-record(state, {inker :: pid() | undefined,
penciller :: pid() | undefined,
cache_size :: integer() | undefined,
ledger_cache = #ledger_cache{},
ledger_cache = #ledger_cache{} :: ledger_cache(),
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
@ -310,12 +309,16 @@
% resilience outside of the store), or retain (retain a history of
% key changes, even when the object value has been compacted).
%
% There is a third, theoretical and untested strategy, which is
% recalc - which would require when reloading the Ledger from the
% Journal, to recalculate the index changes based on the current
% state of the Ledger and the object metadata.
% There is a third strategy, which is recalc, where on reloading
% the Ledger from the Journal, the key changes are recalculated by
% comparing the extracted metadata from the Journal object, with the
% extracted metadata from the current Ledger object it is set to
% replace (should one be present). Implementing the recalc
% strategy requires a override function for
% `leveled_head:diff_indexspecs/3`.
% A function for the ?RIAK_TAG is provided and tested.
%
% reload_strategy ptions are a list - to map from a tag to the
% reload_strategy options are a list - to map from a tag to the
% strategy (recovr|retain|recalc). Defualt strategies are:
% [{?RIAK_TAG, retain}, {?STD_TAG, retain}]
{max_pencillercachesize, pos_integer()|undefined} |
@ -378,7 +381,16 @@
% true
].
-type initial_loadfun() ::
fun((leveled_codec:journal_key(),
any(),
non_neg_integer(),
{non_neg_integer(), non_neg_integer(), ledger_cache()},
fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop,
{non_neg_integer(), non_neg_integer(), ledger_cache()}}).
-export_type([initial_loadfun/0]).
%%%============================================================================
%%% API
@ -1097,7 +1109,7 @@ book_destroy(Pid) ->
%% to store the backup.
%%
%% Backup files are hard-linked. Does not work in head_only mode, or if
%% index changes are used with a `skip` compaction/reload strategy
%% index changes are used with a `recovr` compaction/reload strategy
book_hotbackup(Pid) ->
gen_server:call(Pid, hot_backup, infinity).
@ -1250,8 +1262,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
SQN,
Object,
ObjSize,
{IndexSpecs, TTL},
State),
{IndexSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
@ -1288,8 +1299,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL},
State),
{ObjectSpecs, TTL}),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of
true ->
@ -1537,6 +1547,23 @@ code_change(_OldVsn, State, _Extra) ->
empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}.
-spec push_to_penciller(pid(), ledger_cache()) -> ok.
%% @doc
%% The push to penciller must start as a tree to correctly de-duplicate
%% the list by order before becoming a de-duplicated list for loading
push_to_penciller(Penciller, LedgerCache) ->
push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)).
push_to_penciller_loop(Penciller, LedgerCache) ->
case push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
-spec push_ledgercache(pid(), ledger_cache()) -> ok|returned.
%% @doc
%% Push the ledgercache to the Penciller - which should respond ok or
@ -1642,10 +1669,22 @@ startup(InkerOpts, PencillerOpts, State) ->
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log("B0005", [LedgerSQN]),
ReloadStrategy = InkerOpts#inker_options.reload_strategy,
LoadFun = get_loadfun(ReloadStrategy, Penciller, State),
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
empty_ledgercache()
end,
ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1,
get_loadfun(State),
Penciller),
LoadFun,
InitAccFun,
BatchFun),
ok = leveled_inker:ink_checksqn(Inker, LedgerSQN),
{Inker, Penciller}.
@ -2161,30 +2200,26 @@ check_notfound(CheckFrequency, CheckFun) ->
-spec preparefor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
integer(), any(), integer(),
leveled_codec:journal_keychanges(),
book_state())
-> {integer()|no_lookup,
integer(),
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% Prepare an object and its related key changes for addition to the Ledger
%% via the Ledger Cache.
preparefor_ledgercache(?INKT_MPUT,
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL},
_State) ->
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}) ->
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
KeyChanges =
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
{no_lookup, SQN, KeyChanges};
preparefor_ledgercache(_InkTag,
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
_State) ->
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
KeyChanges =
@ -2193,8 +2228,58 @@ preparefor_ledgercache(_InkTag,
{KeyH, SQN, KeyChanges}.
-spec addto_ledgercache({integer()|no_lookup,
integer(),
-spec recalcfor_ledgercache(leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges(),
ledger_cache(),
pid())
-> {leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())}.
%% @doc
%% When loading from the journal to the ledger, may hit a key which has the
%% `recalc` strategy. Such a key needs to recalculate the key changes by
%% comparison with the current state of the ledger, assuming it is a full
%% journal entry (i.e. KeyDeltas which may be a result of previously running
%% with a retain strategy should be ignored).
recalcfor_ledgercache(InkTag,
_LedgerKey, SQN, _Obj, _Size, {_IdxSpecs, _TTL},
_LedgerCache,
_Penciller)
when InkTag == ?INKT_MPUT; InkTag == ?INKT_KEYD ->
{no_lookup, SQN, []};
recalcfor_ledgercache(_InkTag,
LK, SQN, Obj, Size, {_IgnoreJournalIdxSpecs, TTL},
LedgerCache,
Penciller) ->
{Bucket, Key, MetaValue, {KeyH, _ObjH}, _LastMods} =
leveled_codec:generate_ledgerkv(LK, SQN, Obj, Size, TTL),
OldObject =
case check_in_ledgercache(LK, KeyH, LedgerCache, loader) of
false ->
leveled_penciller:pcl_fetch(Penciller, LK, KeyH, true);
{value, KV} ->
KV
end,
OldMetadata =
case OldObject of
not_present ->
not_present;
{LK, LV} ->
leveled_codec:get_metadata(LV)
end,
UpdMetadata = leveled_codec:get_metadata(MetaValue),
IdxSpecs =
leveled_head:diff_indexspecs(element(1, LK), UpdMetadata, OldMetadata),
{KeyH,
SQN,
[{LK, MetaValue}]
++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL)}.
-spec addto_ledgercache({leveled_codec:segment_hash(),
non_neg_integer(),
list(leveled_codec:ledger_kv())},
ledger_cache())
-> ledger_cache().
@ -2230,6 +2315,32 @@ addto_ledgercache({H, SQN, KeyChanges}, Cache, loader) ->
max_sqn=max(SQN, Cache#ledger_cache.max_sqn)}.
-spec check_in_ledgercache(leveled_codec:ledger_key(),
leveled_codec:segment_hash(),
ledger_cache(),
loader) ->
false | {value, leveled_codec:ledger_kv()}.
%% @doc
%% Check the ledger cache for a Key, when the ledger cache is in loader mode
%% and so is populating a queue not an ETS table
check_in_ledgercache(PK, Hash, Cache, loader) ->
case leveled_pmem:check_index(Hash, Cache#ledger_cache.index) of
[] ->
false;
_ ->
search(fun({K,_V}) -> K == PK end,
lists:reverse(Cache#ledger_cache.load_queue))
end.
-spec search(fun((any()) -> boolean()), list()) -> {value, any()}|false.
search(Pred, [Hd|Tail]) ->
case Pred(Hd) of
true -> {value, Hd};
false -> search(Pred, Tail)
end;
search(Pred, []) when is_function(Pred, 1) ->
false.
-spec maybepush_ledgercache(integer(), ledger_cache(), pid())
-> {ok|returned, ledger_cache()}.
%% @doc
@ -2276,44 +2387,47 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
false.
-spec get_loadfun(book_state()) -> fun().
-spec get_loadfun(leveled_codec:compaction_strategy(), pid(), book_state())
-> initial_loadfun().
%% @doc
%% The LoadFun will be sued by the Inker when walking across the Journal to
%% load the Penciller at startup
get_loadfun(State) ->
PrepareFun =
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State)
end,
LoadFun =
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, OutputTree} = Acc0,
{SQN, InkTag, PK} = KeyInJournal,
% VBin may already be a term
{VBin, VSize} = ExtractFun(ValueInJournal),
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc0};
SQN when SQN < MaxSQN ->
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{loop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
MaxSQN ->
leveled_log:log("B0006", [SQN]),
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{stop,
{MinSQN,
MaxSQN,
addto_ledgercache(Chngs, OutputTree, loader)}};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0}
end
end,
LoadFun.
%% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup.
get_loadfun(ReloadStrat, Penciller, _State) ->
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, LedgerCache} = Acc0,
{SQN, InkTag, PK} = KeyInJournal,
case SQN of
SQN when SQN < MinSQN ->
{loop, Acc0};
SQN when SQN > MaxSQN ->
leveled_log:log("B0007", [MaxSQN, SQN]),
{stop, Acc0};
_ ->
{VBin, ValSize} = ExtractFun(ValueInJournal),
% VBin may already be a term
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
Chngs =
case leveled_codec:get_tagstrategy(PK, ReloadStrat) of
recalc ->
recalcfor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs,
LedgerCache,
Penciller);
_ ->
preparefor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs)
end,
case SQN of
MaxSQN ->
leveled_log:log("B0006", [SQN]),
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{stop, {MinSQN, MaxSQN, LC0}};
_ ->
LC0 = addto_ledgercache(Chngs, LedgerCache, loader),
{loop, {MinSQN, MaxSQN, LC0}}
end
end
end.
delete_path(DirPath) ->
@ -3166,6 +3280,10 @@ sqnorder_mutatefold_test() ->
ok = book_destroy(Bookie1).
search_test() ->
?assertMatch({value, 5}, search(fun(X) -> X == 5 end, lists:seq(1, 10))),
?assertMatch(false, search(fun(X) -> X == 55 end, lists:seq(1, 10))).
check_notfound_test() ->
ProbablyFun = fun() -> probably end,
MissingFun = fun() -> missing end,

View file

@ -130,6 +130,7 @@
-define(DELETE_TIMEOUT, 10000).
-define(TIMING_SAMPLECOUNTDOWN, 5000).
-define(TIMING_SAMPLESIZE, 100).
-define(GETPOS_FACTOR, 8).
-define(MAX_OBJECT_SIZE, 1000000000).
% 1GB but really should be much smaller than this
@ -270,24 +271,28 @@ cdb_getpositions(Pid, SampleSize) ->
all ->
FoldFun =
fun(Index, Acc) ->
cdb_getpositions_fromidx(Pid, all, Index, Acc)
PosList = cdb_getpositions_fromidx(Pid, all, Index, []),
lists:merge(Acc, lists:sort(PosList))
end,
IdxList = lists:seq(0, 255),
lists:foldl(FoldFun, [], IdxList);
S0 ->
FC = ?GETPOS_FACTOR * S0,
FoldFun =
fun({_R, Index}, Acc) ->
case length(Acc) of
S0 ->
FC ->
Acc;
L when L < S0 ->
cdb_getpositions_fromidx(Pid, S0, Index, Acc)
L when L < FC ->
cdb_getpositions_fromidx(Pid, FC, Index, Acc)
end
end,
RandFun = fun(X) -> {leveled_rand:uniform(), X} end,
SeededL = lists:map(RandFun, lists:seq(0, 255)),
SortedL = lists:keysort(1, SeededL),
lists:foldl(FoldFun, [], SortedL)
PosList0 = lists:foldl(FoldFun, [], SortedL),
P1 = leveled_rand:uniform(max(1, length(PosList0) - S0)),
lists:sublist(lists:sort(PosList0), P1, S0)
end.
cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) ->
@ -1226,10 +1231,9 @@ scan_index_returnpositions(Handle, Position, Count, PosList0) ->
[HPosition|PosList]
end
end,
PosList = lists:foldl(AddPosFun,
PosList0,
read_next_n_integerpairs(Handle, Count)),
lists:reverse(PosList).
lists:foldl(AddPosFun,
PosList0,
read_next_n_integerpairs(Handle, Count)).
%% Take an active file and write the hash details necessary to close that

View file

@ -35,7 +35,7 @@
from_inkerkv/2,
from_journalkey/1,
revert_to_keydeltas/2,
is_compaction_candidate/1,
is_full_journalentry/1,
split_inkvalue/1,
check_forinkertype/2,
get_tagstrategy/2,
@ -49,7 +49,8 @@
segment_hash/1,
to_lookup/1,
next_key/1,
return_proxy/4]).
return_proxy/4,
get_metadata/1]).
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
-define(NRT_IDX, "$aae.").
@ -88,7 +89,7 @@
-type ledger_kv() ::
{ledger_key(), ledger_value()}.
-type compaction_method() ::
retain|skip|recalc.
retain|recovr|recalc.
-type compaction_strategy() ::
list({tag(), compaction_method()}).
-type journal_key_tag() ::
@ -243,6 +244,10 @@ strip_to_indexdetails({_, V}) when tuple_size(V) > 4 ->
striphead_to_v1details(V) ->
{element(1, V), element(2, V), element(3, V), element(4, V)}.
-spec get_metadata(ledger_value()) -> metadata().
get_metadata(LV) ->
element(4, LV).
-spec key_dominates(ledger_kv(), ledger_kv()) ->
left_hand_first|right_hand_first|left_hand_dominant|right_hand_dominant.
%% @doc
@ -358,24 +363,24 @@ endkey_passed(EndKey, CheckingKey) ->
-spec inker_reload_strategy(compaction_strategy()) -> compaction_strategy().
%% @doc
%% Take the default startegy for compaction, and override the approach for any
%% Take the default strategy for compaction, and override the approach for any
%% tags passed in
inker_reload_strategy(AltList) ->
ReloadStrategy0 =
DefaultList =
lists:map(fun leveled_head:default_reload_strategy/1,
leveled_head:defined_objecttags()),
lists:foldl(fun({X, Y}, SList) ->
lists:keyreplace(X, 1, SList, {X, Y})
end,
ReloadStrategy0,
AltList).
lists:ukeymerge(1,
lists:ukeysort(1, AltList),
lists:ukeysort(1, DefaultList)).
-spec get_tagstrategy(ledger_key(), compaction_strategy())
-> skip|retain|recalc.
-spec get_tagstrategy(ledger_key()|tag()|dummy, compaction_strategy())
-> compaction_method().
%% @doc
%% Work out the compaction strategy for the key
get_tagstrategy({Tag, _, _, _}, Strategy) ->
get_tagstrategy(Tag, Strategy);
get_tagstrategy(Tag, Strategy) ->
case lists:keyfind(Tag, 1, Strategy) of
{Tag, TagStrat} ->
TagStrat;
@ -410,11 +415,12 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
%% If we wish to retain key deltas when an object in the Journal has been
%% replaced - then this converts a Journal Key and Value into one which has no
%% object body just the key deltas.
%% Only called if retain strategy and has passed
%% leveled_codec:is_full_journalentry/1 - so no need to consider other key
%% types
revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) ->
{_V, KeyDeltas} = revert_value_from_journal(InkerV),
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}};
revert_to_keydeltas(JournalKey, InkerV) ->
{JournalKey, InkerV}.
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}.
%% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) ->
@ -560,12 +566,12 @@ check_forinkertype(_LedgerKey, head_only) ->
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
-spec is_compaction_candidate(journal_key()) -> boolean().
-spec is_full_journalentry(journal_key()) -> boolean().
%% @doc
%% Only journal keys with standard objects should be scored for compaction
is_compaction_candidate({_SQN, ?INKT_STND, _LK}) ->
is_full_journalentry({_SQN, ?INKT_STND, _LK}) ->
true;
is_compaction_candidate(_OtherJKType) ->
is_full_journalentry(_OtherJKType) ->
false.

View file

@ -22,7 +22,8 @@
-export([key_to_canonicalbinary/1,
build_head/2,
extract_metadata/3
extract_metadata/3,
diff_indexspecs/3
]).
-export([get_size/2,
@ -71,12 +72,15 @@
-type object_metadata() :: riak_metadata()|std_metadata()|head_metadata().
-type appdefinable_function() ::
key_to_canonicalbinary | build_head | extract_metadata.
key_to_canonicalbinary | build_head | extract_metadata | diff_indexspecs.
% Functions for which default behaviour can be over-written for the
% application's own tags
-type appdefinable_function_tuple() ::
{appdefinable_function(), fun()}.
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
-type head() ::
binary()|tuple().
% TODO:
@ -174,6 +178,41 @@ default_extract_metadata(_Tag, SizeAsStoredInJournal, Obj) ->
{{standard_hash(Obj), SizeAsStoredInJournal, undefined}, []}.
-spec diff_indexspecs(object_tag(),
object_metadata(),
object_metadata()|not_present)
-> leveled_codec:index_specs().
%% @doc
%% Take an object metadata part from within the journal, and an object metadata
%% part from the ledger (which should have a lower SQN), and generate index
%% specs by determining the difference between the index specs on the object
%% to be loaded and that on object already stored.
%%
%% This is only relevant where the journal compaction strategy of `recalc` is
%% used, the Keychanges will be used when `retain` is the compaction strategy
diff_indexspecs(?RIAK_TAG, UpdatedMetadata, OldMetadata) ->
UpdIndexes =
get_indexes_from_siblingmetabin(element(1, UpdatedMetadata), []),
OldIndexes =
case OldMetadata of
not_present ->
[];
_ ->
get_indexes_from_siblingmetabin(element(1, OldMetadata), [])
end,
diff_index_data(OldIndexes, UpdIndexes);
diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata) ->
default_diff_indexspecs(?STD_TAG, UpdatedMetadata, CurrentMetadata);
diff_indexspecs(Tag, UpdatedMetadata, CurrentMetadata) ->
OverrideFun =
get_appdefined_function(diff_indexspecs,
fun default_diff_indexspecs/3,
3),
OverrideFun(Tag, UpdatedMetadata, CurrentMetadata).
default_diff_indexspecs(_Tag, _UpdatedMetadata, _CurrentMetadata) ->
[].
%%%============================================================================
%%% Standard External Functions
%%%============================================================================
@ -190,7 +229,7 @@ defined_objecttags() ->
leveled_codec:compaction_method()}.
%% @doc
%% State the compaction_method to be used when reloading the Ledger from the
%% journal for each object tag. Note, no compaction startegy required for
%% journal for each object tag. Note, no compaction strategy required for
%% head_only tag
default_reload_strategy(Tag) ->
{Tag, retain}.
@ -317,3 +356,155 @@ get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
MetaLen:32/integer,
MetaBin:MetaLen/binary>>,
[LastMod|LastMods]).
get_indexes_from_siblingmetabin(<<0:32/integer,
MetaLen:32/integer,
MetaBin:MetaLen/binary,
RestBin/binary>>,
Indexes) ->
UpdIndexes = lists:umerge(get_indexes_frommetabin(MetaBin), Indexes),
get_indexes_from_siblingmetabin(RestBin, UpdIndexes);
get_indexes_from_siblingmetabin(<<SibCount:32/integer, RestBin/binary>>,
Indexes) when SibCount > 0 ->
get_indexes_from_siblingmetabin(RestBin, Indexes);
get_indexes_from_siblingmetabin(_, Indexes) ->
Indexes.
%% @doc
%% Parse the metabinary for an individual sibling and return a list of index
%% entries.
get_indexes_frommetabin(<<_LMD1:32/integer, _LMD2:32/integer, _LMD3:32/integer,
VTagLen:8/integer, _VTag:VTagLen/binary,
Deleted:1/binary-unit:8,
MetaRestBin/binary>>) when Deleted /= <<1>> ->
lists:usort(indexes_of_metabinary(MetaRestBin));
get_indexes_frommetabin(_) ->
[].
indexes_of_metabinary(<<>>) ->
[];
indexes_of_metabinary(<<KeyLen:32/integer, KeyBin:KeyLen/binary,
ValueLen:32/integer, ValueBin:ValueLen/binary,
Rest/binary>>) ->
Key = decode_maybe_binary(KeyBin),
case Key of
<<"index">> ->
Value = decode_maybe_binary(ValueBin),
Value;
_ ->
indexes_of_metabinary(Rest)
end.
decode_maybe_binary(<<1, Bin/binary>>) ->
Bin;
decode_maybe_binary(<<0, Bin/binary>>) ->
binary_to_term(Bin);
decode_maybe_binary(<<_Other:8, Bin/binary>>) ->
Bin.
-spec diff_index_data([{binary(), index_value()}],
[{binary(), index_value()}]) ->
[{index_op(), binary(), index_value()}].
diff_index_data(OldIndexes, AllIndexes) ->
OldIndexSet = ordsets:from_list(OldIndexes),
AllIndexSet = ordsets:from_list(AllIndexes),
diff_specs_core(AllIndexSet, OldIndexSet).
diff_specs_core(AllIndexSet, OldIndexSet) ->
NewIndexSet = ordsets:subtract(AllIndexSet, OldIndexSet),
RemoveIndexSet =
ordsets:subtract(OldIndexSet, AllIndexSet),
NewIndexSpecs =
assemble_index_specs(ordsets:subtract(NewIndexSet, OldIndexSet),
add),
RemoveIndexSpecs =
assemble_index_specs(RemoveIndexSet,
remove),
NewIndexSpecs ++ RemoveIndexSpecs.
%% @doc Assemble a list of index specs in the
%% form of triplets of the form
%% {IndexOperation, IndexField, IndexValue}.
-spec assemble_index_specs([{binary(), binary()}], index_op()) ->
[{index_op(), binary(), binary()}].
assemble_index_specs(Indexes, IndexOp) ->
[{IndexOp, Index, Value} || {Index, Value} <- Indexes].
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
index_extract_test() ->
SibMetaBin =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
Indexes = get_indexes_from_siblingmetabin(SibMetaBin, []),
ExpIndexes = [{"idx1_bin","21521107231730Sophia"},
{"idx1_bin","21820510130146Avery"}],
?assertMatch(ExpIndexes, Indexes),
SibMetaBinNoIdx =
<<0,0,0,1,0,0,0,0,0,0,0,128,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50>>,
?assertMatch([], get_indexes_from_siblingmetabin(SibMetaBinNoIdx, [])),
SibMetaBinOverhang =
<<0,0,0,1,0,0,0,0,0,0,0,221,0,0,6,48,0,4,130,247,0,1,250,134,
1,101,0,0,0,0,4,1,77,68,75,0,0,0,44,0,131,107,0,39,77,68,
86,101,49,55,52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,
45,97,53,102,50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,
0,6,1,105,110,100,101,120,0,0,0,79,0,131,108,0,0,0,2,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,20,50,49,53,50,
49,49,48,55,50,51,49,55,51,48,83,111,112,104,105,97,104,2,
107,0,8,105,100,120,49,95,98,105,110,107,0,19,50,49,56,50,
48,53,49,48,49,51,48,49,52,54,65,118,101,114,121,106,0,0,0,
5,1,77,68,75,50,0,0,0,44,0,131,107,0,39,77,68,86,101,49,55,
52,55,48,50,55,45,54,50,99,49,45,52,48,57,55,45,97,53,102,
50,45,53,54,98,51,98,97,57,57,99,55,56,50,0,0,0,0,0,0,0,4,
0,0,0,0>>,
?assertMatch(ExpIndexes,
get_indexes_from_siblingmetabin(SibMetaBinOverhang, [])).
diff_index_test() ->
UpdIndexes =
[{<<"idx1_bin">>,<<"20840930001702Zoe">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
OldIndexes =
[{<<"idx1_bin">>,<<"20231126131808Madison">>},
{<<"idx1_bin">>,<<"20931011172606Emily">>}],
IdxSpecs = diff_index_data(OldIndexes, UpdIndexes),
?assertMatch([{add, <<"idx1_bin">>, <<"20840930001702Zoe">>},
{remove, <<"idx1_bin">>,<<"20231126131808Madison">>}], IdxSpecs).
decode_test() ->
Bin = <<"999">>,
BinTerm = term_to_binary("999"),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<1:8/integer, Bin/binary>>))),
?assertMatch("999", decode_maybe_binary(<<0:8/integer, BinTerm/binary>>)),
?assertMatch("999", binary_to_list(
decode_maybe_binary(<<2:8/integer, Bin/binary>>))).
-endif.

View file

@ -145,6 +145,15 @@
% released from a compaction run of a single file to make it a run
% worthwhile of compaction (released space is 100.0 - target e.g. 70.0
% means that 30.0% should be released)
-type key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key()}, non_neg_integer()}.
-type corrupted_test_key_size() ::
{{non_neg_integer(),
leveled_codec:journal_key_tag(),
leveled_codec:ledger_key(),
null}, non_neg_integer()}.
%%%============================================================================
%%% API
@ -288,6 +297,8 @@ handle_call(stop, _From, State) ->
handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
State) ->
leveled_log:log("IC014", [State#state.reload_strategy,
State#state.max_run_length]),
% Empty the waste folder
clear_waste(State),
SW = os:timestamp(),
@ -298,7 +309,11 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Manifest0},
% Don't want to process a queued call waiting on an old manifest
[_Active|Manifest] = Manifest0,
{FilterServer, MaxSQN} = InitiateFun(Checker),
ok = clerk_scorefilelist(self(), Manifest),
NotRollingFun =
fun({_LowSQN, _FN, Pid, _LK}) ->
not leveled_cdb:cdb_isrolling(Pid)
end,
ok = clerk_scorefilelist(self(), lists:filter(NotRollingFun, Manifest)),
ScoringState =
#scoring_state{filter_fun = FilterFun,
filter_server = FilterServer,
@ -315,7 +330,8 @@ handle_cast({score_filelist, [Entry|Tail]}, State) ->
ScoringState#scoring_state.filter_server,
ScoringState#scoring_state.max_sqn,
?SAMPLE_SIZE,
?BATCH_SIZE),
?BATCH_SIZE,
State#state.reload_strategy),
Candidate =
#candidate{low_sqn = LowSQN,
filename = FN,
@ -493,7 +509,10 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%% Internal functions
%%%============================================================================
-spec check_single_file(pid(), fun(), any(), non_neg_integer(),
non_neg_integer(), non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
%% @doc
%% Get a score for a single CDB file in the journal. This will pull out a bunch
%% of keys and sizes at random in an efficient way (by scanning the hashtable
@ -505,31 +524,65 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) ->
%%
%% The score is based on a random sample - so will not be consistent between
%% calls.
check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) ->
check_single_file(CDB, FilterFun, FilterServer, MaxSQN,
SampleSize, BatchSize,
ReloadStrategy) ->
FN = leveled_cdb:cdb_filename(CDB),
SW = os:timestamp(),
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
Score =
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN),
leveled_log:log("IC004", [FN, Score]),
size_comparison_score(KeySizeList,
FilterFun,
FilterServer,
MaxSQN,
ReloadStrategy),
safely_log_filescore(PositionList, FN, Score, SW),
Score.
size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
safely_log_filescore([], FN, Score, SW) ->
leveled_log:log_timer("IC004", [Score, empty, FN], SW);
safely_log_filescore(PositionList, FN, Score, SW) ->
AvgJump =
(lists:last(PositionList) - lists:nth(1, PositionList))
div length(PositionList),
leveled_log:log_timer("IC004", [Score, AvgJump, FN], SW).
-spec size_comparison_score(list(key_size() | corrupted_test_key_size()),
fun(),
any(),
non_neg_integer(),
leveled_codec:compaction_strategy()) ->
float().
size_comparison_score(KeySizeList,
FilterFun, FilterServer, MaxSQN,
RS) ->
FoldFunForSizeCompare =
fun(KS, {ActSize, RplSize}) ->
case KS of
{{SQN, Type, PK}, Size} ->
MayScore =
leveled_codec:is_compaction_candidate({SQN, Type, PK}),
case MayScore of
IsJournalEntry =
leveled_codec:is_full_journalentry({SQN, Type, PK}),
case IsJournalEntry of
false ->
{ActSize + Size - ?CRC_SIZE, RplSize};
TS = leveled_codec:get_tagstrategy(PK, RS),
% If the strategy is to retain key deltas, then
% scoring must reflect that. Key deltas are
% possible even if strategy does not allow as
% there is support for changing strategy from
% retain to recalc
case TS of
retain ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
end;
true ->
Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of
{true, _} ->
{current, _} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
{false, true} ->
{_, true} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
@ -542,13 +595,13 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
% expected format of the key
{ActSize, RplSize}
end
end,
end,
R0 = lists:foldl(FoldFunForSizeCompare, {0, 0}, KeySizeList),
{ActiveSize, ReplacedSize} = R0,
case ActiveSize + ReplacedSize of
0 ->
100.0;
0.0;
_ ->
100 * ActiveSize / (ActiveSize + ReplacedSize)
end.
@ -558,12 +611,13 @@ fetch_inbatches([], _BatchSize, CDB, CheckedList) ->
ok = leveled_cdb:cdb_clerkcomplete(CDB),
CheckedList;
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
{Batch, Tail} = if
length(PositionList) >= BatchSize ->
lists:split(BatchSize, PositionList);
true ->
{PositionList, []}
end,
{Batch, Tail} =
if
length(PositionList) >= BatchSize ->
lists:split(BatchSize, PositionList);
true ->
{PositionList, []}
end,
KL_List = leveled_cdb:cdb_directfetch(CDB, Batch, key_size),
fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List).
@ -751,45 +805,58 @@ split_positions_into_batches(Positions, Journal, Batches) ->
%% recalculating the KeyChanges by looking at the object when we reload. So
%% old objects can be discarded.
%%
%% If the strategy is skip, we don't care about KeyDeltas. Note though, that
%% If the strategy is recovr, we don't care about KeyDeltas. Note though, that
%% if the ledger is deleted it may not be possible to safely rebuild a KeyStore
%% if it contains index entries. The hot_backup approach is also not safe with
%% a `skip` strategy.
%% a `recovr` strategy. The recovr strategy assumes faults in the ledger will
%% be resolved via application-level anti-entropy
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
FoldFun =
fun(KVC0, Acc) ->
case KVC0 of
{_InkKey, crc_wonky, false} ->
% Bad entry, disregard, don't check
Acc;
{JK, JV, _Check} ->
{SQN, LK} =
leveled_codec:from_journalkey(JK),
CompactStrategy =
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
KeyValid = FilterFun(FilterServer, LK, SQN),
IsInMemory = SQN > MaxSQN,
case {KeyValid or IsInMemory, CompactStrategy} of
{true, _} ->
% This entry may still be required regardless of
% strategy
[KVC0|Acc];
{false, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no longer
% required as this version has been replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{false, _} ->
% This is out of date and not retained - discard
Acc
end
end
end,
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
fun(KVC0, Acc) ->
case KVC0 of
{_InkKey, crc_wonky, false} ->
% Bad entry, disregard, don't check
Acc;
{JK, JV, _Check} ->
{SQN, LK} =
leveled_codec:from_journalkey(JK),
CompactStrategy =
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
IsJournalEntry =
leveled_codec:is_full_journalentry(JK),
case {CompactStrategy, IsJournalEntry} of
{retain, false} ->
[KVC0|Acc];
_ ->
KeyCurrent = FilterFun(FilterServer, LK, SQN),
IsInMemory = SQN > MaxSQN,
case {KeyCurrent, IsInMemory, CompactStrategy} of
{KC, InMem, _} when KC == current; InMem ->
% This entry may still be required
% regardless of strategy
[KVC0|Acc];
{_, _, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no
% longer required as this version has been
% replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{_, _, _} ->
% This is out of date and not retained so
% discard
Acc
end
end
end
end.
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
@ -977,6 +1044,7 @@ fetch_testcdb(RP) ->
check_single_file_test() ->
RP = "test/test_area/",
RS = leveled_codec:inker_reload_strategy([]),
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
{ok, CDB} = fetch_testcdb(RP),
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
@ -985,18 +1053,18 @@ check_single_file_test() ->
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
case lists:keyfind(ObjSQN, 1, Srv) of
{ObjSQN, Key} ->
true;
current;
_ ->
false
replaced
end end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4),
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS),
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB).
@ -1016,9 +1084,9 @@ compact_single_file_setup() ->
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
case lists:keyfind(ObjSQN, 1, Srv) of
{ObjSQN, Key} ->
true;
current;
_ ->
false
replaced
end end,
CompactFP = leveled_inker:filepath(RP, journal_compact_dir),
ok = filelib:ensure_dir(CompactFP),
@ -1111,17 +1179,17 @@ compact_empty_file_test() ->
RP = "test/test_area/",
ok = filelib:ensure_dir(leveled_inker:filepath(RP, journal_dir)),
FN1 = leveled_inker:filepath(RP, 1, new_journal),
RS = leveled_codec:inker_reload_strategy([]),
CDBopts = #cdb_options{binary_mode=true},
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts),
ok = leveled_cdb:cdb_put(CDB1, {1, stnd, test_ledgerkey("Key1")}, <<>>),
{ok, FN2} = leveled_cdb:cdb_complete(CDB1),
{ok, CDB2} = leveled_cdb:cdb_open_reader(FN2),
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
{2, {o, "Bucket", "Key2", null}},
{3, {o, "Bucket", "Key3", null}}],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
?assertMatch(100.0, Score1),
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS),
?assertMatch(0.0, Score1),
ok = leveled_cdb:cdb_deletepending(CDB2),
ok = leveled_cdb:cdb_destroy(CDB2).
@ -1161,7 +1229,13 @@ compact_singlefile_totwosmallfiles_testto() ->
filename=leveled_cdb:cdb_filename(CDBr),
journal=CDBr,
compaction_perc=50.0}],
FakeFilterFun = fun(_FS, _LK, SQN) -> SQN rem 2 == 0 end,
FakeFilterFun =
fun(_FS, _LK, SQN) ->
case SQN rem 2 of
0 -> current;
_ -> replaced
end
end,
ManifestSlice = compact_files(BestRun1,
CDBoptsSmall,
@ -1181,17 +1255,36 @@ compact_singlefile_totwosmallfiles_testto() ->
size_score_test() ->
KeySizeList =
[{{1, ?INKT_STND, "Key1"}, 104},
{{2, ?INKT_STND, "Key2"}, 124},
{{3, ?INKT_STND, "Key3"}, 144},
{{4, ?INKT_STND, "Key4"}, 154},
{{5, ?INKT_STND, "Key5", "Subk1"}, 164},
{{6, ?INKT_STND, "Key6"}, 174},
{{7, ?INKT_STND, "Key7"}, 184}],
[{{1, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key1">>, null}}, 104},
{{2, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key2">>, null}}, 124},
{{3, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key3">>, null}}, 144},
{{4, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key4">>, null}}, 154},
{{5,
?INKT_STND,
{?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>}, null},
164},
{{6, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key6">>, null}}, 174},
{{7, ?INKT_STND, {?STD_TAG, <<"B">>, <<"Key7">>, null}}, 184}],
MaxSQN = 6,
CurrentList = ["Key1", "Key4", "Key5", "Key6"],
FilterFun = fun(L, K, _SQN) -> lists:member(K, L) end,
Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN),
CurrentList =
[{?STD_TAG, <<"B">>, <<"Key1">>, null},
{?STD_TAG, <<"B">>, <<"Key4">>, null},
{?STD_TAG, <<"B">>, <<"Key5">>, <<"Subk1">>},
{?STD_TAG, <<"B">>, <<"Key6">>, null}],
FilterFun =
fun(L, K, _SQN) ->
case lists:member(K, L) of
true -> current;
false -> replaced
end
end,
Score =
size_comparison_score(KeySizeList,
FilterFun,
CurrentList,
MaxSQN,
leveled_codec:inker_reload_strategy([])),
io:format("Score ~w", [Score]),
?assertMatch(true, Score > 69.0),
?assertMatch(true, Score < 70.0).

View file

@ -101,7 +101,7 @@
ink_fetch/3,
ink_keycheck/3,
ink_fold/4,
ink_loadpcl/4,
ink_loadpcl/5,
ink_registersnapshot/2,
ink_confirmdelete/2,
ink_compactjournal/3,
@ -133,7 +133,6 @@
-define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd").
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(TEST_KC, {[], infinity}).
@ -321,7 +320,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
{fold, MinSQN, FoldFuns, Acc, by_runner},
infinity).
-spec ink_loadpcl(pid(), integer(), fun(), pid()) -> ok.
-spec ink_loadpcl(pid(),
integer(),
leveled_bookie:initial_loadfun(),
fun((string(), non_neg_integer()) -> any()),
fun((any(), any()) -> ok)) -> ok.
%%
%% Function to prompt load of the Ledger at startup. The Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker
@ -330,20 +333,11 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
%%
%% The load fun should be a five arity function like:
%% load_fun(KeyInJournal, ValueInJournal, _Position, Acc0, ExtractFun)
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
BatchFun =
fun(BatchAcc, _Acc) ->
push_to_penciller(Penciller, BatchAcc)
end,
InitAccFun =
fun(FN, CurrentMinSQN) ->
leveled_log:log("I0014", [FN, CurrentMinSQN]),
leveled_bookie:empty_ledgercache()
end,
ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) ->
gen_server:call(Pid,
{fold,
MinSQN,
{FilterFun, InitAccFun, BatchFun},
{LoadFun, InitAccFun, BatchFun},
ok,
as_ink},
infinity).
@ -1142,18 +1136,18 @@ fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
Acc0 = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest);
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
Acc0 =
{NextMinSQN, Acc0} =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
@ -1172,9 +1166,9 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
undefined,
FN);
_ ->
Acc
{MinSQN, Acc}
end,
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest).
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
@ -1182,8 +1176,8 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
FoldFun(BatchAcc, Acc);
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
@ -1195,22 +1189,6 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
LastPosition,
FN)
end.
push_to_penciller(Penciller, LedgerCache) ->
% The push to penciller must start as a tree to correctly de-duplicate
% the list by order before becoming a de-duplicated list for loading
LC0 = leveled_bookie:loadqueue_ledgercache(LedgerCache),
push_to_penciller_loop(Penciller, LC0).
push_to_penciller_loop(Penciller, LedgerCache) ->
case leveled_bookie:push_ledgercache(Penciller, LedgerCache) of
returned ->
timer:sleep(?LOADING_PAUSE),
push_to_penciller_loop(Penciller, LedgerCache);
ok ->
ok
end.
sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
@ -1452,7 +1430,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
case lists:member({SQN, K}, L) of
true -> current;
false -> replaced
end
end,
5000),
timer:sleep(1000),
@ -1464,7 +1445,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
case lists:member({SQN, K}, L) of
true -> current;
false -> replaced
end
end,
5000),
timer:sleep(1000),

View file

@ -230,6 +230,8 @@
{"PC023",
{info, "At level=~w file_count=~w avg_mem=~w "
++ "file with most memory fn=~s p=~w mem=~w"}},
{"PC024",
{info, "Grooming compaction picked file with tomb_count=~w"}},
{"PM002",
{info, "Completed dump of L0 cache to list of l0cache_size=~w"}},
@ -336,7 +338,7 @@
{info, "Scoring of compaction runs complete with highest score=~w "
++ "with run of run_length=~w"}},
{"IC004",
{info, "Score for filename ~s is ~w"}},
{info, "Score=~w with mean_byte_jump=~w for filename ~s"}},
{"IC005",
{info, "Compaction to be performed on ~w files with score of ~w"}},
{"IC006",
@ -356,6 +358,8 @@
{"IC013",
{warn, "File with name ~s to be ignored in manifest as scanning for "
++ "first key returned empty - maybe corrupted"}},
{"IC014",
{info, "Compaction to be run with strategy ~w and max_run_length ~w"}},
{"CDB01",
{info, "Opening file for writing with filename ~s"}},

View file

@ -49,6 +49,7 @@
-define(MAX_TIMEOUT, 2000).
-define(MIN_TIMEOUT, 200).
-define(GROOMING_PERC, 50).
-record(state, {owner :: pid() | undefined,
root_path :: string() | undefined,
@ -56,6 +57,8 @@
sst_options :: #sst_options{}
}).
-type manifest_entry() :: #manifest_entry{}.
%%%============================================================================
%%% API
%%%============================================================================
@ -183,7 +186,15 @@ merge(SrcLevel, Manifest, RootPath, OptsSST) ->
leveled_log:log("PC023",
[SrcLevel + 1, FCnt, AvgMem, MaxFN, MaxP, MaxMem])
end,
Src = leveled_pmanifest:mergefile_selector(Manifest, SrcLevel),
SelectMethod =
case leveled_rand:uniform(100) of
R when R =< ?GROOMING_PERC ->
{grooming, fun grooming_scorer/1};
_ ->
random
end,
Src =
leveled_pmanifest:mergefile_selector(Manifest, SrcLevel, SelectMethod),
NewSQN = leveled_pmanifest:get_manifest_sqn(Manifest) + 1,
SinkList = leveled_pmanifest:merge_lookup(Manifest,
SrcLevel + 1,
@ -260,9 +271,9 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
length(Additions)),
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
TS1 = os:timestamp(),
case leveled_sst:sst_new(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
case leveled_sst:sst_newmerge(RP, FileName,
KL1, KL2, SinkB, SinkLevel, MaxSQN,
OptsSST) of
empty ->
leveled_log:log("PC013", [FileName]),
do_merge([], [],
@ -285,6 +296,23 @@ do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) ->
Additions ++ [Entry])
end.
-spec grooming_scorer(list(manifest_entry())) -> manifest_entry().
grooming_scorer([ME | MEs]) ->
InitTombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
{HighestTC, BestME} = grooming_scorer(InitTombCount, ME, MEs),
leveled_log:log("PC024", [HighestTC]),
BestME.
grooming_scorer(HighestTC, BestME, []) ->
{HighestTC, BestME};
grooming_scorer(HighestTC, BestME, [ME | MEs]) ->
TombCount = leveled_sst:sst_gettombcount(ME#manifest_entry.owner),
case TombCount > HighestTC of
true ->
grooming_scorer(TombCount, ME, MEs);
false ->
grooming_scorer(HighestTC, BestME, MEs)
end.
return_deletions(ManifestSQN, PendingDeletionD) ->
% The returning of deletions had been seperated out as a failure to fetch
@ -325,6 +353,82 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) ->
generate_randomkeys(Count - 1, [RandKey|Acc], BucketLow, BRange).
grooming_score_test() ->
ok = filelib:ensure_dir("test/test_area/ledger_files/"),
KL1_L3 = lists:sort(generate_randomkeys(2000, 0, 100)),
KL2_L3 = lists:sort(generate_randomkeys(2000, 101, 250)),
KL3_L3 = lists:sort(generate_randomkeys(2000, 251, 300)),
KL4_L3 = lists:sort(generate_randomkeys(2000, 301, 400)),
[{HeadK, HeadV}|RestKL2] = KL2_L3,
{ok, PidL3_1, _, _} =
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
"1_L3.sst",
KL1_L3,
[{HeadK, setelement(2, HeadV, tomb)}
|RestKL2],
false,
3,
999999,
#sst_options{},
true,
true),
{ok, PidL3_1B, _, _} =
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
"1B_L3.sst",
KL1_L3,
[{HeadK, setelement(2, HeadV, tomb)}
|RestKL2],
true,
3,
999999,
#sst_options{},
true,
true),
{ok, PidL3_2, _, _} =
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
"2_L3.sst",
KL3_L3,
KL4_L3,
false,
3,
999999,
#sst_options{},
true,
true),
{ok, PidL3_2NC, _, _} =
leveled_sst:sst_newmerge("test/test_area/ledger_files/",
"2NC_L3.sst",
KL3_L3,
KL4_L3,
false,
3,
999999,
#sst_options{},
true,
false),
ME1 = #manifest_entry{owner=PidL3_1},
ME1B = #manifest_entry{owner=PidL3_1B},
ME2 = #manifest_entry{owner=PidL3_2},
ME2NC = #manifest_entry{owner=PidL3_2NC},
?assertMatch(ME1, grooming_scorer([ME1, ME2])),
?assertMatch(ME1, grooming_scorer([ME2, ME1])),
% prefer the file with the tombstone
?assertMatch(ME2NC, grooming_scorer([ME1, ME2NC])),
?assertMatch(ME2NC, grooming_scorer([ME2NC, ME1])),
% not_counted > 1 - we will merge files in unexpected (i.e. legacy)
% format first
?assertMatch(ME1B, grooming_scorer([ME1B, ME2])),
?assertMatch(ME2, grooming_scorer([ME2, ME1B])),
% If the file with the tombstone is in the basement, it will have
% no tombstone so the first file will be chosen
lists:foreach(fun(P) -> leveled_sst:sst_clear(P) end,
[PidL3_1, PidL3_1B, PidL3_2, PidL3_2NC]).
merge_file_test() ->
ok = filelib:ensure_dir("test/test_area/ledger_files/"),
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
@ -401,7 +505,10 @@ merge_file_test() ->
"test/test_area/ledger_files/",
3, #sst_options{}),
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)).
?assertMatch(3, leveled_pmanifest:get_manifest_sqn(Man6)),
lists:foreach(fun(P) -> leveled_sst:sst_clear(P) end,
[PidL1_1, PidL2_1, PidL2_2, PidL2_3, PidL2_4]).
coverage_cheat_test() ->
{ok, _State1} =

View file

@ -305,8 +305,9 @@
list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}.
-type iterator() :: list(iterator_entry()).
-type bad_ledgerkey() :: list().
-type sqn_check() :: current|replaced|missing.
-export_type([levelzero_cacheentry/0]).
-export_type([levelzero_cacheentry/0, sqn_check/0]).
%%%============================================================================
%%% API
@ -480,14 +481,13 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
-spec pcl_checksequencenumber(pid(),
leveled_codec:ledger_key()|bad_ledgerkey(),
integer()) -> boolean().
integer()) -> sqn_check().
%% @doc
%% Check if the sequence number of the passed key is not replaced by a change
%% after the passed sequence number. Will return true if the Key is present
%% and either is equal to, or prior to the passed SQN.
%%
%% If the key is not present, it will be assumed that a higher sequence number
%% tombstone once existed, and false will be returned.
%% after the passed sequence number. Will return:
%% - current
%% - replaced
%% - missing
pcl_checksequencenumber(Pid, Key, SQN) ->
Hash = leveled_codec:segment_hash(Key),
if
@ -1487,7 +1487,7 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
end.
-spec compare_to_sqn(tuple()|not_present, integer()) -> boolean().
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check().
%% @doc
%% Check to see if the SQN in the penciller is after the SQN expected for an
%% object (used to allow the journal to check compaction status from a cache
@ -1495,19 +1495,19 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
compare_to_sqn(Obj, SQN) ->
case Obj of
not_present ->
false;
missing;
Obj ->
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
if
SQNToCompare > SQN ->
false;
replaced;
true ->
% Normally we would expect the SQN to be equal here, but
% this also allows for the Journal to have a more advanced
% value. We return true here as we wouldn't want to
% compact thta more advanced value, but this may cause
% confusion in snapshots.
true
current
end
end.
@ -2097,25 +2097,25 @@ simple_server_test() ->
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004", null})),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0001",
"Key0001",
null},
1)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0002",
"Key0002",
null},
1002)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0003",
"Key0003",
null},
2003)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0004",
"Key0004",
@ -2132,7 +2132,7 @@ simple_server_test() ->
KL1A = generate_randomkeys({2000, 4006}),
ok = maybe_pause_push(PCLr, [Key1A]),
ok = maybe_pause_push(PCLr, KL1A),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0001",
"Key0001",
@ -2148,19 +2148,19 @@ simple_server_test() ->
undefined,
false),
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
?assertMatch(replaced, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",
"Key0001",
null},
1)),
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",
"Key0001",
null},
4005)),
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0002",
"Key0002",

View file

@ -34,7 +34,7 @@
remove_manifest_entry/4,
replace_manifest_entry/5,
switch_manifest_entry/4,
mergefile_selector/2,
mergefile_selector/3,
add_snapshot/3,
release_snapshot/2,
merge_snapshot/2,
@ -60,6 +60,7 @@
-define(TREE_WIDTH, 8).
-define(PHANTOM_PID, r2d_fail).
-define(MANIFESTS_TO_RETAIN, 5).
-define(GROOM_SAMPLE, 16).
-record(manifest, {levels,
% an array of lists or trees representing the manifest
@ -82,6 +83,8 @@
-type manifest() :: #manifest{}.
-type manifest_entry() :: #manifest_entry{}.
-type manifest_owner() :: pid()|list().
-type selector_strategy() ::
random|{grooming, fun((list(manifest_entry())) -> manifest_entry())}.
-export_type([manifest/0, manifest_entry/0, manifest_owner/0]).
@ -429,7 +432,8 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
range_lookup_int(Manifest, LevelIdx, StartKey, EndKey, MakePointerFun).
-spec mergefile_selector(manifest(), integer()) -> manifest_entry().
-spec mergefile_selector(manifest(), integer(), selector_strategy())
-> manifest_entry().
%% @doc
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
@ -441,14 +445,29 @@ merge_lookup(Manifest, LevelIdx, StartKey, EndKey) ->
%% genuinely better - eventually every file has to be compacted.
%%
%% Hence, the initial implementation is to select files to merge at random
mergefile_selector(Manifest, LevelIdx) when LevelIdx =< 1 ->
mergefile_selector(Manifest, LevelIdx, _Strategy) when LevelIdx =< 1 ->
Level = array:get(LevelIdx, Manifest#manifest.levels),
lists:nth(leveled_rand:uniform(length(Level)), Level);
mergefile_selector(Manifest, LevelIdx) ->
mergefile_selector(Manifest, LevelIdx, random) ->
Level = leveled_tree:to_list(array:get(LevelIdx,
Manifest#manifest.levels)),
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
ME.
ME;
mergefile_selector(Manifest, LevelIdx, {grooming, ScoringFun}) ->
Level = leveled_tree:to_list(array:get(LevelIdx,
Manifest#manifest.levels)),
SelectorFun =
fun(_I, Acc) ->
{_SK, ME} = lists:nth(leveled_rand:uniform(length(Level)), Level),
[ME|Acc]
end,
Sample =
lists:usort(lists:foldl(SelectorFun, [], lists:seq(1, ?GROOM_SAMPLE))),
% Note that Entries may be less than GROOM_SAMPLE, if same one picked
% multiple times. Level cannot be empty, as otherwise a merge would not
% have been chosen at this level
ScoringFun(Sample).
-spec merge_snapshot(manifest(), manifest()) -> manifest().
%% @doc
@ -607,6 +626,7 @@ check_bloom(Manifest, FP, Hash) ->
%%% Internal Functions
%%%============================================================================
-spec get_manifest_entry({tuple(), manifest_entry()}|manifest_entry())
-> manifest_entry().
%% @doc
@ -1057,10 +1077,10 @@ changeup_setup(Man6) ->
random_select_test() ->
ManTuple = initial_setup(),
LastManifest = element(7, ManTuple),
L1File = mergefile_selector(LastManifest, 1),
L1File = mergefile_selector(LastManifest, 1, random),
% This blows up if the function is not prepared for the different format
% https://github.com/martinsumner/leveled/issues/43
_L2File = mergefile_selector(LastManifest, 2),
_L2File = mergefile_selector(LastManifest, 2, random),
Level1 = array:get(1, LastManifest#manifest.levels),
?assertMatch(true, lists:member(L1File, Level1)).

View file

@ -373,7 +373,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
SQN),
% Need to check that we have not folded past the point
% at which the snapshot was taken
(JournalSQN >= SQN) and CheckSQN
(JournalSQN >= SQN) and (CheckSQN == current)
end,

View file

@ -92,6 +92,7 @@
-define(FLIPPER32, 4294967295).
-define(COMPRESS_AT_LEVEL, 1).
-define(INDEX_MODDATE, true).
-define(TOMB_COUNT, true).
-define(USE_SET_FOR_SPEED, 64).
-define(STARTUP_TIMEOUT, 10000).
@ -111,7 +112,7 @@
delete_pending/3]).
-export([sst_new/6,
sst_new/8,
sst_newmerge/8,
sst_newlevelzero/7,
sst_open/4,
sst_get/2,
@ -123,8 +124,15 @@
sst_checkready/1,
sst_switchlevels/2,
sst_deleteconfirmed/1,
sst_gettombcount/1,
sst_close/1]).
-ifdef(TEST).
-export([sst_newmerge/10]).
-endif.
-export([tune_seglist/1, extract_hash/1, member_check/2]).
-export([in_range/3]).
@ -158,7 +166,7 @@
range_endpoint(),
range_endpoint()}.
-type expandable_pointer()
:: slot_pointer()|sst_closed_pointer().
:: slot_pointer()|sst_pointer()|sst_closed_pointer().
-type expanded_pointer()
:: leveled_codec:ledger_kv()|expandable_pointer().
-type binaryslot_element()
@ -168,9 +176,13 @@
{sets, sets:set(non_neg_integer())}|
{list, list(non_neg_integer())}.
-type sst_options()
:: #sst_options{}.
:: #sst_options{}.
-type binary_slot()
:: {binary(), binary(), list(integer()), leveled_codec:ledger_key()}.
-type sst_summary()
:: #summary{}.
%% yield_blockquery is used to detemrine if the work necessary to process a
%% yield_blockquery is used to determine if the work necessary to process a
%% range query beyond the fetching the slot should be managed from within
%% this process, or should be handled by the calling process.
%% Handling within the calling process may lead to extra binary heap garbage
@ -193,7 +205,9 @@
fetch_cache = array:new([{size, ?CACHE_SIZE}]),
new_slots :: list()|undefined,
deferred_startup_tuple :: tuple()|undefined,
level :: non_neg_integer()|undefined}).
level :: non_neg_integer()|undefined,
tomb_count = not_counted
:: non_neg_integer()|not_counted}).
-record(sst_timings,
{sample_count = 0 :: integer(),
@ -265,7 +279,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
{ok, Pid} = gen_fsm:start_link(?MODULE, [], []),
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{[], [], SlotList, FK} =
{[], [], SlotList, FK, _CountOfTombs} =
merge_lists(KVList, OptsSST0, IndexModDate),
case gen_fsm:sync_send_event(Pid,
{sst_new,
@ -276,17 +290,18 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
MaxSQN,
OptsSST0,
IndexModDate,
not_counted,
self()},
infinity) of
{ok, {SK, EK}, Bloom} ->
{ok, Pid, {SK, EK}, Bloom}
end.
-spec sst_new(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), integer(),
integer(), sst_options())
-spec sst_newmerge(string(), string(),
list(leveled_codec:ledger_kv()|sst_pointer()),
list(leveled_codec:ledger_kv()|sst_pointer()),
boolean(), integer(),
integer(), sst_options())
-> empty|{ok, pid(),
{{list(leveled_codec:ledger_kv()),
list(leveled_codec:ledger_kv())},
@ -302,23 +317,23 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) ->
%% The remainder of the lists is returned along with the StartKey and EndKey
%% so that the remainder cna be used in the next file in the merge. It might
%% be that the merge_lists returns nothing (for example when a basement file is
%% all tombstones) - and the atome empty is returned in this case so that the
%% all tombstones) - and the atom empty is returned in this case so that the
%% file is not added to the manifest.
sst_new(RootPath, Filename,
sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST) ->
sst_new(RootPath, Filename,
sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST, ?INDEX_MODDATE).
MaxSQN, OptsSST, ?INDEX_MODDATE, ?TOMB_COUNT).
sst_new(RootPath, Filename,
sst_newmerge(RootPath, Filename,
KVL1, KVL2, IsBasement, Level,
MaxSQN, OptsSST, IndexModDate) ->
MaxSQN, OptsSST, IndexModDate, TombCount) ->
PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method),
OptsSST0 = OptsSST#sst_options{press_method = PressMethod0},
{Rem1, Rem2, SlotList, FK} =
merge_lists(KVL1, KVL2, {IsBasement, Level},
OptsSST0, IndexModDate),
{Rem1, Rem2, SlotList, FK, CountOfTombs} =
merge_lists(KVL1, KVL2, {IsBasement, Level}, OptsSST0,
IndexModDate, TombCount),
case SlotList of
[] ->
empty;
@ -333,6 +348,7 @@ sst_new(RootPath, Filename,
MaxSQN,
OptsSST0,
IndexModDate,
CountOfTombs,
self()},
infinity) of
{ok, {SK, EK}, Bloom} ->
@ -428,6 +444,15 @@ sst_expandpointer(Pointer, MorePointers, ScanWidth, SegmentList, LowLastMod) ->
sst_setfordelete(Pid, Penciller) ->
gen_fsm:sync_send_event(Pid, {set_for_delete, Penciller}, infinity).
-spec sst_gettombcount(pid()) -> non_neg_integer()|not_counted.
%% @doc
%% Get the count of tomb stones in this SST file, returning not_counted if this
%% file was created with a version which did not support tombstone counting, or
%% could also be because the file is L0 (which aren't counted as being chosen
%% for merge is inevitable)
sst_gettombcount(Pid) ->
gen_fsm:sync_send_event(Pid, get_tomb_count, infinity).
-spec sst_clear(pid()) -> ok.
%% @doc
%% For this file to be closed and deleted
@ -497,17 +522,18 @@ starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
starting({sst_new,
RootPath, Filename, Level,
{SlotList, FirstKey}, MaxSQN,
OptsSST, IdxModDate, StartingPID}, _From, State) ->
OptsSST, IdxModDate, CountOfTombs, StartingPID}, _From, State) ->
SW = os:timestamp(),
leveled_log:save(OptsSST#sst_options.log_options),
PressMethod = OptsSST#sst_options.press_method,
{Length, SlotIndex, BlockIndex, SlotsBin, Bloom} =
build_all_slots(SlotList),
SummaryBin =
build_table_summary(SlotIndex, Level, FirstKey, Length, MaxSQN, Bloom),
build_table_summary(SlotIndex, Level, FirstKey, Length,
MaxSQN, Bloom, CountOfTombs),
ActualFilename =
write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate),
PressMethod, IdxModDate, CountOfTombs),
YBQ = Level =< 2,
{UpdState, Bloom} =
read_file(ActualFilename,
@ -530,7 +556,8 @@ starting({sst_newlevelzero, RootPath, Filename,
Penciller, MaxSQN,
OptsSST, IdxModDate}, _From, State) ->
DeferredStartupTuple =
{RootPath, Filename, Penciller, MaxSQN, OptsSST, IdxModDate},
{RootPath, Filename, Penciller, MaxSQN, OptsSST,
IdxModDate},
{reply, ok, starting,
State#state{deferred_startup_tuple = DeferredStartupTuple, level = 0}};
starting(close, _From, State) ->
@ -551,7 +578,7 @@ starting(complete_l0startup, State) ->
Time0 = timer:now_diff(os:timestamp(), SW0),
SW1 = os:timestamp(),
{[], [], SlotList, FirstKey} =
{[], [], SlotList, FirstKey, _CountOfTombs} =
merge_lists(KVList, OptsSST, IdxModDate),
Time1 = timer:now_diff(os:timestamp(), SW1),
@ -562,13 +589,14 @@ starting(complete_l0startup, State) ->
SW3 = os:timestamp(),
SummaryBin =
build_table_summary(SlotIndex, 0, FirstKey, SlotCount, MaxSQN, Bloom),
build_table_summary(SlotIndex, 0, FirstKey, SlotCount,
MaxSQN, Bloom, not_counted),
Time3 = timer:now_diff(os:timestamp(), SW3),
SW4 = os:timestamp(),
ActualFilename =
write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate),
PressMethod, IdxModDate, not_counted),
{UpdState, Bloom} =
read_file(ActualFilename,
State#state{root_path=RootPath,
@ -716,6 +744,8 @@ reader(background_complete, _From, State) ->
Summary#summary.last_key},
reader,
State};
reader(get_tomb_count, _From, State) ->
{reply, State#state.tomb_count, reader, State};
reader(close, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State}.
@ -1196,11 +1226,11 @@ compress_level(_Level, PressMethod) ->
PressMethod.
write_file(RootPath, Filename, SummaryBin, SlotsBin,
PressMethod, IdxModDate) ->
PressMethod, IdxModDate, CountOfTombs) ->
SummaryLength = byte_size(SummaryBin),
SlotsLength = byte_size(SlotsBin),
{PendingName, FinalName} = generate_filenames(Filename),
FileVersion = gen_fileversion(PressMethod, IdxModDate),
FileVersion = gen_fileversion(PressMethod, IdxModDate, CountOfTombs),
case filelib:is_file(filename:join(RootPath, FinalName)) of
true ->
AltName = filename:join(RootPath, filename:basename(FinalName))
@ -1210,6 +1240,7 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin,
false ->
ok
end,
ok = leveled_util:safe_rename(filename:join(RootPath, PendingName),
filename:join(RootPath, FinalName),
<<FileVersion:8/integer,
@ -1225,7 +1256,8 @@ read_file(Filename, State, LoadPageCache) ->
open_reader(filename:join(State#state.root_path, Filename),
LoadPageCache),
UpdState0 = imp_fileversion(FileVersion, State),
{Summary, Bloom, SlotList} = read_table_summary(SummaryBin),
{Summary, Bloom, SlotList, TombCount} =
read_table_summary(SummaryBin, UpdState0#state.tomb_count),
BlockIndexCache = array:new([{size, Summary#summary.size},
{default, none}]),
UpdState1 = UpdState0#state{blockindex_cache = BlockIndexCache},
@ -1235,11 +1267,12 @@ read_file(Filename, State, LoadPageCache) ->
Summary#summary.size,
Summary#summary.max_sqn]),
{UpdState1#state{summary = UpdSummary,
handle = Handle,
filename = Filename},
handle = Handle,
filename = Filename,
tomb_count = TombCount},
Bloom}.
gen_fileversion(PressMethod, IdxModDate) ->
gen_fileversion(PressMethod, IdxModDate, CountOfTombs) ->
% Native or none can be treated the same once written, as reader
% does not need to know as compression info will be in header of the
% block
@ -1256,7 +1289,14 @@ gen_fileversion(PressMethod, IdxModDate) ->
false ->
0
end,
Bit1+ Bit2.
Bit3 =
case CountOfTombs of
not_counted ->
0;
_ ->
4
end,
Bit1 + Bit2 + Bit3.
imp_fileversion(VersionInt, State) ->
UpdState0 =
@ -1273,7 +1313,12 @@ imp_fileversion(VersionInt, State) ->
2 ->
UpdState0#state{index_moddate = true}
end,
UpdState1.
case VersionInt band 4 of
0 ->
UpdState1;
4 ->
UpdState1#state{tomb_count = 0}
end.
open_reader(Filename, LoadPageCache) ->
{ok, Handle} = file:open(Filename, [binary, raw, read]),
@ -1290,25 +1335,50 @@ open_reader(Filename, LoadPageCache) ->
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength),
{Handle, FileVersion, SummaryBin}.
build_table_summary(SlotIndex, _Level, FirstKey, SlotCount, MaxSQN, Bloom) ->
build_table_summary(SlotIndex, _Level, FirstKey,
SlotCount, MaxSQN, Bloom, CountOfTombs) ->
[{LastKey, _LastV}|_Rest] = SlotIndex,
Summary = #summary{first_key = FirstKey,
last_key = LastKey,
size = SlotCount,
max_sqn = MaxSQN},
SummBin =
SummBin0 =
term_to_binary({Summary, Bloom, lists:reverse(SlotIndex)},
?BINARY_SETTINGS),
SummBin =
case CountOfTombs of
not_counted ->
SummBin0;
I ->
<<I:32/integer, SummBin0/binary>>
end,
SummCRC = hmac(SummBin),
<<SummCRC:32/integer, SummBin/binary>>.
read_table_summary(BinWithCheck) ->
-spec read_table_summary(binary(), not_counted|non_neg_integer()) ->
{sst_summary(),
leveled_ebloom:bloom(),
list(tuple()),
not_counted|non_neg_integer()}.
%% @doc
%% Read the table summary - format varies depending on file version (presence
%% of tomb count)
read_table_summary(BinWithCheck, TombCount) ->
<<SummCRC:32/integer, SummBin/binary>> = BinWithCheck,
CRCCheck = hmac(SummBin),
if
CRCCheck == SummCRC ->
% If not might it might be possible to rebuild from all the slots
binary_to_term(SummBin)
case TombCount of
not_counted ->
erlang:append_element(binary_to_term(SummBin),
not_counted);
_ ->
<<I:32/integer, SummBin0/binary>> = SummBin,
erlang:append_element(binary_to_term(SummBin0), I)
end
end.
@ -1511,7 +1581,7 @@ accumulate_positions({K, V}, {PosBinAcc, NoHashCount, HashAcc, LMDAcc}) ->
NHC:7/integer,
PosBinAcc/binary>>,
0,
HashAcc,
[H1|HashAcc],
LMDAcc0}
end;
false ->
@ -1535,10 +1605,7 @@ take_max_lastmoddate(LMD, LMDAcc) ->
press_method(),
boolean(),
build_timings()) ->
{{binary(),
binary(),
list(integer()),
leveled_codec:ledger_key()},
{binary_slot(),
build_timings()}.
%% @doc
%% Generate the serialised slot to be used when storing this sublist of keys
@ -2258,7 +2325,7 @@ revert_position(Pos) ->
%% large numbers of index keys are present - as well as improving compression
%% ratios in the Ledger.
%%
%% The outcome of merge_lists/3 and merge_lists/5 should be an list of slots.
%% The outcome of merge_lists/3 and merge_lists/6 should be an list of slots.
%% Each slot should be ordered by Key and be of the form {Flag, KVList}, where
%% Flag can either be lookup or no-lookup. The list of slots should also be
%% ordered by Key (i.e. the first key in the slot)
@ -2275,17 +2342,19 @@ revert_position(Pos) ->
%% any lower sequence numbers should be compacted out of existence
-spec merge_lists(list(), sst_options(), boolean())
-> {list(), list(), list(tuple()), tuple()|null}.
-> {list(), list(), list(binary_slot()),
tuple()|null, non_neg_integer()|not_counted}.
%% @doc
%%
%% Merge from asingle list (i.e. at Level 0)
%% Merge from a single list (i.e. at Level 0)
merge_lists(KVList1, SSTOpts, IdxModDate) ->
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
SlotCount = length(KVList1) div ?LOOK_SLOTSIZE,
{[],
[],
split_lists(KVList1, [],
SlotCount, SSTOpts#sst_options.press_method, IdxModDate),
element(1, lists:nth(1, KVList1))}.
element(1, lists:nth(1, KVList1)),
not_counted}.
split_lists([], SlotLists, 0, _PressMethod, _IdxModDate) ->
@ -2301,35 +2370,57 @@ split_lists(KVList1, SlotLists, N, PressMethod, IdxModDate) ->
split_lists(KVListRem, [SlotD|SlotLists], N - 1, PressMethod, IdxModDate).
-spec merge_lists(list(), list(), tuple(), sst_options(), boolean()) ->
{list(), list(), list(tuple()), tuple()|null}.
-spec merge_lists(list(), list(), tuple(), sst_options(),
boolean(), boolean()) ->
{list(), list(), list(binary_slot()), tuple()|null,
non_neg_integer()}.
%% @doc
%% Merge lists when merging across more thna one file. KVLists that are
%% Merge lists when merging across more than one file. KVLists that are
%% provided may include pointers to fetch more Keys/Values from the source
%% file
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts, IndexModDate) ->
merge_lists(KVList1, KVList2, LevelInfo, SSTOpts,
IndexModDate, SaveTombCount) ->
InitTombCount =
case SaveTombCount of true -> 0; false -> not_counted end,
merge_lists(KVList1, KVList2,
LevelInfo,
[], null, 0,
SSTOpts#sst_options.max_sstslots,
SSTOpts#sst_options.press_method,
IndexModDate,
InitTombCount,
#build_timings{}).
-spec merge_lists(
list(expanded_pointer()),
list(expanded_pointer()),
{boolean(), non_neg_integer()},
list(binary_slot()),
leveled_codec:ledger_key()|null,
non_neg_integer(),
non_neg_integer(),
press_method(),
boolean(),
non_neg_integer()|not_counted,
build_timings()) ->
{list(expanded_pointer()), list(expanded_pointer()),
list(binary_slot()), leveled_codec:ledger_key()|null,
non_neg_integer()|not_counted}.
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, MaxSlots, MaxSlots,
_PressMethod, _IdxModDate, T0) ->
_PressMethod, _IdxModDate, CountOfTombs, T0) ->
% This SST file is full, move to complete file, and return the
% remainder
log_buildtimings(T0, LI),
{KVL1, KVL2, lists:reverse(SlotList), FirstKey};
{KVL1, KVL2, lists:reverse(SlotList), FirstKey, CountOfTombs};
merge_lists([], [], LI, SlotList, FirstKey, _SlotCount, _MaxSlots,
_PressMethod, _IdxModDate, T0) ->
_PressMethod, _IdxModDate, CountOfTombs, T0) ->
% the source files are empty, complete the file
log_buildtimings(T0, LI),
{[], [], lists:reverse(SlotList), FirstKey};
{[], [], lists:reverse(SlotList), FirstKey, CountOfTombs};
merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
PressMethod, IdxModDate, T0) ->
PressMethod, IdxModDate, CountOfTombs, T0) ->
% Form a slot by merging the two lists until the next 128 K/V pairs have
% been determined
SW = os:timestamp(),
@ -2348,6 +2439,7 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
MaxSlots,
PressMethod,
IdxModDate,
CountOfTombs,
T1);
{Lookup, KVL} ->
% Convert the list of KVs for the slot into a binary, and related
@ -2363,9 +2455,42 @@ merge_lists(KVL1, KVL2, LI, SlotList, FirstKey, SlotCount, MaxSlots,
MaxSlots,
PressMethod,
IdxModDate,
count_tombs(KVL, CountOfTombs),
T2)
end.
-spec count_tombs(list(leveled_codec:ledger_kv()),
non_neg_integer()|not_counted) ->
non_neg_integer()|not_counted.
%% @doc
%% Count the tombstones in a list of KVs
count_tombs(_KVL, not_counted) ->
not_counted;
count_tombs(KVL, InitCount) ->
FoldFun =
fun(KV, Count) ->
case leveled_codec:strip_to_statusonly(KV) of
tomb ->
Count + 1;
_ ->
Count
end
end,
lists:foldl(FoldFun, InitCount, KVL).
-spec form_slot(list(expanded_pointer()),
list(expanded_pointer()),
{boolean(), non_neg_integer()},
lookup|no_lookup,
non_neg_integer(),
list(leveled_codec:ledger_kv()),
leveled_codec:ledger_key()|null) ->
{list(expanded_pointer()), list(expanded_pointer()),
{lookup|no_lookup, list(leveled_codec:ledger_kv())},
leveled_codec:ledger_key()}.
%% @doc
%% Merge together Key Value lists to provide an ordered slot of KVs
form_slot([], [], _LI, Type, _Size, Slot, FK) ->
{[], [], {Type, lists:reverse(Slot)}, FK};
form_slot(KVList1, KVList2, _LI, lookup, ?LOOK_SLOTSIZE, Slot, FK) ->
@ -2644,8 +2769,8 @@ testsst_new(RootPath, Filename,
OptsSST =
#sst_options{press_method=PressMethod,
log_options=leveled_log:get_opts()},
sst_new(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
OptsSST, false).
sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN,
OptsSST, false, false).
generate_randomkeys(Seqn, Count, BucketRangeLow, BucketRangeHigh) ->
generate_randomkeys(Seqn,
@ -2695,11 +2820,61 @@ generate_indexkey(Term, Count) ->
infinity).
tombcount_test() ->
N = 1600,
KL1 = generate_randomkeys(N div 2 + 1, N, 1, 4),
KL2 = generate_indexkeys(N div 2),
FlipToTombFun =
fun({K, V}) ->
case leveled_rand:uniform(10) of
X when X > 5 ->
{K, setelement(2, V, tomb)};
_ ->
{K, V}
end
end,
KVL1 = lists:map(FlipToTombFun, KL1),
KVL2 = lists:map(FlipToTombFun, KL2),
CountTombFun =
fun({_K, V}, Acc) ->
case element(2, V) of
tomb ->
Acc + 1;
_ ->
Acc
end
end,
ExpectedCount = lists:foldl(CountTombFun, 0, KVL1 ++ KVL2),
{RP, Filename} = {?TEST_AREA, "tombcount_test"},
OptsSST =
#sst_options{press_method=native,
log_options=leveled_log:get_opts()},
{ok, SST1, _KD, _BB} = sst_newmerge(RP, Filename,
KVL1, KVL2, false, 2,
N, OptsSST, false, false),
?assertMatch(not_counted, sst_gettombcount(SST1)),
ok = sst_close(SST1),
ok = file:delete(filename:join(RP, Filename ++ ".sst")),
{ok, SST2, _KD, _BB} = sst_newmerge(RP, Filename,
KVL1, KVL2, false, 2,
N, OptsSST, false, true),
?assertMatch(ExpectedCount, sst_gettombcount(SST2)),
ok = sst_close(SST2),
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
form_slot_test() ->
% If a skip key happens, mustn't switch to loookup by accident as could be
% over the expected size
SkippingKV = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
Slot = [{{o, "B1", "K5", null}, {5, active, 99234567, {}}}],
SkippingKV =
{{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
Slot =
[{{o, "B1", "K5", null},
{5, {active, infinity}, {99234568, 99234567}, {}}}],
R1 = form_slot([SkippingKV], [],
{true, 99999999},
no_lookup,
@ -2709,19 +2884,26 @@ form_slot_test() ->
?assertMatch({[], [], {no_lookup, Slot}, {o, "B1", "K5", null}}, R1).
merge_tombstonelist_test() ->
% Merge lists with nothing but tombstones
SkippingKV1 = {{o, "B1", "K9995", null}, {9995, tomb, 1234567, {}}},
SkippingKV2 = {{o, "B1", "K9996", null}, {9996, tomb, 1234567, {}}},
SkippingKV3 = {{o, "B1", "K9997", null}, {9997, tomb, 1234567, {}}},
SkippingKV4 = {{o, "B1", "K9998", null}, {9998, tomb, 1234567, {}}},
SkippingKV5 = {{o, "B1", "K9999", null}, {9999, tomb, 1234567, {}}},
% Merge lists with nothing but tombstones, and file at basement level
SkippingKV1 =
{{o, "B1", "K9995", null}, {9995, tomb, {1234568, 1234567}, {}}},
SkippingKV2 =
{{o, "B1", "K9996", null}, {9996, tomb, {1234568, 1234567}, {}}},
SkippingKV3 =
{{o, "B1", "K9997", null}, {9997, tomb, {1234568, 1234567}, {}}},
SkippingKV4 =
{{o, "B1", "K9998", null}, {9998, tomb, {1234568, 1234567}, {}}},
SkippingKV5 =
{{o, "B1", "K9999", null}, {9999, tomb, {1234568, 1234567}, {}}},
R = merge_lists([SkippingKV1, SkippingKV3, SkippingKV5],
[SkippingKV2, SkippingKV4],
{true, 9999999},
#sst_options{press_method = native,
max_sstslots = 256},
?INDEX_MODDATE),
?assertMatch({[], [], [], null}, R).
?INDEX_MODDATE,
true),
?assertMatch({[], [], [], null, 0}, R).
indexed_list_test() ->
io:format(user, "~nIndexed list timing test:~n", []),
@ -3319,6 +3501,7 @@ simple_persisted_tester(SSTNewFun) ->
Acc
end
end,
true = [] == MapFun({FirstKey, "V"}, []), % coverage cheat within MapFun
KVList3 = lists:foldl(MapFun, [], KVList2),
SW2 = os:timestamp(),
lists:foreach(fun({K, H, _V}) ->