Further work on system tests

Another issue exposed with laziness in the using an incomplete ledger
when checking for presence during compaction.
This commit is contained in:
martinsumner 2016-10-05 18:28:31 +01:00
parent d903f184fd
commit ad5aebe93e
5 changed files with 115 additions and 103 deletions

View file

@ -159,8 +159,6 @@
-record(state, {inker :: pid(),
penciller :: pid(),
metadata_extractor :: function(),
indexspec_converter :: function(),
cache_size :: integer(),
back_pressure :: boolean(),
ledger_cache :: gb_trees:tree(),
@ -209,18 +207,6 @@ init([Opts]) ->
% Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts),
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
Extractor = if
Opts#bookie_options.metadata_extractor == undefined ->
fun extract_metadata/2;
true ->
Opts#bookie_options.metadata_extractor
end,
Converter = if
Opts#bookie_options.indexspec_converter == undefined ->
fun convert_indexspecs/3;
true ->
Opts#bookie_options.indexspec_converter
end,
CacheSize = if
Opts#bookie_options.cache_size == undefined ->
?CACHE_SIZE;
@ -229,8 +215,6 @@ init([Opts]) ->
end,
{ok, #state{inker=Inker,
penciller=Penciller,
metadata_extractor=Extractor,
indexspec_converter=Converter,
cache_size=CacheSize,
ledger_cache=gb_trees:empty(),
is_snapshot=false}};
@ -311,19 +295,21 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) ->
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{reply,
{ok,
{LedgerSnapshot, State#state.ledger_cache},
{LedgerSnapshot,
State#state.ledger_cache},
JournalSnapshot},
State};
ledger ->
{reply,
{ok,
{LedgerSnapshot, State#state.ledger_cache},
{LedgerSnapshot,
State#state.ledger_cache},
null},
State}
end;
handle_call({compact_journal, Timeout}, _From, State) ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
State#state.penciller,
self(),
Timeout),
{reply, ok, State};
handle_call(close, _From, State) ->
@ -510,7 +496,6 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
{Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)),
case SQN of
SQN when SQN < MinSQN ->
io:format("Skipping due to low SQN ~w~n", [SQN]),
{loop, Acc0};
SQN when SQN =< MaxSQN ->
%% TODO - get correct size in a more efficient manner
@ -631,4 +616,16 @@ multi_key_test() ->
ok = book_close(Bookie2),
reset_filestructure().
indexspecs_test() ->
IndexSpecs = [{add, "t1_int", 456},
{add, "t1_bin", "adbc123"},
{remove, "t1_bin", "abdc456"}],
Changes = convert_indexspecs(IndexSpecs, 1, {o, "Bucket", "Key2"}),
?assertMatch({{i, "Bucket", "t1_int", 456, "Key2"},
{1, {active, infinity}, null}}, lists:nth(1, Changes)),
?assertMatch({{i, "Bucket", "t1_bin", "adbc123", "Key2"},
{1, {active, infinity}, null}}, lists:nth(2, Changes)),
?assertMatch({{i, "Bucket", "t1_bin", "abdc456", "Key2"},
{1, {tomb, infinity}, null}}, lists:nth(3, Changes)).
-endif.

View file

@ -13,7 +13,6 @@
terminate/2,
clerk_new/1,
clerk_compact/6,
clerk_remove/2,
clerk_stop/1,
code_change/3]).
@ -47,10 +46,6 @@
clerk_new(InkerClerkOpts) ->
gen_server:start(?MODULE, [InkerClerkOpts], []).
clerk_remove(Pid, Removals) ->
gen_server:cast(Pid, {remove, Removals}),
ok.
clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) ->
gen_server:cast(Pid,
{compact,
@ -88,12 +83,12 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
% Don't want to process a queued call waiting on an old manifest
Manifest = leveled_inker:ink_getmanifest(Inker),
MaxRunLength = State#state.max_run_length,
FilterServer = InitiateFun(Checker),
{FilterServer, MaxSQN} = InitiateFun(Checker),
CDBopts = State#state.cdb_options,
FP = CDBopts#cdb_options.file_path,
ok = filelib:ensure_dir(FP),
Candidates = scan_all_files(Manifest, FilterFun, FilterServer),
Candidates = scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN),
BestRun = assess_candidates(Candidates, MaxRunLength),
case score_run(BestRun, MaxRunLength) of
Score when Score > 0 ->
@ -102,7 +97,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
PromptDelete} = compact_files(BestRun,
CDBopts,
FilterFun,
FilterServer),
FilterServer,
MaxSQN),
FilesToDelete = lists:map(fun(C) ->
{C#candidate.low_sqn,
C#candidate.filename,
@ -127,8 +123,6 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
ok = leveled_inker:ink_compactioncomplete(Inker),
{noreply, State}
end;
handle_cast({remove, _Removals}, State) ->
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State}.
@ -147,38 +141,45 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================
check_single_file(CDB, FilterFun, FilterServer, SampleSize, BatchSize) ->
check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) ->
FN = leveled_cdb:cdb_filename(CDB),
PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize),
KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []),
R0 = lists:foldl(fun(KS, {ActSize, RplSize}) ->
{{SQN, PK}, Size} = KS,
Check = FilterFun(FilterServer, PK, SQN),
case Check of
true ->
case {Check, SQN > MaxSQN} of
{true, _} ->
{ActSize + Size, RplSize};
false ->
{false, true} ->
{ActSize + Size, RplSize};
_ ->
{ActSize, RplSize + Size}
end end,
{0, 0},
KeySizeList),
{ActiveSize, ReplacedSize} = R0,
100 * ActiveSize / (ActiveSize + ReplacedSize).
Score = 100 * ActiveSize / (ActiveSize + ReplacedSize),
io:format("Score for filename ~s is ~w~n", [FN, Score]),
Score.
scan_all_files(Manifest, FilterFun, FilterServer) ->
scan_all_files(Manifest, FilterFun, FilterServer, []).
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN, []).
scan_all_files([], _FilterFun, _FilterServer, CandidateList) ->
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
CandidateList;
scan_all_files([Entry|Tail], FilterFun, FilterServer, CandidateList) ->
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
{LowSQN, FN, JournalP} = Entry,
CpctPerc = check_single_file(JournalP,
FilterFun,
FilterServer,
MaxSQN,
?SAMPLE_SIZE,
?BATCH_SIZE),
scan_all_files(Tail,
FilterFun,
FilterServer,
MaxSQN,
CandidateList ++
[#candidate{low_sqn = LowSQN,
filename = FN,
@ -274,27 +275,29 @@ print_compaction_run(BestRun, MaxRunLength) ->
end,
BestRun).
compact_files([], _CDBopts, _FilterFun, _FilterServer) ->
compact_files([], _CDBopts, _FilterFun, _FilterServer, _MaxSQN) ->
{[], 0};
compact_files(BestRun, CDBopts, FilterFun, FilterServer) ->
compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN) ->
BatchesOfPositions = get_all_positions(BestRun, []),
compact_files(BatchesOfPositions,
CDBopts,
null,
FilterFun,
FilterServer,
MaxSQN,
[],
true).
compact_files([], _CDBopts, null, _FilterFun, _FilterServer,
compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN,
ManSlice0, PromptDelete0) ->
{ManSlice0, PromptDelete0};
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer,
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN,
ManSlice0, PromptDelete0) ->
ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0),
{ManSlice1, PromptDelete0};
compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer,
compact_files([Batch|T], CDBopts, ActiveJournal0,
FilterFun, FilterServer, MaxSQN,
ManSlice0, PromptDelete0) ->
{SrcJournal, PositionList} = Batch,
KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal,
@ -302,7 +305,8 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer,
key_value_check),
R0 = filter_output(KVCs0,
FilterFun,
FilterServer),
FilterServer,
MaxSQN),
{KVCs1, PromptDelete1} = R0,
PromptDelete2 = case {PromptDelete0, PromptDelete1} of
{true, true} ->
@ -314,7 +318,7 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer,
CDBopts,
ActiveJournal0,
ManSlice0),
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer,
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
ManSlice1, PromptDelete2).
get_all_positions([], PositionBatches) ->
@ -341,16 +345,18 @@ split_positions_into_batches(Positions, Journal, Batches) ->
Batches ++ [{Journal, ThisBatch}]).
filter_output(KVCs, FilterFun, FilterServer) ->
filter_output(KVCs, FilterFun, FilterServer, MaxSQN) ->
lists:foldl(fun(KVC, {Acc, PromptDelete}) ->
{{SQN, PK}, _V, CrcCheck} = KVC,
KeyValid = FilterFun(FilterServer, PK, SQN),
case {KeyValid, CrcCheck} of
{true, true} ->
case {KeyValid, CrcCheck, SQN > MaxSQN} of
{true, true, _} ->
{Acc ++ [KVC], PromptDelete};
{false, _} ->
{false, true, true} ->
{Acc ++ [KVC], PromptDelete};
{false, true, false} ->
{Acc, PromptDelete};
{_, false} ->
{_, false, _} ->
io:format("Corrupted value found for " ++ "
Key ~w at SQN ~w~n", [PK, SQN]),
{Acc, false}
@ -415,7 +421,9 @@ simple_score_test() ->
?assertMatch(6.0, score_run(Run1, 4)),
Run2 = [#candidate{compaction_perc = 75.0}],
?assertMatch(-15.0, score_run(Run2, 4)),
?assertMatch(0.0, score_run([], 4)).
?assertMatch(0.0, score_run([], 4)),
Run3 = [#candidate{compaction_perc = 100.0}],
?assertMatch(-40.0, score_run(Run3, 4)).
score_compare_test() ->
Run1 = [#candidate{compaction_perc = 75.0},
@ -514,15 +522,18 @@ check_single_file_test() ->
_ ->
false
end end,
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 4),
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4),
?assertMatch(37.5, Score1),
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 8, 4),
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4),
?assertMatch(100.0, Score2),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 3),
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3),
?assertMatch(37.5, Score3),
Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4),
?assertMatch(75.0, Score4),
ok = leveled_cdb:cdb_destroy(CDB).
compact_single_file_test() ->
RP = "../test/journal",
{ok, CDB} = fetch_testcdb(RP),
@ -543,7 +554,8 @@ compact_single_file_test() ->
R1 = compact_files([Candidate],
#cdb_options{file_path=CompactFP},
LedgerFun1,
LedgerSrv1),
LedgerSrv1,
9),
{ManSlice1, PromptDelete1} = R1,
?assertMatch(true, PromptDelete1),
[{LowSQN, FN, PidR}] = ManSlice1,

