Compaction, retain and recovery
Change the penciller check so that it returns current/replaced/missing not just true/false. Reduce unnecessary penciller checks for non-standard keys that will always be retained - and remove redunandt code. Expand tests of retain and recover to make sure that compaction on delete is well covered. Also move the SQN number laong during initial loads - to stop aggressive loop to find starting SQN every file.
This commit is contained in:
parent
02155558df
commit
156e7b064d
7 changed files with 223 additions and 90 deletions
|
@ -2278,7 +2278,7 @@ maybe_withjitter(_CacheSize, _MaxCacheSize) ->
|
|||
|
||||
-spec get_loadfun(book_state()) -> fun().
|
||||
%% @doc
|
||||
%% The LoadFun will be sued by the Inker when walking across the Journal to
|
||||
%% The LoadFun will be used by the Inker when walking across the Journal to
|
||||
%% load the Penciller at startup
|
||||
get_loadfun(State) ->
|
||||
PrepareFun =
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
from_inkerkv/2,
|
||||
from_journalkey/1,
|
||||
revert_to_keydeltas/2,
|
||||
is_compaction_candidate/1,
|
||||
is_full_journalentry/1,
|
||||
split_inkvalue/1,
|
||||
check_forinkertype/2,
|
||||
get_tagstrategy/2,
|
||||
|
@ -410,11 +410,12 @@ to_inkerkv(LedgerKey, SQN, Object, KeyChanges, PressMethod, Compress) ->
|
|||
%% If we wish to retain key deltas when an object in the Journal has been
|
||||
%% replaced - then this converts a Journal Key and Value into one which has no
|
||||
%% object body just the key deltas.
|
||||
%% Only called if retain strategy and has passed
|
||||
%% leveled_codec:is_full_journalentry/1 - so no need to consider other key
|
||||
%% types
|
||||
revert_to_keydeltas({SQN, ?INKT_STND, LedgerKey}, InkerV) ->
|
||||
{_V, KeyDeltas} = revert_value_from_journal(InkerV),
|
||||
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}};
|
||||
revert_to_keydeltas(JournalKey, InkerV) ->
|
||||
{JournalKey, InkerV}.
|
||||
{{SQN, ?INKT_KEYD, LedgerKey}, {null, KeyDeltas}}.
|
||||
|
||||
%% Used when fetching objects, so only handles standard, hashable entries
|
||||
from_inkerkv(Object) ->
|
||||
|
@ -560,12 +561,12 @@ check_forinkertype(_LedgerKey, head_only) ->
|
|||
check_forinkertype(_LedgerKey, _Object) ->
|
||||
?INKT_STND.
|
||||
|
||||
-spec is_compaction_candidate(journal_key()) -> boolean().
|
||||
-spec is_full_journalentry(journal_key()) -> boolean().
|
||||
%% @doc
|
||||
%% Only journal keys with standard objects should be scored for compaction
|
||||
is_compaction_candidate({_SQN, ?INKT_STND, _LK}) ->
|
||||
is_full_journalentry({_SQN, ?INKT_STND, _LK}) ->
|
||||
true;
|
||||
is_compaction_candidate(_OtherJKType) ->
|
||||
is_full_journalentry(_OtherJKType) ->
|
||||
false.
|
||||
|
||||
|
||||
|
|
|
@ -519,17 +519,17 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
|
|||
fun(KS, {ActSize, RplSize}) ->
|
||||
case KS of
|
||||
{{SQN, Type, PK}, Size} ->
|
||||
MayScore =
|
||||
leveled_codec:is_compaction_candidate({SQN, Type, PK}),
|
||||
case MayScore of
|
||||
IsJournalEntry =
|
||||
leveled_codec:is_full_journalentry({SQN, Type, PK}),
|
||||
case IsJournalEntry of
|
||||
false ->
|
||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
||||
true ->
|
||||
Check = FilterFun(FilterServer, PK, SQN),
|
||||
case {Check, SQN > MaxSQN} of
|
||||
{true, _} ->
|
||||
{current, _} ->
|
||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
||||
{false, true} ->
|
||||
{_, true} ->
|
||||
{ActSize + Size - ?CRC_SIZE, RplSize};
|
||||
_ ->
|
||||
{ActSize, RplSize + Size - ?CRC_SIZE}
|
||||
|
@ -757,6 +757,11 @@ split_positions_into_batches(Positions, Journal, Batches) ->
|
|||
%% a `skip` strategy.
|
||||
filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||
FoldFun =
|
||||
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy),
|
||||
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
|
||||
|
||||
|
||||
filter_output_fun(FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||
fun(KVC0, Acc) ->
|
||||
case KVC0 of
|
||||
{_InkKey, crc_wonky, false} ->
|
||||
|
@ -767,28 +772,35 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
|||
leveled_codec:from_journalkey(JK),
|
||||
CompactStrategy =
|
||||
leveled_codec:get_tagstrategy(LK, ReloadStrategy),
|
||||
KeyValid = FilterFun(FilterServer, LK, SQN),
|
||||
IsInMemory = SQN > MaxSQN,
|
||||
case {KeyValid or IsInMemory, CompactStrategy} of
|
||||
{true, _} ->
|
||||
% This entry may still be required regardless of
|
||||
% strategy
|
||||
IsJournalEntry =
|
||||
leveled_codec:is_full_journalentry(JK),
|
||||
case {CompactStrategy, IsJournalEntry} of
|
||||
{retain, false} ->
|
||||
[KVC0|Acc];
|
||||
{false, retain} ->
|
||||
_ ->
|
||||
KeyCurrent = FilterFun(FilterServer, LK, SQN),
|
||||
IsInMemory = SQN > MaxSQN,
|
||||
case {KeyCurrent, IsInMemory, CompactStrategy} of
|
||||
{KC, InMem, _} when KC == current; InMem ->
|
||||
% This entry may still be required
|
||||
% regardless of strategy
|
||||
[KVC0|Acc];
|
||||
{_, _, retain} ->
|
||||
% If we have a retain strategy, it can't be
|
||||
% discarded - but the value part is no longer
|
||||
% required as this version has been replaced
|
||||
% discarded - but the value part is no
|
||||
% longer required as this version has been
|
||||
% replaced
|
||||
{JK0, JV0} =
|
||||
leveled_codec:revert_to_keydeltas(JK, JV),
|
||||
[{JK0, JV0, null}|Acc];
|
||||
{false, _} ->
|
||||
% This is out of date and not retained - discard
|
||||
{_, _, _} ->
|
||||
% This is out of date and not retained so
|
||||
% discard
|
||||
Acc
|
||||
end
|
||||
end
|
||||
end,
|
||||
lists:reverse(lists:foldl(FoldFun, [], KVCs)).
|
||||
|
||||
end
|
||||
end.
|
||||
|
||||
write_values([], _CDBopts, Journal0, ManSlice0, _PressMethod) ->
|
||||
{Journal0, ManSlice0};
|
||||
|
@ -985,13 +997,13 @@ check_single_file_test() ->
|
|||
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
|
||||
case lists:keyfind(ObjSQN, 1, Srv) of
|
||||
{ObjSQN, Key} ->
|
||||
true;
|
||||
current;
|
||||
_ ->
|
||||
false
|
||||
replaced
|
||||
end end,
|
||||
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4),
|
||||
?assertMatch(37.5, Score1),
|
||||
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
|
||||
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end,
|
||||
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4),
|
||||
?assertMatch(100.0, Score2),
|
||||
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3),
|
||||
|
@ -1016,9 +1028,9 @@ compact_single_file_setup() ->
|
|||
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
|
||||
case lists:keyfind(ObjSQN, 1, Srv) of
|
||||
{ObjSQN, Key} ->
|
||||
true;
|
||||
current;
|
||||
_ ->
|
||||
false
|
||||
replaced
|
||||
end end,
|
||||
CompactFP = leveled_inker:filepath(RP, journal_compact_dir),
|
||||
ok = filelib:ensure_dir(CompactFP),
|
||||
|
@ -1119,7 +1131,7 @@ compact_empty_file_test() ->
|
|||
LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}},
|
||||
{2, {o, "Bucket", "Key2", null}},
|
||||
{3, {o, "Bucket", "Key3", null}}],
|
||||
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> false end,
|
||||
LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end,
|
||||
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4),
|
||||
?assertMatch(100.0, Score1),
|
||||
ok = leveled_cdb:cdb_deletepending(CDB2),
|
||||
|
@ -1161,7 +1173,13 @@ compact_singlefile_totwosmallfiles_testto() ->
|
|||
filename=leveled_cdb:cdb_filename(CDBr),
|
||||
journal=CDBr,
|
||||
compaction_perc=50.0}],
|
||||
FakeFilterFun = fun(_FS, _LK, SQN) -> SQN rem 2 == 0 end,
|
||||
FakeFilterFun =
|
||||
fun(_FS, _LK, SQN) ->
|
||||
case SQN rem 2 of
|
||||
0 -> current;
|
||||
_ -> replaced
|
||||
end
|
||||
end,
|
||||
|
||||
ManifestSlice = compact_files(BestRun1,
|
||||
CDBoptsSmall,
|
||||
|
@ -1190,7 +1208,13 @@ size_score_test() ->
|
|||
{{7, ?INKT_STND, "Key7"}, 184}],
|
||||
MaxSQN = 6,
|
||||
CurrentList = ["Key1", "Key4", "Key5", "Key6"],
|
||||
FilterFun = fun(L, K, _SQN) -> lists:member(K, L) end,
|
||||
FilterFun =
|
||||
fun(L, K, _SQN) ->
|
||||
case lists:member(K, L) of
|
||||
true -> current;
|
||||
false -> replaced
|
||||
end
|
||||
end,
|
||||
Score = size_comparison_score(KeySizeList, FilterFun, CurrentList, MaxSQN),
|
||||
?assertMatch(true, Score > 69.0),
|
||||
?assertMatch(true, Score < 70.0).
|
||||
|
|
|
@ -1142,18 +1142,18 @@ fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
|
|||
Acc;
|
||||
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
|
||||
when LowSQN >= MinSQN ->
|
||||
Acc0 = foldfile_between_sequence(MinSQN,
|
||||
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FoldFuns,
|
||||
Acc,
|
||||
Pid,
|
||||
undefined,
|
||||
FN),
|
||||
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest);
|
||||
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
|
||||
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
|
||||
% If this file has a LowSQN less than the minimum, we can skip it if the
|
||||
% next file also has a LowSQN below the minimum
|
||||
Acc0 =
|
||||
{NextMinSQN, Acc0} =
|
||||
case Rest of
|
||||
[] ->
|
||||
foldfile_between_sequence(MinSQN,
|
||||
|
@ -1172,9 +1172,9 @@ fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
|
|||
undefined,
|
||||
FN);
|
||||
_ ->
|
||||
Acc
|
||||
{MinSQN, Acc}
|
||||
end,
|
||||
fold_from_sequence(MinSQN, FoldFuns, Acc0, Rest).
|
||||
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
|
||||
|
||||
foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
|
||||
Acc, CDBpid, StartPos, FN) ->
|
||||
|
@ -1182,8 +1182,8 @@ foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
|
|||
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},
|
||||
|
||||
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
|
||||
{eof, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
|
||||
FoldFun(BatchAcc, Acc);
|
||||
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
|
||||
{AccMinSQN, FoldFun(BatchAcc, Acc)};
|
||||
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
|
||||
UpdAcc = FoldFun(BatchAcc, Acc),
|
||||
NextSQN = MaxSQN + 1,
|
||||
|
@ -1452,7 +1452,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
|
|||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) ->
|
||||
lists:member({SQN, K}, L)
|
||||
case lists:member({SQN, K}, L) of
|
||||
true -> current;
|
||||
false -> replaced
|
||||
end
|
||||
end,
|
||||
5000),
|
||||
timer:sleep(1000),
|
||||
|
@ -1464,7 +1467,10 @@ compact_journal_testto(WRP, ExpectedFiles) ->
|
|||
fun(X) -> {X, 55} end,
|
||||
fun(_F) -> ok end,
|
||||
fun(L, K, SQN) ->
|
||||
lists:member({SQN, K}, L)
|
||||
case lists:member({SQN, K}, L) of
|
||||
true -> current;
|
||||
false -> replaced
|
||||
end
|
||||
end,
|
||||
5000),
|
||||
timer:sleep(1000),
|
||||
|
|
|
@ -305,8 +305,9 @@
|
|||
list(leveled_codec:ledger_kv()|leveled_sst:expandable_pointer())}.
|
||||
-type iterator() :: list(iterator_entry()).
|
||||
-type bad_ledgerkey() :: list().
|
||||
-type sqn_check() :: current|replaced|missing.
|
||||
|
||||
-export_type([levelzero_cacheentry/0]).
|
||||
-export_type([levelzero_cacheentry/0, sqn_check/0]).
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
|
@ -480,14 +481,13 @@ pcl_fetchnextkey(Pid, StartKey, EndKey, AccFun, InitAcc) ->
|
|||
|
||||
-spec pcl_checksequencenumber(pid(),
|
||||
leveled_codec:ledger_key()|bad_ledgerkey(),
|
||||
integer()) -> boolean().
|
||||
integer()) -> sqn_check().
|
||||
%% @doc
|
||||
%% Check if the sequence number of the passed key is not replaced by a change
|
||||
%% after the passed sequence number. Will return true if the Key is present
|
||||
%% and either is equal to, or prior to the passed SQN.
|
||||
%%
|
||||
%% If the key is not present, it will be assumed that a higher sequence number
|
||||
%% tombstone once existed, and false will be returned.
|
||||
%% after the passed sequence number. Will return:
|
||||
%% - current
|
||||
%% - replaced
|
||||
%% - missing
|
||||
pcl_checksequencenumber(Pid, Key, SQN) ->
|
||||
Hash = leveled_codec:segment_hash(Key),
|
||||
if
|
||||
|
@ -1487,7 +1487,7 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
|||
end.
|
||||
|
||||
|
||||
-spec compare_to_sqn(tuple()|not_present, integer()) -> boolean().
|
||||
-spec compare_to_sqn(tuple()|not_present, integer()) -> sqn_check().
|
||||
%% @doc
|
||||
%% Check to see if the SQN in the penciller is after the SQN expected for an
|
||||
%% object (used to allow the journal to check compaction status from a cache
|
||||
|
@ -1495,19 +1495,19 @@ log_slowfetch(T0, R, PID, Level, FetchTolerance) ->
|
|||
compare_to_sqn(Obj, SQN) ->
|
||||
case Obj of
|
||||
not_present ->
|
||||
false;
|
||||
missing;
|
||||
Obj ->
|
||||
SQNToCompare = leveled_codec:strip_to_seqonly(Obj),
|
||||
if
|
||||
SQNToCompare > SQN ->
|
||||
false;
|
||||
replaced;
|
||||
true ->
|
||||
% Normally we would expect the SQN to be equal here, but
|
||||
% this also allows for the Journal to have a more advanced
|
||||
% value. We return true here as we wouldn't want to
|
||||
% compact thta more advanced value, but this may cause
|
||||
% confusion in snapshots.
|
||||
true
|
||||
current
|
||||
end
|
||||
end.
|
||||
|
||||
|
@ -2097,25 +2097,25 @@ simple_server_test() ->
|
|||
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
||||
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
|
||||
?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004", null})),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap,
|
||||
{o,
|
||||
"Bucket0001",
|
||||
"Key0001",
|
||||
null},
|
||||
1)),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap,
|
||||
{o,
|
||||
"Bucket0002",
|
||||
"Key0002",
|
||||
null},
|
||||
1002)),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap,
|
||||
{o,
|
||||
"Bucket0003",
|
||||
"Key0003",
|
||||
null},
|
||||
2003)),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap,
|
||||
{o,
|
||||
"Bucket0004",
|
||||
"Key0004",
|
||||
|
@ -2132,7 +2132,7 @@ simple_server_test() ->
|
|||
KL1A = generate_randomkeys({2000, 4006}),
|
||||
ok = maybe_pause_push(PCLr, [Key1A]),
|
||||
ok = maybe_pause_push(PCLr, KL1A),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap,
|
||||
{o,
|
||||
"Bucket0001",
|
||||
"Key0001",
|
||||
|
@ -2148,19 +2148,19 @@ simple_server_test() ->
|
|||
undefined,
|
||||
false),
|
||||
|
||||
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||
?assertMatch(replaced, pcl_checksequencenumber(PclSnap2,
|
||||
{o,
|
||||
"Bucket0001",
|
||||
"Key0001",
|
||||
null},
|
||||
1)),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
|
||||
{o,
|
||||
"Bucket0001",
|
||||
"Key0001",
|
||||
null},
|
||||
4005)),
|
||||
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
||||
?assertMatch(current, pcl_checksequencenumber(PclSnap2,
|
||||
{o,
|
||||
"Bucket0002",
|
||||
"Key0002",
|
||||
|
|
|
@ -373,7 +373,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
|
|||
SQN),
|
||||
% Need to check that we have not folded past the point
|
||||
% at which the snapshot was taken
|
||||
(JournalSQN >= SQN) and CheckSQN
|
||||
(JournalSQN >= SQN) and (CheckSQN == current)
|
||||
|
||||
end,
|
||||
|
||||
|
|
|
@ -238,12 +238,12 @@ retain_strategy(_Config) ->
|
|||
RootPath = testutil:reset_filestructure(),
|
||||
BookOpts = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalsize, 5000000},
|
||||
{max_journalobjectcount, 5000},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{reload_strategy, [{?RIAK_TAG, retain}]}],
|
||||
BookOptsAlt = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalsize, 100000},
|
||||
{max_journalobjectcount, 2000},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{reload_strategy, [{?RIAK_TAG, retain}]},
|
||||
{max_run_length, 8}],
|
||||
|
@ -260,6 +260,58 @@ retain_strategy(_Config) ->
|
|||
{"Bucket4", Spcl4, LastV4},
|
||||
{"Bucket5", Spcl5, LastV5},
|
||||
{"Bucket6", Spcl6, LastV6}]),
|
||||
|
||||
{ok, Book1} = leveled_bookie:book_start(BookOpts),
|
||||
compact_and_wait(Book1),
|
||||
compact_and_wait(Book1),
|
||||
ok = leveled_bookie:book_close(Book1),
|
||||
|
||||
ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},
|
||||
{"Bucket4", Spcl4, LastV4},
|
||||
{"Bucket5", Spcl5, LastV5},
|
||||
{"Bucket6", Spcl6, LastV6}]),
|
||||
|
||||
{ok, Book2} = leveled_bookie:book_start(BookOptsAlt),
|
||||
|
||||
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000),
|
||||
Q2 = fun(RT) -> {index_query,
|
||||
"AltBucket6",
|
||||
{fun testutil:foldkeysfun/3, []},
|
||||
{"idx1_bin", "#", "|"},
|
||||
{RT, undefined}}
|
||||
end,
|
||||
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
|
||||
KeyList2A = lists:usort(KFolder2A()),
|
||||
true = length(KeyList2A) == 3000,
|
||||
|
||||
DeleteFun =
|
||||
fun({DK, [{add, DIdx, DTerm}]}) ->
|
||||
ok = testutil:book_riakdelete(Book2,
|
||||
"AltBucket6",
|
||||
DK,
|
||||
[{remove, DIdx, DTerm}])
|
||||
end,
|
||||
lists:foreach(DeleteFun, KSpcL2),
|
||||
|
||||
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
|
||||
KeyList2AD = lists:usort(KFolder2AD()),
|
||||
true = length(KeyList2AD) == 0,
|
||||
|
||||
ok = leveled_bookie:book_close(Book2),
|
||||
|
||||
{ok, Book3} = leveled_bookie:book_start(BookOptsAlt),
|
||||
|
||||
io:format("Compact after deletions~n"),
|
||||
|
||||
compact_and_wait(Book3),
|
||||
compact_and_wait(Book3),
|
||||
|
||||
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
|
||||
KeyList3AD = lists:usort(KFolder3AD()),
|
||||
true = length(KeyList3AD) == 0,
|
||||
|
||||
ok = leveled_bookie:book_close(Book3),
|
||||
|
||||
testutil:reset_filestructure().
|
||||
|
||||
|
||||
|
@ -267,7 +319,7 @@ recovr_strategy(_Config) ->
|
|||
RootPath = testutil:reset_filestructure(),
|
||||
BookOpts = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalsize, 5000000},
|
||||
{max_journalobjectcount, 8000},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{reload_strategy, [{?RIAK_TAG, recovr}]}],
|
||||
|
||||
|
@ -309,7 +361,57 @@ recovr_strategy(_Config) ->
|
|||
true = length(KeyList) == 6400,
|
||||
true = length(KeyList) < length(KeyTermList),
|
||||
true = length(KeyTermList) < 25600,
|
||||
|
||||
ok = leveled_bookie:book_close(Book1),
|
||||
|
||||
RevisedOpts = [{root_path, RootPath},
|
||||
{cache_size, 1000},
|
||||
{max_journalobjectcount, 2000},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{reload_strategy, [{?RIAK_TAG, recovr}]}],
|
||||
|
||||
{ok, Book2} = leveled_bookie:book_start(RevisedOpts),
|
||||
|
||||
{KSpcL2, _V2} = testutil:put_indexed_objects(Book2, "AltBucket6", 3000),
|
||||
{async, KFolder2} = leveled_bookie:book_returnfolder(Book2, Q(false)),
|
||||
KeyList2 = lists:usort(KFolder2()),
|
||||
true = length(KeyList2) == 6400,
|
||||
|
||||
Q2 = fun(RT) -> {index_query,
|
||||
"AltBucket6",
|
||||
{fun testutil:foldkeysfun/3, []},
|
||||
{"idx1_bin", "#", "|"},
|
||||
{RT, undefined}}
|
||||
end,
|
||||
{async, KFolder2A} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
|
||||
KeyList2A = lists:usort(KFolder2A()),
|
||||
true = length(KeyList2A) == 3000,
|
||||
|
||||
DeleteFun =
|
||||
fun({DK, [{add, DIdx, DTerm}]}) ->
|
||||
ok = testutil:book_riakdelete(Book2,
|
||||
"AltBucket6",
|
||||
DK,
|
||||
[{remove, DIdx, DTerm}])
|
||||
end,
|
||||
lists:foreach(DeleteFun, KSpcL2),
|
||||
|
||||
{async, KFolder2AD} = leveled_bookie:book_returnfolder(Book2, Q2(false)),
|
||||
KeyList2AD = lists:usort(KFolder2AD()),
|
||||
true = length(KeyList2AD) == 0,
|
||||
|
||||
compact_and_wait(Book2),
|
||||
compact_and_wait(Book2),
|
||||
|
||||
ok = leveled_bookie:book_close(Book2),
|
||||
|
||||
{ok, Book3} = leveled_bookie:book_start(RevisedOpts),
|
||||
{async, KFolder3AD} = leveled_bookie:book_returnfolder(Book3, Q2(false)),
|
||||
KeyList3AD = lists:usort(KFolder3AD()),
|
||||
true = length(KeyList3AD) == 0,
|
||||
|
||||
ok = leveled_bookie:book_close(Book3),
|
||||
|
||||
testutil:reset_filestructure().
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue