Mas i389 rebuildledger (#390)

* Protect penciller from empty ledger cache updates

which may occur when loading the ledger from the journal, after the ledger has been cleared.

* Score caching and randomisation

The test allkeydelta_journal_multicompact can occasionally fail when a compaction doesn't happen, but then does the next loop.  Suspect this is as a result of score caching, randomisation of key grabs for scoring, plus jitter on size boundaries.

Modified test for predictability.

Plus formatting changes

* Avoid small batches

Avoid small batches due to large SQN gaps

* Rationalise tests

Two tests overlaps with the new, much broader, replace_everything/1 test.  Ported over any remaining checks of interest and dropped two tests.
This commit is contained in:
Martin Sumner 2023-01-18 11:44:02 +00:00
parent a033e280e6
commit a01c74f268
9 changed files with 358 additions and 320 deletions

View file

@ -1,6 +1,7 @@
% File paths % File paths
-define(JOURNAL_FP, "journal"). -define(JOURNAL_FP, "journal").
-define(LEDGER_FP, "ledger"). -define(LEDGER_FP, "ledger").
-define(LOADING_BATCH, 1000).
%% Tag to be used on standard Riak KV objects %% Tag to be used on standard Riak KV objects
-define(RIAK_TAG, o_rkv). -define(RIAK_TAG, o_rkv).

View file

@ -373,16 +373,21 @@
{monitor_loglist, list(leveled_monitor:log_type())} {monitor_loglist, list(leveled_monitor:log_type())}
]. ].
-type load_item() ::
{leveled_codec:journal_key_tag()|null,
leveled_codec:ledger_key()|?DUMMY,
non_neg_integer(), any(), integer(),
leveled_codec:journal_keychanges()}.
-type initial_loadfun() :: -type initial_loadfun() ::
fun((leveled_codec:journal_key(), fun((leveled_codec:journal_key(),
any(), any(),
non_neg_integer(), non_neg_integer(),
{non_neg_integer(), non_neg_integer(), ledger_cache()}, {non_neg_integer(), non_neg_integer(), ledger_cache()},
fun((any()) -> {binary(), non_neg_integer()})) -> fun((any()) -> {binary(), non_neg_integer()})) ->
{loop|stop, {loop|stop, list(load_item())}).
{non_neg_integer(), non_neg_integer(), ledger_cache()}}).
-export_type([initial_loadfun/0]). -export_type([initial_loadfun/0, ledger_cache/0]).
%%%============================================================================ %%%============================================================================
%%% API %%% API
@ -1562,13 +1567,46 @@ empty_ledgercache() ->
#ledger_cache{mem = ets:new(empty, [ordered_set])}. #ledger_cache{mem = ets:new(empty, [ordered_set])}.
-spec push_to_penciller(pid(), ledger_cache()) -> ok. -spec push_to_penciller(
pid(),
list(load_item()),
ledger_cache(),
leveled_codec:compaction_strategy())
-> ledger_cache().
%% @doc %% @doc
%% The push to penciller must start as a tree to correctly de-duplicate %% The push to penciller must start as a tree to correctly de-duplicate
%% the list by order before becoming a de-duplicated list for loading %% the list by order before becoming a de-duplicated list for loading
push_to_penciller(Penciller, LedgerCache) -> push_to_penciller(Penciller, LoadItemList, LedgerCache, ReloadStrategy) ->
push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)). UpdLedgerCache =
lists:foldl(
fun({InkTag, PK, SQN, Obj, IndexSpecs, ValSize}, AccLC) ->
Chngs =
case leveled_codec:get_tagstrategy(PK, ReloadStrategy) of
recalc ->
recalcfor_ledgercache(
InkTag, PK, SQN, Obj, ValSize, IndexSpecs,
AccLC, Penciller);
_ ->
preparefor_ledgercache(
InkTag, PK, SQN, Obj, ValSize, IndexSpecs)
end,
addto_ledgercache(Chngs, AccLC, loader)
end,
LedgerCache,
lists:reverse(LoadItemList)
),
case length(UpdLedgerCache#ledger_cache.load_queue) of
N when N > ?LOADING_BATCH ->
leveled_log:log(b0006, [UpdLedgerCache#ledger_cache.max_sqn]),
ok =
push_to_penciller_loop(
Penciller, loadqueue_ledgercache(UpdLedgerCache)),
empty_ledgercache();
_ ->
UpdLedgerCache
end.
-spec push_to_penciller_loop(pid(), ledger_cache()) -> ok.
push_to_penciller_loop(Penciller, LedgerCache) -> push_to_penciller_loop(Penciller, LedgerCache) ->
case push_ledgercache(Penciller, LedgerCache) of case push_ledgercache(Penciller, LedgerCache) of
returned -> returned ->
@ -1686,21 +1724,21 @@ startup(InkerOpts, PencillerOpts) ->
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
leveled_log:log(b0005, [LedgerSQN]), leveled_log:log(b0005, [LedgerSQN]),
ReloadStrategy = InkerOpts#inker_options.reload_strategy, ReloadStrategy = InkerOpts#inker_options.reload_strategy,
LoadFun = get_loadfun(ReloadStrategy, Penciller), LoadFun = get_loadfun(),
BatchFun = BatchFun =
fun(BatchAcc, _Acc) -> fun(BatchAcc, Acc) ->
push_to_penciller(Penciller, BatchAcc) push_to_penciller(
Penciller, BatchAcc, Acc, ReloadStrategy)
end, end,
InitAccFun = InitAccFun =
fun(FN, CurrentMinSQN) -> fun(FN, CurrentMinSQN) ->
leveled_log:log(i0014, [FN, CurrentMinSQN]), leveled_log:log(i0014, [FN, CurrentMinSQN]),
empty_ledgercache() []
end, end,
ok = leveled_inker:ink_loadpcl(Inker, FinalAcc =
LedgerSQN + 1, leveled_inker:ink_loadpcl(
LoadFun, Inker, LedgerSQN + 1, LoadFun, InitAccFun, BatchFun),
InitAccFun, ok = push_to_penciller_loop(Penciller, loadqueue_ledgercache(FinalAcc)),
BatchFun),
ok = leveled_inker:ink_checksqn(Inker, LedgerSQN), ok = leveled_inker:ink_checksqn(Inker, LedgerSQN),
{Inker, Penciller}. {Inker, Penciller}.
@ -2414,44 +2452,36 @@ maybe_withjitter(_CacheSize, _MaxCacheSize, _MaxCacheMult) ->
false. false.
-spec get_loadfun( -spec get_loadfun() -> initial_loadfun().
leveled_codec:compaction_strategy(), pid()) -> initial_loadfun().
%% @doc %% @doc
%% The LoadFun will be used by the Inker when walking across the Journal to %% The LoadFun will be used by the Inker when walking across the Journal to
%% load the Penciller at startup. %% load the Penciller at startup.
get_loadfun(ReloadStrat, Penciller) -> get_loadfun() ->
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
{MinSQN, MaxSQN, LedgerCache} = Acc0, {MinSQN, MaxSQN, LoadItems} = Acc0,
{SQN, InkTag, PK} = KeyInJournal, {SQN, InkTag, PK} = KeyInJournal,
case SQN of case SQN of
SQN when SQN < MinSQN -> SQN when SQN < MinSQN ->
{loop, Acc0}; {loop, Acc0};
SQN when SQN > MaxSQN -> SQN when SQN > MaxSQN ->
leveled_log:log(b0007, [MaxSQN, SQN]),
{stop, Acc0}; {stop, Acc0};
_ -> _ ->
{VBin, ValSize} = ExtractFun(ValueInJournal), {VBin, ValSize} = ExtractFun(ValueInJournal),
% VBin may already be a term % VBin may already be a term
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
Chngs =
case leveled_codec:get_tagstrategy(PK, ReloadStrat) of
recalc ->
recalcfor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs,
LedgerCache,
Penciller);
_ ->
preparefor_ledgercache(InkTag, PK, SQN,
Obj, ValSize, IdxSpecs)
end,
case SQN of case SQN of
MaxSQN -> MaxSQN ->
leveled_log:log(b0006, [SQN]), {stop,
LC0 = addto_ledgercache(Chngs, LedgerCache, loader), {MinSQN,
{stop, {MinSQN, MaxSQN, LC0}}; MaxSQN,
[{InkTag, PK, SQN, Obj, IdxSpecs, ValSize}
|LoadItems]}};
_ -> _ ->
LC0 = addto_ledgercache(Chngs, LedgerCache, loader), {loop,
{loop, {MinSQN, MaxSQN, LC0}} {MinSQN,
MaxSQN,
[{InkTag, PK, SQN, Obj, IdxSpecs, ValSize}
|LoadItems]}}
end end
end end
end. end.

View file

@ -375,8 +375,9 @@ cdb_deletepending(Pid) ->
cdb_deletepending(Pid, ManSQN, Inker) -> cdb_deletepending(Pid, ManSQN, Inker) ->
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}). gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).
-spec cdb_scan(pid(), filter_fun(), any(), integer()|undefined) -> -spec cdb_scan(
{integer()|eof, any()}. pid(), filter_fun(), any(), integer()|undefined)
-> {integer()|eof, any()}.
%% @doc %% @doc
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
%% continue from that point (calling function has to protect against) double %% continue from that point (calling function has to protect against) double

View file

@ -133,7 +133,6 @@
-define(WASTE_FP, "waste"). -define(WASTE_FP, "waste").
-define(JOURNAL_FILEX, "cdb"). -define(JOURNAL_FILEX, "cdb").
-define(PENDING_FILEX, "pnd"). -define(PENDING_FILEX, "pnd").
-define(LOADING_BATCH, 1000).
-define(TEST_KC, {[], infinity}). -define(TEST_KC, {[], infinity}).
-record(state, {manifest = [] :: list(), -record(state, {manifest = [] :: list(),
@ -337,11 +336,14 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) ->
{fold, MinSQN, FoldFuns, Acc, by_runner}, {fold, MinSQN, FoldFuns, Acc, by_runner},
infinity). infinity).
-spec ink_loadpcl(pid(), -spec ink_loadpcl(
pid(),
integer(), integer(),
leveled_bookie:initial_loadfun(), leveled_bookie:initial_loadfun(),
fun((string(), non_neg_integer()) -> any()), fun((string(), non_neg_integer()) -> any()),
fun((any(), any()) -> ok)) -> ok. fun((any(), leveled_bookie:ledger_cache())
-> leveled_bookie:ledger_cache()))
-> leveled_bookie:ledger_cache().
%% %%
%% Function to prompt load of the Ledger at startup. The Penciller should %% Function to prompt load of the Ledger at startup. The Penciller should
%% have determined the lowest SQN not present in the Ledger, and the inker %% have determined the lowest SQN not present in the Ledger, and the inker
@ -355,7 +357,7 @@ ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) ->
{fold, {fold,
MinSQN, MinSQN,
{LoadFun, InitAccFun, BatchFun}, {LoadFun, InitAccFun, BatchFun},
ok, leveled_bookie:empty_ledgercache(),
as_ink}, as_ink},
infinity). infinity).

View file

@ -54,8 +54,6 @@
{info, <<"LedgerSQN=~w at startup">>}, {info, <<"LedgerSQN=~w at startup">>},
b0006 => b0006 =>
{info, <<"Reached end of load batch with SQN ~w">>}, {info, <<"Reached end of load batch with SQN ~w">>},
b0007 =>
{info, <<"Skipping as exceeded MaxSQN ~w with SQN ~w">>},
b0008 => b0008 =>
{info, <<"Bucket list finds no more results">>}, {info, <<"Bucket list finds no more results">>},
b0009 => b0009 =>

View file

@ -686,19 +686,23 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
false -> false ->
leveled_tree:from_orderedset(LedgerTable, ?CACHE_TYPE) leveled_tree:from_orderedset(LedgerTable, ?CACHE_TYPE)
end, end,
{UpdMaxSQN, NewL0Size, UpdL0Cache} = case leveled_pmem:add_to_cache(
leveled_pmem:add_to_cache(
L0Size, L0Size,
{PushedTree, MinSQN, MaxSQN}, {PushedTree, MinSQN, MaxSQN},
State#state.ledger_sqn, State#state.ledger_sqn,
State#state.levelzero_cache), State#state.levelzero_cache,
true) of
empty_push ->
{reply, ok, State};
{UpdMaxSQN, NewL0Size, UpdL0Cache} ->
UpdL0Index = UpdL0Index =
leveled_pmem:add_to_index( leveled_pmem:add_to_index(
PushedIdx, PushedIdx,
State#state.levelzero_index, State#state.levelzero_index,
length(State#state.levelzero_cache) + 1), length(State#state.levelzero_cache) + 1),
leveled_log:log_randomtimer( leveled_log:log_randomtimer(
p0031, [NewL0Size, true, true, MinSQN, MaxSQN], SW, 0.1), p0031,
[NewL0Size, true, true, MinSQN, MaxSQN], SW, 0.1),
{reply, {reply,
ok, ok,
State#state{ State#state{
@ -706,6 +710,7 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}},
levelzero_size = NewL0Size, levelzero_size = NewL0Size,
levelzero_index = UpdL0Index, levelzero_index = UpdL0Index,
ledger_sqn = UpdMaxSQN}} ledger_sqn = UpdMaxSQN}}
end
end; end;
handle_call({fetch, Key, Hash, UseL0Index}, _From, State) -> handle_call({fetch, Key, Hash, UseL0Index}, _From, State) ->
L0Idx = L0Idx =
@ -836,10 +841,12 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
case Query of case Query of
no_lookup -> no_lookup ->
{UpdMaxSQN, UpdSize, L0Cache} = {UpdMaxSQN, UpdSize, L0Cache} =
leveled_pmem:add_to_cache(State#state.levelzero_size, leveled_pmem:add_to_cache(
State#state.levelzero_size,
{LM1Cache, MinSQN, MaxSQN}, {LM1Cache, MinSQN, MaxSQN},
State#state.ledger_sqn, State#state.ledger_sqn,
State#state.levelzero_cache), State#state.levelzero_cache,
false),
{#state{levelzero_cache = L0Cache, {#state{levelzero_cache = L0Cache,
ledger_sqn = UpdMaxSQN, ledger_sqn = UpdMaxSQN,
levelzero_size = UpdSize, levelzero_size = UpdSize,
@ -865,10 +872,12 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning},
EndKey}}; EndKey}};
undefined -> undefined ->
{UpdMaxSQN, UpdSize, L0Cache} = {UpdMaxSQN, UpdSize, L0Cache} =
leveled_pmem:add_to_cache(State#state.levelzero_size, leveled_pmem:add_to_cache(
State#state.levelzero_size,
{LM1Cache, MinSQN, MaxSQN}, {LM1Cache, MinSQN, MaxSQN},
State#state.ledger_sqn, State#state.ledger_sqn,
State#state.levelzero_cache), State#state.levelzero_cache,
false),
LM1Idx = LM1Idx =
case BookieIdx of case BookieIdx of
empty_index -> empty_index ->

View file

@ -30,7 +30,7 @@
-export([ -export([
prepare_for_index/2, prepare_for_index/2,
add_to_cache/4, add_to_cache/5,
to_list/2, to_list/2,
check_levelzero/3, check_levelzero/3,
check_levelzero/4, check_levelzero/4,
@ -109,22 +109,30 @@ check_index(Hash, L0Index) ->
L0Index), L0Index),
lists:reverse(Positions). lists:reverse(Positions).
-spec add_to_cache(integer(), -spec add_to_cache(
integer(),
{tuple(), integer(), integer()}, {tuple(), integer(), integer()},
integer(), integer(),
list()) -> list(),
{integer(), integer(), list()}. boolean()) -> {integer(), integer(), list()}|empty_push.
%% @doc %% @doc
%% The penciller's cache is a list of leveled_trees, this adds a new tree to %% The penciller's cache is a list of leveled_trees, this adds a new tree to
%% that cache, providing an update to the approximate size of the cache and %% that cache, providing an update to the approximate size of the cache and
%% the Ledger's SQN. %% the Ledger's SQN.
add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> %% Updates to cache must set Writable to true if the update could generate a
LM1Size = leveled_tree:tsize(LevelMinus1), %% Level 0 file - as this must guard against empty entries (which may lead to
%% an attempt to write an empty L0 file)
add_to_cache(L0Size, {LM1, MinSQN, MaxSQN}, LedgerSQN, TreeList, Writeable) ->
case {Writeable, leveled_tree:tsize(LM1)} of
{true, 0} ->
empty_push;
{_, LM1Size} ->
if if
MinSQN >= LedgerSQN -> MinSQN >= LedgerSQN ->
{MaxSQN, {MaxSQN,
L0Size + LM1Size, L0Size + LM1Size,
[LevelMinus1|TreeList]} [LM1|TreeList]}
end
end. end.
-spec to_list( -spec to_list(
@ -268,12 +276,12 @@ compare_method_test() ->
R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0TreeList}) -> R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0TreeList}) ->
LM1 = generate_randomkeys(LedgerSQN + 1, LM1 = generate_randomkeys(LedgerSQN + 1,
2000, 1, 500), 2000, 1, 500),
add_to_cache(L0Size, add_to_cache(
{LM1, L0Size,
LedgerSQN + 1, {LM1, LedgerSQN + 1, LedgerSQN + 2000},
LedgerSQN + 2000},
LedgerSQN, LedgerSQN,
L0TreeList) L0TreeList,
true)
end, end,
{0, 0, []}, {0, 0, []},
lists:seq(1, 16)), lists:seq(1, 16)),
@ -365,10 +373,12 @@ with_index_test2() ->
LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1),
LM1SL = leveled_tree:from_orderedlist(lists:ukeysort(1, LM1), ?CACHE_TYPE), LM1SL = leveled_tree:from_orderedlist(lists:ukeysort(1, LM1), ?CACHE_TYPE),
UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1),
R = add_to_cache(L0Size, R = add_to_cache(
L0Size,
{LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, {LM1SL, LedgerSQN + 1, LedgerSQN + 2000},
LedgerSQN, LedgerSQN,
L0TreeList), L0TreeList,
true),
{R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)} {R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)}
end, end,

View file

@ -5,7 +5,6 @@
-export([ -export([
recovery_with_samekeyupdates/1, recovery_with_samekeyupdates/1,
same_key_rotation_withindexes/1, same_key_rotation_withindexes/1,
hot_backup_simple/1,
hot_backup_changes/1, hot_backup_changes/1,
retain_strategy/1, retain_strategy/1,
recalc_strategy/1, recalc_strategy/1,
@ -16,15 +15,14 @@
aae_bustedjournal/1, aae_bustedjournal/1,
journal_compaction_bustedjournal/1, journal_compaction_bustedjournal/1,
close_duringcompaction/1, close_duringcompaction/1,
allkeydelta_journal_multicompact/1,
recompact_keydeltas/1, recompact_keydeltas/1,
simple_cachescoring/1 simple_cachescoring/1,
replace_everything/1
]). ]).
all() -> [ all() -> [
recovery_with_samekeyupdates, recovery_with_samekeyupdates,
same_key_rotation_withindexes, same_key_rotation_withindexes,
hot_backup_simple,
hot_backup_changes, hot_backup_changes,
retain_strategy, retain_strategy,
recalc_strategy, recalc_strategy,
@ -34,13 +32,141 @@ all() -> [
aae_bustedjournal, aae_bustedjournal,
journal_compaction_bustedjournal, journal_compaction_bustedjournal,
close_duringcompaction, close_duringcompaction,
allkeydelta_journal_multicompact,
recompact_keydeltas, recompact_keydeltas,
stdtag_recalc, stdtag_recalc,
simple_cachescoring simple_cachescoring,
replace_everything
]. ].
replace_everything(_Config) ->
% See https://github.com/martinsumner/leveled/issues/389
% Also replaces previous test which was checking the comapction process
% respects the journal object count passed at startup
RootPath = testutil:reset_filestructure(),
BackupPath = testutil:reset_filestructure("backupRE"),
CompPath = filename:join(RootPath, "journal/journal_files/post_compact"),
SmallJournalCount = 7000,
StdJournalCount = 20000,
BookOpts =
fun(JournalObjectCount) ->
[{root_path, RootPath},
{cache_size, 2000},
{max_journalobjectcount, JournalObjectCount},
{sync_strategy, testutil:sync_strategy()},
{reload_strategy, [{?RIAK_TAG, recalc}]}]
end,
{ok, Book1} = leveled_bookie:book_start(BookOpts(StdJournalCount)),
BKT = "ReplaceAll",
BKT1 = "ReplaceAll1",
BKT2 = "ReplaceAll2",
BKT3 = "ReplaceAll3",
{KSpcL1, V1} =
testutil:put_indexed_objects(Book1, BKT, 50000),
ok = testutil:check_indexed_objects(Book1, BKT, KSpcL1, V1),
{KSpcL2, V2} =
testutil:put_altered_indexed_objects(Book1, BKT, KSpcL1),
ok = testutil:check_indexed_objects(Book1, BKT, KSpcL2, V2),
compact_and_wait(Book1, 1000),
{ok, FileList1} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList1)]),
compact_and_wait(Book1, 1000),
{ok, FileList2} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
true = FileList1 == FileList2,
{async, BackupFun} = leveled_bookie:book_hotbackup(Book1),
ok = BackupFun(BackupPath),
io:format("Restarting without key store~n"),
ok = leveled_bookie:book_close(Book1),
BookOptsBackup = [{root_path, BackupPath},
{cache_size, 2000},
{sync_strategy, testutil:sync_strategy()}],
SW1 = os:timestamp(),
{ok, Book2} = leveled_bookie:book_start(BookOptsBackup),
io:format(
"Opened backup with no ledger in ~w ms~n",
[timer:now_diff(os:timestamp(), SW1) div 1000]),
ok = testutil:check_indexed_objects(Book2, BKT, KSpcL2, V2),
ok = leveled_bookie:book_close(Book2),
SW2 = os:timestamp(),
{ok, Book3} = leveled_bookie:book_start(BookOptsBackup),
io:format(
"Opened backup with ledger in ~w ms~n",
[timer:now_diff(os:timestamp(), SW2) div 1000]),
ok = testutil:check_indexed_objects(Book3, BKT, KSpcL2, V2),
ok = leveled_bookie:book_destroy(Book3),
{ok, Book4} = leveled_bookie:book_start(BookOpts(StdJournalCount)),
{KSpcL3, V3} = testutil:put_indexed_objects(Book4, BKT1, 1000),
{KSpcL4, _V4} = testutil:put_indexed_objects(Book4, BKT2, 50000),
{KSpcL5, V5} =
testutil:put_altered_indexed_objects(Book4, BKT2, KSpcL4),
compact_and_wait(Book4),
{async, BackupFun4} = leveled_bookie:book_hotbackup(Book4),
ok = BackupFun4(BackupPath),
ok = leveled_bookie:book_close(Book4),
io:format("Restarting without key store~n"),
SW5 = os:timestamp(),
{ok, Book5} = leveled_bookie:book_start(BookOptsBackup),
io:format(
"Opened backup with no ledger in ~w ms~n",
[timer:now_diff(os:timestamp(), SW5) div 1000]),
ok = testutil:check_indexed_objects(Book5, BKT, KSpcL2, V2),
ok = testutil:check_indexed_objects(Book5, BKT1, KSpcL3, V3),
ok = testutil:check_indexed_objects(Book5, BKT2, KSpcL5, V5),
ok = leveled_bookie:book_destroy(Book5),
io:format("Testing with sparse distribution after update~n"),
io:format(
"Also use smaller Journal files and confirm value used "
"in compaction~n"),
{ok, Book6} = leveled_bookie:book_start(BookOpts(SmallJournalCount)),
{KSpcL6, _V6} = testutil:put_indexed_objects(Book6, BKT3, 60000),
{OSpcL6, RSpcL6} = lists:split(200, lists:ukeysort(1, KSpcL6)),
{KSpcL7, V7} =
testutil:put_altered_indexed_objects(Book6, BKT3, RSpcL6),
{ok, FileList3} = file:list_dir(CompPath),
compact_and_wait(Book6),
{ok, FileList4} = file:list_dir(CompPath),
{OSpcL6A, V7} =
testutil:put_altered_indexed_objects(Book6, BKT3, OSpcL6, true, V7),
{async, BackupFun6} = leveled_bookie:book_hotbackup(Book6),
ok = BackupFun6(BackupPath),
ok = leveled_bookie:book_close(Book6),
io:format("Checking object count in newly compacted journal files~n"),
NewlyCompactedFiles = lists:subtract(FileList4, FileList3),
true = length(NewlyCompactedFiles) >= 1,
CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end,
CheckLengthFun =
fun(FN) ->
{ok, CF} =
leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)),
{_LP, TK} =
leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined),
io:format("File ~s has ~w keys~n", [FN, TK]),
true = TK =< SmallJournalCount
end,
lists:foreach(CheckLengthFun, NewlyCompactedFiles),
io:format("Restarting without key store~n"),
SW7 = os:timestamp(),
{ok, Book7} = leveled_bookie:book_start(BookOptsBackup),
io:format(
"Opened backup with no ledger in ~w ms~n",
[timer:now_diff(os:timestamp(), SW7) div 1000]),
ok = testutil:check_indexed_objects(Book7, BKT3, KSpcL7 ++ OSpcL6A, V7),
ok = leveled_bookie:book_destroy(Book7),
testutil:reset_filestructure(BackupPath),
testutil:reset_filestructure().
close_duringcompaction(_Config) -> close_duringcompaction(_Config) ->
% Prompt a compaction, and close immedately - confirm that the close % Prompt a compaction, and close immedately - confirm that the close
% happens without error. % happens without error.
@ -203,7 +329,8 @@ same_key_rotation_withindexes(_Config) ->
CheckFun = CheckFun =
fun(Bookie) -> fun(Bookie) ->
{async, R} = {async, R} =
leveled_bookie:book_indexfold(Bookie, leveled_bookie:book_indexfold(
Bookie,
{Bucket, <<>>}, {Bucket, <<>>},
{FoldKeysFun, []}, {FoldKeysFun, []},
{list_to_binary("binary_bin"), {list_to_binary("binary_bin"),
@ -229,36 +356,6 @@ same_key_rotation_withindexes(_Config) ->
testutil:reset_filestructure(). testutil:reset_filestructure().
hot_backup_simple(_Config) ->
% The journal may have a hot backup. This allows for an online Bookie
% to be sent a message to prepare a backup function, which an asynchronous
% worker can then call to generate a backup taken at the point in time
% the original message was processsed.
%
% The basic test is to:
% 1 - load a Bookie, take a backup, delete the original path, restore from
% that path
RootPath = testutil:reset_filestructure(),
BackupPath = testutil:reset_filestructure("backup0"),
BookOpts = [{root_path, RootPath},
{cache_size, 1000},
{max_journalsize, 10000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 3200),
{ok, Book1} = leveled_bookie:book_start(BookOpts),
{async, BackupFun} = leveled_bookie:book_hotbackup(Book1),
ok = BackupFun(BackupPath),
ok = leveled_bookie:book_close(Book1),
RootPath = testutil:reset_filestructure(),
BookOptsBackup = [{root_path, BackupPath},
{cache_size, 2000},
{max_journalsize, 20000000},
{sync_strategy, testutil:sync_strategy()}],
{ok, BookBackup} = leveled_bookie:book_start(BookOptsBackup),
ok = testutil:check_indexed_objects(BookBackup, "Bucket1", Spcl1, LastV1),
ok = leveled_bookie:book_close(BookBackup),
BackupPath = testutil:reset_filestructure("backup0").
hot_backup_changes(_Config) -> hot_backup_changes(_Config) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
BackupPath = testutil:reset_filestructure("backup0"), BackupPath = testutil:reset_filestructure("backup0"),
@ -946,95 +1043,6 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) ->
ok = leveled_bookie:book_close(Bookie2), ok = leveled_bookie:book_close(Bookie2),
testutil:reset_filestructure(10000). testutil:reset_filestructure(10000).
allkeydelta_journal_multicompact(_Config) ->
RootPath = testutil:reset_filestructure(),
CompPath = filename:join(RootPath, "journal/journal_files/post_compact"),
B = <<"test_bucket">>,
StartOptsFun =
fun(JOC) ->
[{root_path, RootPath},
{max_journalobjectcount, JOC},
{max_run_length, 4},
{singlefile_compactionpercentage, 70.0},
{maxrunlength_compactionpercentage, 85.0},
{sync_strategy, testutil:sync_strategy()}]
end,
{ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(14000)),
{KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000),
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1,
B,
KSpcL1,
false),
compact_and_wait(Bookie1, 0),
compact_and_wait(Bookie1, 0),
{ok, FileList1} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList1)]),
compact_and_wait(Bookie1, 0),
{ok, FileList2} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList2)]),
true = FileList1 == FileList2,
ok = testutil:check_indexed_objects(Bookie1,
B,
KSpcL1 ++ KSpcL2,
V2),
ok = leveled_bookie:book_close(Bookie1),
leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
io:format("Restart without ledger~n"),
{ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(13000)),
ok = testutil:check_indexed_objects(Bookie2,
B,
KSpcL1 ++ KSpcL2,
V2),
{KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2,
B,
KSpcL2,
false),
compact_and_wait(Bookie2, 0),
{ok, FileList3} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList3)]),
ok = leveled_bookie:book_close(Bookie2),
io:format("Restart with smaller journal object count~n"),
{ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(7000)),
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3,
B,
KSpcL3,
false),
compact_and_wait(Bookie3, 0),
ok = testutil:check_indexed_objects(Bookie3,
B,
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4,
V4),
{ok, FileList4} = file:list_dir(CompPath),
io:format("Number of files after compaction ~w~n", [length(FileList4)]),
ok = leveled_bookie:book_close(Bookie3),
NewlyCompactedFiles = lists:subtract(FileList4, FileList3),
true = length(NewlyCompactedFiles) >= 3,
CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end,
CheckLengthFun =
fun(FN) ->
{ok, CF} =
leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)),
{_LP, TK} =
leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined),
io:format("File ~s has ~w keys~n", [FN, TK]),
true = TK =< 7000
end,
lists:foreach(CheckLengthFun, NewlyCompactedFiles),
testutil:reset_filestructure(10000).
recompact_keydeltas(_Config) -> recompact_keydeltas(_Config) ->
RootPath = testutil:reset_filestructure(), RootPath = testutil:reset_filestructure(),
B = <<"test_bucket">>, B = <<"test_bucket">>,
@ -1049,73 +1057,52 @@ recompact_keydeltas(_Config) ->
end, end,
{ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(45000)), {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(45000)),
{KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000),
{KSpcL2, _V2} = testutil:put_altered_indexed_objects(Bookie1, {KSpcL2, _V2} =
B, testutil:put_altered_indexed_objects(Bookie1, B, KSpcL1, false),
KSpcL1,
false),
ok = leveled_bookie:book_close(Bookie1), ok = leveled_bookie:book_close(Bookie1),
{ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(45000)), {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(45000)),
compact_and_wait(Bookie2, 0), compact_and_wait(Bookie2, 0),
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, {KSpcL3, V3} =
B, testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false),
KSpcL2,
false),
compact_and_wait(Bookie2, 0), compact_and_wait(Bookie2, 0),
ok = testutil:check_indexed_objects(Bookie2, ok =
B, testutil:check_indexed_objects(
KSpcL1 ++ KSpcL2 ++ KSpcL3, Bookie2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3),
V3),
ok = leveled_bookie:book_close(Bookie2), ok = leveled_bookie:book_close(Bookie2),
testutil:reset_filestructure(10000). testutil:reset_filestructure(10000).
rotating_object_check(BookOpts, B, NumberOfObjects) -> rotating_object_check(BookOpts, B, NumberOfObjects) ->
{ok, Book1} = leveled_bookie:book_start(BookOpts), {ok, Book1} = leveled_bookie:book_start(BookOpts),
{KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects),
ok = testutil:check_indexed_objects(Book1, ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1),
B, {KSpcL2, V2} =
KSpcL1, testutil:put_altered_indexed_objects(Book1, B, KSpcL1, false),
V1), ok =
{KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, testutil:check_indexed_objects(
B, Book1, B, KSpcL1 ++ KSpcL2, V2),
KSpcL1, {KSpcL3, V3} =
false), testutil:put_altered_indexed_objects(Book1, B, KSpcL2, false),
ok = testutil:check_indexed_objects(Book1, ok =
B, testutil:check_indexed_objects(
KSpcL1 ++ KSpcL2, Book1, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3),
V2),
{KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1,
B,
KSpcL2,
false),
ok = testutil:check_indexed_objects(Book1,
B,
KSpcL1 ++ KSpcL2 ++ KSpcL3,
V3),
ok = leveled_bookie:book_close(Book1), ok = leveled_bookie:book_close(Book1),
{ok, Book2} = leveled_bookie:book_start(BookOpts), {ok, Book2} = leveled_bookie:book_start(BookOpts),
ok = testutil:check_indexed_objects(Book2, ok =
B, testutil:check_indexed_objects(
KSpcL1 ++ KSpcL2 ++ KSpcL3, Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3),
V3), {KSpcL4, V4} =
{KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, testutil:put_altered_indexed_objects(Book2, B, KSpcL3, false),
B,
KSpcL3,
false),
io:format("Bucket complete - checking index before compaction~n"), io:format("Bucket complete - checking index before compaction~n"),
ok = testutil:check_indexed_objects(Book2, ok =
B, testutil:check_indexed_objects(
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4),
V4),
compact_and_wait(Book2), compact_and_wait(Book2),
io:format("Checking index following compaction~n"), io:format("Checking index following compaction~n"),
ok = testutil:check_indexed_objects(Book2, ok =
B, testutil:check_indexed_objects(
KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4),
V4),
ok = leveled_bookie:book_close(Book2), ok = leveled_bookie:book_close(Book2),
{ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}. {ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}.

View file

@ -46,6 +46,7 @@
put_indexed_objects/3, put_indexed_objects/3,
put_altered_indexed_objects/3, put_altered_indexed_objects/3,
put_altered_indexed_objects/4, put_altered_indexed_objects/4,
put_altered_indexed_objects/5,
check_indexed_objects/4, check_indexed_objects/4,
rotating_object_check/3, rotating_object_check/3,
corrupt_journal/5, corrupt_journal/5,
@ -719,58 +720,53 @@ foldkeysfun_returnbucket(Bucket, Key, Acc) ->
check_indexed_objects(Book, B, KSpecL, V) -> check_indexed_objects(Book, B, KSpecL, V) ->
% Check all objects match, return what should be the results of an all % Check all objects match, return what should be the results of an all
% index query % index query
IdxR = lists:map(fun({K, Spc}) -> IdxR =
lists:map(
fun({K, Spc}) ->
{ok, O} = book_riakget(Book, B, K), {ok, O} = book_riakget(Book, B, K),
V = testutil:get_value(O), V = testutil:get_value(O),
{add, {add, "idx1_bin", IdxVal} = lists:keyfind(add, 1, Spc),
"idx1_bin", {IdxVal, K}
IdxVal} = lists:keyfind(add, 1, Spc), end,
{IdxVal, K} end,
KSpecL), KSpecL),
% Check the all index query matches expectations % Check the all index query matches expectations
R = leveled_bookie:book_returnfolder(Book, R =
leveled_bookie:book_returnfolder(
Book,
{index_query, {index_query,
B, B,
{fun foldkeysfun/3, []}, {fun foldkeysfun/3, []},
{"idx1_bin", {"idx1_bin", "0", "|"},
"0",
"|"},
?RETURN_TERMS}), ?RETURN_TERMS}),
SW = os:timestamp(), SW = os:timestamp(),
{async, Fldr} = R, {async, Fldr} = R,
QR0 = Fldr(), QR0 = Fldr(),
io:format("Query match found of length ~w in ~w microseconds " ++ io:format(
"Query match found of length ~w in ~w microseconds "
"expected ~w ~n", "expected ~w ~n",
[length(QR0), [length(QR0), timer:now_diff(os:timestamp(), SW), length(IdxR)]),
timer:now_diff(os:timestamp(), SW),
length(IdxR)]),
QR = lists:sort(QR0), QR = lists:sort(QR0),
ER = lists:sort(IdxR), ER = lists:sort(IdxR),
ok = if ok = if ER == QR -> ok end,
ER == QR ->
ok
end,
ok. ok.
put_indexed_objects(Book, Bucket, Count) -> put_indexed_objects(Book, Bucket, Count) ->
V = testutil:get_compressiblevalue(), V = get_compressiblevalue(),
IndexGen = testutil:get_randomindexes_generator(1), IndexGen = get_randomindexes_generator(1),
SW = os:timestamp(), SW = os:timestamp(),
ObjL1 = testutil:generate_objects(Count, ObjL1 =
uuid, generate_objects(Count, uuid, [], V, IndexGen, Bucket),
[], KSpecL =
V, lists:map(
IndexGen, fun({_RN, Obj, Spc}) ->
Bucket),
KSpecL = lists:map(fun({_RN, Obj, Spc}) ->
book_riakput(Book, Obj, Spc), book_riakput(Book, Obj, Spc),
{testutil:get_key(Obj), Spc} {testutil:get_key(Obj), Spc}
end, end,
ObjL1), ObjL1),
io:format("Put of ~w objects with ~w index entries " io:format(
++ "Put of ~w objects with ~w index entries "
"each completed in ~w microseconds~n", "each completed in ~w microseconds~n",
[Count, 1, timer:now_diff(os:timestamp(), SW)]), [Count, 1, timer:now_diff(os:timestamp(), SW)]),
{KSpecL, V}. {KSpecL, V}.
@ -780,8 +776,12 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) ->
put_altered_indexed_objects(Book, Bucket, KSpecL, true). put_altered_indexed_objects(Book, Bucket, KSpecL, true).
put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) ->
IndexGen = get_randomindexes_generator(1),
V = get_compressiblevalue(), V = get_compressiblevalue(),
put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i, V).
put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i, V) ->
IndexGen = get_randomindexes_generator(1),
FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end, FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end,
MapFun = MapFun =
fun({K, Spc}) -> fun({K, Spc}) ->