From cdb01cd24fbb1ebf3c123f8517c40ba4ee1e072f Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sat, 29 Oct 2016 00:52:49 +0100 Subject: [PATCH] Quality Review Looked through test coverage and dialyzer output and attempted to fill test gaps and strip out untestable code (to let it crash). --- include/leveled.hrl | 8 +- src/leveled_bookie.erl | 12 +- src/leveled_cdb.erl | 384 +++++++++++------------------- src/leveled_inker.erl | 13 +- src/leveled_penciller.erl | 61 ++++- src/leveled_sft.erl | 76 ++---- test/end_to_end/restart_SUITE.erl | 4 +- 7 files changed, 227 insertions(+), 331 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 93e13e3..209a9b7 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -34,7 +34,7 @@ -record(level, {level :: integer(), is_basement = false :: boolean(), - timestamp :: integer()}). + timestamp :: erlang:timestamp()}). -record(manifest_entry, {start_key :: tuple(), @@ -53,7 +53,8 @@ cdb_options :: #cdb_options{}, start_snapshot = false :: boolean(), source_inker :: pid(), - reload_strategy = [] :: list()}). + reload_strategy = [] :: list(), + max_run_length}). -record(penciller_options, {root_path :: string(), @@ -66,7 +67,8 @@ cache_size :: integer(), max_journalsize :: integer(), snapshot_bookie :: pid(), - reload_strategy = [] :: list()}). + reload_strategy = [] :: list(), + max_run_length :: integer()}). -record(iclerk_options, {inker :: pid(), diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index af8b8ed..206e859 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -162,7 +162,6 @@ -record(state, {inker :: pid(), penciller :: pid(), cache_size :: integer(), - back_pressure :: boolean(), ledger_cache :: gb_trees:tree(), is_snapshot :: boolean()}). @@ -293,14 +292,10 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) -> IndexSpecs), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), gen_server:reply(From, ok), - case maybepush_ledgercache(State#state.cache_size, + {ok, NewCache} = maybepush_ledgercache(State#state.cache_size, Cache0, - State#state.penciller) of - {ok, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, back_pressure=false}}; - {pause, NewCache} -> - {noreply, State#state{ledger_cache=NewCache, back_pressure=true}} - end; + State#state.penciller), + {noreply, State#state{ledger_cache=NewCache}}; handle_call({get, Bucket, Key, Tag}, _From, State) -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LedgerKey, @@ -532,6 +527,7 @@ set_options(Opts) -> LedgerFP = Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP, {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, + max_run_length = Opts#bookie_options.max_run_length, cdb_options = #cdb_options{max_size=MaxJournalSize, binary_mode=true}}, #penciller_options{root_path = LedgerFP}}. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 63e48d3..9730a53 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -57,6 +57,7 @@ starting/3, writer/3, writer/2, + rolling/2, rolling/3, reader/3, reader/2, @@ -89,7 +90,6 @@ -define(DWORD_SIZE, 8). -define(WORD_SIZE, 4). --define(CRC_CHECK, true). -define(MAX_FILE_SIZE, 3221225472). -define(BINARY_MODE, false). -define(BASE_POSITION, 2048). @@ -106,7 +106,8 @@ max_size :: integer(), binary_mode = false :: boolean(), delete_point = 0 :: integer(), - inker :: pid()}). + inker :: pid(), + deferred_delete = false :: boolean()}). %%%============================================================================ @@ -119,21 +120,13 @@ cdb_open_writer(Filename) -> cdb_open_writer(Filename, Opts) -> {ok, Pid} = gen_fsm:start(?MODULE, [Opts], []), - case gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity) of - ok -> - {ok, Pid}; - Error -> - Error - end. + ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity), + {ok, Pid}. cdb_open_reader(Filename) -> {ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{}], []), - case gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity) of - ok -> - {ok, Pid}; - Error -> - Error - end. + ok = gen_fsm:sync_send_event(Pid, {open_reader, Filename}, infinity), + {ok, Pid}. cdb_get(Pid, Key) -> gen_fsm:sync_send_event(Pid, {get_kv, Key}, infinity). @@ -280,6 +273,8 @@ writer({put_kv, Key, Value}, _From, State) -> last_key=Key, hashtree=HashTree}} end; +writer({mput_kv, []}, _From, State) -> + {reply, ok, writer, State}; writer({mput_kv, KVList}, _From, State) -> Result = mput(State#state.handle, KVList, @@ -321,8 +316,6 @@ rolling({key_check, Key}, _From, State) -> get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), rolling, State}; -rolling(cdb_filename, _From, State) -> - {reply, State#state.filename, rolling, State}; rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> Handle = State#state.handle, {ok, BasePos} = file:position(Handle, State#state.last_position), @@ -333,13 +326,27 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> ok = rename_for_read(State#state.filename, NewName), io:format("Opening file for reading with filename ~s~n", [NewName]), {NewHandle, Index, LastKey} = open_for_readonly(NewName), - {reply, ok, reader, State#state{handle=NewHandle, - last_key=LastKey, - filename=NewName, - hash_index=Index}}; + case State#state.deferred_delete of + true -> + {reply, ok, delete_pending, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}}; + false -> + {reply, ok, reader, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + hash_index=Index}} + end; rolling(cdb_kill, _From, State) -> {stop, killed, ok, State}. + +rolling({delete_pending, ManSQN, Inker}, State) -> + {next_state, + rolling, + State#state{delete_point=ManSQN, inker=Inker, deferred_delete=true}}. + reader({get_kv, Key}, _From, State) -> {reply, get_withcache(State#state.handle, Key, State#state.hash_index), @@ -347,10 +354,10 @@ reader({get_kv, Key}, _From, State) -> State}; reader({key_check, Key}, _From, State) -> {reply, - get(State#state.handle, - Key, - loose_presence, - State#state.hash_index), + get_withcache(State#state.handle, + Key, + State#state.hash_index, + loose_presence), reader, State}; reader({get_positions, SampleSize}, _From, State) -> @@ -419,10 +426,10 @@ delete_pending({get_kv, Key}, _From, State) -> ?DELETE_TIMEOUT}; delete_pending({key_check, Key}, _From, State) -> {reply, - get(State#state.handle, - Key, - loose_presence, - State#state.hash_index), + get_withcache(State#state.handle, + Key, + State#state.hash_index, + loose_presence), delete_pending, State, ?DELETE_TIMEOUT}. @@ -499,10 +506,10 @@ handle_sync_event(cdb_close, _From, _StateName, State) -> {stop, normal, ok, State#state{handle=undefined}}. handle_event(_Msg, StateName, State) -> - {next_State, StateName, State}. + {next_state, StateName, State}. handle_info(_Msg, StateName, State) -> - {next_State, StateName, State}. + {next_state, StateName, State}. terminate(Reason, StateName, State) -> io:format("Closing of filename ~s for Reason ~w~n", @@ -548,10 +555,8 @@ create(FileName,KeyValueList) -> %% Given a file name, this function returns a list %% of {key,value} tuples from the CDB. %% -dump(FileName) -> - dump(FileName, ?CRC_CHECK). -dump(FileName, CRCCheck) -> +dump(FileName) -> {ok, Handle} = file:open(FileName, [binary, raw, read]), Fn = fun(Index, Acc) -> {ok, _} = file:position(Handle, ?DWORD_SIZE * Index), @@ -564,17 +569,17 @@ dump(FileName, CRCCheck) -> Fn1 = fun(_I,Acc) -> {KL,VL} = read_next_2_integers(Handle), Key = read_next_term(Handle, KL), - case read_next_term(Handle, VL, crc, CRCCheck) of + case read_next_term(Handle, VL, crc) of {false, _} -> - {ok, CurrLoc} = file:position(Handle, cur), - Return = {crc_wonky, get(Handle, Key)}; - {_, Value} -> - {ok, CurrLoc} = file:position(Handle, cur), - Return = - case get(Handle, Key) of - {Key,Value} -> {Key ,Value}; - X -> {wonky, X} - end + {ok, CurrLoc} = file:position(Handle, cur), + Return = {crc_wonky, get(Handle, Key)}; + {_, Value} -> + {ok, CurrLoc} = file:position(Handle, cur), + Return = + case get(Handle, Key) of + {Key,Value} -> {Key ,Value}; + X -> {wonky, X} + end end, {ok, _} = file:position(Handle, CurrLoc), [Return | Acc] @@ -633,8 +638,6 @@ put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) -> put_hashtree(Key, LastPosition, HashTree)} end. -mput(Handle, [], {LastPosition, HashTree0}, _BinaryMode, _MaxSize) -> - {Handle, LastPosition, HashTree0}; mput(Handle, KVList, {LastPosition, HashTree0}, BinaryMode, MaxSize) -> {KPList, Bin, LastKey} = multi_key_value_to_record(KVList, BinaryMode, @@ -663,19 +666,21 @@ put(FileName, Key, Value, {LastPosition, HashTree}) -> %% get(FileName,Key) -> {key,value} %% Given a filename and a key, returns a key and value tuple. %% + + get_withcache(Handle, Key, Cache) -> - get(Handle, Key, ?CRC_CHECK, Cache). + get(Handle, Key, Cache, true). + +get_withcache(Handle, Key, Cache, QuickCheck) -> + get(Handle, Key, Cache, QuickCheck). get(FileNameOrHandle, Key) -> - get(FileNameOrHandle, Key, ?CRC_CHECK). + get(FileNameOrHandle, Key, no_cache, true). -get(FileNameOrHandle, Key, CRCCheck) -> - get(FileNameOrHandle, Key, CRCCheck, no_cache). - -get(FileName, Key, CRCCheck, Cache) when is_list(FileName) -> +get(FileName, Key, Cache, QuickCheck) when is_list(FileName) -> {ok, Handle} = file:open(FileName,[binary, raw, read]), - get(Handle, Key, CRCCheck, Cache); -get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle) -> + get(Handle, Key, Cache, QuickCheck); +get(Handle, Key, Cache, QuickCheck) when is_tuple(Handle) -> Hash = hash(Key), Index = hash_to_index(Hash), {HashTable, Count} = get_index(Handle, Index, Cache), @@ -696,7 +701,9 @@ get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle) -> {L1, L2} = lists:split(Slot, LocList), search_hash_table(Handle, lists:append(L2, L1), - Hash, Key, CRCCheck) + Hash, + Key, + QuickCheck) end. get_index(Handle, Index, no_cache) -> @@ -711,20 +718,20 @@ get_index(_Handle, Index, Cache) -> %% This requires a key dictionary to be passed in (mapping keys to positions) %% Will return {Key, Value} or missing get_mem(Key, FNOrHandle, HashTree) -> - get_mem(Key, FNOrHandle, HashTree, ?CRC_CHECK). + get_mem(Key, FNOrHandle, HashTree, true). -get_mem(Key, Filename, HashTree, CRCCheck) when is_list(Filename) -> +get_mem(Key, Filename, HashTree, QuickCheck) when is_list(Filename) -> {ok, Handle} = file:open(Filename, [binary, raw, read]), - get_mem(Key, Handle, HashTree, CRCCheck); -get_mem(Key, Handle, HashTree, CRCCheck) -> + get_mem(Key, Handle, HashTree, QuickCheck); +get_mem(Key, Handle, HashTree, QuickCheck) -> ListToCheck = get_hashtree(Key, HashTree), - case {CRCCheck, ListToCheck} of + case {QuickCheck, ListToCheck} of {loose_presence, []} -> missing; {loose_presence, _L} -> probably; _ -> - extract_kvpair(Handle, ListToCheck, Key, CRCCheck) + extract_kvpair(Handle, ListToCheck, Key) end. %% Get the next key at a position in the file (or the first key if no position @@ -753,73 +760,6 @@ get_nextkey(Handle, {Position, FirstHashPosition}) -> nomorekeys end. - -%% Fold over all of the objects in the file, applying FoldFun to each object -%% where FoldFun(K, V, Acc0) -> Acc , or FoldFun(K, Acc0) -> Acc if KeyOnly is -%% set to true - -fold(FileName, FoldFun, Acc0) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, [binary, raw, read]), - fold(Handle, FoldFun, Acc0); -fold(Handle, FoldFun, Acc0) -> - {ok, _} = file:position(Handle, bof), - {FirstHashPosition, _} = read_next_2_integers(Handle), - fold(Handle, FoldFun, Acc0, {256 * ?DWORD_SIZE, FirstHashPosition}, false). - -fold(Handle, FoldFun, Acc0, {Position, FirstHashPosition}, KeyOnly) -> - {ok, Position} = file:position(Handle, Position), - case Position of - FirstHashPosition -> - Acc0; - _ -> - case read_next_2_integers(Handle) of - {KeyLength, ValueLength} -> - NextKey = read_next_term(Handle, KeyLength), - NextPosition = Position - + KeyLength + ValueLength + - ?DWORD_SIZE, - case KeyOnly of - true -> - fold(Handle, - FoldFun, - FoldFun(NextKey, Acc0), - {NextPosition, FirstHashPosition}, - KeyOnly); - false -> - case read_next_term(Handle, - ValueLength, - crc, - ?CRC_CHECK) of - {false, _} -> - io:format("Skipping value for Key ~w as CRC - check failed~n", [NextKey]), - fold(Handle, - FoldFun, - Acc0, - {NextPosition, FirstHashPosition}, - KeyOnly); - {_, Value} -> - fold(Handle, - FoldFun, - FoldFun(NextKey, Value, Acc0), - {NextPosition, FirstHashPosition}, - KeyOnly) - end - end; - eof -> - Acc0 - end - end. - - -fold_keys(FileName, FoldFun, Acc0) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, [binary, raw, read]), - fold_keys(Handle, FoldFun, Acc0); -fold_keys(Handle, FoldFun, Acc0) -> - {ok, _} = file:position(Handle, bof), - {FirstHashPosition, _} = read_next_2_integers(Handle), - fold(Handle, FoldFun, Acc0, {256 * ?DWORD_SIZE, FirstHashPosition}, true). - hashtable_calc(HashTree, StartPos) -> Seq = lists:seq(0, 255), SWC = os:timestamp(), @@ -956,22 +896,22 @@ put_hashtree(Key, Position, HashTree) -> end. %% Function to extract a Key-Value pair given a file handle and a position -%% Will confirm that the key matches and do a CRC check when requested -extract_kvpair(_, [], _, _) -> +%% Will confirm that the key matches and do a CRC check +extract_kvpair(_, [], _) -> missing; -extract_kvpair(Handle, [Position|Rest], Key, Check) -> +extract_kvpair(Handle, [Position|Rest], Key) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), case read_next_term(Handle, KeyLength) of Key -> % If same key as passed in, then found! - case read_next_term(Handle, ValueLength, crc, Check) of + case read_next_term(Handle, ValueLength, crc) of {false, _} -> crc_wonky; {_, Value} -> {Key,Value} end; _ -> - extract_kvpair(Handle, Rest, Key, Check) + extract_kvpair(Handle, Rest, Key) end. extract_key(Handle, Position) -> @@ -988,7 +928,7 @@ extract_key_value_check(Handle, Position) -> {ok, _} = file:position(Handle, Position), {KeyLength, ValueLength} = read_next_2_integers(Handle), K = read_next_term(Handle, KeyLength), - {Check, V} = read_next_term(Handle, ValueLength, crc, true), + {Check, V} = read_next_term(Handle, ValueLength, crc), {K, V, Check}. %% Scan through the file until there is a failure to crc check an input, and @@ -1040,7 +980,7 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> Output, fun extract_valueandsize/1) of {stop, UpdOutput} -> - {NewPosition, UpdOutput}; + {Position, UpdOutput}; {loop, UpdOutput} -> case NewPosition of eof -> @@ -1064,17 +1004,15 @@ check_last_key(LastKey) -> end. %% Read the Key/Value at this point, returning {ok, Key, Value} -%% catch expected exceptiosn associated with file corruption (or end) and +%% catch expected exceptions associated with file corruption (or end) and %% return eof saferead_keyvalue(Handle) -> case read_next_2_integers(Handle) of - {error, einval} -> - false; eof -> false; {KeyL, ValueL} -> case safe_read_next_term(Handle, KeyL) of - {error, einval} -> + {error, _} -> false; eof -> false; @@ -1082,8 +1020,6 @@ saferead_keyvalue(Handle) -> false; Key -> case file:read(Handle, ValueL) of - {error, einval} -> - false; eof -> false; {ok, Value} -> @@ -1141,7 +1077,7 @@ to_dict(FileName) -> dict:from_list(KeyValueList). read_next_term(Handle, Length) -> - case file:read(Handle, Length) of + case file:read(Handle, Length) of {ok, Bin} -> binary_to_term(Bin); ReadError -> @@ -1150,20 +1086,13 @@ read_next_term(Handle, Length) -> %% Read next string where the string has a CRC prepended - stripping the crc %% and checking if requested -read_next_term(Handle, Length, crc, Check) -> - case Check of - true -> - {ok, <>} = file:read(Handle, Length), - case calc_crc(Bin) of - CRC -> - {true, binary_to_term(Bin)}; - _ -> - {false, binary_to_term(Bin)} - end; - false -> - {ok, _} = file:position(Handle, {cur, 4}), - {ok, Bin} = file:read(Handle, Length - 4), - {unchecked, binary_to_term(Bin)} +read_next_term(Handle, Length, crc) -> + {ok, <>} = file:read(Handle, Length), + case calc_crc(Bin) of + CRC -> + {true, binary_to_term(Bin)}; + _ -> + {false, binary_to_term(Bin)} end. %% Extract value and size from binary containing CRC @@ -1202,18 +1131,18 @@ read_integerpairs(<>, Pairs) -> %% false - don't check the CRC before returning key & value %% loose_presence - confirm that the hash of the key is present -search_hash_table(_Handle, [], _Hash, _Key, _CRCCheck) -> +search_hash_table(_Handle, [], _Hash, _Key, _QuickCheck) -> missing; -search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) -> +search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, QuickCheck) -> {ok, _} = file:position(Handle, Entry), {StoredHash, DataLoc} = read_next_2_integers(Handle), case StoredHash of Hash -> - KV = case CRCCheck of + KV = case QuickCheck of loose_presence -> probably; _ -> - extract_kvpair(Handle, [DataLoc], Key, CRCCheck) + extract_kvpair(Handle, [DataLoc], Key) end, case KV of missing -> @@ -1221,7 +1150,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) -> RestOfEntries, Hash, Key, - CRCCheck); + QuickCheck); _ -> KV end; @@ -1229,7 +1158,7 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) -> % Hash is 0 so key must be missing as 0 found before Hash matched missing; _ -> - search_hash_table(Handle, RestOfEntries, Hash, Key, CRCCheck) + search_hash_table(Handle, RestOfEntries, Hash, Key, QuickCheck) end. % Write Key and Value tuples into the CDB. Each tuple consists of a @@ -1635,8 +1564,8 @@ search_hash_table_findinslot_test() -> io:format("Slot 2 has Hash ~w Position ~w~n", [ReadH4, ReadP4]), ?assertMatch(0, ReadH4), ?assertMatch({"key1", "value1"}, get(Handle, Key1)), - ?assertMatch(probably, get(Handle, Key1, loose_presence)), - ?assertMatch(missing, get(Handle, "Key99", loose_presence)), + ?assertMatch(probably, get(Handle, Key1, no_cache, loose_presence)), + ?assertMatch(missing, get(Handle, "Key99", no_cache, loose_presence)), {ok, _} = file:position(Handle, FirstHashPosition), FlipH3 = endian_flip(ReadH3), FlipP3 = endian_flip(ReadP3), @@ -1699,91 +1628,6 @@ emptyvalue_fromdict_test() -> ?assertMatch(KVP, D_Result), ok = file:delete("../test/from_dict_test_ev.cdb"). -fold_test() -> - K1 = {"Key1", 1}, - V1 = 2, - K2 = {"Key1", 2}, - V2 = 4, - K3 = {"Key1", 3}, - V3 = 8, - K4 = {"Key1", 4}, - V4 = 16, - K5 = {"Key1", 5}, - V5 = 32, - D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3}, {K4, V4}, {K5, V5}]), - ok = from_dict("../test/fold_test.cdb", D), - FromSN = 2, - FoldFun = fun(K, V, Acc) -> - {_Key, Seq} = K, - if Seq > FromSN -> - Acc + V; - true -> - Acc - end - end, - ?assertMatch(56, fold("../test/fold_test.cdb", FoldFun, 0)), - ok = file:delete("../test/fold_test.cdb"). - -fold_keys_test() -> - K1 = {"Key1", 1}, - V1 = 2, - K2 = {"Key2", 2}, - V2 = 4, - K3 = {"Key3", 3}, - V3 = 8, - K4 = {"Key4", 4}, - V4 = 16, - K5 = {"Key5", 5}, - V5 = 32, - D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3}, {K4, V4}, {K5, V5}]), - ok = from_dict("../test/fold_keys_test.cdb", D), - FromSN = 2, - FoldFun = fun(K, Acc) -> - {Key, Seq} = K, - if Seq > FromSN -> - lists:append(Acc, [Key]); - true -> - Acc - end - end, - Result = fold_keys("../test/fold_keys_test.cdb", FoldFun, []), - ?assertMatch(["Key3", "Key4", "Key5"], lists:sort(Result)), - ok = file:delete("../test/fold_keys_test.cdb"). - -fold2_test() -> - K1 = {"Key1", 1}, - V1 = 2, - K2 = {"Key1", 2}, - V2 = 4, - K3 = {"Key1", 3}, - V3 = 8, - K4 = {"Key1", 4}, - V4 = 16, - K5 = {"Key1", 5}, - V5 = 32, - K6 = {"Key2", 1}, - V6 = 64, - D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3}, - {K4, V4}, {K5, V5}, {K6, V6}]), - ok = from_dict("../test/fold2_test.cdb", D), - FoldFun = fun(K, V, Acc) -> - {Key, Seq} = K, - case dict:find(Key, Acc) of - error -> - dict:store(Key, {Seq, V}, Acc); - {ok, {LSN, _V}} when Seq > LSN -> - dict:store(Key, {Seq, V}, Acc); - _ -> - Acc - end - end, - RD = dict:new(), - RD1 = dict:store("Key1", {5, 32}, RD), - RD2 = dict:store("Key2", {1, 64}, RD1), - Result = fold("../test/fold2_test.cdb", FoldFun, dict:new()), - ?assertMatch(RD2, Result), - ok = file:delete("../test/fold2_test.cdb"). - find_lastkey_test() -> {ok, P1} = cdb_open_writer("../test/lastkey.pnd"), ok = cdb_put(P1, "Key1", "Value1"), @@ -1907,5 +1751,49 @@ mput_test() -> ok = cdb_close(P2), ok = file:delete(F2). +state_test() -> + {ok, P1} = cdb_open_writer("../test/state_test.pnd"), + KVList = generate_sequentialkeys(1000, []), + ok = cdb_mput(P1, KVList), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ok = cdb_roll(P1), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ok = cdb_deletepending(P1), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + timer:sleep(500), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ok = cdb_close(P1). + +corruptfile_test() -> + file:delete("../test/corrupt_test.pnd"), + {ok, P1} = cdb_open_writer("../test/corrupt_test.pnd"), + KVList = generate_sequentialkeys(100, []), + ok = cdb_mput(P1, []), % Not relevant to this test, but needs testing + lists:foreach(fun({K, V}) -> cdb_put(P1, K, V) end, KVList), + ?assertMatch(probably, cdb_keycheck(P1, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P1, "Key1")), + ?assertMatch({"Key100", "Value100"}, cdb_get(P1, "Key100")), + ok = cdb_close(P1), + lists:foreach(fun(Offset) -> corrupt_testfile_at_offset(Offset) end, + lists:seq(1, 40)), + ok = file:delete("../test/corrupt_test.pnd"). + +corrupt_testfile_at_offset(Offset) -> + {ok, F1} = file:open("../test/corrupt_test.pnd", ?WRITE_OPS), + {ok, EofPos} = file:position(F1, eof), + file:position(F1, EofPos - Offset), + ok = file:truncate(F1), + ok = file:close(F1), + {ok, P2} = cdb_open_writer("../test/corrupt_test.pnd"), + ?assertMatch(probably, cdb_keycheck(P2, "Key1")), + ?assertMatch({"Key1", "Value1"}, cdb_get(P2, "Key1")), + ?assertMatch(missing, cdb_get(P2, "Key100")), + ok = cdb_put(P2, "Key100", "Value100"), + ?assertMatch({"Key100", "Value100"}, cdb_get(P2, "Key100")), + ok = cdb_close(P2). -endif. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b929c91..1c8ed17 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -371,19 +371,16 @@ start_from_file(InkOpts) -> filelib:ensure_dir(CompactFP), ManifestFP = filepath(RootPath, manifest_dir), - {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of - true -> - file:list_dir(ManifestFP); - false -> - filelib:ensure_dir(ManifestFP), - {ok, []} - end, + ok = filelib:ensure_dir(ManifestFP), + {ok, ManifestFilenames} = file:list_dir(ManifestFP), IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, ReloadStrategy = InkOpts#inker_options.reload_strategy, + MRL = InkOpts#inker_options.max_run_length, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts, - reload_strategy = ReloadStrategy}, + reload_strategy = ReloadStrategy, + max_run_length = MRL}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {Manifest, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 698b9b5..8522a30 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -930,16 +930,7 @@ print_manifest(Manifest) -> lists:foreach(fun(L) -> io:format("Manifest at Level ~w~n", [L]), Level = get_item(L, Manifest, []), - lists:foreach(fun(M) -> - R = is_record(M, manifest_entry), - case R of - true -> - print_manifest_entry(M); - false -> - {_, M1} = M, - print_manifest_entry(M1) - end end, - Level) + lists:foreach(fun print_manifest_entry/1, Level) end, lists:seq(0, ?MAX_LEVELS - 1)), ok. @@ -1557,7 +1548,10 @@ print_manifest_test() -> M2 = #manifest_entry{start_key={i, self(), {null, "Fld1"}, "K8"}, end_key={i, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, filename="Z1"}, - ?assertMatch(ok, print_manifest([{1, [M1, M2]}])). + M3 = #manifest_entry{start_key={?STD_TAG, self(), {null, "Fld1"}, "K8"}, + end_key={?RIAK_TAG, <<200:32/integer>>, {"Idx1", "Fld9"}, "K93"}, + filename="Z1"}, + print_manifest([{1, [M1, M2, M3]}]). simple_findnextkey_test() -> QueryArray = [ @@ -1689,4 +1683,49 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key5"}, 2}, {{o, "Bucket1", "Key6"}, 7}], AccB). +create_file_test() -> + Filename = "../test/new_file.sft", + ok = file:write_file(Filename, term_to_binary("hello")), + {KL1, KL2} = {lists:sort(leveled_sft:generate_randomkeys(10000)), []}, + {ok, SP, noreply} = leveled_sft:sft_new(Filename, + KL1, + KL2, + 0, + #sft_options{wait=false}), + lists:foreach(fun(X) -> + case checkready(SP) of + timeout -> + timer:sleep(X); + _ -> + ok + end end, + [50, 50, 50, 50, 50]), + {ok, SrcFN, StartKey, EndKey} = checkready(SP), + io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]), + ?assertMatch({o, _, _, _}, StartKey), + ?assertMatch({o, _, _, _}, EndKey), + ?assertMatch("../test/new_file.sft", SrcFN), + ok = leveled_sft:sft_clear(SP), + {ok, Bin} = file:read_file("../test/new_file.sft.discarded"), + ?assertMatch("hello", binary_to_term(Bin)). + +coverage_test() -> + RootPath = "../test/ledger", + clean_testdir(RootPath), + {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000}), + Key1 = {{o,"Bucket0001", "Key0001", null}, {1, {active, infinity}, null}}, + KL1 = leveled_sft:generate_randomkeys({1000, 2}), + ok = maybe_pause_push(PCL, [Key1]), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), + ok = maybe_pause_push(PCL, KL1), + ok = pcl_close(PCL), + ManifestFP = filepath(RootPath, manifest), + file:write_file(ManifestFP ++ "/yeszero_123.man", term_to_binary("hello")), + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000}), + ?assertMatch(Key1, pcl_fetch(PCLr, {o,"Bucket0001", "Key0001", null})), + ok = pcl_close(PCLr), + clean_testdir(RootPath). + -endif. \ No newline at end of file diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 9cebef2..c9cfd7f 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -241,9 +241,7 @@ sft_open(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [], []), case gen_server:call(Pid, {sft_open, Filename}, infinity) of {ok, {SK, EK}} -> - {ok, Pid, {SK, EK}}; - Error -> - Error + {ok, Pid, {SK, EK}} end. sft_setfordelete(Pid, Penciller) -> @@ -467,7 +465,7 @@ create_file(FileName) when is_list(FileName) -> {error, Reason} -> io:format("Error opening filename ~s with reason ~w", [FileName, Reason]), - error + {error, Reason} end. @@ -586,17 +584,16 @@ fetch_keyvalue(Handle, FileMD, Key) -> %% Fetches a range of keys returning a list of {Key, SeqN} tuples fetch_range_keysonly(Handle, FileMD, StartKey, EndKey) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], - fun acc_list_keysonly/2). + fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_keysonly/2). fetch_range_keysonly(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], - fun acc_list_keysonly/2, ScanWidth). + fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_keysonly/2, + ScanWidth). %% Fetches a range of keys returning the full tuple, including value fetch_range_kv(Handle, FileMD, StartKey, EndKey, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, [], - fun acc_list_kv/2, ScanWidth). + fetch_range(Handle, FileMD, StartKey, EndKey, fun acc_list_kv/2, + ScanWidth). acc_list_keysonly(null, empty) -> []; @@ -630,24 +627,20 @@ acc_list_kv(R, RList) -> %% than keys and values, or other entirely different accumulators can be %% used - e.g. counters, hash-lists to build bloom filters etc -fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun) -> - fetch_range(Handle, FileMD, StartKey, EndKey, FunList, - AccFun, ?ITERATOR_SCANWIDTH). +fetch_range(Handle, FileMD, StartKey, EndKey, AccFun) -> + fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ?ITERATOR_SCANWIDTH). -fetch_range(Handle, FileMD, StartKey, EndKey, FunList, AccFun, ScanWidth) -> - fetch_range(Handle, FileMD, StartKey, EndKey, FunList, - AccFun, ScanWidth, empty). +fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth) -> + fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth, empty). -fetch_range(_Handle, _FileMD, StartKey, _EndKey, _FunList, - _AccFun, 0, Acc) -> +fetch_range(_Handle, _FileMD, StartKey, _EndKey, _AccFun, 0, Acc) -> {partial, Acc, StartKey}; -fetch_range(Handle, FileMD, StartKey, EndKey, FunList, - AccFun, ScanWidth, Acc) -> +fetch_range(Handle, FileMD, StartKey, EndKey, AccFun, ScanWidth, Acc) -> %% get_nearestkey gets the last key in the index <= StartKey, or the next %% key along if {next, StartKey} is passed case get_nearestkey(FileMD#state.slot_index, StartKey) of {NearestKey, _Filter, {LengthList, PointerB}} -> - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, AccFun, ScanWidth, LengthList, 0, @@ -657,7 +650,7 @@ fetch_range(Handle, FileMD, StartKey, EndKey, FunList, {complete, AccFun(null, Acc)} end. -fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, +fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, AccFun, ScanWidth, LengthList, BlockNumber, @@ -665,21 +658,21 @@ fetch_range(Handle, FileMD, _StartKey, NearestKey, EndKey, FunList, Acc) when length(LengthList) == BlockNumber -> %% Reached the end of the slot. Move the start key on one to scan a new slot - fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, FunList, + fetch_range(Handle, FileMD, {next, NearestKey}, EndKey, AccFun, ScanWidth - 1, Acc); -fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, +fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, AccFun, ScanWidth, LengthList, BlockNumber, Pointer, Acc) -> Block = fetch_block(Handle, LengthList, BlockNumber, Pointer), - Results = scan_block(Block, StartKey, EndKey, FunList, AccFun, Acc), + Results = scan_block(Block, StartKey, EndKey, AccFun, Acc), case Results of {partial, Acc1, StartKey} -> %% Move on to the next block - fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, + fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, AccFun, ScanWidth, LengthList, BlockNumber + 1, @@ -689,38 +682,20 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, {complete, Acc1} end. -scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) -> +scan_block([], StartKey, _EndKey, _AccFun, Acc) -> {partial, Acc, StartKey}; -scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> +scan_block([HeadKV|T], StartKey, EndKey, AccFun, Acc) -> K = leveled_codec:strip_to_keyonly(HeadKV), case {StartKey > K, leveled_codec:endkey_passed(EndKey, K)} of {true, _} when StartKey /= all -> - scan_block(T, StartKey, EndKey, FunList, AccFun, Acc); + scan_block(T, StartKey, EndKey, AccFun, Acc); {_, true} when EndKey /= all -> {complete, Acc}; _ -> - case applyfuns(FunList, HeadKV) of - true -> - %% Add result to the accumulator - scan_block(T, StartKey, EndKey, FunList, - AccFun, AccFun(HeadKV, Acc)); - false -> - scan_block(T, StartKey, EndKey, FunList, - AccFun, Acc) - end + scan_block(T, StartKey, EndKey, AccFun, AccFun(HeadKV, Acc)) end. -applyfuns([], _KV) -> - true; -applyfuns([HeadFun|OtherFuns], KV) -> - case HeadFun(KV) of - true -> - applyfuns(OtherFuns, KV); - false -> - false - end. - fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> not_present; fetch_keyvalue_fromblock([BlockNmb|T], Key, LengthList, Handle, StartOfSlot) -> @@ -745,10 +720,7 @@ get_nearestkey(KVList, all) -> [] -> not_found; [H|_Tail] -> - H; - _ -> - io:format("KVList issue ~w~n", [KVList]), - error + H end; get_nearestkey(KVList, Key) -> case Key of diff --git a/test/end_to_end/restart_SUITE.erl b/test/end_to_end/restart_SUITE.erl index 0017035..030cfd6 100644 --- a/test/end_to_end/restart_SUITE.erl +++ b/test/end_to_end/restart_SUITE.erl @@ -15,13 +15,15 @@ retain_strategy(_Config) -> cache_size=1000, max_journalsize=5000000, reload_strategy=[{?RIAK_TAG, retain}]}, + BookOptsAlt = BookOpts#bookie_options{max_run_length=6, + max_journalsize=500000}, {ok, Spcl3, LastV3} = rotating_object_check(BookOpts, "Bucket3", 800), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}]), {ok, Spcl4, LastV4} = rotating_object_check(BookOpts, "Bucket4", 1600), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, {"Bucket4", Spcl4, LastV4}]), {ok, Spcl5, LastV5} = rotating_object_check(BookOpts, "Bucket5", 3200), - ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3}, + ok = restart_from_blankledger(BookOptsAlt, [{"Bucket3", Spcl3, LastV3}, {"Bucket5", Spcl5, LastV5}]), {ok, Spcl6, LastV6} = rotating_object_check(BookOpts, "Bucket6", 6400), ok = restart_from_blankledger(BookOpts, [{"Bucket3", Spcl3, LastV3},