Merge branch 'master' into mas-i311-mergeselector

This commit is contained in:
Martin Sumner 2020-03-27 17:11:18 +00:00
commit 42eb5f56bc
8 changed files with 223 additions and 90 deletions

View file

@ -1,7 +1,7 @@
{application, leveled,
[
{description, "Key Value store based on LSM-Tree and designed for larger values"},
{vsn, "0.9.20"},
{vsn, "0.9.21"},
{registered, []},
{applications, [
kernel,

View file

@ -2278,7 +2278,7 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
-spec get_loadfun(book_state()) -> fun().
%% @doc
%% The LoadFun will be sued by the Inker when walking across the Journal to
%% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup
get_loadfun(State) ->
PrepareFun =

View file

@ -35,7 +35,7 @@
from_inkerkv/2,
from_journalkey/1,
revert_to_keydeltas/2,
is_compaction_candidate/1,
is_full_journalentry/1,
split_inkvalue/1,
check_forinkertype/2,
get_tagstrategy/2,
@ -410,11 +410,12 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
%% If we wish to retain key deltas when an object in the Journal has been
%% replaced - then this converts a Journal Key and Value into one which has no
%% object body just the key deltas.
%% Only called if retain strategy and has passed
%% leveled_codec:is_full_journalentry/1 - so no need to consider other key
%% types
revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) ->
{_V, KeyDeltas} = revert_value_from_journal(InkerV),
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}};
revert_to_keydeltas(JournalKey, InkerV) ->
{JournalKey, InkerV}.
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}.
%% Used when fetching objects, so only handles standard, hashable entries
from_inkerkv(Object) ->
@ -560,12 +561,12 @@ check_forinkertype(_LedgerKey, head_only) ->
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
-spec is_compaction_candidate(journal_key()) -> boolean().
-spec is_full_journalentry(journal_key()) -> boolean().
%% @doc
%% Only journal keys with standard objects should be scored for compaction
is_compaction_candidate({_SQN, ?INKT_STND, _LK}) ->
is_full_journalentry({_SQN, ?INKT_STND, _LK}) ->
true;
is_compaction_candidate(_OtherJKType) ->
is_full_journalentry(_OtherJKType) ->
false.

View file

@ -528,17 +528,17 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
fun(KS, {ActSize, RplSize}) ->
case KS of
{{SQN, Type, PK}, Size} ->
MayScore =
leveled_codec:is_compaction_candidate({SQN, Type, PK}),
case MayScore of
IsJournalEntry =
leveled_codec:is_full_journalentry({SQN, Type, PK}),
case IsJournalEntry of
false ->
{ActSize + Size - ?CRC_SIZE, RplSize};
true ->
Check = FilterFun(FilterServer, PK, SQN),
case {Check, SQN > MaxSQN} of
{true, _} ->
{current, _} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
{false, true} ->
{_, true} ->
{ActSize + Size - ?CRC_SIZE, RplSize};
_ ->
{ActSize, RplSize + Size - ?CRC_SIZE}
@ -766,39 +766,51 @@ split_positions_into_batches(Positions, Journal, Batches) ->
%% a `skip` strategy.
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
FoldFun =
fun(KVC0, Acc) ->
case KVC0 of
{_InkKey, crc_wonky, false} ->
% Bad entry, disregard, don't check
Acc;
{JK, JV, _Check} ->
{SQN, LK} =
leveled_codec:from_journalkey(JK),
CompactStrategy =
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
KeyValid = FilterFun(FilterServer, LK, SQN),
IsInMemory = SQN > MaxSQN,
case {KeyValid or IsInMemory, CompactStrategy} of
{true, _} ->
% This entry may still be required regardless of
% strategy
[KVC0|Acc];
{false, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no longer
% required as this version has been replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{false, _} ->
% This is out of date and not retained - discard
Acc
end
end
end,
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
fun(KVC0, Acc) ->
case KVC0 of
{_InkKey, crc_wonky, false} ->
% Bad entry, disregard, don't check
Acc;
{JK, JV, _Check} ->
{SQN, LK} =
leveled_codec:from_journalkey(JK),
CompactStrategy =
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
IsJournalEntry =
leveled_codec:is_full_journalentry(JK),
case {CompactStrategy, IsJournalEntry} of
{retain, false} ->
[KVC0|Acc];
_ ->
KeyCurrent = FilterFun(FilterServer, LK, SQN),
IsInMemory = SQN > MaxSQN,
case {KeyCurrent, IsInMemory, CompactStrategy} of
{KC, InMem, _} when KC == current; InMem ->
% This entry may still be required
% regardless of strategy
[KVC0|Acc];
{_, _, retain} ->
% If we have a retain strategy, it can't be
% discarded - but the value part is no
% longer required as this version has been
% replaced
{JK0, JV0} =
leveled_codec:revert_to_keydeltas(JK, JV),
[{JK0, JV0, null}|Acc];
{_, _, _} ->
% This is out of date and not retained so
% discard
Acc
end
end
end
end.
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
{Journal0, ManSlice0};
write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) ->
@ -994,13 +1006,13 @@ check_single_file_test() ->
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
case lists:keyfind(ObjSQN, 1, Srv) of
{ObjSQN, Key} ->
true;
current;
_ ->
false
replaced
end end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3),
@ -1025,9 +1037,9 @@ compact_single_file_setup() ->
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
case lists:keyfind(ObjSQN, 1, Srv) of
{ObjSQN, Key} ->
true;
current;
_ ->
false
replaced
end end,
CompactFP = leveled_inker:filepath(RP, journal_compact_dir),
ok = filelib:ensure_dir(CompactFP),
@ -1127,7 +1139,7 @@ compact_empty_file_test() ->
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
{2, {o, "Bucket", "Key2", null}},
{3, {o, "Bucket", "Key3", null}}],
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end,
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
?assertMatch(0.0, Score1),
ok = leveled_cdb:cdb_deletepending(CDB2),
@ -1169,7 +1181,13 @@ compact_singlefile_totwosmallfiles_testto() ->
filename=leveled_cdb:cdb_filename(CDBr),
journal=CDBr,
compaction_perc=50.0}],
FakeFilterFun = fun(_FS, _LK, SQN) -> SQN rem 2 == 0 end,
FakeFilterFun =
fun(_FS, _LK, SQN) ->
case SQN rem 2 of
0 -> current;
_ -> replaced
end
end,
ManifestSlice = compact_files(BestRun1,
CDBoptsSmall,
@ -1198,7 +1216,13 @@ size_score_test() ->
{{7, ?INKT_STND, "Key7"}, 184}],
MaxSQN = 6,
CurrentList = ["Key1", "Key4", "Key5", "Key6"],
FilterFun = fun(L, K, _SQN) -> lists:member(K, L) end,
FilterFun =
fun(L, K, _SQN) ->
case lists:member(K, L) of
true -> current;
false -> replaced
end
end,
Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN),
?assertMatch(true, Score > 69.0),
?assertMatch(true, Score < 70.0).

View file

@ -1142,18 +1142,18 @@ fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
Acc0 = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest);
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
Acc0 =
{NextMinSQN, Acc0} =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
@ -1172,9 +1172,9 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
undefined,
FN);
_ ->
Acc
{MinSQN, Acc}
end,
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest).
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
@ -1182,8 +1182,8 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
FoldFun(BatchAcc, Acc);
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
@ -1452,7 +1452,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
case lists:member({SQN, K}, L) of
true -> current;
false -> replaced
end
end,
5000),
timer:sleep(1000),
@ -1464,7 +1467,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
fun(X) -> {X, 55} end,
fun(_F) -> ok end,
fun(L, K, SQN) ->
lists:member({SQN, K}, L)
case lists:member({SQN, K}, L) of
true -> current;
false -> replaced
end
end,
5000),
timer:sleep(1000),

View file

@ -305,8 +305,9 @@
list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}.
-type iterator() :: list(iterator_entry()).
-type bad_ledgerkey() :: list().
-type sqn_check() :: current|replaced|missing.
-export_type([levelzero_cacheentry/0]).
-export_type([levelzero_cacheentry/0, sqn_check/0]).
%%%============================================================================
%%% API
@ -480,14 +481,13 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
-spec pcl_checksequencenumber(pid(),
leveled_codec:ledger_key()|bad_ledgerkey(),
integer()) -> boolean().
integer()) -> sqn_check().
%% @doc
%% Check if the sequence number of the passed key is not replaced by a change
%% after the passed sequence number. Will return true if the Key is present
%% and either is equal to, or prior to the passed SQN.
%%
%% If the key is not present, it will be assumed that a higher sequence number
%% tombstone once existed, and false will be returned.
%% after the passed sequence number. Will return:
%% - current
%% - replaced
%% - missing
pcl_checksequencenumber(Pid, Key, SQN) ->
Hash = leveled_codec:segment_hash(Key),
if
@ -1487,7 +1487,7 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
end.
-spec compare_to_sqn(tuple()|not_present, integer()) -> boolean().
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check().
%% @doc
%% Check to see if the SQN in the penciller is after the SQN expected for an
%% object (used to allow the journal to check compaction status from a cache
@ -1495,19 +1495,19 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
compare_to_sqn(Obj, SQN) ->
case Obj of
not_present ->
false;
missing;
Obj ->
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
if
SQNToCompare > SQN ->
false;
replaced;
true ->
% Normally we would expect the SQN to be equal here, but
% this also allows for the Journal to have a more advanced
% value. We return true here as we wouldn't want to
% compact thta more advanced value, but this may cause
% confusion in snapshots.
true
current
end
end.
@ -2097,25 +2097,25 @@ simple_server_test() ->
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004", null})),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0001",
"Key0001",
null},
1)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0002",
"Key0002",
null},
1002)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0003",
"Key0003",
null},
2003)),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0004",
"Key0004",
@ -2132,7 +2132,7 @@ simple_server_test() ->
KL1A = generate_randomkeys({2000, 4006}),
ok = maybe_pause_push(PCLr, [Key1A]),
ok = maybe_pause_push(PCLr, KL1A),
?assertMatch(true, pcl_checksequencenumber(PclSnap,
?assertMatch(current, pcl_checksequencenumber(PclSnap,
{o,
"Bucket0001",
"Key0001",
@ -2148,19 +2148,19 @@ simple_server_test() ->
undefined,
false),
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
?assertMatch(replaced, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",
"Key0001",
null},
1)),
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0001",
"Key0001",
null},
4005)),
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
{o,
"Bucket0002",
"Key0002",

View file

@ -373,7 +373,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
SQN),
% Need to check that we have not folded past the point
% at which the snapshot was taken
(JournalSQN >= SQN) and CheckSQN
(JournalSQN >= SQN) and (CheckSQN == current)
end,

View file

@ -238,12 +238,12 @@ retain_strategy(_Config) ->
RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 5000000},
{max_journalobjectcount, 5000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]}],
BookOptsAlt = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 100000},
{max_journalobjectcount, 2000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, retain}]},
{max_run_length, 8}],
@ -260,6 +260,58 @@ retain_strategy(_Config) ->
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{ok, Book1} = leveled_bookie:book_start(BookOpts),
compact_and_wait(Book1),
compact_and_wait(Book1),
ok = leveled_bookie:book_close(Book1),
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
{"Bucket4", Spcl4, LastV4},
{"Bucket5", Spcl5, LastV5},
{"Bucket6", Spcl6, LastV6}]),
{ok, Book2} = leveled_bookie:book_start(BookOptsAlt),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000),
Q2 = fun(RT) -> {index_query,
"AltBucket6",
{fun testutil:foldkeysfun/3, []},
{"idx1_bin", "#", "|"},
{RT, undefined}}
end,
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2A = lists:usort(KFolder2A()),
true = length(KeyList2A) == 3000,
DeleteFun =
fun({DK, [{add, DIdx, DTerm}]}) ->
ok = testutil:book_riakdelete(Book2,
"AltBucket6",
DK,
[{remove, DIdx, DTerm}])
end,
lists:foreach(DeleteFun, KSpcL2),
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2AD = lists:usort(KFolder2AD()),
true = length(KeyList2AD) == 0,
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
io:format("Compact after deletions~n"),
compact_and_wait(Book3),
compact_and_wait(Book3),
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList3AD = lists:usort(KFolder3AD()),
true = length(KeyList3AD) == 0,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure().
@ -267,7 +319,7 @@ recovr_strategy(_Config) ->
RootPath = testutil:reset_filestructure(),
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 5000000},
{max_journalobjectcount, 8000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, recovr}]}],
@ -309,7 +361,57 @@ recovr_strategy(_Config) ->
true = length(KeyList) == 6400,
true = length(KeyList) < length(KeyTermList),
true = length(KeyTermList) < 25600,
ok = leveled_bookie:book_close(Book1),
RevisedOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalobjectcount, 2000},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, recovr}]}],
{ok, Book2} = leveled_bookie:book_start(RevisedOpts),
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000),
{async, KFolder2} = leveled_bookie:book_returnfolder(Book2, Q(false)),
KeyList2 = lists:usort(KFolder2()),
true = length(KeyList2) == 6400,
Q2 = fun(RT) -> {index_query,
"AltBucket6",
{fun testutil:foldkeysfun/3, []},
{"idx1_bin", "#", "|"},
{RT, undefined}}
end,
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2A = lists:usort(KFolder2A()),
true = length(KeyList2A) == 3000,
DeleteFun =
fun({DK, [{add, DIdx, DTerm}]}) ->
ok = testutil:book_riakdelete(Book2,
"AltBucket6",
DK,
[{remove, DIdx, DTerm}])
end,
lists:foreach(DeleteFun, KSpcL2),
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
KeyList2AD = lists:usort(KFolder2AD()),
true = length(KeyList2AD) == 0,
compact_and_wait(Book2),
compact_and_wait(Book2),
ok = leveled_bookie:book_close(Book2),
{ok, Book3} = leveled_bookie:book_start(RevisedOpts),
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
KeyList3AD = lists:usort(KFolder3AD()),
true = length(KeyList3AD) == 0,
ok = leveled_bookie:book_close(Book3),
testutil:reset_filestructure().