Add initial end-to-end common tests

These tests highlighted some logical issues when scanning over databases
on startup, so fixes are wrapped in here.
This commit is contained in:
martinsumner 2016-10-05 09:54:53 +01:00
parent 507428bd0b
commit d903f184fd
6 changed files with 294 additions and 94 deletions

View file

@ -155,6 +155,7 @@
-define(LEDGER_FP, "ledger").
-define(SHUTDOWN_WAITS, 60).
-define(SHUTDOWN_PAUSE, 10000).
-define(SNAPSHOT_TIMEOUT, 300000).
-record(state, {inker :: pid(),
penciller :: pid(),
@ -162,7 +163,8 @@
indexspec_converter :: function(),
cache_size :: integer(),
back_pressure :: boolean(),
ledger_cache :: gb_trees:tree()}).
ledger_cache :: gb_trees:tree(),
is_snapshot :: boolean()}).
@ -202,32 +204,46 @@ book_close(Pid) ->
%%%============================================================================
init([Opts]) ->
{InkerOpts, PencillerOpts} = set_options(Opts),
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
Extractor = if
Opts#bookie_options.metadata_extractor == undefined ->
fun extract_metadata/2;
true ->
Opts#bookie_options.metadata_extractor
end,
Converter = if
Opts#bookie_options.indexspec_converter == undefined ->
fun convert_indexspecs/3;
true ->
Opts#bookie_options.indexspec_converter
end,
CacheSize = if
Opts#bookie_options.cache_size == undefined ->
?CACHE_SIZE;
true ->
Opts#bookie_options.cache_size
end,
{ok, #state{inker=Inker,
penciller=Penciller,
metadata_extractor=Extractor,
indexspec_converter=Converter,
cache_size=CacheSize,
ledger_cache=gb_trees:empty()}}.
case Opts#bookie_options.snapshot_bookie of
undefined ->
% Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts),
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
Extractor = if
Opts#bookie_options.metadata_extractor == undefined ->
fun extract_metadata/2;
true ->
Opts#bookie_options.metadata_extractor
end,
Converter = if
Opts#bookie_options.indexspec_converter == undefined ->
fun convert_indexspecs/3;
true ->
Opts#bookie_options.indexspec_converter
end,
CacheSize = if
Opts#bookie_options.cache_size == undefined ->
?CACHE_SIZE;
true ->
Opts#bookie_options.cache_size
end,
{ok, #state{inker=Inker,
penciller=Penciller,
metadata_extractor=Extractor,
indexspec_converter=Converter,
cache_size=CacheSize,
ledger_cache=gb_trees:empty(),
is_snapshot=false}};
Bookie ->
{ok,
{Penciller, LedgerCache},
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
ok = leveled_penciller:pcl_loadsnapshot(Penciller, []),
{ok, #state{penciller=Penciller,
inker=Inker,
ledger_cache=LedgerCache,
is_snapshot=true}}
end.
handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) ->
@ -289,11 +305,21 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) ->
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
case SnapType of
store ->
InkerOpts = #inker_options{},
InkerOpts = #inker_options{start_snapshot=true,
source_inker=State#state.inker,
requestor=Requestor},
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
{reply, {ok, LedgerSnapshot, JournalSnapshot}, State};
{reply,
{ok,
{LedgerSnapshot, State#state.ledger_cache},
JournalSnapshot},
State};
ledger ->
{reply, {ok, LedgerSnapshot, null}, State}
{reply,
{ok,
{LedgerSnapshot, State#state.ledger_cache},
null},
State}
end;
handle_call({compact_journal, Timeout}, _From, State) ->
ok = leveled_inker:ink_compactjournal(State#state.inker,

View file

@ -159,7 +159,7 @@ ink_fetch(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity).
ink_registersnapshot(Pid, Requestor) ->
gen_server:call(Pid, {snapshot, Requestor}, infinity).
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
ink_close(Pid) ->
gen_server:call(Pid, {close, false}, infinity).
@ -218,11 +218,12 @@ init([InkerOpts]) ->
{undefined, true} ->
SrcInker = InkerOpts#inker_options.source_inker,
Requestor = InkerOpts#inker_options.requestor,
{ok,
{ActiveJournalDB,
Manifest}} = ink_registersnapshot(SrcInker, Requestor),
{Manifest,
ActiveJournalDB,
ActiveJournalSQN} = ink_registersnapshot(SrcInker, Requestor),
{ok, #state{manifest=Manifest,
active_journaldb=ActiveJournalDB,
active_journaldb_sqn=ActiveJournalSQN,
is_snapshot=true}};
%% Need to do something about timeout
{_RootPath, false} ->
@ -276,7 +277,8 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
Rs = [{Requestor,
State#state.manifest_sqn}|State#state.registered_snapshots],
{reply, {State#state.manifest,
State#state.active_journaldb},
State#state.active_journaldb,
State#state.active_journaldb_sqn},
State#state{registered_snapshots=Rs}};
handle_call(get_manifest, _From, State) ->
{reply, State#state.manifest, State};
@ -656,33 +658,67 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
ok;
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail])
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|Rest])
when LowSQN >= MinSQN ->
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
{ok, LastMinSQN} = load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
Pid,
undefined),
load_from_sequence(LastMinSQN, FilterFun, Penciller, ManTail);
load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) ->
load_from_sequence(MinSQN, FilterFun, Penciller, ManTail).
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) ->
InitAcc = {MinSQN, MaxSQN, []},
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),
{ok, AccMinSQN};
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),
load_between_sequence(MaxSQN + 1,
MaxSQN + 1 + ?LOADING_BATCH,
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
Pid,
undefined,
FN,
Rest);
load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
case Rest of
[] ->
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
CDBpid,
LastPosition)
Pid,
undefined,
FN,
Rest);
[{NextSQN, _FN, Pid}|_Rest] when NextSQN > MinSQN ->
load_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
Pid,
undefined,
FN,
Rest);
_ ->
load_from_sequence(MinSQN, FilterFun, Penciller, Rest)
end.
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller,
CDBpid, StartPos, FN, Rest) ->
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
InitAcc = {MinSQN, MaxSQN, []},
Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),
{ok, AccMinSQN};
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
ok = push_to_penciller(Penciller, AccKL),
NextSQN = MaxSQN + 1,
load_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FilterFun,
Penciller,
CDBpid,
LastPosition,
FN,
Rest)
end,
case Res of
{ok, LMSQN} ->
load_from_sequence(LMSQN, FilterFun, Penciller, Rest);
ok ->
ok
end.
push_to_penciller(Penciller, KeyList) ->

