diff --git a/src/leveled.app.src b/src/leveled.app.src index 11c190a..cfd5a60 100644 --- a/src/leveled.app.src +++ b/src/leveled.app.src @@ -1,7 +1,7 @@ {application, leveled, [ {description, "Key Value store based on LSM-Tree and designed for larger values"}, - {vsn, "0.9.20"}, + {vsn, "0.9.21"}, {registered, []}, {applications, [ kernel, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 63b22f1..74707dc 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2278,7 +2278,7 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) -> -spec get_loadfun(book_state()) -> fun(). %% @doc -%% The LoadFun will be sued by the Inker when walking across the Journal to +%% The LoadFun will be used by the Inker when walking across the Journal to %% load the Penciller at startup get_loadfun(State) -> PrepareFun = diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index b5cee4f..91386fc 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -35,7 +35,7 @@ from_inkerkv/2, from_journalkey/1, revert_to_keydeltas/2, - is_compaction_candidate/1, + is_full_journalentry/1, split_inkvalue/1, check_forinkertype/2, get_tagstrategy/2, @@ -410,11 +410,12 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) -> %% If we wish to retain key deltas when an object in the Journal has been %% replaced - then this converts a Journal Key and Value into one which has no %% object body just the key deltas. +%% Only called if retain strategy and has passed +%% leveled_codec:is_full_journalentry/1 - so no need to consider other key +%% types revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) -> {_V, KeyDeltas} = revert_value_from_journal(InkerV), - {{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}; -revert_to_keydeltas(JournalKey, InkerV) -> - {JournalKey, InkerV}. + {{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}. %% Used when fetching objects, so only handles standard, hashable entries from_inkerkv(Object) -> @@ -560,12 +561,12 @@ check_forinkertype(_LedgerKey, head_only) -> check_forinkertype(_LedgerKey, _Object) -> ?INKT_STND. --spec is_compaction_candidate(journal_key()) -> boolean(). +-spec is_full_journalentry(journal_key()) -> boolean(). %% @doc %% Only journal keys with standard objects should be scored for compaction -is_compaction_candidate({_SQN, ?INKT_STND, _LK}) -> +is_full_journalentry({_SQN, ?INKT_STND, _LK}) -> true; -is_compaction_candidate(_OtherJKType) -> +is_full_journalentry(_OtherJKType) -> false. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 1c969f6..e996c15 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -528,17 +528,17 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) -> fun(KS, {ActSize, RplSize}) -> case KS of {{SQN, Type, PK}, Size} -> - MayScore = - leveled_codec:is_compaction_candidate({SQN, Type, PK}), - case MayScore of + IsJournalEntry = + leveled_codec:is_full_journalentry({SQN, Type, PK}), + case IsJournalEntry of false -> {ActSize + Size - ?CRC_SIZE, RplSize}; true -> Check = FilterFun(FilterServer, PK, SQN), case {Check, SQN > MaxSQN} of - {true, _} -> + {current, _} -> {ActSize + Size - ?CRC_SIZE, RplSize}; - {false, true} -> + {_, true} -> {ActSize + Size - ?CRC_SIZE, RplSize}; _ -> {ActSize, RplSize + Size - ?CRC_SIZE} @@ -766,39 +766,51 @@ split_positions_into_batches(Positions, Journal, Batches) -> %% a `skip` strategy. filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> FoldFun = - fun(KVC0, Acc) -> - case KVC0 of - {_InkKey, crc_wonky, false} -> - % Bad entry, disregard, don't check - Acc; - {JK, JV, _Check} -> - {SQN, LK} = - leveled_codec:from_journalkey(JK), - CompactStrategy = - leveled_codec:get_tagstrategy(LK, ReloadStrategy), - KeyValid = FilterFun(FilterServer, LK, SQN), - IsInMemory = SQN > MaxSQN, - case {KeyValid or IsInMemory, CompactStrategy} of - {true, _} -> - % This entry may still be required regardless of - % strategy - [KVC0|Acc]; - {false, retain} -> - % If we have a retain strategy, it can't be - % discarded - but the value part is no longer - % required as this version has been replaced - {JK0, JV0} = - leveled_codec:revert_to_keydeltas(JK, JV), - [{JK0, JV0, null}|Acc]; - {false, _} -> - % This is out of date and not retained - discard - Acc - end - end - end, + filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy), lists:reverse(lists:foldl(FoldFun, [], KVCs)). +filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> + fun(KVC0, Acc) -> + case KVC0 of + {_InkKey, crc_wonky, false} -> + % Bad entry, disregard, don't check + Acc; + {JK, JV, _Check} -> + {SQN, LK} = + leveled_codec:from_journalkey(JK), + CompactStrategy = + leveled_codec:get_tagstrategy(LK, ReloadStrategy), + IsJournalEntry = + leveled_codec:is_full_journalentry(JK), + case {CompactStrategy, IsJournalEntry} of + {retain, false} -> + [KVC0|Acc]; + _ -> + KeyCurrent = FilterFun(FilterServer, LK, SQN), + IsInMemory = SQN > MaxSQN, + case {KeyCurrent, IsInMemory, CompactStrategy} of + {KC, InMem, _} when KC == current; InMem -> + % This entry may still be required + % regardless of strategy + [KVC0|Acc]; + {_, _, retain} -> + % If we have a retain strategy, it can't be + % discarded - but the value part is no + % longer required as this version has been + % replaced + {JK0, JV0} = + leveled_codec:revert_to_keydeltas(JK, JV), + [{JK0, JV0, null}|Acc]; + {_, _, _} -> + % This is out of date and not retained so + % discard + Acc + end + end + end + end. + write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) -> {Journal0, ManSlice0}; write_values(KVCList, CDBopts, Journal0, ManSlice0, PressMethod) -> @@ -994,13 +1006,13 @@ check_single_file_test() -> LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of {ObjSQN, Key} -> - true; + current; _ -> - false + replaced end end, Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4), ?assertMatch(37.5, Score1), - LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end, + LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4), ?assertMatch(100.0, Score2), Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3), @@ -1025,9 +1037,9 @@ compact_single_file_setup() -> LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of {ObjSQN, Key} -> - true; + current; _ -> - false + replaced end end, CompactFP = leveled_inker:filepath(RP, journal_compact_dir), ok = filelib:ensure_dir(CompactFP), @@ -1127,7 +1139,7 @@ compact_empty_file_test() -> LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, {2, {o, "Bucket", "Key2", null}}, {3, {o, "Bucket", "Key3", null}}], - LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end, + LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), ?assertMatch(0.0, Score1), ok = leveled_cdb:cdb_deletepending(CDB2), @@ -1169,7 +1181,13 @@ compact_singlefile_totwosmallfiles_testto() -> filename=leveled_cdb:cdb_filename(CDBr), journal=CDBr, compaction_perc=50.0}], - FakeFilterFun = fun(_FS, _LK, SQN) -> SQN rem 2 == 0 end, + FakeFilterFun = + fun(_FS, _LK, SQN) -> + case SQN rem 2 of + 0 -> current; + _ -> replaced + end + end, ManifestSlice = compact_files(BestRun1, CDBoptsSmall, @@ -1198,7 +1216,13 @@ size_score_test() -> {{7, ?INKT_STND, "Key7"}, 184}], MaxSQN = 6, CurrentList = ["Key1", "Key4", "Key5", "Key6"], - FilterFun = fun(L, K, _SQN) -> lists:member(K, L) end, + FilterFun = + fun(L, K, _SQN) -> + case lists:member(K, L) of + true -> current; + false -> replaced + end + end, Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN), ?assertMatch(true, Score > 69.0), ?assertMatch(true, Score < 70.0). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 32ee376..aa9d14d 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -1142,18 +1142,18 @@ fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> Acc; fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) when LowSQN >= MinSQN -> - Acc0 = foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN), - fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest); + {NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FoldFuns, + Acc, + Pid, + undefined, + FN), + fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest); fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> % If this file has a LowSQN less than the minimum, we can skip it if the % next file also has a LowSQN below the minimum - Acc0 = + {NextMinSQN, Acc0} = case Rest of [] -> foldfile_between_sequence(MinSQN, @@ -1172,9 +1172,9 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> undefined, FN); _ -> - Acc + {MinSQN, Acc} end, - fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest). + fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest). foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> @@ -1182,8 +1182,8 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of - {eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> - FoldFun(BatchAcc, Acc); + {eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} -> + {AccMinSQN, FoldFun(BatchAcc, Acc)}; {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> UpdAcc = FoldFun(BatchAcc, Acc), NextSQN = MaxSQN + 1, @@ -1452,7 +1452,10 @@ compact_journal_testto(WRP, ExpectedFiles) -> fun(X) -> {X, 55} end, fun(_F) -> ok end, fun(L, K, SQN) -> - lists:member({SQN, K}, L) + case lists:member({SQN, K}, L) of + true -> current; + false -> replaced + end end, 5000), timer:sleep(1000), @@ -1464,7 +1467,10 @@ compact_journal_testto(WRP, ExpectedFiles) -> fun(X) -> {X, 55} end, fun(_F) -> ok end, fun(L, K, SQN) -> - lists:member({SQN, K}, L) + case lists:member({SQN, K}, L) of + true -> current; + false -> replaced + end end, 5000), timer:sleep(1000), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 4d572bd..3373246 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -305,8 +305,9 @@ list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}. -type iterator() :: list(iterator_entry()). -type bad_ledgerkey() :: list(). +-type sqn_check() :: current|replaced|missing. --export_type([levelzero_cacheentry/0]). +-export_type([levelzero_cacheentry/0, sqn_check/0]). %%%============================================================================ %%% API @@ -480,14 +481,13 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) -> -spec pcl_checksequencenumber(pid(), leveled_codec:ledger_key()|bad_ledgerkey(), - integer()) -> boolean(). + integer()) -> sqn_check(). %% @doc %% Check if the sequence number of the passed key is not replaced by a change -%% after the passed sequence number. Will return true if the Key is present -%% and either is equal to, or prior to the passed SQN. -%% -%% If the key is not present, it will be assumed that a higher sequence number -%% tombstone once existed, and false will be returned. +%% after the passed sequence number. Will return: +%% - current +%% - replaced +%% - missing pcl_checksequencenumber(Pid, Key, SQN) -> Hash = leveled_codec:segment_hash(Key), if @@ -1487,7 +1487,7 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) -> end. --spec compare_to_sqn(tuple()|not_present, integer()) -> boolean(). +-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check(). %% @doc %% Check to see if the SQN in the penciller is after the SQN expected for an %% object (used to allow the journal to check compaction status from a cache @@ -1495,19 +1495,19 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) -> compare_to_sqn(Obj, SQN) -> case Obj of not_present -> - false; + missing; Obj -> SQNToCompare = leveled_codec:strip_to_seqonly(Obj), if SQNToCompare > SQN -> - false; + replaced; true -> % Normally we would expect the SQN to be equal here, but % this also allows for the Journal to have a more advanced % value. We return true here as we wouldn't want to % compact thta more advanced value, but this may cause % confusion in snapshots. - true + current end end. @@ -2097,25 +2097,25 @@ simple_server_test() -> ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})), ?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004", null})), - ?assertMatch(true, pcl_checksequencenumber(PclSnap, + ?assertMatch(current, pcl_checksequencenumber(PclSnap, {o, "Bucket0001", "Key0001", null}, 1)), - ?assertMatch(true, pcl_checksequencenumber(PclSnap, + ?assertMatch(current, pcl_checksequencenumber(PclSnap, {o, "Bucket0002", "Key0002", null}, 1002)), - ?assertMatch(true, pcl_checksequencenumber(PclSnap, + ?assertMatch(current, pcl_checksequencenumber(PclSnap, {o, "Bucket0003", "Key0003", null}, 2003)), - ?assertMatch(true, pcl_checksequencenumber(PclSnap, + ?assertMatch(current, pcl_checksequencenumber(PclSnap, {o, "Bucket0004", "Key0004", @@ -2132,7 +2132,7 @@ simple_server_test() -> KL1A = generate_randomkeys({2000, 4006}), ok = maybe_pause_push(PCLr, [Key1A]), ok = maybe_pause_push(PCLr, KL1A), - ?assertMatch(true, pcl_checksequencenumber(PclSnap, + ?assertMatch(current, pcl_checksequencenumber(PclSnap, {o, "Bucket0001", "Key0001", @@ -2148,19 +2148,19 @@ simple_server_test() -> undefined, false), - ?assertMatch(false, pcl_checksequencenumber(PclSnap2, + ?assertMatch(replaced, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", "Key0001", null}, 1)), - ?assertMatch(true, pcl_checksequencenumber(PclSnap2, + ?assertMatch(current, pcl_checksequencenumber(PclSnap2, {o, "Bucket0001", "Key0001", null}, 4005)), - ?assertMatch(true, pcl_checksequencenumber(PclSnap2, + ?assertMatch(current, pcl_checksequencenumber(PclSnap2, {o, "Bucket0002", "Key0002", diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 412ad60..a51dacc 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -373,7 +373,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> SQN), % Need to check that we have not folded past the point % at which the snapshot was taken - (JournalSQN >= SQN) and CheckSQN + (JournalSQN >= SQN) and (CheckSQN == current) end, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index a3890f8..640b688 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -238,12 +238,12 @@ retain_strategy(_Config) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath}, {cache_size, 1000}, - {max_journalsize, 5000000}, + {max_journalobjectcount, 5000}, {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, retain}]}], BookOptsAlt = [{root_path, RootPath}, {cache_size, 1000}, - {max_journalsize, 100000}, + {max_journalobjectcount, 2000}, {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, retain}]}, {max_run_length, 8}], @@ -260,6 +260,58 @@ retain_strategy(_Config) -> {"Bucket4", Spcl4, LastV4}, {"Bucket5", Spcl5, LastV5}, {"Bucket6", Spcl6, LastV6}]), + + {ok, Book1} = leveled_bookie:book_start(BookOpts), + compact_and_wait(Book1), + compact_and_wait(Book1), + ok = leveled_bookie:book_close(Book1), + + ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + {"Bucket4", Spcl4, LastV4}, + {"Bucket5", Spcl5, LastV5}, + {"Bucket6", Spcl6, LastV6}]), + + {ok, Book2} = leveled_bookie:book_start(BookOptsAlt), + + {KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000), + Q2 = fun(RT) -> {index_query, + "AltBucket6", + {fun testutil:foldkeysfun/3, []}, + {"idx1_bin", "#", "|"}, + {RT, undefined}} + end, + {async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + KeyList2A = lists:usort(KFolder2A()), + true = length(KeyList2A) == 3000, + + DeleteFun = + fun({DK, [{add, DIdx, DTerm}]}) -> + ok = testutil:book_riakdelete(Book2, + "AltBucket6", + DK, + [{remove, DIdx, DTerm}]) + end, + lists:foreach(DeleteFun, KSpcL2), + + {async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + KeyList2AD = lists:usort(KFolder2AD()), + true = length(KeyList2AD) == 0, + + ok = leveled_bookie:book_close(Book2), + + {ok, Book3} = leveled_bookie:book_start(BookOptsAlt), + + io:format("Compact after deletions~n"), + + compact_and_wait(Book3), + compact_and_wait(Book3), + + {async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)), + KeyList3AD = lists:usort(KFolder3AD()), + true = length(KeyList3AD) == 0, + + ok = leveled_bookie:book_close(Book3), + testutil:reset_filestructure(). @@ -267,7 +319,7 @@ recovr_strategy(_Config) -> RootPath = testutil:reset_filestructure(), BookOpts = [{root_path, RootPath}, {cache_size, 1000}, - {max_journalsize, 5000000}, + {max_journalobjectcount, 8000}, {sync_strategy, testutil:sync_strategy()}, {reload_strategy, [{?RIAK_TAG, recovr}]}], @@ -309,7 +361,57 @@ recovr_strategy(_Config) -> true = length(KeyList) == 6400, true = length(KeyList) < length(KeyTermList), true = length(KeyTermList) < 25600, + ok = leveled_bookie:book_close(Book1), + + RevisedOpts = [{root_path, RootPath}, + {cache_size, 1000}, + {max_journalobjectcount, 2000}, + {sync_strategy, testutil:sync_strategy()}, + {reload_strategy, [{?RIAK_TAG, recovr}]}], + + {ok, Book2} = leveled_bookie:book_start(RevisedOpts), + + {KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000), + {async, KFolder2} = leveled_bookie:book_returnfolder(Book2, Q(false)), + KeyList2 = lists:usort(KFolder2()), + true = length(KeyList2) == 6400, + + Q2 = fun(RT) -> {index_query, + "AltBucket6", + {fun testutil:foldkeysfun/3, []}, + {"idx1_bin", "#", "|"}, + {RT, undefined}} + end, + {async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + KeyList2A = lists:usort(KFolder2A()), + true = length(KeyList2A) == 3000, + + DeleteFun = + fun({DK, [{add, DIdx, DTerm}]}) -> + ok = testutil:book_riakdelete(Book2, + "AltBucket6", + DK, + [{remove, DIdx, DTerm}]) + end, + lists:foreach(DeleteFun, KSpcL2), + + {async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)), + KeyList2AD = lists:usort(KFolder2AD()), + true = length(KeyList2AD) == 0, + + compact_and_wait(Book2), + compact_and_wait(Book2), + + ok = leveled_bookie:book_close(Book2), + + {ok, Book3} = leveled_bookie:book_start(RevisedOpts), + {async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)), + KeyList3AD = lists:usort(KFolder3AD()), + true = length(KeyList3AD) == 0, + + ok = leveled_bookie:book_close(Book3), + testutil:reset_filestructure().