View file

@ -170,12 +170,12 @@ ink_forceclose(Pid) ->
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
ink_compactjournal(Pid, Penciller, Timeout) ->
ink_compactjournal(Pid, Bookie, Timeout) ->
CheckerInitiateFun = fun initiate_penciller_snapshot/1,
CheckerFilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
gen_server:call(Pid,
{compact,
Penciller,
Bookie,
CheckerInitiateFun,
CheckerFilterFun,
Timeout},
@ -818,13 +818,14 @@ manifest_printer(Manifest) ->
Manifest).
initiate_penciller_snapshot(Penciller) ->
PclOpts = #penciller_options{start_snapshot = true,
source_penciller = Penciller,
requestor = self()},
{ok, FilterServer} = leveled_penciller:pcl_start(PclOpts),
ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []),
FilterServer.
initiate_penciller_snapshot(Bookie) ->
{ok,
{LedgerSnap, LedgerCache},
_} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap,
gb_trees:to_list(LedgerCache)),
MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap),
{LedgerSnap, MaxSQN}.
%%%============================================================================
%%% Test
@ -864,6 +865,7 @@ build_dummy_journal() ->
clean_testdir(RootPath) ->
clean_subdir(filepath(RootPath, journal_dir)),
clean_subdir(filepath(RootPath, journal_compact_dir)),
clean_subdir(filepath(RootPath, manifest_dir)).
clean_subdir(DirPath) ->
@ -1033,7 +1035,7 @@ compact_journal_test() ->
?assertMatch(2, length(ActualManifest)),
ok = ink_compactjournal(Ink1,
Checker,
fun(X) -> X end,
fun(X) -> {X, 55} end,
fun(L, K, SQN) -> lists:member({SQN, K}, L) end,
5000),
timer:sleep(1000),

