From d903f184fd75c9541d0f196f30b0c9c2ff851310 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Wed, 5 Oct 2016 09:54:53 +0100 Subject: [PATCH] Add initial end-to-end common tests These tests highlighted some logical issues when scanning over databases on startup, so fixes are wrapped in here. --- include/leveled.hrl | 3 +- src/leveled_bookie.erl | 86 +++++++++++++------- src/leveled_inker.erl | 94 ++++++++++++++------- src/leveled_pclerk.erl | 40 +++++---- src/leveled_penciller.erl | 25 +++++- test/end_to_end/basic_SUITE.erl | 140 +++++++++++++++++++++++++++----- 6 files changed, 294 insertions(+), 94 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 0debd70..c04a44f 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -44,7 +44,8 @@ cache_size :: integer(), max_journalsize :: integer(), metadata_extractor :: function(), - indexspec_converter :: function()}). + indexspec_converter :: function(), + snapshot_bookie :: pid()}). -record(iclerk_options, {inker :: pid(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a3679d6..c11c8e0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -155,6 +155,7 @@ -define(LEDGER_FP, "ledger"). -define(SHUTDOWN_WAITS, 60). -define(SHUTDOWN_PAUSE, 10000). +-define(SNAPSHOT_TIMEOUT, 300000). -record(state, {inker :: pid(), penciller :: pid(), @@ -162,7 +163,8 @@ indexspec_converter :: function(), cache_size :: integer(), back_pressure :: boolean(), - ledger_cache :: gb_trees:tree()}). + ledger_cache :: gb_trees:tree(), + is_snapshot :: boolean()}). @@ -202,32 +204,46 @@ book_close(Pid) -> %%%============================================================================ init([Opts]) -> - {InkerOpts, PencillerOpts} = set_options(Opts), - {Inker, Penciller} = startup(InkerOpts, PencillerOpts), - Extractor = if - Opts#bookie_options.metadata_extractor == undefined -> - fun extract_metadata/2; - true -> - Opts#bookie_options.metadata_extractor - end, - Converter = if - Opts#bookie_options.indexspec_converter == undefined -> - fun convert_indexspecs/3; - true -> - Opts#bookie_options.indexspec_converter - end, - CacheSize = if - Opts#bookie_options.cache_size == undefined -> - ?CACHE_SIZE; - true -> - Opts#bookie_options.cache_size - end, - {ok, #state{inker=Inker, - penciller=Penciller, - metadata_extractor=Extractor, - indexspec_converter=Converter, - cache_size=CacheSize, - ledger_cache=gb_trees:empty()}}. + case Opts#bookie_options.snapshot_bookie of + undefined -> + % Start from file not snapshot + {InkerOpts, PencillerOpts} = set_options(Opts), + {Inker, Penciller} = startup(InkerOpts, PencillerOpts), + Extractor = if + Opts#bookie_options.metadata_extractor == undefined -> + fun extract_metadata/2; + true -> + Opts#bookie_options.metadata_extractor + end, + Converter = if + Opts#bookie_options.indexspec_converter == undefined -> + fun convert_indexspecs/3; + true -> + Opts#bookie_options.indexspec_converter + end, + CacheSize = if + Opts#bookie_options.cache_size == undefined -> + ?CACHE_SIZE; + true -> + Opts#bookie_options.cache_size + end, + {ok, #state{inker=Inker, + penciller=Penciller, + metadata_extractor=Extractor, + indexspec_converter=Converter, + cache_size=CacheSize, + ledger_cache=gb_trees:empty(), + is_snapshot=false}}; + Bookie -> + {ok, + {Penciller, LedgerCache}, + Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), + ok = leveled_penciller:pcl_loadsnapshot(Penciller, []), + {ok, #state{penciller=Penciller, + inker=Inker, + ledger_cache=LedgerCache, + is_snapshot=true}} + end. handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> @@ -289,11 +305,21 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), case SnapType of store -> - InkerOpts = #inker_options{}, + InkerOpts = #inker_options{start_snapshot=true, + source_inker=State#state.inker, + requestor=Requestor}, {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), - {reply, {ok, LedgerSnapshot, JournalSnapshot}, State}; + {reply, + {ok, + {LedgerSnapshot, State#state.ledger_cache}, + JournalSnapshot}, + State}; ledger -> - {reply, {ok, LedgerSnapshot, null}, State} + {reply, + {ok, + {LedgerSnapshot, State#state.ledger_cache}, + null}, + State} end; handle_call({compact_journal, Timeout}, _From, State) -> ok = leveled_inker:ink_compactjournal(State#state.inker, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 3fab0fa..32cf340 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -159,7 +159,7 @@ ink_fetch(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). ink_registersnapshot(Pid, Requestor) -> - gen_server:call(Pid, {snapshot, Requestor}, infinity). + gen_server:call(Pid, {register_snapshot, Requestor}, infinity). ink_close(Pid) -> gen_server:call(Pid, {close, false}, infinity). @@ -218,11 +218,12 @@ init([InkerOpts]) -> {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, Requestor = InkerOpts#inker_options.requestor, - {ok, - {ActiveJournalDB, - Manifest}} = ink_registersnapshot(SrcInker, Requestor), + {Manifest, + ActiveJournalDB, + ActiveJournalSQN} = ink_registersnapshot(SrcInker, Requestor), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, + active_journaldb_sqn=ActiveJournalSQN, is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -276,7 +277,8 @@ handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {State#state.manifest, - State#state.active_journaldb}, + State#state.active_journaldb, + State#state.active_journaldb_sqn}, State#state{registered_snapshots=Rs}}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; @@ -656,33 +658,67 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> ok; -load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail]) +load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|Rest]) when LowSQN >= MinSQN -> - io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), - {ok, LastMinSQN} = load_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FilterFun, - Penciller, - Pid, - undefined), - load_from_sequence(LastMinSQN, FilterFun, Penciller, ManTail); -load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) -> - load_from_sequence(MinSQN, FilterFun, Penciller, ManTail). - -load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) -> - InitAcc = {MinSQN, MaxSQN, []}, - case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), - {ok, AccMinSQN}; - {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> - ok = push_to_penciller(Penciller, AccKL), - load_between_sequence(MaxSQN + 1, - MaxSQN + 1 + ?LOADING_BATCH, + load_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FilterFun, + Penciller, + Pid, + undefined, + FN, + Rest); +load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> + case Rest of + [] -> + load_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, FilterFun, Penciller, - CDBpid, - LastPosition) + Pid, + undefined, + FN, + Rest); + [{NextSQN, _FN, Pid}|_Rest] when NextSQN > MinSQN -> + load_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FilterFun, + Penciller, + Pid, + undefined, + FN, + Rest); + _ -> + load_from_sequence(MinSQN, FilterFun, Penciller, Rest) + end. + + + +load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, + CDBpid, StartPos, FN, Rest) -> + io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), + InitAcc = {MinSQN, MaxSQN, []}, + Res = case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of + {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> + ok = push_to_penciller(Penciller, AccKL), + {ok, AccMinSQN}; + {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> + ok = push_to_penciller(Penciller, AccKL), + NextSQN = MaxSQN + 1, + load_between_sequence(NextSQN, + NextSQN + ?LOADING_BATCH, + FilterFun, + Penciller, + CDBpid, + LastPosition, + FN, + Rest) + end, + case Res of + {ok, LMSQN} -> + load_from_sequence(LMSQN, FilterFun, Penciller, Rest); + ok -> + ok end. push_to_penciller(Penciller, KeyList) -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index c54ba01..47467a3 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -117,19 +117,7 @@ merge(WI) -> {SrcF, UpdMFest1} = select_filetomerge(SrcLevel, WI#penciller_work.manifest), SinkFiles = get_item(SrcLevel + 1, UpdMFest1, []), - Splits = lists:splitwith(fun(Ref) -> - case {Ref#manifest_entry.start_key, - Ref#manifest_entry.end_key} of - {_, EK} when SrcF#manifest_entry.start_key > EK -> - false; - {SK, _} when SrcF#manifest_entry.end_key < SK -> - false; - _ -> - true - end end, - SinkFiles), - {Candidates, Others} = Splits, - + {Candidates, Others} = check_for_merge_candidates(SrcF, SinkFiles), %% TODO: %% Need to work out if this is the top level %% And then tell merge process to create files at the top level @@ -185,7 +173,20 @@ mark_for_delete([Head|Tail], Penciller) -> leveled_sft:sft_setfordelete(Head#manifest_entry.owner, Penciller), mark_for_delete(Tail, Penciller). - + +check_for_merge_candidates(SrcF, SinkFiles) -> + lists:partition(fun(Ref) -> + case {Ref#manifest_entry.start_key, + Ref#manifest_entry.end_key} of + {_, EK} when SrcF#manifest_entry.start_key > EK -> + false; + {SK, _} when SrcF#manifest_entry.end_key < SK -> + false; + _ -> + true + end end, + SinkFiles). + %% An algorithm for discovering which files to merge .... %% We can find the most optimal file: @@ -375,6 +376,17 @@ merge_file_test() -> leveled_sft:sft_clear(ManEntry#manifest_entry.owner) end, Result). +select_merge_candidates_test() -> + Sink1 = #manifest_entry{start_key = {o, "Bucket", "Key1"}, + end_key = {o, "Bucket", "Key20000"}}, + Sink2 = #manifest_entry{start_key = {o, "Bucket", "Key20001"}, + end_key = {o, "Bucket1", "Key1"}}, + Src1 = #manifest_entry{start_key = {o, "Bucket", "Key40001"}, + end_key = {o, "Bucket", "Key60000"}}, + {Candidates, Others} = check_for_merge_candidates(Src1, [Sink1, Sink2]), + ?assertMatch([Sink2], Candidates), + ?assertMatch([Sink1], Others). + select_merge_file_test() -> L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a1b1249..84745ad 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -862,10 +862,14 @@ fetch(Key, Manifest, Level, FetchFun) -> not_present, LevelManifest) of not_present -> + io:format("Key ~w out of range at level ~w with manifest ~w~n", + [Key, Level, LevelManifest]), fetch(Key, Manifest, Level + 1, FetchFun); FileToCheck -> case FetchFun(FileToCheck, Key) of not_present -> + io:format("Key ~w not found checking file at level ~w~n", + [Key, Level]), fetch(Key, Manifest, Level + 1, FetchFun); ObjectFound -> ObjectFound @@ -1022,6 +1026,25 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) -> UpdManifest = lists:keystore(Level, 1, Manifest, {Level, LvlFL}), open_all_filesinmanifest({UpdManifest, max(TopSQN, LvlSQN)}, Level + 1). +print_manifest(Manifest) -> + lists:foreach(fun(L) -> + io:format("Manifest at Level ~w~n", [L]), + Level = get_item(L, Manifest, []), + lists:foreach(fun(M) -> + {_, SB, SK} = M#manifest_entry.start_key, + {_, EB, EK} = M#manifest_entry.end_key, + io:format("Manifest entry of " ++ + "startkey ~s ~s " ++ + "endkey ~s ~s " ++ + "filename=~s~n", + [SB, SK, EB, EK, + M#manifest_entry.filename]) + end, + Level) + end, + lists:seq(1, ?MAX_LEVELS - 1)). + + assess_workqueue(WorkQ, ?MAX_LEVELS - 1, _Manifest) -> WorkQ; assess_workqueue(WorkQ, LevelToAssess, Manifest)-> @@ -1087,7 +1110,7 @@ commit_manifest_change(ReturnedWorkItem, State) -> io:format("Merge has been commmitted at sequence number ~w~n", [NewMSN]), NewManifest = ReturnedWorkItem#penciller_work.new_manifest, - %% io:format("Updated manifest is ~w~n", [NewManifest]), + print_manifest(NewManifest), {ok, State#state{ongoing_work=[], manifest_sqn=NewMSN, manifest=NewManifest, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 602514c..5e73ac9 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -2,44 +2,112 @@ -include_lib("common_test/include/ct.hrl"). -include("../include/leveled.hrl"). -export([all/0]). --export([simple_put_fetch/1, - journal_compaction/1]). +-export([simple_put_fetch_head/1, + many_put_fetch_head/1, + journal_compaction/1, + simple_snapshot/1]). -all() -> [journal_compaction, simple_put_fetch]. +all() -> [simple_put_fetch_head, + many_put_fetch_head, + journal_compaction, + simple_snapshot]. -simple_put_fetch(_Config) -> +simple_put_fetch_head(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath}, {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = generate_testobject(), ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, - TestObject#r_object.bucket, - TestObject#r_object.key), + check_bookie_forobject(Bookie1, TestObject), ok = leveled_bookie:book_close(Bookie1), StartOpts2 = #bookie_options{root_path=RootPath, max_journalsize=3000000}, {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, - TestObject#r_object.bucket, - TestObject#r_object.key), + check_bookie_forobject(Bookie2, TestObject), ObjList1 = generate_multiple_objects(5000, 2), lists:foreach(fun({_RN, Obj, Spc}) -> leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, ObjList1), ChkList1 = lists:sublist(lists:sort(ObjList1), 100), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie2, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList1), - {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, - TestObject#r_object.bucket, - TestObject#r_object.key), + check_bookie_forlist(Bookie2, ChkList1), + check_bookie_forobject(Bookie2, TestObject), ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). +many_put_fetch_head(_Config) -> + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + check_bookie_forobject(Bookie1, TestObject), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = #bookie_options{root_path=RootPath, + max_journalsize=1000000000}, + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + check_bookie_forobject(Bookie2, TestObject), + GenList = [2, 20002, 40002, 60002, 80002, + 100002, 120002, 140002, 160002, 180002], + CLs = lists:map(fun(KN) -> + ObjListA = generate_multiple_smallobjects(20000, KN), + StartWatchA = os:timestamp(), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) + end, + ObjListA), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("20,000 objects loaded in ~w seconds~n", + [Time/1000000]), + check_bookie_forobject(Bookie2, TestObject), + lists:sublist(ObjListA, 1000) end, + GenList), + CL1A = lists:nth(1, CLs), + ChkListFixed = lists:nth(length(CLs), CLs), + check_bookie_forlist(Bookie2, CL1A), + ObjList2A = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList2A), + ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), + check_bookie_forlist(Bookie2, ChkList2A), + check_bookie_forlist(Bookie2, ChkListFixed), + check_bookie_forobject(Bookie2, TestObject), + check_bookie_forlist(Bookie2, ChkList2A), + check_bookie_forlist(Bookie2, ChkListFixed), + check_bookie_forobject(Bookie2, TestObject), + ok = leveled_bookie:book_close(Bookie2), + {ok, Bookie3} = leveled_bookie:book_start(StartOpts2), + check_bookie_forlist(Bookie3, ChkList2A), + check_bookie_forobject(Bookie3, TestObject), + ok = leveled_bookie:book_close(Bookie3), + reset_filestructure(). + + +check_bookie_forlist(Bookie, ChkList) -> + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), + io:format("Checking key ~s~n", [Obj#r_object.key]), + R = {ok, Obj} end, + ChkList). + +check_bookie_forobject(Bookie, TestObject) -> + {ok, TestObject} = leveled_bookie:book_riakget(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + ok = case {HeadObject#r_object.bucket, + HeadObject#r_object.key, + HeadObject#r_object.vclock} of + {B1, K1, VC1} when B1 == TestObject#r_object.bucket, + K1 == TestObject#r_object.key, + VC1 == TestObject#r_object.vclock -> + ok + end. + journal_compaction(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, @@ -93,6 +161,37 @@ journal_compaction(_Config) -> reset_filestructure(). +simple_snapshot(_Config) -> + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=3000000}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + ObjList1 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + ObjList1), + SnapOpts = #bookie_options{snapshot_bookie=Bookie1}, + {ok, SnapBookie} = leveled_bookie:book_start(SnapOpts), + ChkList1 = lists:sublist(lists:sort(ObjList1), 100), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie1, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList1), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(SnapBookie, + Obj#r_object.bucket, + Obj#r_object.key), + io:format("Finding key ~s~n", [Obj#r_object.key]), + R = {ok, Obj} end, + ChkList1), + ok = leveled_bookie:book_close(SnapBookie), + ok = leveled_bookie:book_close(Bookie1), + reset_filestructure(). + + reset_filestructure() -> RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"), @@ -111,6 +210,9 @@ generate_testobject() -> {#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, Spec1}. +generate_multiple_smallobjects(Count, KeyNumber) -> + generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(512)). + generate_multiple_objects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(4096)).