diff --git a/include/leveled.hrl b/include/leveled.hrl index f1ca86a..93e13e3 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -1,8 +1,20 @@ +%% Tag to be used on standard Riak KV objects -define(RIAK_TAG, o_rkv). +%% Tag to be used on K/V objects for non-Riak purposes -define(STD_TAG, o). +%% Tag used for secondary index keys -define(IDX_TAG, i). +%% Inker key type used for 'normal' objects +-define(INKT_STND, stnd). +%% Inker key type used for objects which contain no value, only key changes +%% This is used currently for objects formed under a 'retain' strategy on Inker +%% compaction, but could be used for special set-type objects +-define(INKT_KEYD, keyd). +%% Inker key type used for tombstones +-define(INKT_TOMB, tomb). + -record(sft_options, {wait = true :: boolean(), expire_tombstones = false :: boolean()}). @@ -40,7 +52,8 @@ root_path :: string(), cdb_options :: #cdb_options{}, start_snapshot = false :: boolean(), - source_inker :: pid()}). + source_inker :: pid(), + reload_strategy = [] :: list()}). -record(penciller_options, {root_path :: string(), @@ -52,12 +65,14 @@ {root_path :: string(), cache_size :: integer(), max_journalsize :: integer(), - snapshot_bookie :: pid()}). + snapshot_bookie :: pid(), + reload_strategy = [] :: list()}). -record(iclerk_options, {inker :: pid(), max_run_length :: integer(), - cdb_options :: #cdb_options{}}). + cdb_options :: #cdb_options{}, + reload_strategy = [] :: list()}). -record(r_content, { metadata, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index e116b8f..9c8e507 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -280,7 +280,8 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag}, From, State) -> LedgerKey, Object, IndexSpecs), - Changes = preparefor_ledgercache(LedgerKey, + Changes = preparefor_ledgercache(no_type_assigned, + LedgerKey, SQN, Object, ObjSize, @@ -505,7 +506,8 @@ shutdown_wait([TopPause|Rest], Inker) -> ok -> true; pause -> - io:format("Inker shutdown stil waiting process to complete~n"), + io:format("Inker shutdown stil waiting for process to complete" ++ + " with further wait of ~w~n", [lists:sum(Rest)]), ok = timer:sleep(TopPause), shutdown_wait(Rest, Inker) end. @@ -518,12 +520,17 @@ set_options(Opts) -> MS -> MS end, - {#inker_options{root_path = Opts#bookie_options.root_path ++ - "/" ++ ?JOURNAL_FP, + + AltStrategy = Opts#bookie_options.reload_strategy, + ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), + + JournalFP = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP, + LedgerFP = Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP, + {#inker_options{root_path = JournalFP, + reload_strategy = ReloadStrategy, cdb_options = #cdb_options{max_size=MaxJournalSize, binary_mode=true}}, - #penciller_options{root_path=Opts#bookie_options.root_path ++ - "/" ++ ?LEDGER_FP}}. + #penciller_options{root_path = LedgerFP}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), @@ -613,14 +620,18 @@ accumulate_index(TermRe, AddFun) -> end. -preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> - {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK, +preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, IndexSpecs) -> + {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), + leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN); +preparefor_ledgercache(_Type, LedgerKey, SQN, Obj, Size, IndexSpecs) -> + {Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size), ConvSpecs = leveled_codec:convert_indexspecs(IndexSpecs, Bucket, Key, SQN), [PrimaryChange] ++ ConvSpecs. + addto_ledgercache(Changes, Cache) -> lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, Cache, @@ -663,24 +674,21 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, _Type, PK} = KeyInLedger, + {SQN, Type, PK} = KeyInLedger, % VBin may already be a term {VBin, VSize} = ExtractFun(ValueInLedger), - {Obj, IndexSpecs} = case is_binary(VBin) of - true -> - binary_to_term(VBin); - false -> - VBin - end, + {Obj, IndexSpecs} = leveled_codec:split_inkvalue(VBin), case SQN of SQN when SQN < MinSQN -> {loop, Acc0}; SQN when SQN < MaxSQN -> - Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), + Changes = preparefor_ledgercache(Type, PK, SQN, + Obj, VSize, IndexSpecs), {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; MaxSQN -> io:format("Reached end of load batch with SQN ~w~n", [SQN]), - Changes = preparefor_ledgercache(PK, SQN, Obj, VSize, IndexSpecs), + Changes = preparefor_ledgercache(Type, PK, SQN, + Obj, VSize, IndexSpecs), {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; SQN when SQN > MaxSQN -> io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index f3eedd9..d967315 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -293,7 +293,14 @@ handle_call({put_kv, Key, Value, HashOpt}, _From, State) -> handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; handle_call(cdb_firstkey, _From, State) -> - {reply, extract_key(State#state.handle, ?BASE_POSITION), State}; + {ok, EOFPos} = file:position(State#state.handle, eof), + FirstKey = case EOFPos of + ?BASE_POSITION -> + empty; + _ -> + extract_key(State#state.handle, ?BASE_POSITION) + end, + {reply, FirstKey, State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; handle_call({get_positions, SampleSize}, _From, State) -> @@ -746,12 +753,31 @@ load_index(Handle) -> %% Function to find the LastKey in the file find_lastkey(Handle, IndexCache) -> - LastPosition = scan_index(Handle, - IndexCache, - {fun scan_index_findlast/4, 0}), - {ok, _} = file:position(Handle, LastPosition), - {KeyLength, _ValueLength} = read_next_2_integers(Handle), - read_next_term(Handle, KeyLength). + {LastPosition, TotalKeys} = scan_index(Handle, + IndexCache, + {fun scan_index_findlast/4, + {0, 0}}), + {ok, EOFPos} = file:position(Handle, eof), + io:format("TotalKeys ~w in file~n", [TotalKeys]), + case TotalKeys of + 0 -> + scan_keys_forlast(Handle, EOFPos, ?BASE_POSITION, empty); + _ -> + {ok, _} = file:position(Handle, LastPosition), + {KeyLength, _ValueLength} = read_next_2_integers(Handle), + read_next_term(Handle, KeyLength) + end. + +scan_keys_forlast(_Handle, EOFPos, NextPos, LastKey) when EOFPos == NextPos -> + LastKey; +scan_keys_forlast(Handle, EOFPos, NextPos, _LastKey) -> + {ok, _} = file:position(Handle, NextPos), + {KeyLength, ValueLength} = read_next_2_integers(Handle), + scan_keys_forlast(Handle, + EOFPos, + NextPos + KeyLength + ValueLength + ?DWORD_SIZE, + read_next_term(Handle, KeyLength)). + scan_index(Handle, IndexCache, {ScanFun, InitAcc}) -> lists:foldl(fun({_X, {Pos, Count}}, Acc) -> @@ -776,11 +802,12 @@ scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) -> end. -scan_index_findlast(Handle, Position, Count, LastPosition) -> +scan_index_findlast(Handle, Position, Count, {LastPosition, TotalKeys}) -> {ok, _} = file:position(Handle, Position), - lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, - LastPosition, - read_next_n_integerpairs(Handle, Count)). + MaxPos = lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, + LastPosition, + read_next_n_integerpairs(Handle, Count)), + {MaxPos, TotalKeys + Count}. scan_index_returnpositions(Handle, Position, Count, PosList0) -> {ok, _} = file:position(Handle, Position), @@ -1705,7 +1732,7 @@ get_keys_byposition_simple_test() -> ok = file:delete(F2). generate_sequentialkeys(0, KVList) -> - KVList; + lists:reverse(KVList); generate_sequentialkeys(Count, KVList) -> KV = {"Key" ++ integer_to_list(Count), "Value" ++ integer_to_list(Count)}, generate_sequentialkeys(Count - 1, KVList ++ [KV]). @@ -1741,4 +1768,34 @@ get_keys_byposition_manykeys_test() -> ok = file:delete(F2). +manykeys_but_nohash_test() -> + KeyCount = 1024, + {ok, P1} = cdb_open_writer("../test/nohash_keysinfile.pnd"), + KVList = generate_sequentialkeys(KeyCount, []), + lists:foreach(fun({K, V}) -> cdb_put(P1, K, V, no_hash) end, KVList), + SW1 = os:timestamp(), + {ok, F2} = cdb_complete(P1), + SW2 = os:timestamp(), + io:format("CDB completed in ~w microseconds~n", + [timer:now_diff(SW2, SW1)]), + {ok, P2} = cdb_open_reader(F2), + io:format("FirstKey is ~s~n", [cdb_firstkey(P2)]), + io:format("LastKey is ~s~n", [cdb_lastkey(P2)]), + ?assertMatch("Key1", cdb_firstkey(P2)), + ?assertMatch("Key1024", cdb_lastkey(P2)), + ?assertMatch([], cdb_getpositions(P2, 100)), + ok = cdb_close(P2), + ok = file:delete(F2). + +nokeys_test() -> + {ok, P1} = cdb_open_writer("../test/nohash_emptyfile.pnd"), + {ok, F2} = cdb_complete(P1), + {ok, P2} = cdb_open_reader(F2), + io:format("FirstKey is ~s~n", [cdb_firstkey(P2)]), + io:format("LastKey is ~s~n", [cdb_lastkey(P2)]), + ?assertMatch(empty, cdb_firstkey(P2)), + ?assertMatch(empty, cdb_lastkey(P2)), + ok = cdb_close(P2), + ok = file:delete(F2). + -endif. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index a5474dc..c9b04c8 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -32,7 +32,9 @@ -include_lib("eunit/include/eunit.hrl"). --export([strip_to_keyonly/1, +-export([ + inker_reload_strategy/1, + strip_to_keyonly/1, strip_to_seqonly/1, strip_to_statusonly/1, strip_to_keyseqstatusonly/1, @@ -45,6 +47,13 @@ to_ledgerkey/3, to_ledgerkey/5, from_ledgerkey/1, + to_inkerkv/4, + from_inkerkv/1, + from_journalkey/1, + compact_inkerkvc/2, + split_inkvalue/1, + check_forinkertype/2, + create_value_for_journal/1, build_metadata_object/2, generate_ledgerkv/4, generate_ledgerkv/5, @@ -61,6 +70,13 @@ generate_uuid() -> io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). +inker_reload_strategy(AltList) -> + ReloadStrategy0 = [{?RIAK_TAG, retain}, {?STD_TAG, retain}], + lists:foldl(fun({X, Y}, SList) -> + lists:keyreplace(X, 1, Y, SList) + end, + ReloadStrategy0, + AltList). strip_to_keyonly({keyonly, K}) -> K; strip_to_keyonly({K, _V}) -> K. @@ -121,6 +137,75 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +%% Return the Key, Value and Hash Option for this object. The hash option +%% indicates whether the key would ever be looked up directly, and so if it +%% requires an entry in the hash table +to_inkerkv(LedgerKey, SQN, to_fetch, null) -> + {{SQN, ?INKT_STND, LedgerKey}, null, true}; +to_inkerkv(LedgerKey, SQN, Object, KeyChanges) -> + {InkerType, HashOpt} = check_forinkertype(LedgerKey, Object), + Value = create_value_for_journal({Object, KeyChanges}), + {{SQN, InkerType, LedgerKey}, Value, HashOpt}. + +%% Used when fetching objects, so only handles standard, hashable entries +from_inkerkv(Object) -> + case Object of + {{SQN, ?INKT_STND, PK}, Bin} when is_binary(Bin) -> + {{SQN, PK}, binary_to_term(Bin)}; + {{SQN, ?INKT_STND, PK}, Term} -> + {{SQN, PK}, Term}; + _ -> + Object + end. + +from_journalkey({SQN, _Type, LedgerKey}) -> + {SQN, LedgerKey}. + +compact_inkerkvc({{_SQN, ?INKT_TOMB, _LK}, _V, _CrcCheck}, _Strategy) -> + skip; +compact_inkerkvc({{_SQN, ?INKT_KEYD, LK}, _V, _CrcCheck}, Strategy) -> + {Tag, _, _, _} = LK, + {Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy), + case TagStrat of + retain -> + skip; + TagStrat -> + {TagStrat, null} + end; +compact_inkerkvc({{SQN, ?INKT_STND, LK}, V, CrcCheck}, Strategy) -> + {Tag, _, _, _} = LK, + {Tag, TagStrat} = lists:keyfind(Tag, 1, Strategy), + case TagStrat of + retain -> + {_V, KeyDeltas} = split_inkvalue(V), + {TagStrat, {{SQN, ?INKT_KEYD, LK}, {null, KeyDeltas}, CrcCheck}}; + TagStrat -> + {TagStrat, null} + end. + +split_inkvalue(VBin) -> + case is_binary(VBin) of + true -> + binary_to_term(VBin); + false -> + VBin + end. + +check_forinkertype(_LedgerKey, delete) -> + {?INKT_TOMB, no_hash}; +check_forinkertype(_LedgerKey, _Object) -> + {?INKT_STND, hash}. + +create_value_for_journal(Value) -> + case Value of + {Object, KeyChanges} -> + term_to_binary({Object, KeyChanges}, [compressed]); + Value when is_binary(Value) -> + Value + end. + + + hash(Obj) -> erlang:phash2(term_to_binary(Obj)). diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index e8eeb9c..5165c94 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -87,10 +87,12 @@ -define(SINGLEFILE_COMPACTION_TARGET, 60.0). -define(MAXRUN_COMPACTION_TARGET, 80.0). -define(CRC_SIZE, 4). +-define(DEFAULT_RELOAD_STRATEGY, leveled_codec:inker_reload_strategy([])). -record(state, {inker :: pid(), max_run_length :: integer(), - cdb_options}). + cdb_options, + reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list()}). -record(candidate, {low_sqn :: integer(), filename :: string(), @@ -126,15 +128,18 @@ clerk_stop(Pid) -> %%%============================================================================ init([IClerkOpts]) -> + ReloadStrategy = IClerkOpts#iclerk_options.reload_strategy, case IClerkOpts#iclerk_options.max_run_length of undefined -> {ok, #state{max_run_length = ?MAX_COMPACTION_RUN, inker = IClerkOpts#iclerk_options.inker, - cdb_options = IClerkOpts#iclerk_options.cdb_options}}; + cdb_options = IClerkOpts#iclerk_options.cdb_options, + reload_strategy = ReloadStrategy}}; MRL -> {ok, #state{max_run_length = MRL, inker = IClerkOpts#iclerk_options.inker, - cdb_options = IClerkOpts#iclerk_options.cdb_options}} + cdb_options = IClerkOpts#iclerk_options.cdb_options, + reload_strategy = ReloadStrategy}} end. handle_call(_Msg, _From, State) -> @@ -166,7 +171,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, CDBopts, FilterFun, FilterServer, - MaxSQN), + MaxSQN, + State#state.reload_strategy), FilesToDelete = lists:map(fun(C) -> {C#candidate.low_sqn, C#candidate.filename, @@ -296,7 +302,6 @@ assess_candidates(AllCandidates, MaxRunLength) -> end. assess_candidates([], _MaxRunLength, _CurrentRun0, BestAssessment) -> - io:format("Best run of ~w~n", [BestAssessment]), BestAssessment; assess_candidates([HeadC|Tail], MaxRunLength, CurrentRun0, BestAssessment) -> CurrentRun1 = choose_best_assessment(CurrentRun0 ++ [HeadC], @@ -349,13 +354,14 @@ print_compaction_run(BestRun, MaxRunLength) -> [length(BestRun), score_run(BestRun, MaxRunLength)]), lists:foreach(fun(File) -> io:format("Filename ~s is part of compaction run~n", - [File#candidate.filename]) + [File#candidate.filename]) + end, BestRun). -compact_files([], _CDBopts, _FilterFun, _FilterServer, _MaxSQN) -> +compact_files([], _CDBopts, _FilterFun, _FilterServer, _MaxSQN, _RStrategy) -> {[], 0}; -compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN) -> +compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy) -> BatchesOfPositions = get_all_positions(BestRun, []), compact_files(BatchesOfPositions, CDBopts, @@ -363,20 +369,21 @@ compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN) -> FilterFun, FilterServer, MaxSQN, + RStrategy, [], true). compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, - ManSlice0, PromptDelete0) -> + _RStrategy, ManSlice0, PromptDelete0) -> {ManSlice0, PromptDelete0}; compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, - ManSlice0, PromptDelete0) -> + _RStrategy, ManSlice0, PromptDelete0) -> ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0), {ManSlice1, PromptDelete0}; compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, MaxSQN, - ManSlice0, PromptDelete0) -> + RStrategy, ManSlice0, PromptDelete0) -> {SrcJournal, PositionList} = Batch, KVCs0 = leveled_cdb:cdb_directfetch(SrcJournal, PositionList, @@ -384,7 +391,8 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, R0 = filter_output(KVCs0, FilterFun, FilterServer, - MaxSQN), + MaxSQN, + RStrategy), {KVCs1, PromptDelete1} = R0, PromptDelete2 = case {PromptDelete0, PromptDelete1} of {true, true} -> @@ -397,7 +405,7 @@ compact_files([Batch|T], CDBopts, ActiveJournal0, ActiveJournal0, ManSlice0), compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN, - ManSlice1, PromptDelete2). + RStrategy, ManSlice1, PromptDelete2). get_all_positions([], PositionBatches) -> PositionBatches; @@ -423,25 +431,34 @@ split_positions_into_batches(Positions, Journal, Batches) -> Batches ++ [{Journal, ThisBatch}]). -filter_output(KVCs, FilterFun, FilterServer, MaxSQN) -> - lists:foldl(fun(KVC, {Acc, PromptDelete}) -> - {{SQN, _Type, PK}, _V, CrcCheck} = KVC, - KeyValid = FilterFun(FilterServer, PK, SQN), - case {KeyValid, CrcCheck, SQN > MaxSQN} of - {true, true, _} -> - {Acc ++ [KVC], PromptDelete}; - {false, true, true} -> - {Acc ++ [KVC], PromptDelete}; - {false, true, false} -> - {Acc, PromptDelete}; - {_, false, _} -> - io:format("Corrupted value found for " ++ " - Key ~w at SQN ~w~n", [PK, SQN]), - {Acc, false} - end - end, - {[], true}, - KVCs). +filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) -> + lists:foldl(fun(KVC0, {Acc, PromptDelete}) -> + R = leveled_codec:compact_inkerkvc(KVC0, ReloadStrategy), + case R of + skip -> + {Acc, PromptDelete}; + {TStrat, KVC1} -> + {K, _V, CrcCheck} = KVC0, + {SQN, LedgerKey} = leveled_codec:from_journalkey(K), + KeyValid = FilterFun(FilterServer, LedgerKey, SQN), + case {KeyValid, CrcCheck, SQN > MaxSQN, TStrat} of + {true, true, _, _} -> + {Acc ++ [KVC0], PromptDelete}; + {false, true, true, _} -> + {Acc ++ [KVC0], PromptDelete}; + {false, true, false, retain} -> + {Acc ++ [KVC1], PromptDelete}; + {false, true, false, _} -> + {Acc, PromptDelete}; + {_, false, _, _} -> + io:format("Corrupted value found for " + ++ "Journal Key ~w~n", [K]), + {Acc, false} + end + end + end, + {[], true}, + KVCs). write_values([], _CDBopts, Journal0, ManSlice0) -> @@ -462,7 +479,7 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> _ -> {ok, Journal0} end, - ValueToStore = leveled_inker:create_value_for_cdb(V), + ValueToStore = leveled_codec:create_value_for_journal(V), R = leveled_cdb:cdb_put(Journal1, {SQN, Type, PK}, ValueToStore), case R of ok -> @@ -568,17 +585,23 @@ find_bestrun_test() -> #candidate{compaction_perc = 65.0}], assess_candidates(CList0, 6)). +test_ledgerkey(Key) -> + {o, "Bucket", Key, null}. + +test_inkerkv(SQN, Key, V, IdxSpecs) -> + {{SQN, ?INKT_STND, test_ledgerkey(Key)}, term_to_binary({V, IdxSpecs})}. + fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), - {K1, V1} = {{1, stnd, "Key1"}, term_to_binary("Value1")}, - {K2, V2} = {{2, stnd, "Key2"}, term_to_binary("Value2")}, - {K3, V3} = {{3, stnd, "Key3"}, term_to_binary("Value3")}, - {K4, V4} = {{4, stnd, "Key1"}, term_to_binary("Value4")}, - {K5, V5} = {{5, stnd, "Key1"}, term_to_binary("Value5")}, - {K6, V6} = {{6, stnd, "Key1"}, term_to_binary("Value6")}, - {K7, V7} = {{7, stnd, "Key1"}, term_to_binary("Value7")}, - {K8, V8} = {{8, stnd, "Key1"}, term_to_binary("Value8")}, + {K1, V1} = test_inkerkv(1, "Key1", "Value1", []), + {K2, V2} = test_inkerkv(2, "Key2", "Value2", []), + {K3, V3} = test_inkerkv(3, "Key3", "Value3", []), + {K4, V4} = test_inkerkv(4, "Key1", "Value4", []), + {K5, V5} = test_inkerkv(5, "Key1", "Value5", []), + {K6, V6} = test_inkerkv(6, "Key1", "Value6", []), + {K7, V7} = test_inkerkv(7, "Key1", "Value7", []), + {K8, V8} = test_inkerkv(8, "Key1", "Value8", []), ok = leveled_cdb:cdb_put(CDB1, K1, V1), ok = leveled_cdb:cdb_put(CDB1, K2, V2), ok = leveled_cdb:cdb_put(CDB1, K3, V3), @@ -593,7 +616,9 @@ fetch_testcdb(RP) -> check_single_file_test() -> RP = "../test/journal", {ok, CDB} = fetch_testcdb(RP), - LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, + {2, {o, "Bucket", "Key2", null}}, + {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of {ObjSQN, Key} -> @@ -613,14 +638,16 @@ check_single_file_test() -> ok = leveled_cdb:cdb_destroy(CDB). -compact_single_file_test() -> +compact_single_file_setup() -> RP = "../test/journal", {ok, CDB} = fetch_testcdb(RP), Candidate = #candidate{journal = CDB, low_sqn = 1, filename = "test", compaction_perc = 37.5}, - LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, + {2, {o, "Bucket", "Key2", null}}, + {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of {ObjSQN, Key} -> @@ -630,33 +657,94 @@ compact_single_file_test() -> end end, CompactFP = leveled_inker:filepath(RP, journal_compact_dir), ok = filelib:ensure_dir(CompactFP), + {Candidate, LedgerSrv1, LedgerFun1, CompactFP, CDB}. + +compact_single_file_recovr_test() -> + {Candidate, + LedgerSrv1, + LedgerFun1, + CompactFP, + CDB} = compact_single_file_setup(), R1 = compact_files([Candidate], #cdb_options{file_path=CompactFP}, LedgerFun1, LedgerSrv1, - 9), + 9, + [{?STD_TAG, recovr}]), {ManSlice1, PromptDelete1} = R1, ?assertMatch(true, PromptDelete1), [{LowSQN, FN, PidR}] = ManSlice1, io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), - ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, stnd, "Key1"})), - ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {7, stnd, "Key1"})), - ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {1, stnd, "Key1"})), - {_RK1, RV1} = leveled_cdb:cdb_get(PidR, {2, stnd, "Key2"}), - ?assertMatch("Value2", binary_to_term(RV1)), + ?assertMatch(probably, + leveled_cdb:cdb_keycheck(PidR, + {8, + stnd, + test_ledgerkey("Key1")})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, + {7, + stnd, + test_ledgerkey("Key1")})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, + {1, + stnd, + test_ledgerkey("Key1")})), + {_RK1, RV1} = leveled_cdb:cdb_get(PidR, + {2, + stnd, + test_ledgerkey("Key2")}), + ?assertMatch({"Value2", []}, binary_to_term(RV1)), ok = leveled_cdb:cdb_destroy(CDB). +compact_single_file_retain_test() -> + {Candidate, + LedgerSrv1, + LedgerFun1, + CompactFP, + CDB} = compact_single_file_setup(), + R1 = compact_files([Candidate], + #cdb_options{file_path=CompactFP}, + LedgerFun1, + LedgerSrv1, + 9, + [{?STD_TAG, retain}]), + {ManSlice1, PromptDelete1} = R1, + ?assertMatch(true, PromptDelete1), + [{LowSQN, FN, PidR}] = ManSlice1, + io:format("FN of ~s~n", [FN]), + ?assertMatch(1, LowSQN), + ?assertMatch(probably, + leveled_cdb:cdb_keycheck(PidR, + {8, + stnd, + test_ledgerkey("Key1")})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, + {7, + stnd, + test_ledgerkey("Key1")})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, + {1, + stnd, + test_ledgerkey("Key1")})), + {_RK1, RV1} = leveled_cdb:cdb_get(PidR, + {2, + stnd, + test_ledgerkey("Key2")}), + ?assertMatch({"Value2", []}, binary_to_term(RV1)), + ok = leveled_cdb:cdb_destroy(CDB). + compact_empty_file_test() -> RP = "../test/journal", FN1 = leveled_inker:filepath(RP, 1, new_journal), CDBopts = #cdb_options{binary_mode=true}, {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), - ok = leveled_cdb:cdb_put(CDB1, {1, stnd, "Key1"}, <<>>), + ok = leveled_cdb:cdb_put(CDB1, {1, stnd, test_ledgerkey("Key1")}, <<>>), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), {ok, CDB2} = leveled_cdb:cdb_open_reader(FN2), - LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], + LedgerSrv1 = [{8, {o, "Bucket", "Key1", null}}, + {2, {o, "Bucket", "Key2", null}}, + {3, {o, "Bucket", "Key3", null}}], LedgerFun1 = fun(Srv, Key, ObjSQN) -> case lists:keyfind(ObjSQN, 1, Srv) of {ObjSQN, Key} -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b7f26aa..8b83f49 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -106,7 +106,6 @@ ink_print_manifest/1, ink_close/1, ink_forceclose/1, - create_value_for_cdb/1, build_dummy_journal/0, simple_manifest_reader/2, clean_testdir/1, @@ -342,9 +341,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -start_from_file(InkerOpts) -> - RootPath = InkerOpts#inker_options.root_path, - CDBopts = InkerOpts#inker_options.cdb_options, +start_from_file(InkOpts) -> + RootPath = InkOpts#inker_options.root_path, + CDBopts = InkOpts#inker_options.cdb_options, JournalFP = filepath(RootPath, journal_dir), filelib:ensure_dir(JournalFP), CompactFP = filepath(RootPath, journal_compact_dir), @@ -360,8 +359,10 @@ start_from_file(InkerOpts) -> end, IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, + ReloadStrategy = InkOpts#inker_options.reload_strategy, IClerkOpts = #iclerk_options{inker = self(), - cdb_options=IClerkCDBOpts}, + cdb_options=IClerkCDBOpts, + reload_strategy = ReloadStrategy}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {Manifest, @@ -379,24 +380,18 @@ start_from_file(InkerOpts) -> clerk = Clerk}}. -put_object(PrimaryKey, Object, KeyChanges, State) -> +put_object(LedgerKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, - {InkerType, HashOpt} = case Object of - delete -> - {tomb, no_hash}; - %delta -> - % {keyd, no_hash - _ -> - {stnd, hash} - end, - Bin1 = create_value_for_cdb({Object, KeyChanges}), - ObjSize = byte_size(Bin1), + {JournalKey, JournalBin, HashOpt} = leveled_codec:to_inkerkv(LedgerKey, + NewSQN, + Object, + KeyChanges), case leveled_cdb:cdb_put(State#state.active_journaldb, - {NewSQN, InkerType, PrimaryKey}, - Bin1, + JournalKey, + JournalBin, HashOpt) of ok -> - {ok, State#state{journal_sqn=NewSQN}, ObjSize}; + {ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)}; roll -> SW = os:timestamp(), CDBopts = State#state.cdb_options, @@ -409,8 +404,8 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> State#state.manifest_sqn + 1, State#state.root_path), ok = leveled_cdb:cdb_put(NewJournalP, - {NewSQN, InkerType, PrimaryKey}, - Bin1, + JournalKey, + JournalBin, HashOpt), io:format("Put to new active journal " ++ "with manifest write took ~w microseconds~n", @@ -420,30 +415,18 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> manifest=NewManifest, manifest_sqn = State#state.manifest_sqn + 1, active_journaldb=NewJournalP}, - ObjSize} + byte_size(JournalBin)} end. -create_value_for_cdb(Value) -> - case Value of - {Object, KeyChanges} -> - term_to_binary({Object, KeyChanges}, [compressed]); - Value when is_binary(Value) -> - Value - end. - - -get_object(PrimaryKey, SQN, Manifest) -> +get_object(LedgerKey, SQN, Manifest) -> JournalP = find_in_manifest(SQN, Manifest), - Obj = leveled_cdb:cdb_get(JournalP, {SQN, stnd, PrimaryKey}), - case Obj of - {{SQN, stnd, PK}, Bin} when is_binary(Bin) -> - {{SQN, PK}, binary_to_term(Bin)}; - {{SQN, stnd, PK}, Term} -> - {{SQN, PK}, Term}; - _ -> - Obj - end. + {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, + SQN, + to_fetch, + null), + Obj = leveled_cdb:cdb_get(JournalP, InkerKey), + leveled_codec:from_inkerkv(Obj). build_manifest(ManifestFilenames, @@ -771,6 +754,10 @@ initiate_penciller_snapshot(Bookie) -> -ifdef(TEST). build_dummy_journal() -> + F = fun(X) -> X end, + build_dummy_journal(F). + +build_dummy_journal(KeyConvertF) -> RootPath = "../test/journal", clean_testdir(RootPath), JournalFP = filepath(RootPath, journal_dir), @@ -780,8 +767,8 @@ build_dummy_journal() -> ok = filelib:ensure_dir(ManifestFP), F1 = filename:join(JournalFP, "nursery_1.pnd"), {ok, J1} = leveled_cdb:cdb_open_writer(F1), - {K1, V1} = {"Key1", "TestValue1"}, - {K2, V2} = {"Key2", "TestValue2"}, + {K1, V1} = {KeyConvertF("Key1"), "TestValue1"}, + {K2, V2} = {KeyConvertF("Key2"), "TestValue2"}, ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})), ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})), ok = leveled_cdb:cdb_roll(J1), @@ -789,8 +776,8 @@ build_dummy_journal() -> ok = leveled_cdb:cdb_close(J1), F2 = filename:join(JournalFP, "nursery_3.pnd"), {ok, J2} = leveled_cdb:cdb_open_writer(F2), - {K1, V3} = {"Key1", "TestValue3"}, - {K4, V4} = {"Key4", "TestValue4"}, + {K1, V3} = {KeyConvertF("Key1"), "TestValue3"}, + {K4, V4} = {KeyConvertF("Key4"), "TestValue4"}, ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})), ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})), ok = leveled_cdb:cdb_close(J2), @@ -854,29 +841,37 @@ simple_inker_completeactivejournal_test() -> ink_close(Ink1), clean_testdir(RootPath). - +test_ledgerkey(Key) -> + {o, "Bucket", Key, null}. + compact_journal_test() -> RootPath = "../test/journal", - build_dummy_journal(), + build_dummy_journal(fun test_ledgerkey/1), CDBopts = #cdb_options{max_size=300000}, + RStrategy = [{?STD_TAG, recovr}], {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=CDBopts}), - {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), + cdb_options=CDBopts, + reload_strategy=RStrategy}), + {ok, NewSQN1, _ObjSize} = ink_put(Ink1, + test_ledgerkey("KeyAA"), + "TestValueAA", []), ?assertMatch(NewSQN1, 5), ok = ink_print_manifest(Ink1), - R0 = ink_get(Ink1, "KeyAA", 5), - ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), + R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), + ?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}), FunnyLoop = lists:seq(1, 48), Checker = lists:map(fun(X) -> PK = "KeyZ" ++ integer_to_list(X), {ok, SQN, _} = ink_put(Ink1, - PK, + test_ledgerkey(PK), crypto:rand_bytes(10000), []), - {SQN, PK} + {SQN, test_ledgerkey(PK)} end, FunnyLoop), - {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), + {ok, NewSQN2, _ObjSize} = ink_put(Ink1, + test_ledgerkey("KeyBB"), + "TestValueBB", []), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), ok = ink_print_manifest(Ink1),