diff --git a/include/leveled.hrl b/include/leveled.hrl index c04a44f..37694ed 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -29,15 +29,13 @@ root_path :: string(), cdb_options :: #cdb_options{}, start_snapshot = false :: boolean(), - source_inker :: pid(), - requestor :: pid()}). + source_inker :: pid()}). -record(penciller_options, {root_path :: string(), max_inmemory_tablesize :: integer(), start_snapshot = false :: boolean(), - source_penciller :: pid(), - requestor :: pid()}). + source_penciller :: pid()}). -record(bookie_options, {root_path :: string(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ba1a333..bafba89 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -213,6 +213,8 @@ init([Opts]) -> true -> Opts#bookie_options.cache_size end, + io:format("Bookie starting with Pcl ~w Ink ~w~n", + [Penciller, Inker]), {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, @@ -223,6 +225,8 @@ init([Opts]) -> {Penciller, LedgerCache}, Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), ok = leveled_penciller:pcl_loadsnapshot(Penciller, []), + io:format("Snapshot starting with Pcl ~w Ink ~w~n", + [Penciller, Inker]), {ok, #state{penciller=Penciller, inker=Inker, ledger_cache=LedgerCache, @@ -282,16 +286,14 @@ handle_call({head, Key}, _From, State) -> {reply, {ok, OMD}, State} end end; -handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> +handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> PCLopts = #penciller_options{start_snapshot=true, - source_penciller=State#state.penciller, - requestor=Requestor}, + source_penciller=State#state.penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), case SnapType of store -> InkerOpts = #inker_options{start_snapshot=true, - source_inker=State#state.inker, - requestor=Requestor}, + source_inker=State#state.inker}, {ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts), {reply, {ok, @@ -497,12 +499,19 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; - SQN when SQN =< MaxSQN -> + SQN when SQN < MaxSQN -> %% TODO - get correct size in a more efficient manner %% Need to have compressed size Size = byte_size(term_to_binary(ValueInLedger, [compressed])), Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), {loop, {MinSQN, MaxSQN, Output ++ Changes}}; + MaxSQN -> + %% TODO - get correct size in a more efficient manner + %% Need to have compressed size + io:format("Reached end of load batch with SQN ~w~n", [SQN]), + Size = byte_size(term_to_binary(ValueInLedger, [compressed])), + Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), + {stop, {MinSQN, MaxSQN, Output ++ Changes}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", [MaxSQN, SQN]), diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 31b4c53..668b1c8 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -66,6 +66,7 @@ cdb_scan/4, cdb_close/1, cdb_complete/1, + cdb_roll/1, cdb_destroy/1, cdb_deletepending/1]). @@ -137,6 +138,9 @@ cdb_close(Pid) -> cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). +cdb_roll(Pid) -> + gen_server:call(Pid, cdb_roll, infinity). + cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -197,9 +201,7 @@ handle_call({open_writer, Filename}, _From, State) -> writer=true}}; handle_call({open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), - {ok, Handle} = file:open(Filename, [binary, raw, read]), - Index = load_index(Handle), - LastKey = find_lastkey(Handle, Index), + {Handle, Index, LastKey} = open_for_readonly(Filename), {reply, ok, State#state{handle=Handle, last_key=LastKey, filename=Filename, @@ -332,29 +334,33 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=undefined}}; +handle_call(cdb_complete, _From, State=#state{writer=Writer}) + when Writer == true -> + NewName = determine_new_filename(State#state.filename), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> - case State#state.writer of - true -> - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - %% Rename file - NewName = filename:rootname(State#state.filename, ".pnd") - ++ ".cdb", - io:format("Renaming file from ~s to ~s " ++ - "for which existence is ~w~n", - [State#state.filename, NewName, - filelib:is_file(NewName)]), - ok = file:rename(State#state.filename, NewName), - {stop, normal, {ok, NewName}, State}; - false -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; - undefined -> - ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State} - end. - + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State}; +handle_call(cdb_roll, From, State=#state{writer=Writer}) + when Writer == true -> + NewName = determine_new_filename(State#state.filename), + gen_server:reply(From, {ok, NewName}), + ok = close_file(State#state.handle, + State#state.hashtree, + State#state.last_position), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {Handle, Index, LastKey} = open_for_readonly(NewName), + {noreply, State#state{handle=Handle, + last_key=LastKey, + filename=NewName, + writer=false, + hash_index=Index}}. + handle_cast(destroy, State) -> ok = file:close(State#state.handle), @@ -659,6 +665,23 @@ fold_keys(Handle, FoldFun, Acc0) -> %% Internal functions %%%%%%%%%%%%%%%%%%%% +determine_new_filename(Filename) -> + filename:rootname(Filename, ".pnd") ++ ".cdb". + +rename_for_read(Filename, NewName) -> + %% Rename file + io:format("Renaming file from ~s to ~s " ++ + "for which existence is ~w~n", + [Filename, NewName, + filelib:is_file(NewName)]), + file:rename(Filename, NewName). + +open_for_readonly(Filename) -> + {ok, Handle} = file:open(Filename, [binary, raw, read]), + Index = load_index(Handle), + LastKey = find_lastkey(Handle, Index), + {Handle, Index, LastKey}. + load_index(Handle) -> Index = lists:seq(0, 255), lists:map(fun(X) -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 2161730..561ff64 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -139,7 +139,8 @@ cdb_options :: #cdb_options{}, clerk :: pid(), compaction_pending = false :: boolean(), - is_snapshot = false :: boolean()}). + is_snapshot = false :: boolean(), + source_inker :: pid()}). %%%============================================================================ @@ -161,6 +162,9 @@ ink_fetch(Pid, PrimaryKey, SQN) -> ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {register_snapshot, Requestor}, infinity). +ink_releasesnapshot(Pid, Snapshot) -> + gen_server:call(Pid, {release_snapshot, Snapshot}, infinity). + ink_close(Pid) -> gen_server:call(Pid, {close, false}, infinity). @@ -217,13 +221,13 @@ init([InkerOpts]) -> InkerOpts#inker_options.start_snapshot} of {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, - Requestor = InkerOpts#inker_options.requestor, {Manifest, ActiveJournalDB, - ActiveJournalSQN} = ink_registersnapshot(SrcInker, Requestor), + ActiveJournalSQN} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, active_journaldb_sqn=ActiveJournalSQN, + source_inker=SrcInker, is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> @@ -276,10 +280,17 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> handle_call({register_snapshot, Requestor}, _From , State) -> Rs = [{Requestor, State#state.manifest_sqn}|State#state.registered_snapshots], + io:format("Inker snapshot ~w registered at SQN ~w~n", + [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, State#state.active_journaldb, State#state.active_journaldb_sqn}, State#state{registered_snapshots=Rs}}; +handle_call({release_snapshot, Snapshot}, _From , State) -> + Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), + io:format("Snapshot ~w released~n", [Snapshot]), + io:format("Remaining snapshots are ~w~n", [Rs]), + {reply, ok, State#state{registered_snapshots=Rs}}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; handle_call({update_manifest, @@ -344,7 +355,7 @@ handle_info(_Info, State) -> terminate(Reason, State) -> case State#state.is_snapshot of true -> - ok; + ok = ink_releasesnapshot(State#state.source_inker, self()); false -> io:format("Inker closing journal for reason ~w~n", [Reason]), io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", @@ -444,16 +455,16 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> end end. -roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> +roll_active_file(ActiveJournal, Manifest, ManifestSQN, RootPath) -> SW = os:timestamp(), - io:format("Rolling old journal ~w~n", [OldActiveJournal]), - {ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), + io:format("Rolling old journal ~w~n", [ActiveJournal]), + {ok, NewFilename} = leveled_cdb:cdb_roll(ActiveJournal), JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], JournalRegex2, 'SQN'), - NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}), + NewManifest = add_to_manifest(Manifest, + {JournalSQN, NewFilename, ActiveJournal}), NewManifestSQN = ManifestSQN + 1, ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), io:format("Rolling old journal completed in ~w microseconds~n", diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 3402aa8..26b70e8 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -491,42 +491,40 @@ handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap}) when Snap == false -> {ok, UpdState} = commit_manifest_change(WI, State), {reply, ok, UpdState}; -handle_call({check_sqn, Key, SQN}, _From, State) -> - Obj = if - State#state.is_snapshot == true -> - fetch_snap(Key, +handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + {reply, + fetch(Key, + State#state.manifest, + State#state.memtable), + State}; +handle_call({check_sqn, Key, SQN}, _From, State=#state{is_snapshot=Snap}) + when Snap == false -> + {reply, + compare_to_sqn(fetch(Key, State#state.manifest, - State#state.levelzero_snapshot); - true -> - fetch(Key, - State#state.manifest, - State#state.memtable) - end, - Reply = case Obj of - not_present -> - false; - Obj -> - SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), - if - SQNToCompare > SQN -> - false; - true -> - true - end - end, - {reply, Reply, State}; -handle_call({fetch, Key}, _From, State) -> - Reply = if - State#state.is_snapshot == true -> - fetch_snap(Key, - State#state.manifest, - State#state.levelzero_snapshot); - true -> - fetch(Key, - State#state.manifest, - State#state.memtable) - end, - {reply, Reply, State}; + State#state.memtable), + SQN), + State}; +handle_call({fetch, Key}, + _From, + State=#state{snapshot_fully_loaded=Ready}) + when Ready == true -> + {reply, + fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot), + State}; +handle_call({check_sqn, Key, SQN}, + _From, + State=#state{snapshot_fully_loaded=Ready}) + when Ready == true -> + {reply, + compare_to_sqn(fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot), + SQN), + State}; handle_call(work_for_clerk, From, State) -> {UpdState, Work} = return_work(State, From), {reply, {Work, UpdState#state.backlog}, UpdState}; @@ -568,6 +566,8 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. +terminate(_Reason, _State=#state{is_snapshot=Snap}) when Snap == true -> + ok; terminate(_Reason, State) -> %% When a Penciller shuts down it isn't safe to try an manage the safe %% finishing of any outstanding work. The last commmitted manifest will @@ -585,46 +585,41 @@ terminate(_Reason, State) -> %% The cast may not succeed as the clerk could be synchronously calling %% the penciller looking for a manifest commit %% - if - State#state.is_snapshot == true -> - ok; - true -> - leveled_pclerk:clerk_stop(State#state.clerk), - Dump = ets:tab2list(State#state.memtable), - case {State#state.levelzero_pending, - get_item(0, State#state.manifest, []), length(Dump)} of - {?L0PEND_RESET, [], L} when L > 0 -> - MSN = State#state.manifest_sqn + 1, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", - Dump, - [], - 0), - {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, - io:format("Dump of memory on close to filename ~s~n", - [FileName]), - leveled_sft:sft_close(L0Pid), - file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {?L0PEND_RESET, [], L} when L == 0 -> - io:format("No keys to dump from memory when closing~n"); - {{true, L0Pid, _TS}, _, _} -> - leveled_sft:sft_close(L0Pid), - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]); - _ -> - io:format("No opportunity to persist memory before closing" - ++ " with ~w keys discarded~n", - [length(Dump)]) - end, - ok = close_files(0, State#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> - leveled_sft:sft_close(Pid) end, - State#state.unreferenced_files), - ok - end. + leveled_pclerk:clerk_stop(State#state.clerk), + Dump = ets:tab2list(State#state.memtable), + case {State#state.levelzero_pending, + get_item(0, State#state.manifest, []), length(Dump)} of + {?L0PEND_RESET, [], L} when L > 0 -> + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", + Dump, + [], + 0), + {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, + io:format("Dump of memory on close to filename ~s~n", + [FileName]), + leveled_sft:sft_close(L0Pid), + file:rename(FileName ++ ".pnd", FileName ++ ".sft"); + {?L0PEND_RESET, [], L} when L == 0 -> + io:format("No keys to dump from memory when closing~n"); + {{true, L0Pid, _TS}, _, _} -> + leveled_sft:sft_close(L0Pid), + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]); + _ -> + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]) + end, + ok = close_files(0, State#state.manifest), + lists:foreach(fun({_FN, Pid, _SN}) -> + leveled_sft:sft_close(Pid) end, + State#state.unreferenced_files), + ok. code_change(_OldVsn, State, _Extra) -> @@ -843,6 +838,21 @@ fetch(Key, Manifest, Level, FetchFun) -> end. +compare_to_sqn(Obj, SQN) -> + case Obj of + not_present -> + false; + Obj -> + SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), + if + SQNToCompare > SQN -> + false; + true -> + true + end + end. + + %% Manifest lock - don't have two changes to the manifest happening %% concurrently @@ -1280,8 +1290,7 @@ simple_server_test() -> ?assertMatch(R13, Key3), ?assertMatch(R14, Key4), SnapOpts = #penciller_options{start_snapshot = true, - source_penciller = PCLr, - requestor = self()}, + source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), ok = pcl_loadsnapshot(PclSnap, []), ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 3173d05..9496e2f 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -5,12 +5,12 @@ -export([simple_put_fetch_head/1, many_put_fetch_head/1, journal_compaction/1, - simple_snapshot/1]). + fetchput_snapshot/1]). all() -> [simple_put_fetch_head, many_put_fetch_head, journal_compaction, - simple_snapshot]. + fetchput_snapshot]. simple_put_fetch_head(_Config) -> @@ -51,19 +51,8 @@ many_put_fetch_head(_Config) -> check_bookie_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002, 100002, 120002, 140002, 160002, 180002], - CLs = lists:map(fun(KN) -> - ObjListA = generate_multiple_smallobjects(20000, KN), - StartWatchA = os:timestamp(), - lists:foreach(fun({_RN, Obj, Spc}) -> - leveled_bookie:book_riakput(Bookie2, Obj, Spc) - end, - ObjListA), - Time = timer:now_diff(os:timestamp(), StartWatchA), - io:format("20,000 objects loaded in ~w seconds~n", - [Time/1000000]), - check_bookie_forobject(Bookie2, TestObject), - lists:sublist(ObjListA, 1000) end, - GenList), + CLs = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), CL1A = lists:nth(1, CLs), ChkListFixed = lists:nth(length(CLs), CLs), check_bookie_forlist(Bookie2, CL1A), @@ -85,35 +74,6 @@ many_put_fetch_head(_Config) -> ok = leveled_bookie:book_close(Bookie3), reset_filestructure(). - -check_bookie_forlist(Bookie, ChkList) -> - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList). - -check_bookie_forobject(Bookie, TestObject) -> - {ok, TestObject} = leveled_bookie:book_riakget(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), - {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, - TestObject#r_object.bucket, - TestObject#r_object.key), - ok = case {HeadObject#r_object.bucket, - HeadObject#r_object.key, - HeadObject#r_object.vclock} of - {B1, K1, VC1} when B1 == TestObject#r_object.bucket, - K1 == TestObject#r_object.key, - VC1 == TestObject#r_object.vclock -> - ok - end. - -check_bookie_formissingobject(Bookie, Bucket, Key) -> - not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), - not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). - journal_compaction(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, @@ -162,7 +122,7 @@ journal_compaction(_Config) -> reset_filestructure(). -simple_snapshot(_Config) -> +fetchput_snapshot(_Config) -> RootPath = reset_filestructure(), StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=3000000}, {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), @@ -172,27 +132,82 @@ simple_snapshot(_Config) -> lists:foreach(fun({_RN, Obj, Spc}) -> leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, ObjList1), - SnapOpts = #bookie_options{snapshot_bookie=Bookie1}, - {ok, SnapBookie} = leveled_bookie:book_start(SnapOpts), + SnapOpts1 = #bookie_options{snapshot_bookie=Bookie1}, + {ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1), ChkList1 = lists:sublist(lists:sort(ObjList1), 100), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(Bookie1, - Obj#r_object.bucket, - Obj#r_object.key), - R = {ok, Obj} end, - ChkList1), - lists:foreach(fun({_RN, Obj, _Spc}) -> - R = leveled_bookie:book_riakget(SnapBookie, - Obj#r_object.bucket, - Obj#r_object.key), - io:format("Finding key ~s~n", [Obj#r_object.key]), - R = {ok, Obj} end, - ChkList1), - ok = leveled_bookie:book_close(SnapBookie), + check_bookie_forlist(Bookie1, ChkList1), + check_bookie_forlist(SnapBookie1, ChkList1), + ok = leveled_bookie:book_close(SnapBookie1), + check_bookie_forlist(Bookie1, ChkList1), ok = leveled_bookie:book_close(Bookie1), + io:format("Closed initial bookies~n"), + + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + SnapOpts2 = #bookie_options{snapshot_bookie=Bookie2}, + {ok, SnapBookie2} = leveled_bookie:book_start(SnapOpts2), + io:format("Bookies restarted~n"), + + check_bookie_forlist(Bookie2, ChkList1), + io:format("Check active bookie still contains original data~n"), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Check snapshot still contains original data~n"), + + + ObjList2 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList2), + io:format("Replacement objects put~n"), + + ChkList2 = lists:sublist(lists:sort(ObjList2), 100), + check_bookie_forlist(Bookie2, ChkList2), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Checked for replacement objects in active bookie" ++ + ", old objects in snapshot~n"), + + {ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + ObjList3 = generate_multiple_objects(15000, 5002), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList3), + ChkList3 = lists:sublist(lists:sort(ObjList3), 100), + check_bookie_forlist(Bookie2, ChkList3), + check_bookie_formissinglist(SnapBookie2, ChkList3), + GenList = [20002, 40002, 60002, 80002, 100002, 120002], + CLs2 = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), + io:format("Loaded significant numbers of new objects~n"), + + check_bookie_forlist(Bookie2, lists:nth(length(CLs2), CLs2)), + io:format("Checked active bookie has new objects~n"), + + {ok, SnapBookie3} = leveled_bookie:book_start(SnapOpts2), + check_bookie_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)), + check_bookie_formissinglist(SnapBookie2, ChkList3), + check_bookie_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)), + check_bookie_forlist(SnapBookie3, ChkList2), + check_bookie_forlist(SnapBookie2, ChkList1), + io:format("Started new snapshot and check for new objects~n"), + + CLs3 = load_objects(20000, GenList, Bookie2, TestObject, + fun generate_multiple_smallobjects/2), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + check_bookie_forlist(Bookie2, lists:nth(1, CLs3)), + {ok, FNsB} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + ok = leveled_bookie:book_close(SnapBookie2), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + ok = leveled_bookie:book_close(SnapBookie3), + check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)), + check_bookie_forlist(Bookie2, lists:nth(1, CLs3)), + timer:sleep(90000), + {ok, FNsC} = file:list_dir(RootPath ++ "/ledger/ledger_files"), + true = length(FNsB) > length(FNsA), + true = length(FNsB) > length(FNsC), + ok = leveled_bookie:book_close(Bookie2), reset_filestructure(). + reset_filestructure() -> RootPath = "test", filelib:ensure_dir(RootPath ++ "/journal/"), @@ -202,6 +217,52 @@ reset_filestructure() -> RootPath. +check_bookie_forlist(Bookie, ChkList) -> + check_bookie_forlist(Bookie, ChkList, false). + +check_bookie_forlist(Bookie, ChkList, Log) -> + lists:foreach(fun({_RN, Obj, _Spc}) -> + if + Log == true -> + io:format("Fetching Key ~w~n", [Obj#r_object.key]); + true -> + ok + end, + R = leveled_bookie:book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList). + +check_bookie_formissinglist(Bookie, ChkList) -> + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie, + Obj#r_object.bucket, + Obj#r_object.key), + R = not_found end, + ChkList). + +check_bookie_forobject(Bookie, TestObject) -> + {ok, TestObject} = leveled_bookie:book_riakget(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + {ok, HeadObject} = leveled_bookie:book_riakhead(Bookie, + TestObject#r_object.bucket, + TestObject#r_object.key), + ok = case {HeadObject#r_object.bucket, + HeadObject#r_object.key, + HeadObject#r_object.vclock} of + {B1, K1, VC1} when B1 == TestObject#r_object.bucket, + K1 == TestObject#r_object.key, + VC1 == TestObject#r_object.vclock -> + ok + end. + +check_bookie_formissingobject(Bookie, Bucket, Key) -> + not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key), + not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key). + + generate_testobject() -> {B1, K1, V1, Spec1, MD} = {"Bucket1", "Key1", @@ -239,4 +300,17 @@ generate_multiple_objects(Count, KeyNumber, ObjL, Value) -> Value). - \ No newline at end of file +load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) -> + lists:map(fun(KN) -> + ObjListA = Generator(ChunkSize, KN), + StartWatchA = os:timestamp(), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie, Obj, Spc) + end, + ObjListA), + Time = timer:now_diff(os:timestamp(), StartWatchA), + io:format("~w objects loaded in ~w seconds~n", + [ChunkSize, Time/1000000]), + check_bookie_forobject(Bookie, TestObject), + lists:sublist(ObjListA, 1000) end, + GenList).