Add more complex snapshot test
This exposed another off-by-one error on startup. This commit also includes an unsafe change to reply early from a rolling CDB file (with lots of objects writing the hash table can take too long). This is bad, but will be resolved through a refactor of the manifest writing: essentially we deferred writing of the manifest update which was an unnecessary performance optimisation. If instead we wait on this, the process is made substantially simpler, and it is safer to perform the roll of the complete CDB journal asynchronously. If the manifest update takes too long, an append-only log may be used instead.
This commit is contained in:
parent
f58f4d0ea5
commit
2055f8ed3f
6 changed files with 307 additions and 183 deletions
|
@ -29,15 +29,13 @@
|
|||
root_path :: string(),
|
||||
cdb_options :: #cdb_options{},
|
||||
start_snapshot = false :: boolean(),
|
||||
source_inker :: pid(),
|
||||
requestor :: pid()}).
|
||||
source_inker :: pid()}).
|
||||
|
||||
-record(penciller_options,
|
||||
{root_path :: string(),
|
||||
max_inmemory_tablesize :: integer(),
|
||||
start_snapshot = false :: boolean(),
|
||||
source_penciller :: pid(),
|
||||
requestor :: pid()}).
|
||||
source_penciller :: pid()}).
|
||||
|
||||
-record(bookie_options,
|
||||
{root_path :: string(),
|
||||
|
|
|
@ -213,6 +213,8 @@ init([Opts]) ->
|
|||
true ->
|
||||
Opts#bookie_options.cache_size
|
||||
end,
|
||||
io:format("Bookie starting with Pcl ~w Ink ~w~n",
|
||||
[Penciller, Inker]),
|
||||
{ok, #state{inker=Inker,
|
||||
penciller=Penciller,
|
||||
cache_size=CacheSize,
|
||||
|
@ -223,6 +225,8 @@ init([Opts]) ->
|
|||
{Penciller, LedgerCache},
|
||||
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(Penciller, []),
|
||||
io:format("Snapshot starting with Pcl ~w Ink ~w~n",
|
||||
[Penciller, Inker]),
|
||||
{ok, #state{penciller=Penciller,
|
||||
inker=Inker,
|
||||
ledger_cache=LedgerCache,
|
||||
|
@ -282,16 +286,14 @@ handle_call({head, Key}, _From, State) ->
|
|||
{reply, {ok, OMD}, State}
|
||||
end
|
||||
end;
|
||||
handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) ->
|
||||
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
|
||||
PCLopts = #penciller_options{start_snapshot=true,
|
||||
source_penciller=State#state.penciller,
|
||||
requestor=Requestor},
|
||||
source_penciller=State#state.penciller},
|
||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||
case SnapType of
|
||||
store ->
|
||||
InkerOpts = #inker_options{start_snapshot=true,
|
||||
source_inker=State#state.inker,
|
||||
requestor=Requestor},
|
||||
source_inker=State#state.inker},
|
||||
{ok, JournalSnapshot} = leveled_inker:ink_start(InkerOpts),
|
||||
{reply,
|
||||
{ok,
|
||||
|
@ -497,12 +499,19 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
|||
case SQN of
|
||||
SQN when SQN < MinSQN ->
|
||||
{loop, Acc0};
|
||||
SQN when SQN =< MaxSQN ->
|
||||
SQN when SQN < MaxSQN ->
|
||||
%% TODO - get correct size in a more efficient manner
|
||||
%% Need to have compressed size
|
||||
Size = byte_size(term_to_binary(ValueInLedger, [compressed])),
|
||||
Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs),
|
||||
{loop, {MinSQN, MaxSQN, Output ++ Changes}};
|
||||
MaxSQN ->
|
||||
%% TODO - get correct size in a more efficient manner
|
||||
%% Need to have compressed size
|
||||
io:format("Reached end of load batch with SQN ~w~n", [SQN]),
|
||||
Size = byte_size(term_to_binary(ValueInLedger, [compressed])),
|
||||
Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs),
|
||||
{stop, {MinSQN, MaxSQN, Output ++ Changes}};
|
||||
SQN when SQN > MaxSQN ->
|
||||
io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n",
|
||||
[MaxSQN, SQN]),
|
||||
|
|
|
@ -66,6 +66,7 @@
|
|||
cdb_scan/4,
|
||||
cdb_close/1,
|
||||
cdb_complete/1,
|
||||
cdb_roll/1,
|
||||
cdb_destroy/1,
|
||||
cdb_deletepending/1]).
|
||||
|
||||
|
@ -137,6 +138,9 @@ cdb_close(Pid) ->
|
|||
cdb_complete(Pid) ->
|
||||
gen_server:call(Pid, cdb_complete, infinity).
|
||||
|
||||
cdb_roll(Pid) ->
|
||||
gen_server:call(Pid, cdb_roll, infinity).
|
||||
|
||||
cdb_destroy(Pid) ->
|
||||
gen_server:cast(Pid, destroy).
|
||||
|
||||
|
@ -197,9 +201,7 @@ handle_call({open_writer, Filename}, _From, State) ->
|
|||
writer=true}};
|
||||
handle_call({open_reader, Filename}, _From, State) ->
|
||||
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||
Index = load_index(Handle),
|
||||
LastKey = find_lastkey(Handle, Index),
|
||||
{Handle, Index, LastKey} = open_for_readonly(Filename),
|
||||
{reply, ok, State#state{handle=Handle,
|
||||
last_key=LastKey,
|
||||
filename=Filename,
|
||||
|
@ -332,28 +334,32 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
|
|||
handle_call(cdb_close, _From, State) ->
|
||||
ok = file:close(State#state.handle),
|
||||
{stop, normal, ok, State#state{handle=undefined}};
|
||||
handle_call(cdb_complete, _From, State) ->
|
||||
case State#state.writer of
|
||||
true ->
|
||||
handle_call(cdb_complete, _From, State=#state{writer=Writer})
|
||||
when Writer == true ->
|
||||
NewName = determine_new_filename(State#state.filename),
|
||||
ok = close_file(State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.last_position),
|
||||
%% Rename file
|
||||
NewName = filename:rootname(State#state.filename, ".pnd")
|
||||
++ ".cdb",
|
||||
io:format("Renaming file from ~s to ~s " ++
|
||||
"for which existence is ~w~n",
|
||||
[State#state.filename, NewName,
|
||||
filelib:is_file(NewName)]),
|
||||
ok = file:rename(State#state.filename, NewName),
|
||||
ok = rename_for_read(State#state.filename, NewName),
|
||||
{stop, normal, {ok, NewName}, State};
|
||||
false ->
|
||||
handle_call(cdb_complete, _From, State) ->
|
||||
ok = file:close(State#state.handle),
|
||||
{stop, normal, {ok, State#state.filename}, State};
|
||||
undefined ->
|
||||
ok = file:close(State#state.handle),
|
||||
{stop, normal, {ok, State#state.filename}, State}
|
||||
end.
|
||||
handle_call(cdb_roll, From, State=#state{writer=Writer})
|
||||
when Writer == true ->
|
||||
NewName = determine_new_filename(State#state.filename),
|
||||
gen_server:reply(From, {ok, NewName}),
|
||||
ok = close_file(State#state.handle,
|
||||
State#state.hashtree,
|
||||
State#state.last_position),
|
||||
ok = rename_for_read(State#state.filename, NewName),
|
||||
io:format("Opening file for reading with filename ~s~n", [NewName]),
|
||||
{Handle, Index, LastKey} = open_for_readonly(NewName),
|
||||
{noreply, State#state{handle=Handle,
|
||||
last_key=LastKey,
|
||||
filename=NewName,
|
||||
writer=false,
|
||||
hash_index=Index}}.
|
||||
|
||||
|
||||
handle_cast(destroy, State) ->
|
||||
|
@ -659,6 +665,23 @@ fold_keys(Handle, FoldFun, Acc0) ->
|
|||
%% Internal functions
|
||||
%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
determine_new_filename(Filename) ->
|
||||
filename:rootname(Filename, ".pnd") ++ ".cdb".
|
||||
|
||||
rename_for_read(Filename, NewName) ->
|
||||
%% Rename file
|
||||
io:format("Renaming file from ~s to ~s " ++
|
||||
"for which existence is ~w~n",
|
||||
[Filename, NewName,
|
||||
filelib:is_file(NewName)]),
|
||||
file:rename(Filename, NewName).
|
||||
|
||||
open_for_readonly(Filename) ->
|
||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||
Index = load_index(Handle),
|
||||
LastKey = find_lastkey(Handle, Index),
|
||||
{Handle, Index, LastKey}.
|
||||
|
||||
load_index(Handle) ->
|
||||
Index = lists:seq(0, 255),
|
||||
lists:map(fun(X) ->
|
||||
|
|
|
@ -139,7 +139,8 @@
|
|||
cdb_options :: #cdb_options{},
|
||||
clerk :: pid(),
|
||||
compaction_pending = false :: boolean(),
|
||||
is_snapshot = false :: boolean()}).
|
||||
is_snapshot = false :: boolean(),
|
||||
source_inker :: pid()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -161,6 +162,9 @@ ink_fetch(Pid, PrimaryKey, SQN) ->
|
|||
ink_registersnapshot(Pid, Requestor) ->
|
||||
gen_server:call(Pid, {register_snapshot, Requestor}, infinity).
|
||||
|
||||
ink_releasesnapshot(Pid, Snapshot) ->
|
||||
gen_server:call(Pid, {release_snapshot, Snapshot}, infinity).
|
||||
|
||||
ink_close(Pid) ->
|
||||
gen_server:call(Pid, {close, false}, infinity).
|
||||
|
||||
|
@ -217,13 +221,13 @@ init([InkerOpts]) ->
|
|||
InkerOpts#inker_options.start_snapshot} of
|
||||
{undefined, true} ->
|
||||
SrcInker = InkerOpts#inker_options.source_inker,
|
||||
Requestor = InkerOpts#inker_options.requestor,
|
||||
{Manifest,
|
||||
ActiveJournalDB,
|
||||
ActiveJournalSQN} = ink_registersnapshot(SrcInker, Requestor),
|
||||
ActiveJournalSQN} = ink_registersnapshot(SrcInker, self()),
|
||||
{ok, #state{manifest=Manifest,
|
||||
active_journaldb=ActiveJournalDB,
|
||||
active_journaldb_sqn=ActiveJournalSQN,
|
||||
source_inker=SrcInker,
|
||||
is_snapshot=true}};
|
||||
%% Need to do something about timeout
|
||||
{_RootPath, false} ->
|
||||
|
@ -276,10 +280,17 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
|
|||
handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||
Rs = [{Requestor,
|
||||
State#state.manifest_sqn}|State#state.registered_snapshots],
|
||||
io:format("Inker snapshot ~w registered at SQN ~w~n",
|
||||
[Requestor, State#state.manifest_sqn]),
|
||||
{reply, {State#state.manifest,
|
||||
State#state.active_journaldb,
|
||||
State#state.active_journaldb_sqn},
|
||||
State#state{registered_snapshots=Rs}};
|
||||
handle_call({release_snapshot, Snapshot}, _From , State) ->
|
||||
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
|
||||
io:format("Snapshot ~w released~n", [Snapshot]),
|
||||
io:format("Remaining snapshots are ~w~n", [Rs]),
|
||||
{reply, ok, State#state{registered_snapshots=Rs}};
|
||||
handle_call(get_manifest, _From, State) ->
|
||||
{reply, State#state.manifest, State};
|
||||
handle_call({update_manifest,
|
||||
|
@ -344,7 +355,7 @@ handle_info(_Info, State) ->
|
|||
terminate(Reason, State) ->
|
||||
case State#state.is_snapshot of
|
||||
true ->
|
||||
ok;
|
||||
ok = ink_releasesnapshot(State#state.source_inker, self());
|
||||
false ->
|
||||
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
||||
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
||||
|
@ -444,16 +455,16 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
|
|||
end
|
||||
end.
|
||||
|
||||
roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
|
||||
roll_active_file(ActiveJournal, Manifest, ManifestSQN, RootPath) ->
|
||||
SW = os:timestamp(),
|
||||
io:format("Rolling old journal ~w~n", [OldActiveJournal]),
|
||||
{ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal),
|
||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
||||
io:format("Rolling old journal ~w~n", [ActiveJournal]),
|
||||
{ok, NewFilename} = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
JournalRegex2 = "nursery_(?<SQN>[0-9]+)\\." ++ ?JOURNAL_FILEX,
|
||||
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
|
||||
JournalRegex2,
|
||||
'SQN'),
|
||||
NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}),
|
||||
NewManifest = add_to_manifest(Manifest,
|
||||
{JournalSQN, NewFilename, ActiveJournal}),
|
||||
NewManifestSQN = ManifestSQN + 1,
|
||||
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
|
||||
io:format("Rolling old journal completed in ~w microseconds~n",
|
||||
|
|
|
@ -491,42 +491,40 @@ handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap})
|
|||
when Snap == false ->
|
||||
{ok, UpdState} = commit_manifest_change(WI, State),
|
||||
{reply, ok, UpdState};
|
||||
handle_call({check_sqn, Key, SQN}, _From, State) ->
|
||||
Obj = if
|
||||
State#state.is_snapshot == true ->
|
||||
fetch_snap(Key,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_snapshot);
|
||||
true ->
|
||||
handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap})
|
||||
when Snap == false ->
|
||||
{reply,
|
||||
fetch(Key,
|
||||
State#state.manifest,
|
||||
State#state.memtable)
|
||||
end,
|
||||
Reply = case Obj of
|
||||
not_present ->
|
||||
false;
|
||||
Obj ->
|
||||
SQNToCompare = leveled_bookie:strip_to_seqonly(Obj),
|
||||
if
|
||||
SQNToCompare > SQN ->
|
||||
false;
|
||||
true ->
|
||||
true
|
||||
end
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
handle_call({fetch, Key}, _From, State) ->
|
||||
Reply = if
|
||||
State#state.is_snapshot == true ->
|
||||
State#state.memtable),
|
||||
State};
|
||||
handle_call({check_sqn, Key, SQN}, _From, State=#state{is_snapshot=Snap})
|
||||
when Snap == false ->
|
||||
{reply,
|
||||
compare_to_sqn(fetch(Key,
|
||||
State#state.manifest,
|
||||
State#state.memtable),
|
||||
SQN),
|
||||
State};
|
||||
handle_call({fetch, Key},
|
||||
_From,
|
||||
State=#state{snapshot_fully_loaded=Ready})
|
||||
when Ready == true ->
|
||||
{reply,
|
||||
fetch_snap(Key,
|
||||
State#state.manifest,
|
||||
State#state.levelzero_snapshot);
|
||||
true ->
|
||||
fetch(Key,
|
||||
State#state.levelzero_snapshot),
|
||||
State};
|
||||
handle_call({check_sqn, Key, SQN},
|
||||
_From,
|
||||
State=#state{snapshot_fully_loaded=Ready})
|
||||
when Ready == true ->
|
||||
{reply,
|
||||
compare_to_sqn(fetch_snap(Key,
|
||||
State#state.manifest,
|
||||
State#state.memtable)
|
||||
end,
|
||||
{reply, Reply, State};
|
||||
State#state.levelzero_snapshot),
|
||||
SQN),
|
||||
State};
|
||||
handle_call(work_for_clerk, From, State) ->
|
||||
{UpdState, Work} = return_work(State, From),
|
||||
{reply, {Work, UpdState#state.backlog}, UpdState};
|
||||
|
@ -568,6 +566,8 @@ handle_cast(_Msg, State) ->
|
|||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State=#state{is_snapshot=Snap}) when Snap == true ->
|
||||
ok;
|
||||
terminate(_Reason, State) ->
|
||||
%% When a Penciller shuts down it isn't safe to try an manage the safe
|
||||
%% finishing of any outstanding work. The last commmitted manifest will
|
||||
|
@ -585,10 +585,6 @@ terminate(_Reason, State) ->
|
|||
%% The cast may not succeed as the clerk could be synchronously calling
|
||||
%% the penciller looking for a manifest commit
|
||||
%%
|
||||
if
|
||||
State#state.is_snapshot == true ->
|
||||
ok;
|
||||
true ->
|
||||
leveled_pclerk:clerk_stop(State#state.clerk),
|
||||
Dump = ets:tab2list(State#state.memtable),
|
||||
case {State#state.levelzero_pending,
|
||||
|
@ -623,8 +619,7 @@ terminate(_Reason, State) ->
|
|||
lists:foreach(fun({_FN, Pid, _SN}) ->
|
||||
leveled_sft:sft_close(Pid) end,
|
||||
State#state.unreferenced_files),
|
||||
ok
|
||||
end.
|
||||
ok.
|
||||
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -843,6 +838,21 @@ fetch(Key, Manifest, Level, FetchFun) ->
|
|||
end.
|
||||
|
||||
|
||||
compare_to_sqn(Obj, SQN) ->
|
||||
case Obj of
|
||||
not_present ->
|
||||
false;
|
||||
Obj ->
|
||||
SQNToCompare = leveled_bookie:strip_to_seqonly(Obj),
|
||||
if
|
||||
SQNToCompare > SQN ->
|
||||
false;
|
||||
true ->
|
||||
true
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
%% Manifest lock - don't have two changes to the manifest happening
|
||||
%% concurrently
|
||||
|
||||
|
@ -1280,8 +1290,7 @@ simple_server_test() ->
|
|||
?assertMatch(R13, Key3),
|
||||
?assertMatch(R14, Key4),
|
||||
SnapOpts = #penciller_options{start_snapshot = true,
|
||||
source_penciller = PCLr,
|
||||
requestor = self()},
|
||||
source_penciller = PCLr},
|
||||
{ok, PclSnap} = pcl_start(SnapOpts),
|
||||
ok = pcl_loadsnapshot(PclSnap, []),
|
||||
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})),
|
||||
|
|
|
@ -5,12 +5,12 @@
|
|||
-export([simple_put_fetch_head/1,
|
||||
many_put_fetch_head/1,
|
||||
journal_compaction/1,
|
||||
simple_snapshot/1]).
|
||||
fetchput_snapshot/1]).
|
||||
|
||||
all() -> [simple_put_fetch_head,
|
||||
many_put_fetch_head,
|
||||
journal_compaction,
|
||||
simple_snapshot].
|
||||
fetchput_snapshot].
|
||||
|
||||
|
||||
simple_put_fetch_head(_Config) ->
|
||||
|
@ -51,19 +51,8 @@ many_put_fetch_head(_Config) ->
|
|||
check_bookie_forobject(Bookie2, TestObject),
|
||||
GenList = [2, 20002, 40002, 60002, 80002,
|
||||
100002, 120002, 140002, 160002, 180002],
|
||||
CLs = lists:map(fun(KN) ->
|
||||
ObjListA = generate_multiple_smallobjects(20000, KN),
|
||||
StartWatchA = os:timestamp(),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie2, Obj, Spc)
|
||||
end,
|
||||
ObjListA),
|
||||
Time = timer:now_diff(os:timestamp(), StartWatchA),
|
||||
io:format("20,000 objects loaded in ~w seconds~n",
|
||||
[Time/1000000]),
|
||||
check_bookie_forobject(Bookie2, TestObject),
|
||||
lists:sublist(ObjListA, 1000) end,
|
||||
GenList),
|
||||
CLs = load_objects(20000, GenList, Bookie2, TestObject,
|
||||
fun generate_multiple_smallobjects/2),
|
||||
CL1A = lists:nth(1, CLs),
|
||||
ChkListFixed = lists:nth(length(CLs), CLs),
|
||||
check_bookie_forlist(Bookie2, CL1A),
|
||||
|
@ -85,35 +74,6 @@ many_put_fetch_head(_Config) ->
|
|||
ok = leveled_bookie:book_close(Bookie3),
|
||||
reset_filestructure().
|
||||
|
||||
|
||||
check_bookie_forlist(Bookie, ChkList) ->
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList).
|
||||
|
||||
check_bookie_forobject(Bookie, TestObject) ->
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
{ok, HeadObject} = leveled_bookie:book_riakhead(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ok = case {HeadObject#r_object.bucket,
|
||||
HeadObject#r_object.key,
|
||||
HeadObject#r_object.vclock} of
|
||||
{B1, K1, VC1} when B1 == TestObject#r_object.bucket,
|
||||
K1 == TestObject#r_object.key,
|
||||
VC1 == TestObject#r_object.vclock ->
|
||||
ok
|
||||
end.
|
||||
|
||||
check_bookie_formissingobject(Bookie, Bucket, Key) ->
|
||||
not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key),
|
||||
not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key).
|
||||
|
||||
journal_compaction(_Config) ->
|
||||
RootPath = reset_filestructure(),
|
||||
StartOpts1 = #bookie_options{root_path=RootPath,
|
||||
|
@ -162,7 +122,7 @@ journal_compaction(_Config) ->
|
|||
reset_filestructure().
|
||||
|
||||
|
||||
simple_snapshot(_Config) ->
|
||||
fetchput_snapshot(_Config) ->
|
||||
RootPath = reset_filestructure(),
|
||||
StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=3000000},
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
|
@ -172,27 +132,82 @@ simple_snapshot(_Config) ->
|
|||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
|
||||
ObjList1),
|
||||
SnapOpts = #bookie_options{snapshot_bookie=Bookie1},
|
||||
{ok, SnapBookie} = leveled_bookie:book_start(SnapOpts),
|
||||
SnapOpts1 = #bookie_options{snapshot_bookie=Bookie1},
|
||||
{ok, SnapBookie1} = leveled_bookie:book_start(SnapOpts1),
|
||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie1,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList1),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(SnapBookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
io:format("Finding key ~s~n", [Obj#r_object.key]),
|
||||
R = {ok, Obj} end,
|
||||
ChkList1),
|
||||
ok = leveled_bookie:book_close(SnapBookie),
|
||||
check_bookie_forlist(Bookie1, ChkList1),
|
||||
check_bookie_forlist(SnapBookie1, ChkList1),
|
||||
ok = leveled_bookie:book_close(SnapBookie1),
|
||||
check_bookie_forlist(Bookie1, ChkList1),
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
io:format("Closed initial bookies~n"),
|
||||
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||
SnapOpts2 = #bookie_options{snapshot_bookie=Bookie2},
|
||||
{ok, SnapBookie2} = leveled_bookie:book_start(SnapOpts2),
|
||||
io:format("Bookies restarted~n"),
|
||||
|
||||
check_bookie_forlist(Bookie2, ChkList1),
|
||||
io:format("Check active bookie still contains original data~n"),
|
||||
check_bookie_forlist(SnapBookie2, ChkList1),
|
||||
io:format("Check snapshot still contains original data~n"),
|
||||
|
||||
|
||||
ObjList2 = generate_multiple_objects(5000, 2),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie2, Obj, Spc) end,
|
||||
ObjList2),
|
||||
io:format("Replacement objects put~n"),
|
||||
|
||||
ChkList2 = lists:sublist(lists:sort(ObjList2), 100),
|
||||
check_bookie_forlist(Bookie2, ChkList2),
|
||||
check_bookie_forlist(SnapBookie2, ChkList1),
|
||||
io:format("Checked for replacement objects in active bookie" ++
|
||||
", old objects in snapshot~n"),
|
||||
|
||||
{ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||
ObjList3 = generate_multiple_objects(15000, 5002),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie2, Obj, Spc) end,
|
||||
ObjList3),
|
||||
ChkList3 = lists:sublist(lists:sort(ObjList3), 100),
|
||||
check_bookie_forlist(Bookie2, ChkList3),
|
||||
check_bookie_formissinglist(SnapBookie2, ChkList3),
|
||||
GenList = [20002, 40002, 60002, 80002, 100002, 120002],
|
||||
CLs2 = load_objects(20000, GenList, Bookie2, TestObject,
|
||||
fun generate_multiple_smallobjects/2),
|
||||
io:format("Loaded significant numbers of new objects~n"),
|
||||
|
||||
check_bookie_forlist(Bookie2, lists:nth(length(CLs2), CLs2)),
|
||||
io:format("Checked active bookie has new objects~n"),
|
||||
|
||||
{ok, SnapBookie3} = leveled_bookie:book_start(SnapOpts2),
|
||||
check_bookie_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)),
|
||||
check_bookie_formissinglist(SnapBookie2, ChkList3),
|
||||
check_bookie_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)),
|
||||
check_bookie_forlist(SnapBookie3, ChkList2),
|
||||
check_bookie_forlist(SnapBookie2, ChkList1),
|
||||
io:format("Started new snapshot and check for new objects~n"),
|
||||
|
||||
CLs3 = load_objects(20000, GenList, Bookie2, TestObject,
|
||||
fun generate_multiple_smallobjects/2),
|
||||
check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)),
|
||||
check_bookie_forlist(Bookie2, lists:nth(1, CLs3)),
|
||||
{ok, FNsB} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||
ok = leveled_bookie:book_close(SnapBookie2),
|
||||
check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)),
|
||||
ok = leveled_bookie:book_close(SnapBookie3),
|
||||
check_bookie_forlist(Bookie2, lists:nth(length(CLs3), CLs3)),
|
||||
check_bookie_forlist(Bookie2, lists:nth(1, CLs3)),
|
||||
timer:sleep(90000),
|
||||
{ok, FNsC} = file:list_dir(RootPath ++ "/ledger/ledger_files"),
|
||||
true = length(FNsB) > length(FNsA),
|
||||
true = length(FNsB) > length(FNsC),
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
reset_filestructure().
|
||||
|
||||
|
||||
|
||||
reset_filestructure() ->
|
||||
RootPath = "test",
|
||||
filelib:ensure_dir(RootPath ++ "/journal/"),
|
||||
|
@ -202,6 +217,52 @@ reset_filestructure() ->
|
|||
RootPath.
|
||||
|
||||
|
||||
check_bookie_forlist(Bookie, ChkList) ->
|
||||
check_bookie_forlist(Bookie, ChkList, false).
|
||||
|
||||
check_bookie_forlist(Bookie, ChkList, Log) ->
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
if
|
||||
Log == true ->
|
||||
io:format("Fetching Key ~w~n", [Obj#r_object.key]);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
R = leveled_bookie:book_riakget(Bookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList).
|
||||
|
||||
check_bookie_formissinglist(Bookie, ChkList) ->
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = not_found end,
|
||||
ChkList).
|
||||
|
||||
check_bookie_forobject(Bookie, TestObject) ->
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
{ok, HeadObject} = leveled_bookie:book_riakhead(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ok = case {HeadObject#r_object.bucket,
|
||||
HeadObject#r_object.key,
|
||||
HeadObject#r_object.vclock} of
|
||||
{B1, K1, VC1} when B1 == TestObject#r_object.bucket,
|
||||
K1 == TestObject#r_object.key,
|
||||
VC1 == TestObject#r_object.vclock ->
|
||||
ok
|
||||
end.
|
||||
|
||||
check_bookie_formissingobject(Bookie, Bucket, Key) ->
|
||||
not_found = leveled_bookie:book_riakget(Bookie, Bucket, Key),
|
||||
not_found = leveled_bookie:book_riakhead(Bookie, Bucket, Key).
|
||||
|
||||
|
||||
generate_testobject() ->
|
||||
{B1, K1, V1, Spec1, MD} = {"Bucket1",
|
||||
"Key1",
|
||||
|
@ -239,4 +300,17 @@ generate_multiple_objects(Count, KeyNumber, ObjL, Value) ->
|
|||
Value).
|
||||
|
||||
|
||||
|
||||
load_objects(ChunkSize, GenList, Bookie, TestObject, Generator) ->
|
||||
lists:map(fun(KN) ->
|
||||
ObjListA = Generator(ChunkSize, KN),
|
||||
StartWatchA = os:timestamp(),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie, Obj, Spc)
|
||||
end,
|
||||
ObjListA),
|
||||
Time = timer:now_diff(os:timestamp(), StartWatchA),
|
||||
io:format("~w objects loaded in ~w seconds~n",
|
||||
[ChunkSize, Time/1000000]),
|
||||
check_bookie_forobject(Bookie, TestObject),
|
||||
lists:sublist(ObjListA, 1000) end,
|
||||
GenList).
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue