End-to-end test
Changes to ensure working of first end-to-end test (with a single Key and Value)
This commit is contained in:
parent
5127119669
commit
b452fbe27c
5 changed files with 77 additions and 35 deletions
|
@ -142,7 +142,8 @@
|
||||||
strip_to_keyonly/1,
|
strip_to_keyonly/1,
|
||||||
strip_to_keyseqonly/1,
|
strip_to_keyseqonly/1,
|
||||||
strip_to_seqonly/1,
|
strip_to_seqonly/1,
|
||||||
strip_to_statusonly/1]).
|
strip_to_statusonly/1,
|
||||||
|
striphead_to_details/1]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -301,7 +302,11 @@ startup(InkerOpts, PencillerOpts) ->
|
||||||
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
||||||
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
||||||
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
||||||
ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN, fun load_fun/4, Penciller),
|
io:format("LedgerSQN=~w at startup~n", [LedgerSQN]),
|
||||||
|
ok = leveled_inker:ink_loadpcl(Inker,
|
||||||
|
LedgerSQN + 1,
|
||||||
|
fun load_fun/5,
|
||||||
|
Penciller),
|
||||||
{Inker, Penciller}.
|
{Inker, Penciller}.
|
||||||
|
|
||||||
|
|
||||||
|
@ -413,10 +418,11 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
||||||
{ok, Cache}
|
{ok, Cache}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0) ->
|
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
||||||
{MinSQN, MaxSQN, Output} = Acc0,
|
{MinSQN, MaxSQN, Output} = Acc0,
|
||||||
{SQN, PK} = KeyInLedger,
|
{SQN, PK} = KeyInLedger,
|
||||||
{Obj, IndexSpecs} = ValueInLedger,
|
io:format("Reloading changes with SQN=~w PK=~w~n", [SQN, PK]),
|
||||||
|
{Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)),
|
||||||
case SQN of
|
case SQN of
|
||||||
SQN when SQN < MinSQN ->
|
SQN when SQN < MinSQN ->
|
||||||
{loop, Acc0};
|
{loop, Acc0};
|
||||||
|
@ -445,7 +451,7 @@ reset_filestructure() ->
|
||||||
|
|
||||||
single_key_test() ->
|
single_key_test() ->
|
||||||
RootPath = reset_filestructure(),
|
RootPath = reset_filestructure(),
|
||||||
{ok, Bookie} = book_start(#bookie_options{root_path=RootPath}),
|
{ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}),
|
||||||
{B1, K1, V1, Spec1, MD} = {"Bucket1",
|
{B1, K1, V1, Spec1, MD} = {"Bucket1",
|
||||||
"Key1",
|
"Key1",
|
||||||
"Value1",
|
"Value1",
|
||||||
|
@ -453,10 +459,14 @@ single_key_test() ->
|
||||||
{"MDK1", "MDV1"}},
|
{"MDK1", "MDV1"}},
|
||||||
Content = #r_content{metadata=MD, value=V1},
|
Content = #r_content{metadata=MD, value=V1},
|
||||||
Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
||||||
ok = book_riakput(Bookie, Object, Spec1),
|
ok = book_riakput(Bookie1, Object, Spec1),
|
||||||
{ok, F1} = book_riakget(Bookie, B1, K1),
|
{ok, F1} = book_riakget(Bookie1, B1, K1),
|
||||||
?assertMatch(F1, Object),
|
?assertMatch(F1, Object),
|
||||||
ok = book_close(Bookie),
|
ok = book_close(Bookie1),
|
||||||
|
{ok, Bookie2} = book_start(#bookie_options{root_path=RootPath}),
|
||||||
|
{ok, F2} = book_riakget(Bookie2, B1, K1),
|
||||||
|
?assertMatch(F2, Object),
|
||||||
|
ok = book_close(Bookie2),
|
||||||
reset_filestructure().
|
reset_filestructure().
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -253,13 +253,17 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
|
||||||
StartPos ->
|
StartPos ->
|
||||||
{ok, StartPos}
|
{ok, StartPos}
|
||||||
end,
|
end,
|
||||||
ok = check_last_key(State#state.last_key),
|
case check_last_key(State#state.last_key) of
|
||||||
{LastPosition, Acc2} = scan_over_file(State#state.handle,
|
ok ->
|
||||||
StartPos0,
|
{LastPosition, Acc2} = scan_over_file(State#state.handle,
|
||||||
FilterFun,
|
StartPos0,
|
||||||
Acc,
|
FilterFun,
|
||||||
State#state.last_key),
|
Acc,
|
||||||
{reply, {LastPosition, Acc2}, State};
|
State#state.last_key),
|
||||||
|
{reply, {LastPosition, Acc2}, State};
|
||||||
|
empty ->
|
||||||
|
{reply, {eof, Acc}, State}
|
||||||
|
end;
|
||||||
handle_call(cdb_close, _From, State) ->
|
handle_call(cdb_close, _From, State) ->
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
{stop, normal, ok, State#state{handle=undefined}};
|
{stop, normal, ok, State#state{handle=undefined}};
|
||||||
|
@ -382,11 +386,11 @@ open_active_file(FileName) when is_list(FileName) ->
|
||||||
case file:position(Handle, eof) of
|
case file:position(Handle, eof) of
|
||||||
{ok, LastPosition} ->
|
{ok, LastPosition} ->
|
||||||
ok = file:close(Handle);
|
ok = file:close(Handle);
|
||||||
{ok, _} ->
|
{ok, EndPosition} ->
|
||||||
LogDetails = [LastPosition, file:position(Handle, eof)],
|
LogDetails = [LastPosition, EndPosition],
|
||||||
io:format("File to be truncated at last position of ~w "
|
io:format("File to be truncated at last position of ~w "
|
||||||
"with end of file at ~w~n", LogDetails),
|
"with end of file at ~w~n", LogDetails),
|
||||||
{ok, LastPosition} = file:position(Handle, LastPosition),
|
{ok, _LastPosition} = file:position(Handle, LastPosition),
|
||||||
ok = file:truncate(Handle),
|
ok = file:truncate(Handle),
|
||||||
ok = file:close(Handle)
|
ok = file:close(Handle)
|
||||||
end,
|
end,
|
||||||
|
@ -653,25 +657,33 @@ startup_scan_over_file(Handle, Position) ->
|
||||||
HashTree = array:new(256, {default, gb_trees:empty()}),
|
HashTree = array:new(256, {default, gb_trees:empty()}),
|
||||||
scan_over_file(Handle,
|
scan_over_file(Handle,
|
||||||
Position,
|
Position,
|
||||||
fun startup_filter/4,
|
fun startup_filter/5,
|
||||||
{HashTree, empty},
|
{HashTree, empty},
|
||||||
undefined).
|
empty).
|
||||||
|
|
||||||
%% Scan for key changes - scan over file returning applying FilterFun
|
%% Scan for key changes - scan over file returning applying FilterFun
|
||||||
%% The FilterFun should accept as input:
|
%% The FilterFun should accept as input:
|
||||||
%% - Key, Value, Position, Accumulator, outputting a new Accumulator
|
%% - Key, ValueBin, Position, Accumulator, Fun (to extract values from Binary)
|
||||||
%% and a loop|stop instruction as a tuple i.e. {loop, Acc} or {stop, Acc}
|
%% -> outputting a new Accumulator and a loop|stop instruction as a tuple
|
||||||
|
%% i.e. {loop, Acc} or {stop, Acc}
|
||||||
|
|
||||||
scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
||||||
case saferead_keyvalue(Handle) of
|
case saferead_keyvalue(Handle) of
|
||||||
false ->
|
false ->
|
||||||
|
io:format("Failure to read Key/Value at Position ~w"
|
||||||
|
++ " in scan~n", [Position]),
|
||||||
{Position, Output};
|
{Position, Output};
|
||||||
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
||||||
NewPosition = Position + KeyLength + ValueLength
|
NewPosition = Position + KeyLength + ValueLength
|
||||||
+ ?DWORD_SIZE,
|
+ ?DWORD_SIZE,
|
||||||
case {FilterFun(Key, ValueAsBin, Position, Output), Key} of
|
case {FilterFun(Key,
|
||||||
|
ValueAsBin,
|
||||||
|
Position,
|
||||||
|
Output,
|
||||||
|
fun extract_value/1),
|
||||||
|
Key} of
|
||||||
{{stop, UpdOutput}, _} ->
|
{{stop, UpdOutput}, _} ->
|
||||||
{Position, UpdOutput};
|
{NewPosition, UpdOutput};
|
||||||
{{loop, UpdOutput}, LastKey} ->
|
{{loop, UpdOutput}, LastKey} ->
|
||||||
{eof, UpdOutput};
|
{eof, UpdOutput};
|
||||||
{{loop, UpdOutput}, _} ->
|
{{loop, UpdOutput}, _} ->
|
||||||
|
@ -687,7 +699,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
||||||
%% cdb file, and returns at the end the hashtree and the final Key seen in the
|
%% cdb file, and returns at the end the hashtree and the final Key seen in the
|
||||||
%% journal
|
%% journal
|
||||||
|
|
||||||
startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) ->
|
startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) ->
|
||||||
case crccheck_value(ValueAsBin) of
|
case crccheck_value(ValueAsBin) of
|
||||||
true ->
|
true ->
|
||||||
{loop, {put_hashtree(Key, Position, Hashtree), Key}};
|
{loop, {put_hashtree(Key, Position, Hashtree), Key}};
|
||||||
|
@ -700,7 +712,7 @@ startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) ->
|
||||||
check_last_key(LastKey) ->
|
check_last_key(LastKey) ->
|
||||||
case LastKey of
|
case LastKey of
|
||||||
undefined -> error;
|
undefined -> error;
|
||||||
empty -> error;
|
empty -> empty;
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -807,6 +819,11 @@ read_next_term(Handle, Length, crc, Check) ->
|
||||||
{unchecked, binary_to_term(Bin)}
|
{unchecked, binary_to_term(Bin)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Extract value from binary containing CRC
|
||||||
|
extract_value(ValueAsBin) ->
|
||||||
|
<<_CRC:32/integer, Bin/binary>> = ValueAsBin,
|
||||||
|
binary_to_term(Bin).
|
||||||
|
|
||||||
|
|
||||||
%% Used for reading lengths
|
%% Used for reading lengths
|
||||||
%% Note that the endian_flip is required to make the file format compatible
|
%% Note that the endian_flip is required to make the file format compatible
|
||||||
|
|
|
@ -276,6 +276,10 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
||||||
NewSQN = State#state.journal_sqn + 1,
|
NewSQN = State#state.journal_sqn + 1,
|
||||||
|
%% TODO: The term goes through a double binary_to_term conversion
|
||||||
|
%% as the CDB will also do the same conversion
|
||||||
|
%% Perhaps have CDB started up in apure binary mode, when it doesn't
|
||||||
|
%5 receive terms?
|
||||||
Bin1 = term_to_binary({Object, KeyChanges}, [compressed]),
|
Bin1 = term_to_binary({Object, KeyChanges}, [compressed]),
|
||||||
ObjSize = byte_size(Bin1),
|
ObjSize = byte_size(Bin1),
|
||||||
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
||||||
|
@ -493,8 +497,9 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
|
||||||
|
|
||||||
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
|
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
|
||||||
ok;
|
ok;
|
||||||
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, _FN, Pid}|ManTail])
|
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail])
|
||||||
when MinSQN >= LowSQN ->
|
when MinSQN >= LowSQN ->
|
||||||
|
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
|
||||||
ok = load_between_sequence(MinSQN,
|
ok = load_between_sequence(MinSQN,
|
||||||
MinSQN + ?LOADING_BATCH,
|
MinSQN + ?LOADING_BATCH,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
|
@ -508,11 +513,11 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) ->
|
||||||
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) ->
|
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) ->
|
||||||
InitAcc = {MinSQN, MaxSQN, []},
|
InitAcc = {MinSQN, MaxSQN, []},
|
||||||
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
|
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
|
||||||
{eof, Acc} ->
|
{eof, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||||
ok = push_to_penciller(Penciller, Acc),
|
ok = push_to_penciller(Penciller, AccKL),
|
||||||
ok;
|
ok;
|
||||||
{LastPosition, Acc} ->
|
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||||
ok = push_to_penciller(Penciller, Acc),
|
ok = push_to_penciller(Penciller, AccKL),
|
||||||
load_between_sequence(MaxSQN + 1,
|
load_between_sequence(MaxSQN + 1,
|
||||||
MaxSQN + 1 + ?LOADING_BATCH,
|
MaxSQN + 1 + ?LOADING_BATCH,
|
||||||
FilterFun,
|
FilterFun,
|
||||||
|
@ -522,7 +527,7 @@ load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
push_to_penciller(Penciller, KeyList) ->
|
push_to_penciller(Penciller, KeyList) ->
|
||||||
R = leveled_penciler:pcl_pushmem(Penciller, KeyList),
|
R = leveled_penciller:pcl_pushmem(Penciller, KeyList),
|
||||||
if
|
if
|
||||||
R == pause ->
|
R == pause ->
|
||||||
timer:sleep(?LOADING_PAUSE);
|
timer:sleep(?LOADING_PAUSE);
|
||||||
|
|
|
@ -161,6 +161,7 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3,
|
code_change/3,
|
||||||
pcl_start/1,
|
pcl_start/1,
|
||||||
|
pcl_quickstart/1,
|
||||||
pcl_pushmem/2,
|
pcl_pushmem/2,
|
||||||
pcl_fetch/2,
|
pcl_fetch/2,
|
||||||
pcl_workforclerk/1,
|
pcl_workforclerk/1,
|
||||||
|
@ -205,6 +206,9 @@
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
|
pcl_quickstart(RootPath) ->
|
||||||
|
pcl_start(#penciller_options{root_path=RootPath}).
|
||||||
|
|
||||||
pcl_start(PCLopts) ->
|
pcl_start(PCLopts) ->
|
||||||
gen_server:start(?MODULE, [PCLopts], []).
|
gen_server:start(?MODULE, [PCLopts], []).
|
||||||
|
@ -349,7 +353,10 @@ handle_call({push_mem, DumpList}, _From, State) ->
|
||||||
++ "having sequence numbers between ~w and ~w "
|
++ "having sequence numbers between ~w and ~w "
|
||||||
++ "but current sequence number is ~w~n",
|
++ "but current sequence number is ~w~n",
|
||||||
[MinSQN, MaxSQN, State#state.ledger_sqn]),
|
[MinSQN, MaxSQN, State#state.ledger_sqn]),
|
||||||
{reply, refused, State}
|
{reply, refused, State};
|
||||||
|
empty ->
|
||||||
|
io:format("Empty request pushed to Penciller~n"),
|
||||||
|
{reply, ok, State}
|
||||||
end,
|
end,
|
||||||
io:format("Push completed in ~w microseconds~n",
|
io:format("Push completed in ~w microseconds~n",
|
||||||
[timer:now_diff(os:timestamp(),StartWatch)]),
|
[timer:now_diff(os:timestamp(),StartWatch)]),
|
||||||
|
@ -830,6 +837,8 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredIterators) ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
assess_sqn([]) ->
|
||||||
|
empty;
|
||||||
assess_sqn(DumpList) ->
|
assess_sqn(DumpList) ->
|
||||||
assess_sqn(DumpList, infinity, 0).
|
assess_sqn(DumpList, infinity, 0).
|
||||||
|
|
||||||
|
|
|
@ -1069,8 +1069,9 @@ key_dominates_expanded([], [H2|T2], Level) ->
|
||||||
{{next_key, H2}, [], maybe_expand_pointer(T2)}
|
{{next_key, H2}, [], maybe_expand_pointer(T2)}
|
||||||
end;
|
end;
|
||||||
key_dominates_expanded([H1|T1], [H2|T2], Level) ->
|
key_dominates_expanded([H1|T1], [H2|T2], Level) ->
|
||||||
{K1, Sq1, St1} = leveled_bookie:strip_to_details(H1),
|
{{K1, V1}, {K2, V2}} = {H1, H2},
|
||||||
{K2, Sq2, St2} = leveled_bookie:strip_to_details(H2),
|
{Sq1, St1, _MD1} = leveled_bookie:striphead_to_details(V1),
|
||||||
|
{Sq2, St2, _MD2} = leveled_bookie:striphead_to_details(V2),
|
||||||
case K1 of
|
case K1 of
|
||||||
K2 ->
|
K2 ->
|
||||||
case Sq1 > Sq2 of
|
case Sq1 > Sq2 of
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue