From 2055f8ed3f6a17844f459ca9430be40fad340b2b Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 7 Oct 2016 10:04:48 +0100 Subject: [PATCH] Add more complex snapshot test This exposed another off-by-one error on startup. This commit also includes an unsafe change to reply early from a rolling CDB file (with lots of objects writing the hash table can take too long). This is bad, but will be resolved through a refactor of the manifest writing: essentially we deferred writing of the manifest update which was an unnecessary performance optimisation. If instead we wait on this, the process is made substantially simpler, and it is safer to perform the roll of the complete CDB journal asynchronously. If the manifest update takes too long, an append-only log may be used instead. --- include/leveled.hrl | 6 +- src/leveled_bookie.erl | 21 +++- src/leveled_cdb.erl | 73 ++++++++---- src/leveled_inker.erl | 29 +++-- src/leveled_penciller.erl | 163 +++++++++++++------------- test/end_to_end/basic_SUITE.erl | 198 ++++++++++++++++++++++---------- 6 files changed, 307 insertions(+), 183 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index c04a44f..37694ed 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -29,15 +29,13 @@ root_path :: string(), cdb_options :: #cdb_options{}, start_snapshot = false :: boolean(), - source_inker :: pid(), - requestor :: pid()}). + source_inker :: pid()}). -record(penciller_options, {root_path :: string(), max_inmemory_tablesize :: integer(), start_snapshot = false :: boolean(), - source_penciller :: pid(), - requestor :: pid()}). + source_penciller :: pid()}). -record(bookie_options, {root_path :: string(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ba1a333..bafba89 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -213,6 +213,8 @@ init([Opts]) -> true -> Opts#bookie_options.cache_size end, + io:format("Bookie starting with Pcl ~w Ink ~w~n", + [Penciller, Inker]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, @@ -223,6 +225,8 @@ init([Opts]) -> {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), ok = leveled_penciller:pcl_loadsnapshot(Penciller, []), + io:format("Snapshot starting with Pcl ~w Ink ~w~n", + [Penciller, Inker]), {ok, #state{penciller=Penciller, inker=Inker, ledger_cache=LedgerCache, @@ -282,16 +286,14 @@ handle_call({head, Key}, _From, State) -> {reply, {ok, OMD}, State} end end; -handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> +handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> PCLopts = #penciller_options{start_snapshot=true, - source_penciller=State#state.penciller, - requestor=Requestor}, + source_penciller=State#state.penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, - source_inker=State#state.inker, - requestor=Requestor}, + source_inker=State#state.inker}, {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), {reply, {ok, @@ -497,12 +499,19 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; - SQN when SQN =< MaxSQN -> + SQN when SQN < MaxSQN -> %% TODO - get correct size in a more efficient manner %% Need to have compressed size Size = byte_size(term_to_binary(ValueInLedger, [compressed])), Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), {loop, {MinSQN, MaxSQN, Output ++ Changes}}; + MaxSQN -> + %% TODO - get correct size in a more efficient manner + %% Need to have compressed size + io:format("Reached end of load batch with SQN ~w~n", [SQN]), + Size = byte_size(term_to_binary(ValueInLedger, [compressed])), + Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), + {stop, {MinSQN, MaxSQN, Output ++ Changes}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", [MaxSQN, SQN]), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 31b4c53..668b1c8 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -66,6 +66,7 @@ cdb_scan/4, cdb_close/1, cdb_complete/1, + cdb_roll/1, cdb_destroy/1, cdb_deletepending/1]). @@ -137,6 +138,9 @@ cdb_close(Pid) -> cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). +cdb_roll(Pid) -> + gen_server:call(Pid, cdb_roll, infinity). + cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -197,9 +201,7 @@ handle_call({open_writer, Filename}, _From, State) -> writer=true}}; handle_call({open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), - {ok, Handle} = file:open(Filename, [binary, raw, read]), - Index = load_index(Handle), - LastKey = find_lastkey(Handle, Index), + {Handle, Index, LastKey} = open_for_readonly(Filename), {reply, ok, State#state{handle=Handle, last_key=LastKey, filename=Filename, @@ -332,29 +334,33 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=undefined}}; +handle_call(cdb_complete, _From, State=#state{writer=Writer}) + when Writer == true -> + NewName = determine_new_filename(State#state.filename), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> - case State#state.writer of - true -> - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - %% Rename file - NewName = filename:rootname(State#state.filename, ".pnd") - ++ ".cdb", - io:format("Renaming file from ~s to ~s " ++ - "for which existence is ~w~n", - [State#state.filename, NewName, - filelib:is_file(NewName)]), - ok = file:rename(State#state.filename, NewName), - {stop, normal, {ok, NewName}, State}; - false -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; - undefined -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State} - end. - + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State}; +handle_call(cdb_roll, From, State=#state{writer=Writer}) + when Writer == true -> + NewName = determine_new_filename(State#state.filename), + gen_server:reply(From, {ok, NewName}), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {Handle, Index, LastKey} = open_for_readonly(NewName), + {noreply, State#state{handle=Handle, + last_key=LastKey, + filename=NewName, + writer=false, + hash_index=Index}}. + handle_cast(destroy, State) -> ok = file:close(State#state.handle), @@ -659,6 +665,23 @@ fold_keys(Handle, FoldFun, Acc0) -> %% Internal functions %%%%%%%%%%%%%%%%%%%% +determine_new_filename(Filename) -> + filename:rootname(Filename, ".pnd") ++ ".cdb". + +rename_for_read(Filename, NewName) -> + %% Rename file + io:format("Renaming file from ~s to ~s " ++ + "for which existence is ~w~n", + [Filename, NewName, + filelib:is_file(NewName)]), + file:rename(Filename, NewName). + +open_for_readonly(Filename) -> + {ok, Handle} = file:open(Filename, [binary, raw, read]), + Index = load_index(Handle), + LastKey = find_lastkey(Handle, Index), + {Handle, Index, LastKey}. + load_index(Handle) -> Index = lists:seq(0, 255), lists:map(fun(X) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2161730..561ff64 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -139,7 +139,8 @@ cdb_options :: #cdb_options{}, clerk :: pid(), compaction_pending = false :: boolean(), - is_snapshot = false :: boolean()}). + is_snapshot = false :: boolean(), + source_inker :: pid()}). %%%============================================================================ @@ -161,6 +162,9 @@ ink_fetch(Pid, PrimaryKey, SQN) -> ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). +ink_releasesnapshot(Pid, Snapshot) -> + gen_server:call(Pid, {release_snapshot, Snapshot}, infinity). + ink_close(Pid) -> gen_server:call(Pid, {close, false}, infinity). @@ -217,13 +221,13 @@ init([InkerOpts]) -> InkerOpts#inker_options.start_snapshot} of {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, - Requestor = InkerOpts#inker_options.requestor, {Manifest, ActiveJournalDB, - ActiveJournalSQN} = ink_registersnapshot(SrcInker, Requestor), + ActiveJournalSQN} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, active_journaldb_sqn=ActiveJournalSQN, + source_inker=SrcInker, is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -276,10 +280,17 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], + io:format("Inker snapshot ~w registered at SQN ~w~n", + [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, State#state.active_journaldb, State#state.active_journaldb_sqn}, State#state{registered_snapshots=Rs}}; +handle_call({release_snapshot, Snapshot}, _From , State) -> + Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), + io:format("Snapshot ~w released~n", [Snapshot]), + io:format("Remaining snapshots are ~w~n", [Rs]), + {reply, ok, State#state{registered_snapshots=Rs}}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; handle_call({update_manifest, @@ -344,7 +355,7 @@ handle_info(_Info, State) -> terminate(Reason, State) -> case State#state.is_snapshot of true -> - ok; + ok = ink_releasesnapshot(State#state.source_inker, self()); false -> io:format("Inker closing journal for reason ~w~n", [Reason]), io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", @@ -444,16 +455,16 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> end end. -roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> +roll_active_file(ActiveJournal, Manifest, ManifestSQN, RootPath) -> SW = os:timestamp(), - io:format("Rolling old journal ~w~n", [OldActiveJournal]), - {ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), + io:format("Rolling old journal ~w~n", [ActiveJournal]), + {ok, NewFilename} = leveled_cdb:cdb_roll(ActiveJournal), JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], JournalRegex2, 'SQN'), - NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}), + NewManifest = add_to_manifest(Manifest, + {JournalSQN, NewFilename, ActiveJournal}), NewManifestSQN = ManifestSQN + 1, ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), io:format("Rolling old journal completed in ~w microseconds~n", diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 3402aa8..26b70e8 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -491,42 +491,40 @@ handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap}) when Snap == false -> {ok, UpdState} = commit_manifest_change(WI, State), {reply, ok, UpdState}; -handle_call({check_sqn, Key, SQN}, _From, State) -> - Obj = if - State#state.is_snapshot == true -> - fetch_snap(Key, +handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + {reply, + fetch(Key, + State#state.manifest, + State#state.memtable), + State}; +handle_call({check_sqn, Key, SQN}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + {reply, + compare_to_sqn(fetch(Key, State#state.manifest, - State#state.levelzero_snapshot); - true -> - fetch(Key, - State#state.manifest, - State#state.memtable) - end, - Reply = case Obj of - not_present -> - false; - Obj -> - SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), - if - SQNToCompare > SQN -> - false; - true -> - true - end - end, - {reply, Reply, State}; -handle_call({fetch, Key}, _From, State) -> - Reply = if - State#state.is_snapshot == true -> - fetch_snap(Key, - State#state.manifest, - State#state.levelzero_snapshot); - true -> - fetch(Key, - State#state.manifest, - State#state.memtable) - end, - {reply, Reply, State}; + State#state.memtable), + SQN), + State}; +handle_call({fetch, Key}, + _From, + State=#state{snapshot_fully_loaded=Ready}) + when Ready == true -> + {reply, + fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot), + State}; +handle_call({check_sqn, Key, SQN}, + _From, + State=#state{snapshot_fully_loaded=Ready}) + when Ready == true -> + {reply, + compare_to_sqn(fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot), + SQN), + State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), {reply, {Work, UpdState#state.backlog}, UpdState}; @@ -568,6 +566,8 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. +terminate(_Reason, _State=#state{is_snapshot=Snap}) when Snap == true -> + ok; terminate(_Reason, State) -> %% When a Penciller shuts down it isn't safe to try an manage the safe %% finishing of any outstanding work. The last commmitted manifest will @@ -585,46 +585,41 @@ terminate(_Reason, State) -> %% The cast may not succeed as the clerk could be synchronously calling %% the penciller looking for a manifest commit %% - if - State#state.is_snapshot == true -> - ok; - true -> - leveled_pclerk:clerk_stop(State#state.clerk), - Dump = ets:tab2list(State#state.memtable), - case {State#state.levelzero_pending, - get_item(0, State#state.manifest, []), length(Dump)} of - {?L0PEND_RESET, [], L} when L > 0 -> - MSN = State#state.manifest_sqn + 1, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", - Dump, - [], - 0), - {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, - io:format("Dump of memory on close to filename ~s~n", - [FileName]), - leveled_sft:sft_close(L0Pid), - file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {?L0PEND_RESET, [], L} when L == 0 -> - io:format("No keys to dump from memory when closing~n"); - {{true, L0Pid, _TS}, _, _} -> - leveled_sft:sft_close(L0Pid), - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]); - _ -> - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]) - end, - ok = close_files(0, State#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> - leveled_sft:sft_close(Pid) end, - State#state.unreferenced_files), - ok - end. + leveled_pclerk:clerk_stop(State#state.clerk), + Dump = ets:tab2list(State#state.memtable), + case {State#state.levelzero_pending, + get_item(0, State#state.manifest, []), length(Dump)} of + {?L0PEND_RESET, [], L} when L > 0 -> + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", + Dump, + [], + 0), + {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, + io:format("Dump of memory on close to filename ~s~n", + [FileName]), + leveled_sft:sft_close(L0Pid), + file:rename(FileName ++ ".pnd", FileName ++ ".sft"); + {?L0PEND_RESET, [], L} when L == 0 -> + io:format("No keys to dump from memory when closing~n"); + {{true, L0Pid, _TS}, _, _} -> + leveled_sft:sft_close(L0Pid), + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]); + _ -> + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]) + end, + ok = close_files(0, State#state.manifest), + lists:foreach(fun({_FN, Pid, _SN}) -> + leveled_sft:sft_close(Pid) end, + State#state.unreferenced_files), + ok. code_change(_OldVsn, State, _Extra) -> @@ -843,6 +838,21 @@ fetch(Key, Manifest, Level, FetchFun) -> end. +compare_to_sqn(Obj, SQN) -> + case Obj of + not_present -> + false; + Obj -> + SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), + if + SQNToCompare > SQN -> + false; + true -> + true + end + end. + + %% Manifest lock - don't have two changes to the manifest happening %% concurrently @@ -1280,8 +1290,7 @@ simple_server_test() -> ?assertMatch(R13, Key3), ?assertMatch(R14, Key4), SnapOpts = #penciller_options{start_snapshot = true, - source_penciller = PCLr, - requestor = self()}, + source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), ok = pcl_loadsnapshot(PclSnap, []), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 3173d05..9496e2f 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -5,12 +5,12 @@ -export([simple_put_fetch_head/1, many_put_fetch_head/1, journal_compaction/1, - simple_snapshot/1]). + fetchput_snapshot/1]). all() -> [simple_put_fetch_head, many_put_fetch_head, journal_compaction, - simple_snapshot]. + fetchput_snapshot]. simple_put_fetch_head(_Config) -> @@ -51,19 +51,8 @@ many_put_fetch_head(_Config) -> check_bookie_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, 100002, 120002, 140002, 160002, 180002], - CLs = lists:map(fun(KN) -> - ObjListA = generate_multiple_smallobjects(20000, KN), - StartWatchA = os:timestamp(), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) - end, - ObjListA), - Time = timer:now_diff(os:timestamp(), StartWatchA), - io:format("20,000 objects loaded in ~w seconds~n", - [Time/1000000]), - check_bookie_forobject(Bookie2, TestObject), - lists:sublist(ObjListA, 1000) end, - GenList), + CLs = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), CL1A = lists:nth(1, CLs), ChkListFixed = lists:nth(length(CLs), CLs), check_bookie_forlist(Bookie2, CL1A), @@ -85,35 +74,6 @@ many_put_fetch_head(_Config) -> ok = leveled_bookie:book_close(Bookie3), reset_filestructure(). - -check_bookie_forlist(Bookie, ChkList) -> - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList). - -check_bookie_forobject(Bookie, TestObject) -> - {ok, TestObject} = leveled_bookie:book_riakget(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), - {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), - ok = case {HeadObject#r_object.bucket, - HeadObject#r_object.key, - HeadObject#r_object.vclock} of - {B1, K1, VC1} when B1 == TestObject#r_object.bucket, - K1 == TestObject#r_object.key, - VC1 == TestObject#r_object.vclock -> - ok - end. - -check_bookie_formissingobject(Bookie, Bucket, Key) -> - not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), - not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). - journal_compaction(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, @@ -162,7 +122,7 @@ journal_compaction(_Config) -> reset_filestructure(). -simple_snapshot(_Config) -> +fetchput_snapshot(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=3000000}, {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), @@ -172,27 +132,82 @@ simple_snapshot(_Config) -> lists:foreach(fun({_RN, Obj, Spc}) -> leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, ObjList1), - SnapOpts = #bookie_options{snapshot_bookie=Bookie1}, - {ok, SnapBookie} = leveled_bookie:book_start(SnapOpts), + SnapOpts1 = #bookie_options{snapshot_bookie=Bookie1}, + {ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1), ChkList1 = lists:sublist(lists:sort(ObjList1), 100), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie1, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList1), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(SnapBookie, - Obj#r_object.bucket, - Obj#r_object.key), - io:format("Finding key ~s~n", [Obj#r_object.key]), - R = {ok, Obj} end, - ChkList1), - ok = leveled_bookie:book_close(SnapBookie), + check_bookie_forlist(Bookie1, ChkList1), + check_bookie_forlist(SnapBookie1, ChkList1), + ok = leveled_bookie:book_close(SnapBookie1), + check_bookie_forlist(Bookie1, ChkList1), ok = leveled_bookie:book_close(Bookie1), + io:format("Closed initial bookies~n"), + + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + SnapOpts2 = #bookie_options{snapshot_bookie=Bookie2}, + {ok, SnapBookie2} = leveled_bookie:book_start(SnapOpts2), + io:format("Bookies restarted~n"), + + check_bookie_forlist(Bookie2, ChkList1), + io:format("Check active bookie still contains original data~n"), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Check snapshot still contains original data~n"), + + + ObjList2 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList2), + io:format("Replacement objects put~n"), + + ChkList2 = lists:sublist(lists:sort(ObjList2), 100), + check_bookie_forlist(Bookie2, ChkList2), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Checked for replacement objects in active bookie" ++ + ", old objects in snapshot~n"), + + {ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + ObjList3 = generate_multiple_objects(15000, 5002), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList3), + ChkList3 = lists:sublist(lists:sort(ObjList3), 100), + check_bookie_forlist(Bookie2, ChkList3), + check_bookie_formissinglist(SnapBookie2, ChkList3), + GenList = [20002, 40002, 60002, 80002, 100002, 120002], + CLs2 = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), + io:format("Loaded significant numbers of new objects~n"), + + check_bookie_forlist(Bookie2, lists:nth(length(CLs2), CLs2)), + io:format("Checked active bookie has new objects~n"), + + {ok, SnapBookie3} = leveled_bookie:book_start(SnapOpts2), + check_bookie_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)), + check_bookie_formissinglist(SnapBookie2, ChkList3), + check_bookie_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)), + check_bookie_forlist(SnapBookie3, ChkList2), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Started new snapshot and check for new objects~n"), + + CLs3 = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + check_bookie_forlist(Bookie2, lists:nth(1, CLs3)), + {ok, FNsB} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + ok = leveled_bookie:book_close(SnapBookie2), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + ok = leveled_bookie:book_close(SnapBookie3), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + check_bookie_forlist(Bookie2, lists:nth(1, CLs3)), + timer:sleep(90000), + {ok, FNsC} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + true = length(FNsB) > length(FNsA), + true = length(FNsB) > length(FNsC), + ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). + reset_filestructure() -> RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"), @@ -202,6 +217,52 @@ reset_filestructure() -> RootPath. +check_bookie_forlist(Bookie, ChkList) -> + check_bookie_forlist(Bookie, ChkList, false). + +check_bookie_forlist(Bookie, ChkList, Log) -> + lists:foreach(fun({_RN, Obj, _Spc}) -> + if + Log == true -> + io:format("Fetching Key ~w~n", [Obj#r_object.key]); + true -> + ok + end, + R = leveled_bookie:book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList). + +check_bookie_formissinglist(Bookie, ChkList) -> + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), + R = not_found end, + ChkList). + +check_bookie_forobject(Bookie, TestObject) -> + {ok, TestObject} = leveled_bookie:book_riakget(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + ok = case {HeadObject#r_object.bucket, + HeadObject#r_object.key, + HeadObject#r_object.vclock} of + {B1, K1, VC1} when B1 == TestObject#r_object.bucket, + K1 == TestObject#r_object.key, + VC1 == TestObject#r_object.vclock -> + ok + end. + +check_bookie_formissingobject(Bookie, Bucket, Key) -> + not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), + not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). + + generate_testobject() -> {B1, K1, V1, Spec1, MD} = {"Bucket1", "Key1", @@ -239,4 +300,17 @@ generate_multiple_objects(Count, KeyNumber, ObjL, Value) -> Value). - \ No newline at end of file +load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> + lists:map(fun(KN) -> + ObjListA = Generator(ChunkSize, KN), + StartWatchA = os:timestamp(), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie, Obj, Spc) + end, + ObjListA), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects loaded in ~w seconds~n", + [ChunkSize, Time/1000000]), + check_bookie_forobject(Bookie, TestObject), + lists:sublist(ObjListA, 1000) end, + GenList).