diff --git a/include/leveled.hrl b/include/leveled.hrl index 37694ed..cd82d2a 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -22,7 +22,8 @@ -record(cdb_options, {max_size :: integer(), - file_path :: string()}). + file_path :: string(), + binary_mode = false :: boolean()}). -record(inker_options, {cdb_max_size :: integer(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index bafba89..a055cdf 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -368,7 +368,8 @@ set_options(Opts) -> end, {#inker_options{root_path = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP, - cdb_options = #cdb_options{max_size=MaxJournalSize}}, + cdb_options = #cdb_options{max_size=MaxJournalSize, + binary_mode=true}}, #penciller_options{root_path=Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP}}. @@ -495,22 +496,25 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, Output} = Acc0, {SQN, PK} = KeyInLedger, - {Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)), + % VBin may already be a term + % TODO: Should VSize include CRC? + % Using ExtractFun means we ignore simple way of getting size (from length) + {VBin, VSize} = ExtractFun(ValueInLedger), + {Obj, IndexSpecs} = case is_binary(VBin) of + true -> + binary_to_term(VBin); + false -> + VBin + end, case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; SQN when SQN < MaxSQN -> - %% TODO - get correct size in a more efficient manner - %% Need to have compressed size - Size = byte_size(term_to_binary(ValueInLedger, [compressed])), - Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), + Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), {loop, {MinSQN, MaxSQN, Output ++ Changes}}; MaxSQN -> - %% TODO - get correct size in a more efficient manner - %% Need to have compressed size io:format("Reached end of load batch with SQN ~w~n", [SQN]), - Size = byte_size(term_to_binary(ValueInLedger, [compressed])), - Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), + Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), {stop, {MinSQN, MaxSQN, Output ++ Changes}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 04adfda..45f2a6a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -76,6 +76,7 @@ -define(WORD_SIZE, 4). -define(CRC_CHECK, true). -define(MAX_FILE_SIZE, 3221225472). +-define(BINARY_MODE, false). -define(BASE_POSITION, 2048). -define(WRITE_OPS, [binary, raw, read, write]). @@ -87,7 +88,8 @@ handle :: file:fd(), writer :: boolean(), max_size :: integer(), - pending_delete = false :: boolean()}). + pending_delete = false :: boolean(), + binary_mode = false :: boolean()}). %%%============================================================================ @@ -187,7 +189,7 @@ init([Opts]) -> M -> M end, - {ok, #state{max_size=MaxSize}}. + {ok, #state{max_size=MaxSize, binary_mode=Opts#cdb_options.binary_mode}}. handle_call({open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), @@ -251,6 +253,7 @@ handle_call({put_kv, Key, Value}, _From, State) -> Result = put(State#state.handle, Key, Value, {State#state.last_position, State#state.hashtree}, + State#state.binary_mode, State#state.max_size), case Result of roll -> @@ -478,23 +481,32 @@ open_active_file(FileName) when is_list(FileName) -> %% Append to an active file a new key/value pair returning an updated %% dictionary of Keys and positions. Returns an updated Position %% -put(FileName, Key, Value, {LastPosition, HashTree}, MaxSize) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, ?WRITE_OPS), - put(Handle, Key, Value, {LastPosition, HashTree}, MaxSize); -put(Handle, Key, Value, {LastPosition, HashTree}, MaxSize) -> - Bin = key_value_to_record({Key, Value}), - PotentialNewSize = LastPosition + byte_size(Bin), - if PotentialNewSize > MaxSize -> - roll; - true -> - ok = file:pwrite(Handle, LastPosition, Bin), - {Handle, PotentialNewSize, put_hashtree(Key, LastPosition, HashTree)} - end. +put(FileName, + Key, + Value, + {LastPosition, HashTree}, + BinaryMode, + MaxSize) when is_list(FileName) -> + {ok, Handle} = file:open(FileName, ?WRITE_OPS), + put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize); +put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) -> + Bin = key_value_to_record({Key, Value}, BinaryMode), + PotentialNewSize = LastPosition + byte_size(Bin), + if + PotentialNewSize > MaxSize -> + roll; + true -> + ok = file:pwrite(Handle, LastPosition, Bin), + {Handle, + PotentialNewSize, + put_hashtree(Key, LastPosition, HashTree)} + end. %% Should not be used for non-test PUTs by the inker - as the Max File Size %% should be taken from the startup options not the default put(FileName, Key, Value, {LastPosition, HashTree}) -> - put(FileName, Key, Value, {LastPosition, HashTree}, ?MAX_FILE_SIZE). + put(FileName, Key, Value, {LastPosition, HashTree}, + ?BINARY_MODE, ?MAX_FILE_SIZE). %% @@ -864,7 +876,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> ValueAsBin, Position, Output, - fun extract_value/1) of + fun extract_valueandsize/1) of {stop, UpdOutput} -> {NewPosition, UpdOutput}; {loop, UpdOutput} -> @@ -993,10 +1005,10 @@ read_next_term(Handle, Length, crc, Check) -> {unchecked, binary_to_term(Bin)} end. -%% Extract value from binary containing CRC -extract_value(ValueAsBin) -> +%% Extract value and size from binary containing CRC +extract_valueandsize(ValueAsBin) -> <<_CRC:32/integer, Bin/binary>> = ValueAsBin, - binary_to_term(Bin). + {binary_to_term(Bin), byte_size(Bin)}. %% Used for reading lengths @@ -1234,9 +1246,14 @@ hash_to_slot(Hash, L) -> %% Create a binary of the LengthKeyLengthValue, adding a CRC check %% at the front of the value -key_value_to_record({Key, Value}) -> - BK = term_to_binary(Key), - BV = term_to_binary(Value), +key_value_to_record({Key, Value}, BinaryMode) -> + BK = term_to_binary(Key), + BV = case BinaryMode of + true -> + Value; + false -> + term_to_binary(Value) + end, LK = byte_size(BK), LV = byte_size(BV), LK_FL = endian_flip(LK), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 9403ef4..96926c7 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -28,6 +28,7 @@ %% Sliding scale to allow preference of longer runs up to maximum -define(SINGLEFILE_COMPACTION_TARGET, 60.0). -define(MAXRUN_COMPACTION_TARGET, 80.0). +-define(CRC_SIZE, 4). -record(state, {inker :: pid(), max_run_length :: integer(), @@ -150,16 +151,17 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - FN = leveled_cdb:cdb_filename(CDB), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), + io:format("KeySizeList ~w~n", [KeySizeList]), R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> {{SQN, PK}, Size} = KS, Check = FilterFun(FilterServer, PK, SQN), case {Check, SQN > MaxSQN} of {true, _} -> - {ActSize + Size, RplSize}; + {ActSize + Size - ?CRC_SIZE, RplSize}; {false, true} -> - {ActSize + Size, RplSize}; + {ActSize + Size - ?CRC_SIZE, RplSize}; _ -> - {ActSize, RplSize + Size} + {ActSize, RplSize + Size - ?CRC_SIZE} end end, {0, 0}, KeySizeList), @@ -580,4 +582,23 @@ compact_single_file_test() -> ok = leveled_cdb:cdb_destroy(CDB). +compact_empty_file_test() -> + RP = "../test/journal", + FN1 = leveled_inker:filepath(RP, 1, new_journal), + CDBopts = #cdb_options{binary_mode=true}, + {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), + ok = leveled_cdb:cdb_put(CDB1, {1, "Key1"}, <<>>), + {ok, FN2} = leveled_cdb:cdb_complete(CDB1), + {ok, CDB2} = leveled_cdb:cdb_open_reader(FN2), + LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerFun1 = fun(Srv, Key, ObjSQN) -> + case lists:keyfind(ObjSQN, 1, Srv) of + {ObjSQN, Key} -> + true; + _ -> + false + end end, + Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4), + ?assertMatch(100.0, Score1). + -endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 471ea0b..7957d94 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -413,8 +413,10 @@ get_object(PrimaryKey, SQN, Manifest) -> JournalP = find_in_manifest(SQN, Manifest), Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), case Obj of - {{SQN, PK}, Bin} -> + {{SQN, PK}, Bin} when is_binary(Bin) -> {{SQN, PK}, binary_to_term(Bin)}; + {{SQN, PK}, Term} -> + {{SQN, PK}, Term}; _ -> Obj end. @@ -807,6 +809,23 @@ simple_inker_test() -> ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), ink_close(Ink1), clean_testdir(RootPath). + +simple_inker_completeactivejournal_test() -> + RootPath = "../test/journal", + build_dummy_journal(), + CDBopts = #cdb_options{max_size=300000}, + {ok, PidW} = leveled_cdb:cdb_open_writer(filepath(RootPath, + 3, + new_journal)), + {ok, _FN} = leveled_cdb:cdb_complete(PidW), + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts}), + Obj1 = ink_get(Ink1, "Key1", 1), + ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), + Obj2 = ink_get(Ink1, "Key4", 4), + ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), + ink_close(Ink1), + clean_testdir(RootPath). compact_journal_test() -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 47467a3..42001c1 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -14,17 +14,20 @@ handle_info/2, terminate/2, clerk_new/1, - clerk_prompt/2, - clerk_stop/1, + clerk_prompt/1, + clerk_returnmanifestchange/2, code_change/3, perform_merge/4]). -include_lib("eunit/include/eunit.hrl"). -define(INACTIVITY_TIMEOUT, 2000). +-define(QUICK_TIMEOUT, 500). -define(HAPPYTIME_MULTIPLIER, 5). --record(state, {owner :: pid()}). +-record(state, {owner :: pid(), + change_pending=false :: boolean(), + work_item :: #penciller_work{}}). %%%============================================================================ %%% API @@ -34,13 +37,12 @@ clerk_new(Owner) -> {ok, Pid} = gen_server:start(?MODULE, [], []), ok = gen_server:call(Pid, {register, Owner}, infinity), {ok, Pid}. - -clerk_prompt(Pid, penciller) -> - gen_server:cast(Pid, penciller_prompt), - ok. -clerk_stop(Pid) -> - gen_server:cast(Pid, stop). +clerk_returnmanifestchange(Pid, Closing) -> + gen_server:call(Pid, {return_manifest_change, Closing}). + +clerk_prompt(Pid) -> + gen_server:cast(Pid, prompt). %%%============================================================================ %%% gen_server callbacks @@ -50,21 +52,41 @@ init([]) -> {ok, #state{}}. handle_call({register, Owner}, _From, State) -> - {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}. + {reply, ok, State#state{owner=Owner}, ?INACTIVITY_TIMEOUT}; +handle_call({return_manifest_change, Closing}, From, State) -> + case {State#state.change_pending, Closing} of + {true, true} -> + WI = State#state.work_item, + ok = mark_for_delete(WI#penciller_work.unreferenced_files, + State#state.owner), + {stop, normal, {ok, WI}, State}; + {true, false} -> + WI = State#state.work_item, + gen_server:reply(From, {ok, WI}), + mark_for_delete(WI#penciller_work.unreferenced_files, + State#state.owner), + {noreply, + State#state{work_item=null, change_pending=false}, + ?INACTIVITY_TIMEOUT}; + {false, true} -> + {stop, normal, no_change_required, State} + end. -handle_cast(penciller_prompt, State) -> - Timeout = requestandhandle_work(State), - {noreply, State, Timeout}; -handle_cast(stop, State) -> - {stop, normal, State}. +handle_cast(prompt, State) -> + io:format("Clerk reducing timeout due to prompt~n"), + {noreply, State, ?QUICK_TIMEOUT}; +handle_cast(_Msg, State) -> + {noreply, State}. -handle_info(timeout, State) -> - case leveled_penciller:pcl_prompt(State#state.owner) of - ok -> - Timeout = requestandhandle_work(State), +handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false -> + case requestandhandle_work(State) of + {false, Timeout} -> {noreply, State, Timeout}; - pause -> - {noreply, State, ?INACTIVITY_TIMEOUT} + {true, WI} -> + % No timeout now as will wait for call to return manifest + % change + {noreply, + State#state{change_pending=true, work_item=WI}} end; handle_info(_Info, State) -> {noreply, State}. @@ -86,29 +108,16 @@ requestandhandle_work(State) -> io:format("Work prompted but none needed~n"), case Backlog of false -> - ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER; + {false, ?INACTIVITY_TIMEOUT * ?HAPPYTIME_MULTIPLIER}; _ -> - ?INACTIVITY_TIMEOUT + {false, ?INACTIVITY_TIMEOUT} end; {WI, _} -> {NewManifest, FilesToDelete} = merge(WI), UpdWI = WI#penciller_work{new_manifest=NewManifest, unreferenced_files=FilesToDelete}, - R = leveled_penciller:pcl_requestmanifestchange(State#state.owner, - UpdWI), - case R of - ok -> - %% Request for manifest change must be a synchronous call - %% Otherwise cannot mark files for deletion (may erase - %% without manifest change on close) - mark_for_delete(FilesToDelete, State#state.owner), - ?INACTIVITY_TIMEOUT; - _ -> - %% New files will forever remain in an undetermined state - %% The disconnected files should be logged at start-up for - %% Manual clear-up - ?INACTIVITY_TIMEOUT - end + ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner), + {true, UpdWI} end. @@ -252,22 +261,16 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> io:format("File to be created as part of MSN=~w Filename=~s~n", [MSN, FileName]), TS1 = os:timestamp(), - case leveled_sft:sft_new(FileName, KL1, KL2, Level + 1) of - {ok, _Pid, {error, Reason}} -> - io:format("Exiting due to error~w~n", [Reason]), - error; - {ok, Pid, Reply} -> - {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - ExtMan = lists:append(OutList, - [#manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName}]), - MTime = timer:now_diff(os:timestamp(), TS1), - io:format("File creation took ~w microseconds ~n", [MTime]), - do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, - FileCounter + 1, ExtMan) - end. + {ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, Level + 1), + {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, + ExtMan = lists:append(OutList, + [#manifest_entry{start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName}]), + MTime = timer:now_diff(os:timestamp(), TS1), + io:format("File creation took ~w microseconds ~n", [MTime]), + do_merge(KL1Rem, KL2Rem, Level, {Filepath, MSN}, FileCounter + 1, ExtMan). get_item(Index, List, Default) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 26b70e8..fb20819 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -236,9 +236,8 @@ pcl_fetch/2, pcl_checksequencenumber/3, pcl_workforclerk/1, - pcl_requestmanifestchange/2, + pcl_promptmanifestchange/1, pcl_confirmdelete/2, - pcl_prompt/1, pcl_close/1, pcl_registersnapshot/2, pcl_updatesnapshotcache/3, @@ -308,15 +307,12 @@ pcl_checksequencenumber(Pid, Key, SQN) -> pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). -pcl_requestmanifestchange(Pid, WorkItem) -> - gen_server:call(Pid, {manifest_change, WorkItem}, infinity). +pcl_promptmanifestchange(Pid) -> + gen_server:cast(Pid, manifest_change). pcl_confirmdelete(Pid, FileName) -> gen_server:call(Pid, {confirm_delete, FileName}, infinity). -pcl_prompt(Pid) -> - gen_server:call(Pid, prompt_compaction, infinity). - pcl_getstartupsequencenumber(Pid) -> gen_server:call(Pid, get_startup_sqn, infinity). @@ -454,43 +450,6 @@ handle_call({confirm_delete, FileName}, _From, State=#state{is_snapshot=Snap}) _ -> {reply, Reply, State} end; -handle_call(prompt_compaction, _From, State=#state{is_snapshot=Snap}) - when Snap == false -> - %% If there is a prompt immediately after a L0 async write event then - %% there exists the potential for the prompt to stall the database. - %% Should only accept prompts if there has been a safe wait from the - %% last L0 write event. - Proceed = case State#state.levelzero_pending of - {true, _Pid, TS} -> - TD = timer:now_diff(os:timestamp(),TS), - if - TD < ?PROMPT_WAIT_ONL0 * 1000000 -> false; - true -> true - end; - ?L0PEND_RESET -> - true - end, - if - Proceed -> - {_TableSize, State1} = checkready_pushtomem(State), - case roll_memory(State1, State1#state.memtable_maxsize) of - {ok, L0Pend, MSN, TableSize} -> - io:format("Prompted push completed~n"), - {reply, ok, State1#state{levelzero_pending=L0Pend, - table_size=TableSize, - manifest_sqn=MSN, - backlog=false}}; - {pause, Reason, Details} -> - io:format("Excess work due to - " ++ Reason, Details), - {reply, pause, State1#state{backlog=true}} - end; - true -> - {reply, ok, State#state{backlog=false}} - end; -handle_call({manifest_change, WI}, _From, State=#state{is_snapshot=Snap}) - when Snap == false -> - {ok, UpdState} = commit_manifest_change(WI, State), - {reply, ok, UpdState}; handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap}) when Snap == false -> {reply, @@ -498,14 +457,6 @@ handle_call({fetch, Key}, _From, State=#state{is_snapshot=Snap}) State#state.manifest, State#state.memtable), State}; -handle_call({check_sqn, Key, SQN}, _From, State=#state{is_snapshot=Snap}) - when Snap == false -> - {reply, - compare_to_sqn(fetch(Key, - State#state.manifest, - State#state.memtable), - SQN), - State}; handle_call({fetch, Key}, _From, State=#state{snapshot_fully_loaded=Ready}) @@ -560,6 +511,11 @@ handle_call(close, _From, State) -> handle_cast({update_snapshotcache, Tree, SQN}, State) -> MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), {noreply, State#state{memtable_copy=MemTableC}}; +handle_cast(manifest_change, State) -> + {ok, WI} = leveled_pclerk:clerk_returnmanifestchange(State#state.clerk, + false), + {ok, UpdState} = commit_manifest_change(WI, State), + {noreply, UpdState}; handle_cast(_Msg, State) -> {noreply, State}. @@ -585,13 +541,20 @@ terminate(_Reason, State) -> %% The cast may not succeed as the clerk could be synchronously calling %% the penciller looking for a manifest commit %% - leveled_pclerk:clerk_stop(State#state.clerk), - Dump = ets:tab2list(State#state.memtable), - case {State#state.levelzero_pending, - get_item(0, State#state.manifest, []), length(Dump)} of + MC = leveled_pclerk:clerk_returnmanifestchange(State#state.clerk, true), + UpdState = case MC of + {ok, WI} -> + {ok, NewState} = commit_manifest_change(WI, State), + NewState; + no_change_required -> + State + end, + Dump = ets:tab2list(UpdState#state.memtable), + case {UpdState#state.levelzero_pending, + get_item(0, UpdState#state.manifest, []), length(Dump)} of {?L0PEND_RESET, [], L} when L > 0 -> - MSN = State#state.manifest_sqn + 1, - FileName = State#state.root_path + MSN = UpdState#state.manifest_sqn + 1, + FileName = UpdState#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", @@ -615,10 +578,10 @@ terminate(_Reason, State) -> ++ " with ~w keys discarded~n", [length(Dump)]) end, - ok = close_files(0, State#state.manifest), + ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, - State#state.unreferenced_files), + UpdState#state.unreferenced_files), ok. @@ -732,6 +695,8 @@ checkready_pushtomem(State) -> end_key=EndKey, owner=Pid, filename=SrcFN}, + % Prompt clerk to ask about work - do this for every L0 roll + ok = leveled_pclerk:clerk_prompt(State#state.clerk), {0, State#state{manifest=lists:keystore(0, 1, @@ -742,9 +707,6 @@ checkready_pushtomem(State) -> ?L0PEND_RESET -> {State#state.table_size, State} end, - - %% Prompt clerk to ask about work - do this for every push_mem - ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller), {TableSize, UpdState}. quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> @@ -1216,6 +1178,15 @@ confirm_delete_test() -> ?assertMatch(R3, false). +maybe_pause_push(R) -> + if + R == pause -> + io:format("Pausing push~n"), + timer:sleep(1000); + true -> + ok + end. + simple_server_test() -> RootPath = "../test/ledger", clean_testdir(RootPath), @@ -1230,27 +1201,17 @@ simple_server_test() -> Key4 = {{o,"Bucket0004", "Key0004"}, {3002, {active, infinity}, null}}, KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})), ok = pcl_pushmem(PCL, [Key1]), - R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), - ?assertMatch(R1, Key1), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001"})), ok = pcl_pushmem(PCL, KL1), - R2 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), - ?assertMatch(R2, Key1), - S1 = pcl_pushmem(PCL, [Key2]), - if S1 == pause -> timer:sleep(2); true -> ok end, - R3 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), - R4 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}), - ?assertMatch(R3, Key1), - ?assertMatch(R4, Key2), - S2 = pcl_pushmem(PCL, KL2), - if S2 == pause -> timer:sleep(1000); true -> ok end, - S3 = pcl_pushmem(PCL, [Key3]), - if S3 == pause -> timer:sleep(1000); true -> ok end, - R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), - R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}), - R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}), - ?assertMatch(R5, Key1), - ?assertMatch(R6, Key2), - ?assertMatch(R7, Key3), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001"})), + maybe_pause_push(pcl_pushmem(PCL, [Key2])), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001"})), + ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002"})), + maybe_pause_push(pcl_pushmem(PCL, KL2)), + maybe_pause_push(pcl_pushmem(PCL, [Key3])), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001"})), + ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002"})), + ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003"})), ok = pcl_close(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), @@ -1268,27 +1229,20 @@ simple_server_test() -> io:format("Unexpected sequence number on restart ~w~n", [TopSQN]), error end, - ?assertMatch(Check, ok), - R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), - R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), - R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), - ?assertMatch(R8, Key1), - ?assertMatch(R9, Key2), - ?assertMatch(R10, Key3), + ?assertMatch(ok, Check), + ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"})), + ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"})), + ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"})), S4 = pcl_pushmem(PCLr, KL3), if S4 == pause -> timer:sleep(1000); true -> ok end, S5 = pcl_pushmem(PCLr, [Key4]), if S5 == pause -> timer:sleep(1000); true -> ok end, S6 = pcl_pushmem(PCLr, KL4), if S6 == pause -> timer:sleep(1000); true -> ok end, - R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), - R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), - R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), - R14 = pcl_fetch(PCLr, {o,"Bucket0004", "Key0004"}), - ?assertMatch(R11, Key1), - ?assertMatch(R12, Key2), - ?assertMatch(R13, Key3), - ?assertMatch(R14, Key4), + ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"})), + ?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"})), + ?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"})), + ?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004"})), SnapOpts = #penciller_options{start_snapshot = true, source_penciller = PCLr}, {ok, PclSnap} = pcl_start(SnapOpts), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 9496e2f..3f1560a 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -221,6 +221,7 @@ check_bookie_forlist(Bookie, ChkList) -> check_bookie_forlist(Bookie, ChkList, false). check_bookie_forlist(Bookie, ChkList, Log) -> + SW = os:timestamp(), lists:foreach(fun({_RN, Obj, _Spc}) -> if Log == true -> @@ -232,15 +233,20 @@ check_bookie_forlist(Bookie, ChkList, Log) -> Obj#r_object.bucket, Obj#r_object.key), R = {ok, Obj} end, - ChkList). + ChkList), + io:format("Fetch check took ~w microseconds checking list of length ~w~n", + [timer:now_diff(os:timestamp(), SW), length(ChkList)]). check_bookie_formissinglist(Bookie, ChkList) -> + SW = os:timestamp(), lists:foreach(fun({_RN, Obj, _Spc}) -> R = leveled_bookie:book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), R = not_found end, - ChkList). + ChkList), + io:format("Miss check took ~w microseconds checking list of length ~w~n", + [timer:now_diff(os:timestamp(), SW), length(ChkList)]). check_bookie_forobject(Bookie, TestObject) -> {ok, TestObject} = leveled_bookie:book_riakget(Bookie,