Inker Clerk - Further Testing
Expanded the unit tetsing of the Inker Clerk actor. Still WIP
This commit is contained in:
parent
d24b100aa6
commit
50b50ba486
2 changed files with 87 additions and 32 deletions
|
@ -87,6 +87,11 @@ handle_cast({compact, Penciller, Inker, _Timeout}, State) ->
|
||||||
FilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
FilterFun = fun leveled_penciller:pcl_checksequencenumber/3,
|
||||||
FilterServer = leveled_penciller:pcl_start(PclOpts),
|
FilterServer = leveled_penciller:pcl_start(PclOpts),
|
||||||
ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []),
|
ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []),
|
||||||
|
|
||||||
|
CDBopts = State#state.cdb_options,
|
||||||
|
FP = CDBopts#cdb_options.file_path,
|
||||||
|
ok = filelib:ensure_dir(FP),
|
||||||
|
|
||||||
Candidates = scan_all_files(Manifest,
|
Candidates = scan_all_files(Manifest,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
FilterServer),
|
FilterServer),
|
||||||
|
@ -94,7 +99,6 @@ handle_cast({compact, Penciller, Inker, _Timeout}, State) ->
|
||||||
case score_run(BestRun, MaxRunLength) of
|
case score_run(BestRun, MaxRunLength) of
|
||||||
Score when Score > 0 ->
|
Score when Score > 0 ->
|
||||||
print_compaction_run(BestRun, MaxRunLength),
|
print_compaction_run(BestRun, MaxRunLength),
|
||||||
CDBopts = State#state.cdb_options,
|
|
||||||
{ManifestSlice,
|
{ManifestSlice,
|
||||||
PromptDelete} = compact_files(BestRun,
|
PromptDelete} = compact_files(BestRun,
|
||||||
CDBopts,
|
CDBopts,
|
||||||
|
@ -274,10 +278,10 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer) ->
|
||||||
[],
|
[],
|
||||||
true).
|
true).
|
||||||
|
|
||||||
compact_files([], _CDBopts, _ActiveJournal0, _FilterFun, _FilterServer,
|
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer,
|
||||||
ManSlice0, PromptDelete0) ->
|
ManSlice0, PromptDelete0) ->
|
||||||
%% Need to close the active file
|
ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0),
|
||||||
{ManSlice0, PromptDelete0};
|
{ManSlice1, PromptDelete0};
|
||||||
compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer,
|
compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer,
|
||||||
ManSlice0, PromptDelete0) ->
|
ManSlice0, PromptDelete0) ->
|
||||||
{SrcJournal, PositionList} = Batch,
|
{SrcJournal, PositionList} = Batch,
|
||||||
|
@ -306,13 +310,20 @@ get_all_positions([], PositionBatches) ->
|
||||||
get_all_positions([HeadRef|RestOfBest], PositionBatches) ->
|
get_all_positions([HeadRef|RestOfBest], PositionBatches) ->
|
||||||
SrcJournal = HeadRef#candidate.journal,
|
SrcJournal = HeadRef#candidate.journal,
|
||||||
Positions = leveled_cdb:cdb_getpositions(SrcJournal, all),
|
Positions = leveled_cdb:cdb_getpositions(SrcJournal, all),
|
||||||
Batches = split_positions_into_batches(Positions, SrcJournal, []),
|
Batches = split_positions_into_batches(lists:sort(Positions),
|
||||||
|
SrcJournal,
|
||||||
|
[]),
|
||||||
get_all_positions(RestOfBest, PositionBatches ++ Batches).
|
get_all_positions(RestOfBest, PositionBatches ++ Batches).
|
||||||
|
|
||||||
split_positions_into_batches([], _Journal, Batches) ->
|
split_positions_into_batches([], _Journal, Batches) ->
|
||||||
Batches;
|
Batches;
|
||||||
split_positions_into_batches(Positions, Journal, Batches) ->
|
split_positions_into_batches(Positions, Journal, Batches) ->
|
||||||
{ThisBatch, Tail} = lists:split(?BATCH_SIZE, Positions),
|
{ThisBatch, Tail} = if
|
||||||
|
length(Positions) > ?BATCH_SIZE ->
|
||||||
|
lists:split(?BATCH_SIZE, Positions);
|
||||||
|
true ->
|
||||||
|
{Positions, []}
|
||||||
|
end,
|
||||||
split_positions_into_batches(Tail,
|
split_positions_into_batches(Tail,
|
||||||
Journal,
|
Journal,
|
||||||
Batches ++ [{Journal, ThisBatch}]).
|
Batches ++ [{Journal, ThisBatch}]).
|
||||||
|
@ -337,32 +348,36 @@ filter_output(KVCs, FilterFun, FilterServer) ->
|
||||||
KVCs).
|
KVCs).
|
||||||
|
|
||||||
|
|
||||||
write_values([KVC|Rest], CDBopts, ActiveJournal0, ManSlice0) ->
|
write_values([], _CDBopts, Journal0, ManSlice0) ->
|
||||||
|
{Journal0, ManSlice0};
|
||||||
|
write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) ->
|
||||||
{{SQN, PK}, V, _CrcCheck} = KVC,
|
{{SQN, PK}, V, _CrcCheck} = KVC,
|
||||||
{ok, ActiveJournal1} = case ActiveJournal0 of
|
{ok, Journal1} = case Journal0 of
|
||||||
null ->
|
null ->
|
||||||
FP = CDBopts#cdb_options.file_path,
|
FP = CDBopts#cdb_options.file_path,
|
||||||
FN = leveled_inker:filepath(FP,
|
FN = leveled_inker:filepath(FP,
|
||||||
SQN,
|
SQN,
|
||||||
new_journal),
|
compact_journal),
|
||||||
leveled_cdb:cdb_open_writer(FN,
|
leveled_cdb:cdb_open_writer(FN,
|
||||||
CDBopts);
|
CDBopts);
|
||||||
_ ->
|
_ ->
|
||||||
{ok, ActiveJournal0}
|
{ok, Journal0}
|
||||||
end,
|
end,
|
||||||
R = leveled_cdb:cdb_put(ActiveJournal1, {SQN, PK}, V),
|
R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, V),
|
||||||
case R of
|
case R of
|
||||||
ok ->
|
ok ->
|
||||||
write_values(Rest, CDBopts, ActiveJournal1, ManSlice0);
|
write_values(Rest, CDBopts, Journal1, ManSlice0);
|
||||||
roll ->
|
roll ->
|
||||||
{ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal1),
|
ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1),
|
||||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
|
|
||||||
{StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR),
|
|
||||||
ManSlice1 = ManSlice0 ++ [{StartSQN, NewFN, PidR}],
|
|
||||||
write_values(Rest, CDBopts, null, ManSlice1)
|
write_values(Rest, CDBopts, null, ManSlice1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
generate_manifest_entry(ActiveJournal) ->
|
||||||
|
{ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal),
|
||||||
|
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
|
||||||
|
{StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR),
|
||||||
|
[{StartSQN, NewFN, PidR}].
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -451,8 +466,7 @@ find_bestrun_test() ->
|
||||||
#candidate{compaction_perc = 65.0}],
|
#candidate{compaction_perc = 65.0}],
|
||||||
assess_candidates(CList0, 6)).
|
assess_candidates(CList0, 6)).
|
||||||
|
|
||||||
check_single_file_test() ->
|
fetch_testcdb(RP) ->
|
||||||
RP = "../test/journal",
|
|
||||||
FN1 = leveled_inker:filepath(RP, 1, new_journal),
|
FN1 = leveled_inker:filepath(RP, 1, new_journal),
|
||||||
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}),
|
{ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}),
|
||||||
{K1, V1} = {{1, "Key1"}, term_to_binary("Value1")},
|
{K1, V1} = {{1, "Key1"}, term_to_binary("Value1")},
|
||||||
|
@ -472,7 +486,11 @@ check_single_file_test() ->
|
||||||
ok = leveled_cdb:cdb_put(CDB1, K7, V7),
|
ok = leveled_cdb:cdb_put(CDB1, K7, V7),
|
||||||
ok = leveled_cdb:cdb_put(CDB1, K8, V8),
|
ok = leveled_cdb:cdb_put(CDB1, K8, V8),
|
||||||
{ok, FN2} = leveled_cdb:cdb_complete(CDB1),
|
{ok, FN2} = leveled_cdb:cdb_complete(CDB1),
|
||||||
{ok, CDB2} = leveled_cdb:cdb_open_reader(FN2),
|
leveled_cdb:cdb_open_reader(FN2).
|
||||||
|
|
||||||
|
check_single_file_test() ->
|
||||||
|
RP = "../test/journal",
|
||||||
|
{ok, CDB} = fetch_testcdb(RP),
|
||||||
LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}],
|
LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}],
|
||||||
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
|
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
|
||||||
case lists:keyfind(ObjSQN, 1, Srv) of
|
case lists:keyfind(ObjSQN, 1, Srv) of
|
||||||
|
@ -481,14 +499,46 @@ check_single_file_test() ->
|
||||||
_ ->
|
_ ->
|
||||||
false
|
false
|
||||||
end end,
|
end end,
|
||||||
Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 4),
|
Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 4),
|
||||||
?assertMatch(37.5, Score1),
|
?assertMatch(37.5, Score1),
|
||||||
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
|
LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> true end,
|
||||||
Score2 = check_single_file(CDB2, LedgerFun2, LedgerSrv1, 8, 4),
|
Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 8, 4),
|
||||||
?assertMatch(100.0, Score2),
|
?assertMatch(100.0, Score2),
|
||||||
Score3 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 8, 3),
|
Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 8, 3),
|
||||||
?assertMatch(37.5, Score3),
|
?assertMatch(37.5, Score3),
|
||||||
ok = leveled_cdb:cdb_destroy(CDB2).
|
ok = leveled_cdb:cdb_destroy(CDB).
|
||||||
|
|
||||||
|
compact_single_file_test() ->
|
||||||
|
RP = "../test/journal",
|
||||||
|
{ok, CDB} = fetch_testcdb(RP),
|
||||||
|
Candidate = #candidate{journal = CDB,
|
||||||
|
low_sqn = 1,
|
||||||
|
filename = "test",
|
||||||
|
compaction_perc = 37.5},
|
||||||
|
LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}],
|
||||||
|
LedgerFun1 = fun(Srv, Key, ObjSQN) ->
|
||||||
|
case lists:keyfind(ObjSQN, 1, Srv) of
|
||||||
|
{ObjSQN, Key} ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end end,
|
||||||
|
CompactFP = leveled_inker:filepath(RP, journal_compact_dir),
|
||||||
|
ok = filelib:ensure_dir(CompactFP),
|
||||||
|
R1 = compact_files([Candidate],
|
||||||
|
#cdb_options{file_path=CompactFP},
|
||||||
|
LedgerFun1,
|
||||||
|
LedgerSrv1),
|
||||||
|
{ManSlice1, PromptDelete1} = R1,
|
||||||
|
?assertMatch(true, PromptDelete1),
|
||||||
|
[{LowSQN, FN, PidR}] = ManSlice1,
|
||||||
|
io:format("FN of ~s~n", [FN]),
|
||||||
|
?assertMatch(2, LowSQN),
|
||||||
|
?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, "Key1"})),
|
||||||
|
?assertMatch(missing, leveled_cdb:cdb_get(PidR, {7, "Key1"})),
|
||||||
|
?assertMatch(missing, leveled_cdb:cdb_get(PidR, {1, "Key1"})),
|
||||||
|
{_RK1, RV1} = leveled_cdb:cdb_get(PidR, {2, "Key2"}),
|
||||||
|
?assertMatch("Value2", binary_to_term(RV1)).
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -112,6 +112,7 @@
|
||||||
build_dummy_journal/0,
|
build_dummy_journal/0,
|
||||||
simple_manifest_reader/2,
|
simple_manifest_reader/2,
|
||||||
clean_testdir/1,
|
clean_testdir/1,
|
||||||
|
filepath/2,
|
||||||
filepath/3]).
|
filepath/3]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
@ -659,11 +660,15 @@ filepath(RootPath, journal_compact_dir) ->
|
||||||
|
|
||||||
filepath(RootPath, NewSQN, new_journal) ->
|
filepath(RootPath, NewSQN, new_journal) ->
|
||||||
filename:join(filepath(RootPath, journal_dir),
|
filename:join(filepath(RootPath, journal_dir),
|
||||||
|
"nursery_"
|
||||||
|
++ integer_to_list(NewSQN)
|
||||||
|
++ "." ++ ?PENDING_FILEX);
|
||||||
|
filepath(CompactFilePath, NewSQN, compact_journal) ->
|
||||||
|
filename:join(CompactFilePath,
|
||||||
"nursery_"
|
"nursery_"
|
||||||
++ integer_to_list(NewSQN)
|
++ integer_to_list(NewSQN)
|
||||||
++ "." ++ ?PENDING_FILEX).
|
++ "." ++ ?PENDING_FILEX).
|
||||||
|
|
||||||
|
|
||||||
simple_manifest_reader(SQN, RootPath) ->
|
simple_manifest_reader(SQN, RootPath) ->
|
||||||
ManifestPath = filepath(RootPath, manifest_dir),
|
ManifestPath = filepath(RootPath, manifest_dir),
|
||||||
io:format("Opening manifest file at ~s with SQN ~w~n",
|
io:format("Opening manifest file at ~s with SQN ~w~n",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue