From 7c28ffbd96b0854f4bb6e23587a70e5a89a27519 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 19 Sep 2016 15:31:26 +0100 Subject: [PATCH] Further bookie test - CDB optimisation and Inker manifest correction Additional bookie test revealed that the persisting/reading of inker manifests was inconsistent and buggy. Also, the CDB files were inffeciently writing the top index table - needed to be improved as this is blokicng on a roll --- src/leveled_bookie.erl | 75 ++++++++++++++++++++++ src/leveled_cdb.erl | 137 +++++++++++++++++++++++++++++------------ src/leveled_inker.erl | 36 +++++++---- 3 files changed, 198 insertions(+), 50 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index e95aa80..0658767 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -448,7 +448,25 @@ reset_filestructure() -> leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP), leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP), RootPath. + +generate_multiple_objects(Count, KeyNumber) -> + generate_multiple_objects(Count, KeyNumber, []). +generate_multiple_objects(0, _KeyNumber, ObjL) -> + ObjL; +generate_multiple_objects(Count, KeyNumber, ObjL) -> + Obj = {"Bucket", + "Key" ++ integer_to_list(KeyNumber), + crypto:rand_bytes(128), + [], + [{"MDK", "MDV" ++ integer_to_list(KeyNumber)}, + {"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]}, + {B1, K1, V1, Spec1, MD} = Obj, + Content = #r_content{metadata=MD, value=V1}, + Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + generate_multiple_objects(Count - 1, KeyNumber + 1, ObjL ++ [{Obj1, Spec1}]). + + single_key_test() -> RootPath = reset_filestructure(), {ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}), @@ -469,4 +487,61 @@ single_key_test() -> ok = book_close(Bookie2), reset_filestructure(). +multi_key_test() -> + RootPath = reset_filestructure(), + {ok, Bookie1} = book_start(#bookie_options{root_path=RootPath}), + {B1, K1, V1, Spec1, MD1} = {"Bucket", + "Key1", + "Value1", + [], + {"MDK1", "MDV1"}}, + C1 = #r_content{metadata=MD1, value=V1}, + Obj1 = #r_object{bucket=B1, key=K1, contents=[C1], vclock=[{'a',1}]}, + {B2, K2, V2, Spec2, MD2} = {"Bucket", + "Key2", + "Value2", + [], + {"MDK2", "MDV2"}}, + C2 = #r_content{metadata=MD2, value=V2}, + Obj2 = #r_object{bucket=B2, key=K2, contents=[C2], vclock=[{'a',1}]}, + ok = book_riakput(Bookie1, Obj1, Spec1), + ObjL1 = generate_multiple_objects(100, 3), + SW1 = os:timestamp(), + lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL1), + io:format("PUT of 100 objects completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW1)]), + ok = book_riakput(Bookie1, Obj2, Spec2), + {ok, F1A} = book_riakget(Bookie1, B1, K1), + ?assertMatch(F1A, Obj1), + {ok, F2A} = book_riakget(Bookie1, B2, K2), + ?assertMatch(F2A, Obj2), + ObjL2 = generate_multiple_objects(100, 103), + SW2 = os:timestamp(), + lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie1, O, S) end, ObjL2), + io:format("PUT of 100 objects completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW2)]), + {ok, F1B} = book_riakget(Bookie1, B1, K1), + ?assertMatch(F1B, Obj1), + {ok, F2B} = book_riakget(Bookie1, B2, K2), + ?assertMatch(F2B, Obj2), + ok = book_close(Bookie1), + %% Now reopen the file, and confirm that a fetch is still possible + {ok, Bookie2} = book_start(#bookie_options{root_path=RootPath}), + {ok, F1C} = book_riakget(Bookie2, B1, K1), + ?assertMatch(F1C, Obj1), + {ok, F2C} = book_riakget(Bookie2, B2, K2), + ?assertMatch(F2C, Obj2), + ObjL3 = generate_multiple_objects(100, 203), + SW3 = os:timestamp(), + lists:foreach(fun({O, S}) -> ok = book_riakput(Bookie2, O, S) end, ObjL3), + io:format("PUT of 100 objects completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW3)]), + {ok, F1D} = book_riakget(Bookie2, B1, K1), + ?assertMatch(F1D, Obj1), + {ok, F2D} = book_riakget(Bookie2, B2, K2), + ?assertMatch(F2D, Obj2), + ok = book_close(Bookie2), + reset_filestructure(), + ?assertMatch(true, false). + -endif. \ No newline at end of file diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 24ffc71..ec807ea 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -177,7 +177,9 @@ handle_call({cdb_open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), {ok, Handle} = file:open(Filename, [binary, raw, read]), Index = load_index(Handle), + LastKey = find_lastkey(Handle, Index), {reply, ok, State#state{handle=Handle, + last_key=LastKey, filename=Filename, writer=false, hash_index=Index}}; @@ -464,8 +466,8 @@ get_index(Handle, Index, no_cache) -> % Get location of hashtable and number of entries in the hash read_next_2_integers(Handle); get_index(_Handle, Index, Cache) -> - {_Pointer, Count} = lists:keyfind(Index, 1, Cache), - Count. + {Index, {Pointer, Count}} = lists:keyfind(Index, 1, Cache), + {Pointer, Count}. %% Get a Key/Value pair from an active CDB file (with no hash table written) %% This requires a key dictionary to be passed in (mapping keys to positions) @@ -593,6 +595,31 @@ load_index(Handle) -> {X, {HashTablePos, Count}} end, Index). +%% Function to find the LastKey in the file +find_lastkey(Handle, IndexCache) -> + LastPosition = scan_index(Handle, IndexCache), + {ok, _} = file:position(Handle, LastPosition), + {KeyLength, _ValueLength} = read_next_2_integers(Handle), + read_next_term(Handle, KeyLength). + +scan_index(Handle, IndexCache) -> + lists:foldl(fun({_X, {Pos, Count}}, LastPosition) -> + scan_index(Handle, Pos, 0, Count, LastPosition) end, + 0, + IndexCache). + +scan_index(_Handle, _Position, Count, Checks, LastPosition) + when Count == Checks -> + LastPosition; +scan_index(Handle, Position, Count, Checks, LastPosition) -> + {ok, _} = file:position(Handle, Position + ?DWORD_SIZE * Count), + {_Hash, HPosition} = read_next_2_integers(Handle), + scan_index(Handle, + Position, + Count + 1 , + Checks, + max(LastPosition, HPosition)). + %% Take an active file and write the hash details necessary to close that %% file and roll a new active file if requested. @@ -601,8 +628,13 @@ load_index(Handle) -> %% the hash tables close_file(Handle, HashTree, BasePos) -> {ok, BasePos} = file:position(Handle, BasePos), + SW = os:timestamp(), L2 = write_hash_tables(Handle, HashTree), + io:format("Hash Table write took ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), write_top_index_table(Handle, BasePos, L2), + io:format("Top Index Table write took ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), file:close(Handle). @@ -661,6 +693,19 @@ startup_scan_over_file(Handle, Position) -> {HashTree, empty}, empty). +%% Specific filter to be used at startup to build a hashtree for an incomplete +%% cdb file, and returns at the end the hashtree and the final Key seen in the +%% journal + +startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) -> + case crccheck_value(ValueAsBin) of + true -> + {loop, {put_hashtree(Key, Position, Hashtree), Key}}; + false -> + {stop, {Hashtree, LastKey}} + end. + + %% Scan for key changes - scan over file returning applying FilterFun %% The FilterFun should accept as input: %% - Key, ValueBin, Position, Accumulator, Fun (to extract values from Binary) @@ -674,39 +719,34 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> ++ " in scan~n", [Position]), {Position, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> - NewPosition = Position + KeyLength + ValueLength - + ?DWORD_SIZE, - case {FilterFun(Key, + NewPosition = case Key of + LastKey -> + eof; + _ -> + Position + KeyLength + ValueLength + + ?DWORD_SIZE + end, + case FilterFun(Key, ValueAsBin, Position, Output, - fun extract_value/1), - Key} of - {{stop, UpdOutput}, _} -> + fun extract_value/1) of + {stop, UpdOutput} -> {NewPosition, UpdOutput}; - {{loop, UpdOutput}, LastKey} -> - {eof, UpdOutput}; - {{loop, UpdOutput}, _} -> - scan_over_file(Handle, - NewPosition, - FilterFun, - UpdOutput, - Key) + {loop, UpdOutput} -> + case NewPosition of + eof -> + {eof, UpdOutput}; + _ -> + scan_over_file(Handle, + NewPosition, + FilterFun, + UpdOutput, + LastKey) + end end end. -%% Specific filter to be used at startup to build a hashtree for an incomplete -%% cdb file, and returns at the end the hashtree and the final Key seen in the -%% journal - -startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}, _ExtractFun) -> - case crccheck_value(ValueAsBin) of - true -> - {loop, {put_hashtree(Key, Position, Hashtree), Key}}; - false -> - {stop, {Hashtree, LastKey}} - end. - %% Confirm that the last key has been defined and set to a non-default value check_last_key(LastKey) -> @@ -972,17 +1012,16 @@ find_open_slot1([_|RestOfSlots], [_|RestOfEntries]) -> write_top_index_table(Handle, BasePos, List) -> % fold function to find any missing index tuples, and add one a replacement % in this case with a count of 0. Also orders the list by index - FnMakeIndex = fun(I, Acc) -> + FnMakeIndex = fun(I) -> case lists:keysearch(I, 1, List) of {value, Tuple} -> - [Tuple|Acc]; + Tuple; false -> - [{I, BasePos, 0}|Acc] + {I, BasePos, 0} end end, % Fold function to write the index entries - FnWriteIndex = fun({Index, Pos, Count}, CurrPos) -> - {ok, _} = file:position(Handle, ?DWORD_SIZE * Index), + FnWriteIndex = fun({_Index, Pos, Count}, {AccBin, CurrPos}) -> case Count == 0 of true -> PosLE = endian_flip(CurrPos), @@ -992,14 +1031,16 @@ write_top_index_table(Handle, BasePos, List) -> NextPos = Pos + (Count * ?DWORD_SIZE) end, CountLE = endian_flip(Count), - Bin = <>, - file:write(Handle, Bin), - NextPos + {<>, NextPos} end, Seq = lists:seq(0, 255), - CompleteList = lists:keysort(1, lists:foldl(FnMakeIndex, [], Seq)), - lists:foldl(FnWriteIndex, BasePos, CompleteList), + CompleteList = lists:keysort(1, lists:map(FnMakeIndex, Seq)), + {IndexBin, _Pos} = lists:foldl(FnWriteIndex, + {<<>>, BasePos}, + CompleteList), + {ok, _} = file:position(Handle, 0), + ok = file:write(Handle, IndexBin), ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need). %% To make this compatible with original Bernstein format this endian flip @@ -1131,7 +1172,7 @@ full_1_test() -> full_2_test() -> List1 = lists:sort([{lists:flatten(io_lib:format("~s~p",[Prefix,Plug])), lists:flatten(io_lib:format("value~p",[Plug]))} - || Plug <- lists:seq(1,2000), + || Plug <- lists:seq(1,200), Prefix <- ["dsd","so39ds","oe9%#*(","020dkslsldclsldowlslf%$#", "tiep4||","qweq"]]), create("../test/full.cdb",List1), @@ -1394,4 +1435,22 @@ fold2_test() -> ?assertMatch(RD2, Result), ok = file:delete("../test/fold2_test.cdb"). +find_lastkey_test() -> + {ok, P1} = cdb_open_writer("../test/lastkey.pnd"), + ok = cdb_put(P1, "Key1", "Value1"), + ok = cdb_put(P1, "Key3", "Value3"), + ok = cdb_put(P1, "Key2", "Value2"), + R1 = cdb_lastkey(P1), + ?assertMatch(R1, "Key2"), + ok = cdb_close(P1), + {ok, P2} = cdb_open_writer("../test/lastkey.pnd"), + R2 = cdb_lastkey(P2), + ?assertMatch(R2, "Key2"), + {ok, F2} = cdb_complete(P2), + {ok, P3} = cdb_open_reader(F2), + R3 = cdb_lastkey(P3), + ?assertMatch(R3, "Key2"), + ok = cdb_close(P3), + ok = file:delete("../test/lastkey.cdb"). + -endif. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0c0529c..b816e5f 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -308,16 +308,25 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> end. roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> + SW = os:timestamp(), io:format("Rolling old journal ~w~n", [OldActiveJournal]), {ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal), + io:format("Rolling old journal S1 completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), + io:format("Rolling old journal S2 completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], JournalRegex2, 'SQN'), NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}), + io:format("Rolling old journal S3 completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), NewManifestSQN = ManifestSQN + 1, ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), + io:format("Rolling old journal S4 completed in ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), {NewManifest, NewManifestSQN}. get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> @@ -358,8 +367,6 @@ build_manifest(ManifestFilenames, ManifestRdrFun, RootPath, CDBopts) -> - %% Setup root paths - JournalFP = filepath(RootPath, journal_dir), %% Find the manifest with a highest Manifest sequence number %% Open it and read it to get the current Confirmed Manifest ManifestRegex = "(?[0-9]+)\\." ++ ?MANIFEST_FILEX, @@ -374,9 +381,12 @@ build_manifest(ManifestFilenames, {0, [], [], 0}; _ -> PersistedManSQN = lists:max(ValidManSQNs), - {J1, M1, R1} = ManifestRdrFun(PersistedManSQN, - RootPath), - {J1, M1, R1, PersistedManSQN} + M1 = ManifestRdrFun(PersistedManSQN, RootPath), + J1 = lists:foldl(fun({JSQN, _FN}, Acc) -> + max(JSQN, Acc) end, + 0, + M1), + {J1, M1, [], PersistedManSQN} end, %% Find any more recent immutable files that have a higher sequence number @@ -415,8 +425,7 @@ build_manifest(ManifestFilenames, io:format("Manifest on startup is: ~n"), manifest_printer(Manifest1), Manifest2 = lists:map(fun({LowSQN, FN}) -> - FP = filename:join(JournalFP, FN), - {ok, Pid} = leveled_cdb:cdb_open_reader(FP), + {ok, Pid} = leveled_cdb:cdb_open_reader(FN), {LowSQN, FN, Pid} end, Manifest1), @@ -498,7 +507,7 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> ok; load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail]) - when MinSQN >= LowSQN -> + when LowSQN >= MinSQN -> io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), ok = load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, @@ -576,6 +585,8 @@ filepath(RootPath, NewSQN, new_journal) -> simple_manifest_reader(SQN, RootPath) -> ManifestPath = filepath(RootPath, manifest_dir), + io:format("Opening manifest file at ~s with SQN ~w~n", + [ManifestPath, SQN]), {ok, MBin} = file:read_file(filename:join(ManifestPath, integer_to_list(SQN) ++ ".man")), @@ -584,9 +595,12 @@ simple_manifest_reader(SQN, RootPath) -> simple_manifest_writer(Manifest, ManSQN, RootPath) -> ManPath = filepath(RootPath, manifest_dir), - NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX), - TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX), - MBin = term_to_binary(Manifest), + NewFN = filename:join(ManPath, + integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX), + TmpFN = filename:join(ManPath, + integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), + MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end, + Manifest)), case filelib:is_file(NewFN) of true -> io:format("Error - trying to write manifest for"