Further bookie test - CDB optimisation and Inker manifest correction
Additional bookie test revealed that the persisting/reading of inker manifests was inconsistent and buggy. Also, the CDB files were inffeciently writing the top index table - needed to be improved as this is blokicng on a roll
This commit is contained in:
parent
b452fbe27c
commit
7c28ffbd96
3 changed files with 198 additions and 50 deletions
|
@ -448,7 +448,25 @@ reset_filestructure() ->
|
|||
leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP),
|
||||
leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP),
|
||||
RootPath.
|
||||
|
||||
generate_multiple_objects(Count, KeyNumber) ->
|
||||
generate_multiple_objects(Count, KeyNumber, []).
|
||||
|
||||
generate_multiple_objects(0, _KeyNumber, ObjL) ->
|
||||
ObjL;
|
||||
generate_multiple_objects(Count, KeyNumber, ObjL) ->
|
||||
Obj = {"Bucket",
|
||||
"Key" ++ integer_to_list(KeyNumber),
|
||||
crypto:rand_bytes(128),
|
||||
[],
|
||||
[{"MDK", "MDV" ++ integer_to_list(KeyNumber)},
|
||||
{"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]},
|
||||
{B1, K1, V1, Spec1, MD} = Obj,
|
||||
Content = #r_content{metadata=MD, value=V1},
|
||||
Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
||||
generate_multiple_objects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]).
|
||||
|
||||
|
||||
single_key_test() ->
|
||||
RootPath = reset_filestructure(),
|
||||
{ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}),
|
||||
|
@ -469,4 +487,61 @@ single_key_test() ->
|
|||
ok = book_close(Bookie2),
|
||||
reset_filestructure().
|
||||
|
||||
multi_key_test() ->
|
||||
RootPath = reset_filestructure(),
|
||||
{ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}),
|
||||
{B1, K1, V1, Spec1, MD1} = {"Bucket",
|
||||
"Key1",
|
||||
"Value1",
|
||||
[],
|
||||
{"MDK1", "MDV1"}},
|
||||
C1 = #r_content{metadata=MD1, value=V1},
|
||||
Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]},
|
||||
{B2, K2, V2, Spec2, MD2} = {"Bucket",
|
||||
"Key2",
|
||||
"Value2",
|
||||
[],
|
||||
{"MDK2", "MDV2"}},
|
||||
C2 = #r_content{metadata=MD2, value=V2},
|
||||
Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]},
|
||||
ok = book_riakput(Bookie1, Obj1, Spec1),
|
||||
ObjL1 = generate_multiple_objects(100, 3),
|
||||
SW1 = os:timestamp(),
|
||||
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL1),
|
||||
io:format("PUT of 100 objects completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW1)]),
|
||||
ok = book_riakput(Bookie1, Obj2, Spec2),
|
||||
{ok, F1A} = book_riakget(Bookie1, B1, K1),
|
||||
?assertMatch(F1A, Obj1),
|
||||
{ok, F2A} = book_riakget(Bookie1, B2, K2),
|
||||
?assertMatch(F2A, Obj2),
|
||||
ObjL2 = generate_multiple_objects(100, 103),
|
||||
SW2 = os:timestamp(),
|
||||
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL2),
|
||||
io:format("PUT of 100 objects completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW2)]),
|
||||
{ok, F1B} = book_riakget(Bookie1, B1, K1),
|
||||
?assertMatch(F1B, Obj1),
|
||||
{ok, F2B} = book_riakget(Bookie1, B2, K2),
|
||||
?assertMatch(F2B, Obj2),
|
||||
ok = book_close(Bookie1),
|
||||
%% Now reopen the file, and confirm that a fetch is still possible
|
||||
{ok, Bookie2} = book_start(#bookie_options{root_path=RootPath}),
|
||||
{ok, F1C} = book_riakget(Bookie2, B1, K1),
|
||||
?assertMatch(F1C, Obj1),
|
||||
{ok, F2C} = book_riakget(Bookie2, B2, K2),
|
||||
?assertMatch(F2C, Obj2),
|
||||
ObjL3 = generate_multiple_objects(100, 203),
|
||||
SW3 = os:timestamp(),
|
||||
lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie2, O, S) end, ObjL3),
|
||||
io:format("PUT of 100 objects completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW3)]),
|
||||
{ok, F1D} = book_riakget(Bookie2, B1, K1),
|
||||
?assertMatch(F1D, Obj1),
|
||||
{ok, F2D} = book_riakget(Bookie2, B2, K2),
|
||||
?assertMatch(F2D, Obj2),
|
||||
ok = book_close(Bookie2),
|
||||
reset_filestructure(),
|
||||
?assertMatch(true, false).
|
||||
|
||||
-endif.
|
|
@ -177,7 +177,9 @@ handle_call({cdb_open_reader, Filename}, _From, State) ->
|
|||
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||
Index = load_index(Handle),
|
||||
LastKey = find_lastkey(Handle, Index),
|
||||
{reply, ok, State#state{handle=Handle,
|
||||
last_key=LastKey,
|
||||
filename=Filename,
|
||||
writer=false,
|
||||
hash_index=Index}};
|
||||
|
@ -464,8 +466,8 @@ get_index(Handle, Index, no_cache) ->
|
|||
% Get location of hashtable and number of entries in the hash
|
||||
read_next_2_integers(Handle);
|
||||
get_index(_Handle, Index, Cache) ->
|
||||
{_Pointer, Count} = lists:keyfind(Index, 1, Cache),
|
||||
Count.
|
||||
{Index, {Pointer, Count}} = lists:keyfind(Index, 1, Cache),
|
||||
{Pointer, Count}.
|
||||
|
||||
%% Get a Key/Value pair from an active CDB file (with no hash table written)
|
||||
%% This requires a key dictionary to be passed in (mapping keys to positions)
|
||||
|
@ -593,6 +595,31 @@ load_index(Handle) ->
|
|||
{X, {HashTablePos, Count}} end,
|
||||
Index).
|
||||
|
||||
%% Function to find the LastKey in the file
|
||||
find_lastkey(Handle, IndexCache) ->
|
||||
LastPosition = scan_index(Handle, IndexCache),
|
||||
{ok, _} = file:position(Handle, LastPosition),
|
||||
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
|
||||
read_next_term(Handle, KeyLength).
|
||||
|
||||
scan_index(Handle, IndexCache) ->
|
||||
lists:foldl(fun({_X, {Pos, Count}}, LastPosition) ->
|
||||
scan_index(Handle, Pos, 0, Count, LastPosition) end,
|
||||
0,
|
||||
IndexCache).
|
||||
|
||||
scan_index(_Handle, _Position, Count, Checks, LastPosition)
|
||||
when Count == Checks ->
|
||||
LastPosition;
|
||||
scan_index(Handle, Position, Count, Checks, LastPosition) ->
|
||||
{ok, _} = file:position(Handle, Position + ?DWORD_SIZE * Count),
|
||||
{_Hash, HPosition} = read_next_2_integers(Handle),
|
||||
scan_index(Handle,
|
||||
Position,
|
||||
Count + 1 ,
|
||||
Checks,
|
||||
max(LastPosition, HPosition)).
|
||||
|
||||
|
||||
%% Take an active file and write the hash details necessary to close that
|
||||
%% file and roll a new active file if requested.
|
||||
|
@ -601,8 +628,13 @@ load_index(Handle) ->
|
|||
%% the hash tables
|
||||
close_file(Handle, HashTree, BasePos) ->
|
||||
{ok, BasePos} = file:position(Handle, BasePos),
|
||||
SW = os:timestamp(),
|
||||
L2 = write_hash_tables(Handle, HashTree),
|
||||
io:format("Hash Table write took ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
write_top_index_table(Handle, BasePos, L2),
|
||||
io:format("Top Index Table write took ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
file:close(Handle).
|
||||
|
||||
|
||||
|
@ -661,6 +693,19 @@ startup_scan_over_file(Handle, Position) ->
|
|||
{HashTree, empty},
|
||||
empty).
|
||||
|
||||
%% Specific filter to be used at startup to build a hashtree for an incomplete
|
||||
%% cdb file, and returns at the end the hashtree and the final Key seen in the
|
||||
%% journal
|
||||
|
||||
startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) ->
|
||||
case crccheck_value(ValueAsBin) of
|
||||
true ->
|
||||
{loop, {put_hashtree(Key, Position, Hashtree), Key}};
|
||||
false ->
|
||||
{stop, {Hashtree, LastKey}}
|
||||
end.
|
||||
|
||||
|
||||
%% Scan for key changes - scan over file returning applying FilterFun
|
||||
%% The FilterFun should accept as input:
|
||||
%% - Key, ValueBin, Position, Accumulator, Fun (to extract values from Binary)
|
||||
|
@ -674,39 +719,34 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
|
|||
++ " in scan~n", [Position]),
|
||||
{Position, Output};
|
||||
{Key, ValueAsBin, KeyLength, ValueLength} ->
|
||||
NewPosition = Position + KeyLength + ValueLength
|
||||
+ ?DWORD_SIZE,
|
||||
case {FilterFun(Key,
|
||||
NewPosition = case Key of
|
||||
LastKey ->
|
||||
eof;
|
||||
_ ->
|
||||
Position + KeyLength + ValueLength
|
||||
+ ?DWORD_SIZE
|
||||
end,
|
||||
case FilterFun(Key,
|
||||
ValueAsBin,
|
||||
Position,
|
||||
Output,
|
||||
fun extract_value/1),
|
||||
Key} of
|
||||
{{stop, UpdOutput}, _} ->
|
||||
fun extract_value/1) of
|
||||
{stop, UpdOutput} ->
|
||||
{NewPosition, UpdOutput};
|
||||
{{loop, UpdOutput}, LastKey} ->
|
||||
{eof, UpdOutput};
|
||||
{{loop, UpdOutput}, _} ->
|
||||
scan_over_file(Handle,
|
||||
NewPosition,
|
||||
FilterFun,
|
||||
UpdOutput,
|
||||
Key)
|
||||
{loop, UpdOutput} ->
|
||||
case NewPosition of
|
||||
eof ->
|
||||
{eof, UpdOutput};
|
||||
_ ->
|
||||
scan_over_file(Handle,
|
||||
NewPosition,
|
||||
FilterFun,
|
||||
UpdOutput,
|
||||
LastKey)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%% Specific filter to be used at startup to build a hashtree for an incomplete
|
||||
%% cdb file, and returns at the end the hashtree and the final Key seen in the
|
||||
%% journal
|
||||
|
||||
startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) ->
|
||||
case crccheck_value(ValueAsBin) of
|
||||
true ->
|
||||
{loop, {put_hashtree(Key, Position, Hashtree), Key}};
|
||||
false ->
|
||||
{stop, {Hashtree, LastKey}}
|
||||
end.
|
||||
|
||||
%% Confirm that the last key has been defined and set to a non-default value
|
||||
|
||||
check_last_key(LastKey) ->
|
||||
|
@ -972,17 +1012,16 @@ find_open_slot1([_|RestOfSlots], [_|RestOfEntries]) ->
|
|||
write_top_index_table(Handle, BasePos, List) ->
|
||||
% fold function to find any missing index tuples, and add one a replacement
|
||||
% in this case with a count of 0. Also orders the list by index
|
||||
FnMakeIndex = fun(I, Acc) ->
|
||||
FnMakeIndex = fun(I) ->
|
||||
case lists:keysearch(I, 1, List) of
|
||||
{value, Tuple} ->
|
||||
[Tuple|Acc];
|
||||
Tuple;
|
||||
false ->
|
||||
[{I, BasePos, 0}|Acc]
|
||||
{I, BasePos, 0}
|
||||
end
|
||||
end,
|
||||
% Fold function to write the index entries
|
||||
FnWriteIndex = fun({Index, Pos, Count}, CurrPos) ->
|
||||
{ok, _} = file:position(Handle, ?DWORD_SIZE * Index),
|
||||
FnWriteIndex = fun({_Index, Pos, Count}, {AccBin, CurrPos}) ->
|
||||
case Count == 0 of
|
||||
true ->
|
||||
PosLE = endian_flip(CurrPos),
|
||||
|
@ -992,14 +1031,16 @@ write_top_index_table(Handle, BasePos, List) ->
|
|||
NextPos = Pos + (Count * ?DWORD_SIZE)
|
||||
end,
|
||||
CountLE = endian_flip(Count),
|
||||
Bin = <<PosLE:32, CountLE:32>>,
|
||||
file:write(Handle, Bin),
|
||||
NextPos
|
||||
{<<AccBin/binary, PosLE:32, CountLE:32>>, NextPos}
|
||||
end,
|
||||
|
||||
Seq = lists:seq(0, 255),
|
||||
CompleteList = lists:keysort(1, lists:foldl(FnMakeIndex, [], Seq)),
|
||||
lists:foldl(FnWriteIndex, BasePos, CompleteList),
|
||||
CompleteList = lists:keysort(1, lists:map(FnMakeIndex, Seq)),
|
||||
{IndexBin, _Pos} = lists:foldl(FnWriteIndex,
|
||||
{<<>>, BasePos},
|
||||
CompleteList),
|
||||
{ok, _} = file:position(Handle, 0),
|
||||
ok = file:write(Handle, IndexBin),
|
||||
ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need).
|
||||
|
||||
%% To make this compatible with original Bernstein format this endian flip
|
||||
|
@ -1131,7 +1172,7 @@ full_1_test() ->
|
|||
full_2_test() ->
|
||||
List1 = lists:sort([{lists:flatten(io_lib:format("~s~p",[Prefix,Plug])),
|
||||
lists:flatten(io_lib:format("value~p",[Plug]))}
|
||||
|| Plug <- lists:seq(1,2000),
|
||||
|| Plug <- lists:seq(1,200),
|
||||
Prefix <- ["dsd","so39ds","oe9%#*(","020dkslsldclsldowlslf%$#",
|
||||
"tiep4||","qweq"]]),
|
||||
create("../test/full.cdb",List1),
|
||||
|
@ -1394,4 +1435,22 @@ fold2_test() ->
|
|||
?assertMatch(RD2, Result),
|
||||
ok = file:delete("../test/fold2_test.cdb").
|
||||
|
||||
find_lastkey_test() ->
|
||||
{ok, P1} = cdb_open_writer("../test/lastkey.pnd"),
|
||||
ok = cdb_put(P1, "Key1", "Value1"),
|
||||
ok = cdb_put(P1, "Key3", "Value3"),
|
||||
ok = cdb_put(P1, "Key2", "Value2"),
|
||||
R1 = cdb_lastkey(P1),
|
||||
?assertMatch(R1, "Key2"),
|
||||
ok = cdb_close(P1),
|
||||
{ok, P2} = cdb_open_writer("../test/lastkey.pnd"),
|
||||
R2 = cdb_lastkey(P2),
|
||||
?assertMatch(R2, "Key2"),
|
||||
{ok, F2} = cdb_complete(P2),
|
||||
{ok, P3} = cdb_open_reader(F2),
|
||||
R3 = cdb_lastkey(P3),
|
||||
?assertMatch(R3, "Key2"),
|
||||
ok = cdb_close(P3),
|
||||
ok = file:delete("../test/lastkey.cdb").
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -308,16 +308,25 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
|
|||
end.
|
||||
|
||||
roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
|
||||
SW = os:timestamp(),
|
||||
io:format("Rolling old journal ~w~n", [OldActiveJournal]),
|
||||
{ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal),
|
||||
io:format("Rolling old journal S1 completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
||||
io:format("Rolling old journal S2 completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
JournalRegex2 = "nursery_(?<SQN>[0-9]+)\\." ++ ?JOURNAL_FILEX,
|
||||
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
|
||||
JournalRegex2,
|
||||
'SQN'),
|
||||
NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}),
|
||||
io:format("Rolling old journal S3 completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
NewManifestSQN = ManifestSQN + 1,
|
||||
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
|
||||
io:format("Rolling old journal S4 completed in ~w microseconds~n",
|
||||
[timer:now_diff(os:timestamp(),SW)]),
|
||||
{NewManifest, NewManifestSQN}.
|
||||
|
||||
get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) ->
|
||||
|
@ -358,8 +367,6 @@ build_manifest(ManifestFilenames,
|
|||
ManifestRdrFun,
|
||||
RootPath,
|
||||
CDBopts) ->
|
||||
%% Setup root paths
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
%% Find the manifest with a highest Manifest sequence number
|
||||
%% Open it and read it to get the current Confirmed Manifest
|
||||
ManifestRegex = "(?<MSQN>[0-9]+)\\." ++ ?MANIFEST_FILEX,
|
||||
|
@ -374,9 +381,12 @@ build_manifest(ManifestFilenames,
|
|||
{0, [], [], 0};
|
||||
_ ->
|
||||
PersistedManSQN = lists:max(ValidManSQNs),
|
||||
{J1, M1, R1} = ManifestRdrFun(PersistedManSQN,
|
||||
RootPath),
|
||||
{J1, M1, R1, PersistedManSQN}
|
||||
M1 = ManifestRdrFun(PersistedManSQN, RootPath),
|
||||
J1 = lists:foldl(fun({JSQN, _FN}, Acc) ->
|
||||
max(JSQN, Acc) end,
|
||||
0,
|
||||
M1),
|
||||
{J1, M1, [], PersistedManSQN}
|
||||
end,
|
||||
|
||||
%% Find any more recent immutable files that have a higher sequence number
|
||||
|
@ -415,8 +425,7 @@ build_manifest(ManifestFilenames,
|
|||
io:format("Manifest on startup is: ~n"),
|
||||
manifest_printer(Manifest1),
|
||||
Manifest2 = lists:map(fun({LowSQN, FN}) ->
|
||||
FP = filename:join(JournalFP, FN),
|
||||
{ok, Pid} = leveled_cdb:cdb_open_reader(FP),
|
||||
{ok, Pid} = leveled_cdb:cdb_open_reader(FN),
|
||||
{LowSQN, FN, Pid} end,
|
||||
Manifest1),
|
||||
|
||||
|
@ -498,7 +507,7 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
|
|||
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
|
||||
ok;
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail])
|
||||
when MinSQN >= LowSQN ->
|
||||
when LowSQN >= MinSQN ->
|
||||
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
|
||||
ok = load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
|
@ -576,6 +585,8 @@ filepath(RootPath, NewSQN, new_journal) ->
|
|||
|
||||
simple_manifest_reader(SQN, RootPath) ->
|
||||
ManifestPath = filepath(RootPath, manifest_dir),
|
||||
io:format("Opening manifest file at ~s with SQN ~w~n",
|
||||
[ManifestPath, SQN]),
|
||||
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
||||
integer_to_list(SQN)
|
||||
++ ".man")),
|
||||
|
@ -584,9 +595,12 @@ simple_manifest_reader(SQN, RootPath) ->
|
|||
|
||||
simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||
ManPath = filepath(RootPath, manifest_dir),
|
||||
NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX),
|
||||
TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX),
|
||||
MBin = term_to_binary(Manifest),
|
||||
NewFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX),
|
||||
TmpFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
|
||||
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
|
||||
Manifest)),
|
||||
case filelib:is_file(NewFN) of
|
||||
true ->
|
||||
io:format("Error - trying to write manifest for"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue