diff --git a/include/leveled.hrl b/include/leveled.hrl index 97eaebc..13b72cc 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -1,6 +1,7 @@ % File paths -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). +-define(LOADING_BATCH, 1000). %% Tag to be used on standard Riak KV objects -define(RIAK_TAG, o_rkv). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 2f9877b..ffb5575 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -373,16 +373,21 @@ {monitor_loglist, list(leveled_monitor:log_type())} ]. +-type load_item() :: + {leveled_codec:journal_key_tag()|null, + leveled_codec:ledger_key()|?DUMMY, + non_neg_integer(), any(), integer(), + leveled_codec:journal_keychanges()}. + -type initial_loadfun() :: fun((leveled_codec:journal_key(), any(), non_neg_integer(), {non_neg_integer(), non_neg_integer(), ledger_cache()}, fun((any()) -> {binary(), non_neg_integer()})) -> - {loop|stop, - {non_neg_integer(), non_neg_integer(), ledger_cache()}}). + {loop|stop, list(load_item())}). --export_type([initial_loadfun/0]). +-export_type([initial_loadfun/0, ledger_cache/0]). %%%============================================================================ %%% API @@ -1562,13 +1567,46 @@ empty_ledgercache() -> #ledger_cache{mem = ets:new(empty, [ordered_set])}. --spec push_to_penciller(pid(), ledger_cache()) -> ok. +-spec push_to_penciller( + pid(), + list(load_item()), + ledger_cache(), + leveled_codec:compaction_strategy()) + -> ledger_cache(). %% @doc %% The push to penciller must start as a tree to correctly de-duplicate %% the list by order before becoming a de-duplicated list for loading -push_to_penciller(Penciller, LedgerCache) -> - push_to_penciller_loop(Penciller, loadqueue_ledgercache(LedgerCache)). +push_to_penciller(Penciller, LoadItemList, LedgerCache, ReloadStrategy) -> + UpdLedgerCache = + lists:foldl( + fun({InkTag, PK, SQN, Obj, IndexSpecs, ValSize}, AccLC) -> + Chngs = + case leveled_codec:get_tagstrategy(PK, ReloadStrategy) of + recalc -> + recalcfor_ledgercache( + InkTag, PK, SQN, Obj, ValSize, IndexSpecs, + AccLC, Penciller); + _ -> + preparefor_ledgercache( + InkTag, PK, SQN, Obj, ValSize, IndexSpecs) + end, + addto_ledgercache(Chngs, AccLC, loader) + end, + LedgerCache, + lists:reverse(LoadItemList) + ), + case length(UpdLedgerCache#ledger_cache.load_queue) of + N when N > ?LOADING_BATCH -> + leveled_log:log(b0006, [UpdLedgerCache#ledger_cache.max_sqn]), + ok = + push_to_penciller_loop( + Penciller, loadqueue_ledgercache(UpdLedgerCache)), + empty_ledgercache(); + _ -> + UpdLedgerCache + end. +-spec push_to_penciller_loop(pid(), ledger_cache()) -> ok. push_to_penciller_loop(Penciller, LedgerCache) -> case push_ledgercache(Penciller, LedgerCache) of returned -> @@ -1686,21 +1724,21 @@ startup(InkerOpts, PencillerOpts) -> LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), leveled_log:log(b0005, [LedgerSQN]), ReloadStrategy = InkerOpts#inker_options.reload_strategy, - LoadFun = get_loadfun(ReloadStrategy, Penciller), + LoadFun = get_loadfun(), BatchFun = - fun(BatchAcc, _Acc) -> - push_to_penciller(Penciller, BatchAcc) + fun(BatchAcc, Acc) -> + push_to_penciller( + Penciller, BatchAcc, Acc, ReloadStrategy) end, InitAccFun = fun(FN, CurrentMinSQN) -> leveled_log:log(i0014, [FN, CurrentMinSQN]), - empty_ledgercache() + [] end, - ok = leveled_inker:ink_loadpcl(Inker, - LedgerSQN + 1, - LoadFun, - InitAccFun, - BatchFun), + FinalAcc = + leveled_inker:ink_loadpcl( + Inker, LedgerSQN + 1, LoadFun, InitAccFun, BatchFun), + ok = push_to_penciller_loop(Penciller, loadqueue_ledgercache(FinalAcc)), ok = leveled_inker:ink_checksqn(Inker, LedgerSQN), {Inker, Penciller}. @@ -2414,44 +2452,36 @@ maybe_withjitter(_CacheSize, _MaxCacheSize, _MaxCacheMult) -> false. --spec get_loadfun( - leveled_codec:compaction_strategy(), pid()) -> initial_loadfun(). +-spec get_loadfun() -> initial_loadfun(). %% @doc %% The LoadFun will be used by the Inker when walking across the Journal to %% load the Penciller at startup. -get_loadfun(ReloadStrat, Penciller) -> +get_loadfun() -> fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> - {MinSQN, MaxSQN, LedgerCache} = Acc0, + {MinSQN, MaxSQN, LoadItems} = Acc0, {SQN, InkTag, PK} = KeyInJournal, case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; SQN when SQN > MaxSQN -> - leveled_log:log(b0007, [MaxSQN, SQN]), {stop, Acc0}; _ -> {VBin, ValSize} = ExtractFun(ValueInJournal), % VBin may already be a term {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), - Chngs = - case leveled_codec:get_tagstrategy(PK, ReloadStrat) of - recalc -> - recalcfor_ledgercache(InkTag, PK, SQN, - Obj, ValSize, IdxSpecs, - LedgerCache, - Penciller); - _ -> - preparefor_ledgercache(InkTag, PK, SQN, - Obj, ValSize, IdxSpecs) - end, case SQN of MaxSQN -> - leveled_log:log(b0006, [SQN]), - LC0 = addto_ledgercache(Chngs, LedgerCache, loader), - {stop, {MinSQN, MaxSQN, LC0}}; + {stop, + {MinSQN, + MaxSQN, + [{InkTag, PK, SQN, Obj, IdxSpecs, ValSize} + |LoadItems]}}; _ -> - LC0 = addto_ledgercache(Chngs, LedgerCache, loader), - {loop, {MinSQN, MaxSQN, LC0}} + {loop, + {MinSQN, + MaxSQN, + [{InkTag, PK, SQN, Obj, IdxSpecs, ValSize} + |LoadItems]}} end end end. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5596a37..c50c08a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -375,8 +375,9 @@ cdb_deletepending(Pid) -> cdb_deletepending(Pid, ManSQN, Inker) -> gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}). --spec cdb_scan(pid(), filter_fun(), any(), integer()|undefined) -> - {integer()|eof, any()}. +-spec cdb_scan( + pid(), filter_fun(), any(), integer()|undefined) + -> {integer()|eof, any()}. %% @doc %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% continue from that point (calling function has to protect against) double diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index ae6b7d1..013bb5f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -133,7 +133,6 @@ -define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). -define(PENDING_FILEX, "pnd"). --define(LOADING_BATCH, 1000). -define(TEST_KC, {[], infinity}). -record(state, {manifest = [] :: list(), @@ -337,11 +336,14 @@ ink_fold(Pid, MinSQN, FoldFuns, Acc) -> {fold, MinSQN, FoldFuns, Acc, by_runner}, infinity). --spec ink_loadpcl(pid(), - integer(), - leveled_bookie:initial_loadfun(), - fun((string(), non_neg_integer()) -> any()), - fun((any(), any()) -> ok)) -> ok. +-spec ink_loadpcl( + pid(), + integer(), + leveled_bookie:initial_loadfun(), + fun((string(), non_neg_integer()) -> any()), + fun((any(), leveled_bookie:ledger_cache()) + -> leveled_bookie:ledger_cache())) + -> leveled_bookie:ledger_cache(). %% %% Function to prompt load of the Ledger at startup. The Penciller should %% have determined the lowest SQN not present in the Ledger, and the inker @@ -355,7 +357,7 @@ ink_loadpcl(Pid, MinSQN, LoadFun, InitAccFun, BatchFun) -> {fold, MinSQN, {LoadFun, InitAccFun, BatchFun}, - ok, + leveled_bookie:empty_ledgercache(), as_ink}, infinity). diff --git a/src/leveled_log.erl b/src/leveled_log.erl index d609f77..ffdc855 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -54,8 +54,6 @@ {info, <<"LedgerSQN=~w at startup">>}, b0006 => {info, <<"Reached end of load batch with SQN ~w">>}, - b0007 => - {info, <<"Skipping as exceeded MaxSQN ~w with SQN ~w">>}, b0008 => {info, <<"Bucket list finds no more results">>}, b0009 => diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 6b38077..167852c 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -686,26 +686,31 @@ handle_call({push_mem, {LedgerTable, PushedIdx, MinSQN, MaxSQN}}, false -> leveled_tree:from_orderedset(LedgerTable, ?CACHE_TYPE) end, - {UpdMaxSQN, NewL0Size, UpdL0Cache} = - leveled_pmem:add_to_cache( + case leveled_pmem:add_to_cache( L0Size, {PushedTree, MinSQN, MaxSQN}, State#state.ledger_sqn, - State#state.levelzero_cache), - UpdL0Index = - leveled_pmem:add_to_index( - PushedIdx, - State#state.levelzero_index, - length(State#state.levelzero_cache) + 1), - leveled_log:log_randomtimer( - p0031, [NewL0Size, true, true, MinSQN, MaxSQN], SW, 0.1), - {reply, - ok, - State#state{ - levelzero_cache = UpdL0Cache, - levelzero_size = NewL0Size, - levelzero_index = UpdL0Index, - ledger_sqn = UpdMaxSQN}} + State#state.levelzero_cache, + true) of + empty_push -> + {reply, ok, State}; + {UpdMaxSQN, NewL0Size, UpdL0Cache} -> + UpdL0Index = + leveled_pmem:add_to_index( + PushedIdx, + State#state.levelzero_index, + length(State#state.levelzero_cache) + 1), + leveled_log:log_randomtimer( + p0031, + [NewL0Size, true, true, MinSQN, MaxSQN], SW, 0.1), + {reply, + ok, + State#state{ + levelzero_cache = UpdL0Cache, + levelzero_size = NewL0Size, + levelzero_index = UpdL0Index, + ledger_sqn = UpdMaxSQN}} + end end; handle_call({fetch, Key, Hash, UseL0Index}, _From, State) -> L0Idx = @@ -836,10 +841,12 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, case Query of no_lookup -> {UpdMaxSQN, UpdSize, L0Cache} = - leveled_pmem:add_to_cache(State#state.levelzero_size, - {LM1Cache, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache), + leveled_pmem:add_to_cache( + State#state.levelzero_size, + {LM1Cache, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache, + false), {#state{levelzero_cache = L0Cache, ledger_sqn = UpdMaxSQN, levelzero_size = UpdSize, @@ -865,10 +872,12 @@ handle_call({register_snapshot, Snapshot, Query, BookiesMem, LongRunning}, EndKey}}; undefined -> {UpdMaxSQN, UpdSize, L0Cache} = - leveled_pmem:add_to_cache(State#state.levelzero_size, - {LM1Cache, MinSQN, MaxSQN}, - State#state.ledger_sqn, - State#state.levelzero_cache), + leveled_pmem:add_to_cache( + State#state.levelzero_size, + {LM1Cache, MinSQN, MaxSQN}, + State#state.ledger_sqn, + State#state.levelzero_cache, + false), LM1Idx = case BookieIdx of empty_index -> diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 025fa7b..44fde74 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -30,7 +30,7 @@ -export([ prepare_for_index/2, - add_to_cache/4, + add_to_cache/5, to_list/2, check_levelzero/3, check_levelzero/4, @@ -109,22 +109,30 @@ check_index(Hash, L0Index) -> L0Index), lists:reverse(Positions). --spec add_to_cache(integer(), - {tuple(), integer(), integer()}, - integer(), - list()) -> - {integer(), integer(), list()}. +-spec add_to_cache( + integer(), + {tuple(), integer(), integer()}, + integer(), + list(), + boolean()) -> {integer(), integer(), list()}|empty_push. %% @doc %% The penciller's cache is a list of leveled_trees, this adds a new tree to %% that cache, providing an update to the approximate size of the cache and %% the Ledger's SQN. -add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> - LM1Size = leveled_tree:tsize(LevelMinus1), - if - MinSQN >= LedgerSQN -> - {MaxSQN, - L0Size + LM1Size, - [LevelMinus1|TreeList]} +%% Updates to cache must set Writable to true if the update could generate a +%% Level 0 file - as this must guard against empty entries (which may lead to +%% an attempt to write an empty L0 file) +add_to_cache(L0Size, {LM1, MinSQN, MaxSQN}, LedgerSQN, TreeList, Writeable) -> + case {Writeable, leveled_tree:tsize(LM1)} of + {true, 0} -> + empty_push; + {_, LM1Size} -> + if + MinSQN >= LedgerSQN -> + {MaxSQN, + L0Size + LM1Size, + [LM1|TreeList]} + end end. -spec to_list( @@ -268,12 +276,12 @@ compare_method_test() -> R = lists:foldl(fun(_X, {LedgerSQN, L0Size, L0TreeList}) -> LM1 = generate_randomkeys(LedgerSQN + 1, 2000, 1, 500), - add_to_cache(L0Size, - {LM1, - LedgerSQN + 1, - LedgerSQN + 2000}, - LedgerSQN, - L0TreeList) + add_to_cache( + L0Size, + {LM1, LedgerSQN + 1, LedgerSQN + 2000}, + LedgerSQN, + L0TreeList, + true) end, {0, 0, []}, lists:seq(1, 16)), @@ -365,10 +373,12 @@ with_index_test2() -> LM1Array = lists:foldl(IndexPrepareFun, new_index(), LM1), LM1SL = leveled_tree:from_orderedlist(lists:ukeysort(1, LM1), ?CACHE_TYPE), UpdL0Index = add_to_index(LM1Array, L0Idx, length(L0TreeList) + 1), - R = add_to_cache(L0Size, - {LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, - LedgerSQN, - L0TreeList), + R = add_to_cache( + L0Size, + {LM1SL, LedgerSQN + 1, LedgerSQN + 2000}, + LedgerSQN, + L0TreeList, + true), {R, UpdL0Index, lists:ukeymerge(1, LM1, SrcList)} end, diff --git a/test/end_to_end/recovery_SUITE.erl b/test/end_to_end/recovery_SUITE.erl index f528357..0f018c0 100644 --- a/test/end_to_end/recovery_SUITE.erl +++ b/test/end_to_end/recovery_SUITE.erl @@ -5,7 +5,6 @@ -export([ recovery_with_samekeyupdates/1, same_key_rotation_withindexes/1, - hot_backup_simple/1, hot_backup_changes/1, retain_strategy/1, recalc_strategy/1, @@ -16,15 +15,14 @@ aae_bustedjournal/1, journal_compaction_bustedjournal/1, close_duringcompaction/1, - allkeydelta_journal_multicompact/1, recompact_keydeltas/1, - simple_cachescoring/1 + simple_cachescoring/1, + replace_everything/1 ]). all() -> [ recovery_with_samekeyupdates, same_key_rotation_withindexes, - hot_backup_simple, hot_backup_changes, retain_strategy, recalc_strategy, @@ -34,13 +32,141 @@ all() -> [ aae_bustedjournal, journal_compaction_bustedjournal, close_duringcompaction, - allkeydelta_journal_multicompact, recompact_keydeltas, stdtag_recalc, - simple_cachescoring + simple_cachescoring, + replace_everything ]. +replace_everything(_Config) -> + % See https://github.com/martinsumner/leveled/issues/389 + % Also replaces previous test which was checking the comapction process + % respects the journal object count passed at startup + RootPath = testutil:reset_filestructure(), + BackupPath = testutil:reset_filestructure("backupRE"), + CompPath = filename:join(RootPath, "journal/journal_files/post_compact"), + SmallJournalCount = 7000, + StdJournalCount = 20000, + BookOpts = + fun(JournalObjectCount) -> + [{root_path, RootPath}, + {cache_size, 2000}, + {max_journalobjectcount, JournalObjectCount}, + {sync_strategy, testutil:sync_strategy()}, + {reload_strategy, [{?RIAK_TAG, recalc}]}] + end, + {ok, Book1} = leveled_bookie:book_start(BookOpts(StdJournalCount)), + BKT = "ReplaceAll", + BKT1 = "ReplaceAll1", + BKT2 = "ReplaceAll2", + BKT3 = "ReplaceAll3", + {KSpcL1, V1} = + testutil:put_indexed_objects(Book1, BKT, 50000), + ok = testutil:check_indexed_objects(Book1, BKT, KSpcL1, V1), + {KSpcL2, V2} = + testutil:put_altered_indexed_objects(Book1, BKT, KSpcL1), + ok = testutil:check_indexed_objects(Book1, BKT, KSpcL2, V2), + compact_and_wait(Book1, 1000), + {ok, FileList1} = file:list_dir(CompPath), + io:format("Number of files after compaction ~w~n", [length(FileList1)]), + compact_and_wait(Book1, 1000), + {ok, FileList2} = file:list_dir(CompPath), + io:format("Number of files after compaction ~w~n", [length(FileList2)]), + true = FileList1 == FileList2, + {async, BackupFun} = leveled_bookie:book_hotbackup(Book1), + ok = BackupFun(BackupPath), + + io:format("Restarting without key store~n"), + ok = leveled_bookie:book_close(Book1), + + BookOptsBackup = [{root_path, BackupPath}, + {cache_size, 2000}, + {sync_strategy, testutil:sync_strategy()}], + SW1 = os:timestamp(), + {ok, Book2} = leveled_bookie:book_start(BookOptsBackup), + + io:format( + "Opened backup with no ledger in ~w ms~n", + [timer:now_diff(os:timestamp(), SW1) div 1000]), + ok = testutil:check_indexed_objects(Book2, BKT, KSpcL2, V2), + ok = leveled_bookie:book_close(Book2), + + SW2 = os:timestamp(), + {ok, Book3} = leveled_bookie:book_start(BookOptsBackup), + io:format( + "Opened backup with ledger in ~w ms~n", + [timer:now_diff(os:timestamp(), SW2) div 1000]), + ok = testutil:check_indexed_objects(Book3, BKT, KSpcL2, V2), + ok = leveled_bookie:book_destroy(Book3), + + {ok, Book4} = leveled_bookie:book_start(BookOpts(StdJournalCount)), + {KSpcL3, V3} = testutil:put_indexed_objects(Book4, BKT1, 1000), + {KSpcL4, _V4} = testutil:put_indexed_objects(Book4, BKT2, 50000), + {KSpcL5, V5} = + testutil:put_altered_indexed_objects(Book4, BKT2, KSpcL4), + compact_and_wait(Book4), + {async, BackupFun4} = leveled_bookie:book_hotbackup(Book4), + ok = BackupFun4(BackupPath), + ok = leveled_bookie:book_close(Book4), + + io:format("Restarting without key store~n"), + SW5 = os:timestamp(), + {ok, Book5} = leveled_bookie:book_start(BookOptsBackup), + io:format( + "Opened backup with no ledger in ~w ms~n", + [timer:now_diff(os:timestamp(), SW5) div 1000]), + ok = testutil:check_indexed_objects(Book5, BKT, KSpcL2, V2), + ok = testutil:check_indexed_objects(Book5, BKT1, KSpcL3, V3), + ok = testutil:check_indexed_objects(Book5, BKT2, KSpcL5, V5), + ok = leveled_bookie:book_destroy(Book5), + + io:format("Testing with sparse distribution after update~n"), + io:format( + "Also use smaller Journal files and confirm value used " + "in compaction~n"), + {ok, Book6} = leveled_bookie:book_start(BookOpts(SmallJournalCount)), + {KSpcL6, _V6} = testutil:put_indexed_objects(Book6, BKT3, 60000), + {OSpcL6, RSpcL6} = lists:split(200, lists:ukeysort(1, KSpcL6)), + {KSpcL7, V7} = + testutil:put_altered_indexed_objects(Book6, BKT3, RSpcL6), + {ok, FileList3} = file:list_dir(CompPath), + compact_and_wait(Book6), + {ok, FileList4} = file:list_dir(CompPath), + {OSpcL6A, V7} = + testutil:put_altered_indexed_objects(Book6, BKT3, OSpcL6, true, V7), + {async, BackupFun6} = leveled_bookie:book_hotbackup(Book6), + ok = BackupFun6(BackupPath), + ok = leveled_bookie:book_close(Book6), + + io:format("Checking object count in newly compacted journal files~n"), + NewlyCompactedFiles = lists:subtract(FileList4, FileList3), + true = length(NewlyCompactedFiles) >= 1, + CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end, + CheckLengthFun = + fun(FN) -> + {ok, CF} = + leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)), + {_LP, TK} = + leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined), + io:format("File ~s has ~w keys~n", [FN, TK]), + true = TK =< SmallJournalCount + end, + lists:foreach(CheckLengthFun, NewlyCompactedFiles), + + io:format("Restarting without key store~n"), + SW7 = os:timestamp(), + {ok, Book7} = leveled_bookie:book_start(BookOptsBackup), + io:format( + "Opened backup with no ledger in ~w ms~n", + [timer:now_diff(os:timestamp(), SW7) div 1000]), + ok = testutil:check_indexed_objects(Book7, BKT3, KSpcL7 ++ OSpcL6A, V7), + ok = leveled_bookie:book_destroy(Book7), + + testutil:reset_filestructure(BackupPath), + testutil:reset_filestructure(). + + close_duringcompaction(_Config) -> % Prompt a compaction, and close immedately - confirm that the close % happens without error. @@ -203,13 +329,14 @@ same_key_rotation_withindexes(_Config) -> CheckFun = fun(Bookie) -> {async, R} = - leveled_bookie:book_indexfold(Bookie, - {Bucket, <<>>}, - {FoldKeysFun, []}, - {list_to_binary("binary_bin"), - <<0:32/integer>>, - <<255:32/integer>>}, - {true, undefined}), + leveled_bookie:book_indexfold( + Bookie, + {Bucket, <<>>}, + {FoldKeysFun, []}, + {list_to_binary("binary_bin"), + <<0:32/integer>>, + <<255:32/integer>>}, + {true, undefined}), QR = R(), BadAnswers = lists:filter(fun({I, _K}) -> I =/= <> end, QR), @@ -229,36 +356,6 @@ same_key_rotation_withindexes(_Config) -> testutil:reset_filestructure(). -hot_backup_simple(_Config) -> - % The journal may have a hot backup. This allows for an online Bookie - % to be sent a message to prepare a backup function, which an asynchronous - % worker can then call to generate a backup taken at the point in time - % the original message was processsed. - % - % The basic test is to: - % 1 - load a Bookie, take a backup, delete the original path, restore from - % that path - RootPath = testutil:reset_filestructure(), - BackupPath = testutil:reset_filestructure("backup0"), - BookOpts = [{root_path, RootPath}, - {cache_size, 1000}, - {max_journalsize, 10000000}, - {sync_strategy, testutil:sync_strategy()}], - {ok, Spcl1, LastV1} = rotating_object_check(BookOpts, "Bucket1", 3200), - {ok, Book1} = leveled_bookie:book_start(BookOpts), - {async, BackupFun} = leveled_bookie:book_hotbackup(Book1), - ok = BackupFun(BackupPath), - ok = leveled_bookie:book_close(Book1), - RootPath = testutil:reset_filestructure(), - BookOptsBackup = [{root_path, BackupPath}, - {cache_size, 2000}, - {max_journalsize, 20000000}, - {sync_strategy, testutil:sync_strategy()}], - {ok, BookBackup} = leveled_bookie:book_start(BookOptsBackup), - ok = testutil:check_indexed_objects(BookBackup, "Bucket1", Spcl1, LastV1), - ok = leveled_bookie:book_close(BookBackup), - BackupPath = testutil:reset_filestructure("backup0"). - hot_backup_changes(_Config) -> RootPath = testutil:reset_filestructure(), BackupPath = testutil:reset_filestructure("backup0"), @@ -946,95 +1043,6 @@ busted_journal_test(MaxJournalSize, PressMethod, PressPoint, Bust) -> ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(10000). - -allkeydelta_journal_multicompact(_Config) -> - RootPath = testutil:reset_filestructure(), - CompPath = filename:join(RootPath, "journal/journal_files/post_compact"), - B = <<"test_bucket">>, - StartOptsFun = - fun(JOC) -> - [{root_path, RootPath}, - {max_journalobjectcount, JOC}, - {max_run_length, 4}, - {singlefile_compactionpercentage, 70.0}, - {maxrunlength_compactionpercentage, 85.0}, - {sync_strategy, testutil:sync_strategy()}] - end, - {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(14000)), - {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000), - {KSpcL2, V2} = testutil:put_altered_indexed_objects(Bookie1, - B, - KSpcL1, - false), - compact_and_wait(Bookie1, 0), - compact_and_wait(Bookie1, 0), - {ok, FileList1} = file:list_dir(CompPath), - io:format("Number of files after compaction ~w~n", [length(FileList1)]), - compact_and_wait(Bookie1, 0), - {ok, FileList2} = file:list_dir(CompPath), - io:format("Number of files after compaction ~w~n", [length(FileList2)]), - true = FileList1 == FileList2, - - ok = testutil:check_indexed_objects(Bookie1, - B, - KSpcL1 ++ KSpcL2, - V2), - - ok = leveled_bookie:book_close(Bookie1), - leveled_penciller:clean_testdir(RootPath ++ "/ledger"), - io:format("Restart without ledger~n"), - {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(13000)), - - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2, - V2), - - {KSpcL3, _V3} = testutil:put_altered_indexed_objects(Bookie2, - B, - KSpcL2, - false), - compact_and_wait(Bookie2, 0), - {ok, FileList3} = file:list_dir(CompPath), - io:format("Number of files after compaction ~w~n", [length(FileList3)]), - - ok = leveled_bookie:book_close(Bookie2), - - io:format("Restart with smaller journal object count~n"), - {ok, Bookie3} = leveled_bookie:book_start(StartOptsFun(7000)), - - {KSpcL4, V4} = testutil:put_altered_indexed_objects(Bookie3, - B, - KSpcL3, - false), - - compact_and_wait(Bookie3, 0), - - ok = testutil:check_indexed_objects(Bookie3, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, - V4), - {ok, FileList4} = file:list_dir(CompPath), - io:format("Number of files after compaction ~w~n", [length(FileList4)]), - - ok = leveled_bookie:book_close(Bookie3), - - NewlyCompactedFiles = lists:subtract(FileList4, FileList3), - true = length(NewlyCompactedFiles) >= 3, - CDBFilterFun = fun(_K, _V, _P, Acc, _EF) -> {loop, Acc + 1} end, - CheckLengthFun = - fun(FN) -> - {ok, CF} = - leveled_cdb:cdb_open_reader(filename:join(CompPath, FN)), - {_LP, TK} = - leveled_cdb:cdb_scan(CF, CDBFilterFun, 0, undefined), - io:format("File ~s has ~w keys~n", [FN, TK]), - true = TK =< 7000 - end, - lists:foreach(CheckLengthFun, NewlyCompactedFiles), - - testutil:reset_filestructure(10000). - recompact_keydeltas(_Config) -> RootPath = testutil:reset_filestructure(), B = <<"test_bucket">>, @@ -1049,73 +1057,52 @@ recompact_keydeltas(_Config) -> end, {ok, Bookie1} = leveled_bookie:book_start(StartOptsFun(45000)), {KSpcL1, _V1} = testutil:put_indexed_objects(Bookie1, B, 24000), - {KSpcL2, _V2} = testutil:put_altered_indexed_objects(Bookie1, - B, - KSpcL1, - false), + {KSpcL2, _V2} = + testutil:put_altered_indexed_objects(Bookie1, B, KSpcL1, false), ok = leveled_bookie:book_close(Bookie1), {ok, Bookie2} = leveled_bookie:book_start(StartOptsFun(45000)), compact_and_wait(Bookie2, 0), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Bookie2, - B, - KSpcL2, - false), + {KSpcL3, V3} = + testutil:put_altered_indexed_objects(Bookie2, B, KSpcL2, false), compact_and_wait(Bookie2, 0), - ok = testutil:check_indexed_objects(Bookie2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), + ok = + testutil:check_indexed_objects( + Bookie2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3), ok = leveled_bookie:book_close(Bookie2), testutil:reset_filestructure(10000). - - rotating_object_check(BookOpts, B, NumberOfObjects) -> {ok, Book1} = leveled_bookie:book_start(BookOpts), {KSpcL1, V1} = testutil:put_indexed_objects(Book1, B, NumberOfObjects), - ok = testutil:check_indexed_objects(Book1, - B, - KSpcL1, - V1), - {KSpcL2, V2} = testutil:put_altered_indexed_objects(Book1, - B, - KSpcL1, - false), - ok = testutil:check_indexed_objects(Book1, - B, - KSpcL1 ++ KSpcL2, - V2), - {KSpcL3, V3} = testutil:put_altered_indexed_objects(Book1, - B, - KSpcL2, - false), - ok = testutil:check_indexed_objects(Book1, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), + ok = testutil:check_indexed_objects(Book1, B, KSpcL1, V1), + {KSpcL2, V2} = + testutil:put_altered_indexed_objects(Book1, B, KSpcL1, false), + ok = + testutil:check_indexed_objects( + Book1, B, KSpcL1 ++ KSpcL2, V2), + {KSpcL3, V3} = + testutil:put_altered_indexed_objects(Book1, B, KSpcL2, false), + ok = + testutil:check_indexed_objects( + Book1, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3), ok = leveled_bookie:book_close(Book1), {ok, Book2} = leveled_bookie:book_start(BookOpts), - ok = testutil:check_indexed_objects(Book2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3, - V3), - {KSpcL4, V4} = testutil:put_altered_indexed_objects(Book2, - B, - KSpcL3, - false), + ok = + testutil:check_indexed_objects( + Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3, V3), + {KSpcL4, V4} = + testutil:put_altered_indexed_objects(Book2, B, KSpcL3, false), io:format("Bucket complete - checking index before compaction~n"), - ok = testutil:check_indexed_objects(Book2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, - V4), + ok = + testutil:check_indexed_objects( + Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4), compact_and_wait(Book2), io:format("Checking index following compaction~n"), - ok = testutil:check_indexed_objects(Book2, - B, - KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, - V4), + ok = + testutil:check_indexed_objects( + Book2, B, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4), ok = leveled_bookie:book_close(Book2), {ok, KSpcL1 ++ KSpcL2 ++ KSpcL3 ++ KSpcL4, V4}. diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index bd197ad..84774fa 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -46,6 +46,7 @@ put_indexed_objects/3, put_altered_indexed_objects/3, put_altered_indexed_objects/4, + put_altered_indexed_objects/5, check_indexed_objects/4, rotating_object_check/3, corrupt_journal/5, @@ -719,60 +720,55 @@ foldkeysfun_returnbucket(Bucket, Key, Acc) -> check_indexed_objects(Book, B, KSpecL, V) -> % Check all objects match, return what should be the results of an all % index query - IdxR = lists:map(fun({K, Spc}) -> - {ok, O} = book_riakget(Book, B, K), - V = testutil:get_value(O), - {add, - "idx1_bin", - IdxVal} = lists:keyfind(add, 1, Spc), - {IdxVal, K} end, - KSpecL), + IdxR = + lists:map( + fun({K, Spc}) -> + {ok, O} = book_riakget(Book, B, K), + V = testutil:get_value(O), + {add, "idx1_bin", IdxVal} = lists:keyfind(add, 1, Spc), + {IdxVal, K} + end, + KSpecL), % Check the all index query matches expectations - R = leveled_bookie:book_returnfolder(Book, - {index_query, - B, - {fun foldkeysfun/3, []}, - {"idx1_bin", - "0", - "|"}, - ?RETURN_TERMS}), + R = + leveled_bookie:book_returnfolder( + Book, + {index_query, + B, + {fun foldkeysfun/3, []}, + {"idx1_bin", "0", "|"}, + ?RETURN_TERMS}), SW = os:timestamp(), {async, Fldr} = R, QR0 = Fldr(), - io:format("Query match found of length ~w in ~w microseconds " ++ - "expected ~w ~n", - [length(QR0), - timer:now_diff(os:timestamp(), SW), - length(IdxR)]), + io:format( + "Query match found of length ~w in ~w microseconds " + "expected ~w ~n", + [length(QR0), timer:now_diff(os:timestamp(), SW), length(IdxR)]), QR = lists:sort(QR0), ER = lists:sort(IdxR), - ok = if - ER == QR -> - ok - end, + ok = if ER == QR -> ok end, ok. put_indexed_objects(Book, Bucket, Count) -> - V = testutil:get_compressiblevalue(), - IndexGen = testutil:get_randomindexes_generator(1), + V = get_compressiblevalue(), + IndexGen = get_randomindexes_generator(1), SW = os:timestamp(), - ObjL1 = testutil:generate_objects(Count, - uuid, - [], - V, - IndexGen, - Bucket), - KSpecL = lists:map(fun({_RN, Obj, Spc}) -> - book_riakput(Book, Obj, Spc), - {testutil:get_key(Obj), Spc} - end, - ObjL1), - io:format("Put of ~w objects with ~w index entries " - ++ - "each completed in ~w microseconds~n", - [Count, 1, timer:now_diff(os:timestamp(), SW)]), + ObjL1 = + generate_objects(Count, uuid, [], V, IndexGen, Bucket), + KSpecL = + lists:map( + fun({_RN, Obj, Spc}) -> + book_riakput(Book, Obj, Spc), + {testutil:get_key(Obj), Spc} + end, + ObjL1), + io:format( + "Put of ~w objects with ~w index entries " + "each completed in ~w microseconds~n", + [Count, 1, timer:now_diff(os:timestamp(), SW)]), {KSpecL, V}. @@ -780,8 +776,12 @@ put_altered_indexed_objects(Book, Bucket, KSpecL) -> put_altered_indexed_objects(Book, Bucket, KSpecL, true). put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i) -> - IndexGen = get_randomindexes_generator(1), V = get_compressiblevalue(), + put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i, V). + +put_altered_indexed_objects(Book, Bucket, KSpecL, RemoveOld2i, V) -> + IndexGen = get_randomindexes_generator(1), + FindAdditionFun = fun(SpcItem) -> element(1, SpcItem) == add end, MapFun = fun({K, Spc}) ->