From b452fbe27c97f93f42c349d8efe00f0ab7917b94 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 15 Sep 2016 18:38:23 +0100 Subject: [PATCH] End-to-end test Changes to ensure working of first end-to-end test (with a single Key and Value) --- src/leveled_bookie.erl | 26 +++++++++++++------ src/leveled_cdb.erl | 53 ++++++++++++++++++++++++++------------- src/leveled_inker.erl | 17 ++++++++----- src/leveled_penciller.erl | 11 +++++++- src/leveled_sft.erl | 5 ++-- 5 files changed, 77 insertions(+), 35 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 25326f2..e95aa80 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -142,7 +142,8 @@ strip_to_keyonly/1, strip_to_keyseqonly/1, strip_to_seqonly/1, - strip_to_statusonly/1]). + strip_to_statusonly/1, + striphead_to_details/1]). -include_lib("eunit/include/eunit.hrl"). @@ -301,7 +302,11 @@ startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), - ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN, fun load_fun/4, Penciller), + io:format("LedgerSQN=~w at startup~n", [LedgerSQN]), + ok = leveled_inker:ink_loadpcl(Inker, + LedgerSQN + 1, + fun load_fun/5, + Penciller), {Inker, Penciller}. @@ -413,10 +418,11 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> {ok, Cache} end. -load_fun(KeyInLedger, ValueInLedger, _Position, Acc0) -> +load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, Output} = Acc0, {SQN, PK} = KeyInLedger, - {Obj, IndexSpecs} = ValueInLedger, + io:format("Reloading changes with SQN=~w PK=~w~n", [SQN, PK]), + {Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)), case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; @@ -445,7 +451,7 @@ reset_filestructure() -> single_key_test() -> RootPath = reset_filestructure(), - {ok, Bookie} = book_start(#bookie_options{root_path=RootPath}), + {ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}), {B1, K1, V1, Spec1, MD} = {"Bucket1", "Key1", "Value1", @@ -453,10 +459,14 @@ single_key_test() -> {"MDK1", "MDV1"}}, Content = #r_content{metadata=MD, value=V1}, Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, - ok = book_riakput(Bookie, Object, Spec1), - {ok, F1} = book_riakget(Bookie, B1, K1), + ok = book_riakput(Bookie1, Object, Spec1), + {ok, F1} = book_riakget(Bookie1, B1, K1), ?assertMatch(F1, Object), - ok = book_close(Bookie), + ok = book_close(Bookie1), + {ok, Bookie2} = book_start(#bookie_options{root_path=RootPath}), + {ok, F2} = book_riakget(Bookie2, B1, K1), + ?assertMatch(F2, Object), + ok = book_close(Bookie2), reset_filestructure(). -endif. \ No newline at end of file diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index cba127d..24ffc71 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -253,13 +253,17 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> StartPos -> {ok, StartPos} end, - ok = check_last_key(State#state.last_key), - {LastPosition, Acc2} = scan_over_file(State#state.handle, - StartPos0, - FilterFun, - Acc, - State#state.last_key), - {reply, {LastPosition, Acc2}, State}; + case check_last_key(State#state.last_key) of + ok -> + {LastPosition, Acc2} = scan_over_file(State#state.handle, + StartPos0, + FilterFun, + Acc, + State#state.last_key), + {reply, {LastPosition, Acc2}, State}; + empty -> + {reply, {eof, Acc}, State} + end; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=undefined}}; @@ -382,11 +386,11 @@ open_active_file(FileName) when is_list(FileName) -> case file:position(Handle, eof) of {ok, LastPosition} -> ok = file:close(Handle); - {ok, _} -> - LogDetails = [LastPosition, file:position(Handle, eof)], + {ok, EndPosition} -> + LogDetails = [LastPosition, EndPosition], io:format("File to be truncated at last position of ~w " "with end of file at ~w~n", LogDetails), - {ok, LastPosition} = file:position(Handle, LastPosition), + {ok, _LastPosition} = file:position(Handle, LastPosition), ok = file:truncate(Handle), ok = file:close(Handle) end, @@ -653,25 +657,33 @@ startup_scan_over_file(Handle, Position) -> HashTree = array:new(256, {default, gb_trees:empty()}), scan_over_file(Handle, Position, - fun startup_filter/4, + fun startup_filter/5, {HashTree, empty}, - undefined). + empty). %% Scan for key changes - scan over file returning applying FilterFun %% The FilterFun should accept as input: -%% - Key, Value, Position, Accumulator, outputting a new Accumulator -%% and a loop|stop instruction as a tuple i.e. {loop, Acc} or {stop, Acc} +%% - Key, ValueBin, Position, Accumulator, Fun (to extract values from Binary) +%% -> outputting a new Accumulator and a loop|stop instruction as a tuple +%% i.e. {loop, Acc} or {stop, Acc} scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> case saferead_keyvalue(Handle) of false -> + io:format("Failure to read Key/Value at Position ~w" + ++ " in scan~n", [Position]), {Position, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> NewPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE, - case {FilterFun(Key, ValueAsBin, Position, Output), Key} of + case {FilterFun(Key, + ValueAsBin, + Position, + Output, + fun extract_value/1), + Key} of {{stop, UpdOutput}, _} -> - {Position, UpdOutput}; + {NewPosition, UpdOutput}; {{loop, UpdOutput}, LastKey} -> {eof, UpdOutput}; {{loop, UpdOutput}, _} -> @@ -687,7 +699,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> %% cdb file, and returns at the end the hashtree and the final Key seen in the %% journal -startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) -> +startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) -> case crccheck_value(ValueAsBin) of true -> {loop, {put_hashtree(Key, Position, Hashtree), Key}}; @@ -700,7 +712,7 @@ startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) -> check_last_key(LastKey) -> case LastKey of undefined -> error; - empty -> error; + empty -> empty; _ -> ok end. @@ -807,6 +819,11 @@ read_next_term(Handle, Length, crc, Check) -> {unchecked, binary_to_term(Bin)} end. +%% Extract value from binary containing CRC +extract_value(ValueAsBin) -> + <<_CRC:32/integer, Bin/binary>> = ValueAsBin, + binary_to_term(Bin). + %% Used for reading lengths %% Note that the endian_flip is required to make the file format compatible diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index e5302c0..0c0529c 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -276,6 +276,10 @@ code_change(_OldVsn, State, _Extra) -> put_object(PrimaryKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, + %% TODO: The term goes through a double binary_to_term conversion + %% as the CDB will also do the same conversion + %% Perhaps have CDB started up in apure binary mode, when it doesn't + %5 receive terms? Bin1 = term_to_binary({Object, KeyChanges}, [compressed]), ObjSize = byte_size(Bin1), case leveled_cdb:cdb_put(State#state.active_journaldb, @@ -493,8 +497,9 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> ok; -load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, _FN, Pid}|ManTail]) +load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail]) when MinSQN >= LowSQN -> + io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), ok = load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun, @@ -508,11 +513,11 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) -> load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) -> InitAcc = {MinSQN, MaxSQN, []}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, Acc} -> - ok = push_to_penciller(Penciller, Acc), + {eof, {_AccMinSQN, _AccMaxSQN, AccKL}} -> + ok = push_to_penciller(Penciller, AccKL), ok; - {LastPosition, Acc} -> - ok = push_to_penciller(Penciller, Acc), + {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> + ok = push_to_penciller(Penciller, AccKL), load_between_sequence(MaxSQN + 1, MaxSQN + 1 + ?LOADING_BATCH, FilterFun, @@ -522,7 +527,7 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) -> end. push_to_penciller(Penciller, KeyList) -> - R = leveled_penciler:pcl_pushmem(Penciller, KeyList), + R = leveled_penciller:pcl_pushmem(Penciller, KeyList), if R == pause -> timer:sleep(?LOADING_PAUSE); diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 09babae..777baba 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -161,6 +161,7 @@ terminate/2, code_change/3, pcl_start/1, + pcl_quickstart/1, pcl_pushmem/2, pcl_fetch/2, pcl_workforclerk/1, @@ -205,6 +206,9 @@ %%%============================================================================ %%% API %%%============================================================================ + +pcl_quickstart(RootPath) -> + pcl_start(#penciller_options{root_path=RootPath}). pcl_start(PCLopts) -> gen_server:start(?MODULE, [PCLopts], []). @@ -349,7 +353,10 @@ handle_call({push_mem, DumpList}, _From, State) -> ++ "having sequence numbers between ~w and ~w " ++ "but current sequence number is ~w~n", [MinSQN, MaxSQN, State#state.ledger_sqn]), - {reply, refused, State} + {reply, refused, State}; + empty -> + io:format("Empty request pushed to Penciller~n"), + {reply, ok, State} end, io:format("Push completed in ~w microseconds~n", [timer:now_diff(os:timestamp(),StartWatch)]), @@ -830,6 +837,8 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) -> +assess_sqn([]) -> + empty; assess_sqn(DumpList) -> assess_sqn(DumpList, infinity, 0). diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index d4b81fb..793edc5 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -1069,8 +1069,9 @@ key_dominates_expanded([], [H2|T2], Level) -> {{next_key, H2}, [], maybe_expand_pointer(T2)} end; key_dominates_expanded([H1|T1], [H2|T2], Level) -> - {K1, Sq1, St1} = leveled_bookie:strip_to_details(H1), - {K2, Sq2, St2} = leveled_bookie:strip_to_details(H2), + {{K1, V1}, {K2, V2}} = {H1, H2}, + {Sq1, St1, _MD1} = leveled_bookie:striphead_to_details(V1), + {Sq2, St2, _MD2} = leveled_bookie:striphead_to_details(V2), case K1 of K2 -> case Sq1 > Sq2 of