View file

@ -117,19 +117,7 @@ merge(WI) ->
{SrcF, UpdMFest1} = select_filetomerge(SrcLevel,
WI#penciller_work.manifest),
SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []),
Splits = lists:splitwith(fun(Ref) ->
case {Ref#manifest_entry.start_key,
Ref#manifest_entry.end_key} of
{_, EK} when SrcF#manifest_entry.start_key > EK ->
false;
{SK, _} when SrcF#manifest_entry.end_key < SK ->
false;
_ ->
true
end end,
SinkFiles),
{Candidates, Others} = Splits,
{Candidates, Others} = check_for_merge_candidates(SrcF, SinkFiles),
%% TODO:
%% Need to work out if this is the top level
%% And then tell merge process to create files at the top level
@ -185,7 +173,20 @@ mark_for_delete([Head|Tail], Penciller) ->
leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller),
mark_for_delete(Tail, Penciller).
check_for_merge_candidates(SrcF, SinkFiles) ->
lists:partition(fun(Ref) ->
case {Ref#manifest_entry.start_key,
Ref#manifest_entry.end_key} of
{_, EK} when SrcF#manifest_entry.start_key > EK ->
false;
{SK, _} when SrcF#manifest_entry.end_key < SK ->
false;
_ ->
true
end end,
SinkFiles).
%% An algorithm for discovering which files to merge ....
%% We can find the most optimal file:
@ -375,6 +376,17 @@ merge_file_test() ->
leveled_sft:sft_clear(ManEntry#manifest_entry.owner) end,
Result).
select_merge_candidates_test() ->
Sink1 = #manifest_entry{start_key = {o, "Bucket", "Key1"},
end_key = {o, "Bucket", "Key20000"}},
Sink2 = #manifest_entry{start_key = {o, "Bucket", "Key20001"},
end_key = {o, "Bucket1", "Key1"}},
Src1 = #manifest_entry{start_key = {o, "Bucket", "Key40001"},
end_key = {o, "Bucket", "Key60000"}},
{Candidates, Others} = check_for_merge_candidates(Src1, [Sink1, Sink2]),
?assertMatch([Sink2], Candidates),
?assertMatch([Sink1], Others).
select_merge_file_test() ->
L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}],

View file

@ -862,10 +862,14 @@ fetch(Key, Manifest, Level, FetchFun) ->
not_present,
LevelManifest) of
not_present ->
io:format("Key ~w out of range at level ~w with manifest ~w~n",
[Key, Level, LevelManifest]),
fetch(Key, Manifest, Level + 1, FetchFun);
FileToCheck ->
case FetchFun(FileToCheck, Key) of
not_present ->
io:format("Key ~w not found checking file at level ~w~n",
[Key, Level]),
fetch(Key, Manifest, Level + 1, FetchFun);
ObjectFound ->
ObjectFound
@ -1022,6 +1026,25 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}),
open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1).
print_manifest(Manifest) ->
lists:foreach(fun(L) ->
io:format("Manifest at Level ~w~n", [L]),
Level = get_item(L, Manifest, []),
lists:foreach(fun(M) ->
{_, SB, SK} = M#manifest_entry.start_key,
{_, EB, EK} = M#manifest_entry.end_key,
io:format("Manifest entry of " ++
"startkey ~s ~s " ++
"endkey ~s ~s " ++
"filename=~s~n",
[SB, SK, EB, EK,
M#manifest_entry.filename])
end,
Level)
end,
lists:seq(1, ?MAX_LEVELS - 1)).
assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) ->
WorkQ;
assess_workqueue(WorkQ, LevelToAssess, Manifest)->
@ -1087,7 +1110,7 @@ commit_manifest_change(ReturnedWorkItem, State) ->
io:format("Merge has been commmitted at sequence number ~w~n",
[NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
%% io:format("Updated manifest is ~w~n", [NewManifest]),
print_manifest(NewManifest),
{ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN,
manifest=NewManifest,