View file

@ -862,14 +862,10 @@ fetch(Key, Manifest, Level, FetchFun) ->
not_present,
LevelManifest) of
not_present ->
io:format("Key ~w out of range at level ~w with manifest ~w~n",
[Key, Level, LevelManifest]),
fetch(Key, Manifest, Level + 1, FetchFun);
FileToCheck ->
case FetchFun(FileToCheck, Key) of
not_present ->
io:format("Key ~w not found checking file at level ~w~n",
[Key, Level]),
fetch(Key, Manifest, Level + 1, FetchFun);
ObjectFound ->
ObjectFound

View file

@ -12,6 +12,7 @@ all() -> [simple_put_fetch_head,
journal_compaction,
simple_snapshot].
simple_put_fetch_head(_Config) ->
RootPath = reset_filestructure(),
StartOpts1 = #bookie_options{root_path=RootPath},
@ -19,6 +20,7 @@ simple_put_fetch_head(_Config) ->
{TestObject, TestSpec} = generate_testobject(),
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
check_bookie_forobject(Bookie1, TestObject),
check_bookie_formissingobject(Bookie1, "Bucket1", "Key2"),
ok = leveled_bookie:book_close(Bookie1),
StartOpts2 = #bookie_options{root_path=RootPath,
max_journalsize=3000000},
@ -31,6 +33,7 @@ simple_put_fetch_head(_Config) ->
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
check_bookie_forlist(Bookie2, ChkList1),
check_bookie_forobject(Bookie2, TestObject),
check_bookie_formissingobject(Bookie2, "Bucket1", "Key2"),
ok = leveled_bookie:book_close(Bookie2),
reset_filestructure().
@ -88,7 +91,6 @@ check_bookie_forlist(Bookie, ChkList) ->
R = leveled_bookie:book_riakget(Bookie,
Obj#r_object.bucket,
Obj#r_object.key),
io:format("Checking key ~s~n", [Obj#r_object.key]),
R = {ok, Obj} end,
ChkList).
@ -108,6 +110,10 @@ check_bookie_forobject(Bookie, TestObject) ->
ok
end.
check_bookie_formissingobject(Bookie, Bucket, Key) ->
not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key),
not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key).
journal_compaction(_Config) ->
RootPath = reset_filestructure(),
StartOpts1 = #bookie_options{root_path=RootPath,
@ -115,23 +121,30 @@ journal_compaction(_Config) ->
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{TestObject, TestSpec} = generate_testobject(),
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
{ok, TestObject} = leveled_bookie:book_riakget(Bookie1,
TestObject#r_object.bucket,
TestObject#r_object.key),
check_bookie_forobject(Bookie1, TestObject),
ObjList1 = generate_multiple_objects(5000, 2),
lists:foreach(fun({_RN, Obj, Spc}) ->
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
ObjList1),
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
lists:foreach(fun({_RN, Obj, _Spc}) ->
R = leveled_bookie:book_riakget(Bookie1,
Obj#r_object.bucket,
Obj#r_object.key),
R = {ok, Obj} end,
ChkList1),
{ok, TestObject} = leveled_bookie:book_riakget(Bookie1,
TestObject#r_object.bucket,
TestObject#r_object.key),
ChkList1 = lists:sublist(lists:sort(ObjList1), 1000),
check_bookie_forlist(Bookie1, ChkList1),
check_bookie_forobject(Bookie1, TestObject),
{B2, K2, V2, Spec2, MD} = {"Bucket1",
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
{TestObject2, TestSpec2} = generate_testobject(B2, K2, V2, Spec2, MD),
ok = leveled_bookie:book_riakput(Bookie1, TestObject2, TestSpec2),
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
check_bookie_forlist(Bookie1, ChkList1),
check_bookie_forobject(Bookie1, TestObject),
check_bookie_forobject(Bookie1, TestObject2),
timer:sleep(5000), % Allow for compaction to complete
io:format("Has journal completed?~n"),
check_bookie_forlist(Bookie1, ChkList1),
check_bookie_forobject(Bookie1, TestObject),
check_bookie_forobject(Bookie1, TestObject2),
%% Now replace all the objects
ObjList2 = generate_multiple_objects(5000, 2),
lists:foreach(fun({_RN, Obj, Spc}) ->
@ -139,24 +152,12 @@ journal_compaction(_Config) ->
ObjList2),
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
lists:foreach(fun({_RN, Obj, _Spc}) ->
R = leveled_bookie:book_riakget(Bookie1,
Obj#r_object.bucket,
Obj#r_object.key),
R = {ok, Obj} end,
ChkList3),
check_bookie_forlist(Bookie1, ChkList3),
ok = leveled_bookie:book_close(Bookie1),
% Restart
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
{ok, TestObject} = leveled_bookie:book_riakget(Bookie2,
TestObject#r_object.bucket,
TestObject#r_object.key),
lists:foreach(fun({_RN, Obj, _Spc}) ->
R = leveled_bookie:book_riakget(Bookie2,
Obj#r_object.bucket,
Obj#r_object.key),
R = {ok, Obj} end,
ChkList3),
check_bookie_forobject(Bookie2, TestObject),
check_bookie_forlist(Bookie2, ChkList3),
ok = leveled_bookie:book_close(Bookie2),
reset_filestructure().
@ -200,15 +201,19 @@ reset_filestructure() ->
leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
RootPath.
generate_testobject() ->
{B1, K1, V1, Spec1, MD} = {"Bucket1",
"Key1",
"Value1",
[],
{"MDK1", "MDV1"}},
Content = #r_content{metadata=MD, value=V1},
{#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
Spec1}.
generate_testobject(B1, K1, V1, Spec1, MD).
generate_testobject(B, K, V, Spec, MD) ->
Content = #r_content{metadata=MD, value=V},
{#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]},
Spec}.
generate_multiple_smallobjects(Count, KeyNumber) ->
generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(512)).