diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 5305eaf..e116b8f 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -663,7 +663,7 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, OutputTree} = Acc0, - {SQN, PK} = KeyInLedger, + {SQN, _Type, PK} = KeyInLedger, % VBin may already be a term {VBin, VSize} = ExtractFun(ValueInLedger), {Obj, IndexSpecs} = case is_binary(VBin) of diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 94d3a2f..f3eedd9 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -57,6 +57,7 @@ cdb_open_reader/1, cdb_get/2, cdb_put/3, + cdb_put/4, cdb_getpositions/2, cdb_directfetch/3, cdb_lastkey/1, @@ -126,7 +127,10 @@ cdb_get(Pid, Key) -> gen_server:call(Pid, {get_kv, Key}, infinity). cdb_put(Pid, Key, Value) -> - gen_server:call(Pid, {put_kv, Key, Value}, infinity). + cdb_put(Pid, Key, Value, hash). + +cdb_put(Pid, Key, Value, HashOpt) -> + gen_server:call(Pid, {put_kv, Key, Value, HashOpt}, infinity). %% SampleSize can be an integer or the atom all cdb_getpositions(Pid, SampleSize) -> @@ -258,7 +262,7 @@ handle_call({key_check, Key}, _From, State) -> State#state.hash_index), State} end; -handle_call({put_kv, Key, Value}, _From, State) -> +handle_call({put_kv, Key, Value, HashOpt}, _From, State) -> case {State#state.writer, State#state.pending_roll} of {true, false} -> Result = put(State#state.handle, @@ -266,15 +270,20 @@ handle_call({put_kv, Key, Value}, _From, State) -> {State#state.last_position, State#state.hashtree}, State#state.binary_mode, State#state.max_size), - case Result of - roll -> + case {Result, HashOpt} of + {roll, _} -> %% Key and value could not be written {reply, roll, State}; - {UpdHandle, NewPosition, HashTree} -> + {{UpdHandle, NewPosition, HashTree}, hash} -> {reply, ok, State#state{handle=UpdHandle, last_position=NewPosition, last_key=Key, - hashtree=HashTree}} + hashtree=HashTree}}; + {{UpdHandle, NewPosition, _HashTree}, no_hash} -> + %% Don't update the hashtree + {reply, ok, State#state{handle=UpdHandle, + last_position=NewPosition, + last_key=Key}} end; _ -> {reply, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 8fb0579..e8eeb9c 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -1,4 +1,61 @@ - +%% -------- Inker's Clerk --------- +%% +%% The Inker's clerk runs compaction jobs on behalf of the Inker, informing the +%% Inker of any manifest changes when complete. +%% +%% -------- Value Compaction --------- +%% +%% Compaction requires the Inker to have four different types of keys +%% * stnd - A standard key of the form {SQN, stnd, LedgerKey} which maps to a +%% value of {Object, KeyDeltas} +%% * tomb - A tombstone for a LedgerKey {SQN, tomb, LedgerKey} +%% * keyd - An object containing key deltas only of the form +%% {SQN, keyd, LedgerKey} which maps to a value of {KeyDeltas} +%% +%% Each LedgerKey has a Tag, and for each Tag there should be a compaction +%% strategy, which will be set to one of the following: +%% * retain - KeyDeltas must be retained permanently, only values can be +%% compacted (if replaced or not_present in the ledger) +%% * recalc - The full object can be removed through comapction (if replaced or +%% not_present in the ledger), as each object with that tag can have the Key +%% Deltas recreated by passing into an assigned recalc function {LedgerKey, +%% SQN, Object, KeyDeltas, PencillerSnapshot} +%% * recovr - At compaction time this is equivalent to recalc, only KeyDeltas +%% are lost when reloading the Ledger from the Journal, and it is assumed that +%% those deltas will be resolved through external anti-entropy (e.g. read +%% repair or AAE) - or alternatively the risk of loss of persisted data from +%% the ledger is accepted for this data type +%% +%% During the compaction process for the Journal, the file chosen for +%% compaction is scanned in SQN order, and a FilterFun is passed (which will +%% normally perform a check against a snapshot of the persisted part of the +%% Ledger). If the given key is of type stnd, and this object is no longer the +%% active object under the LedgerKey, then the object can be compacted out of +%% the journal. This will lead to either its removal (if the strategy for the +%% Tag is recovr or recalc), or its replacement with a KeyDelta object. +%% +%% Tombstones cannot be reaped through this compaction process. +%% +%% Currently, KeyDeltas are also reaped if the LedgerKey has been updated and +%% the Tag has a recovr strategy. This may be the case when KeyDeltas are used +%% as a way of directly representing a change, and where anti-entropy can +%% recover from a loss. +%% +%% -------- Tombstone Reaping --------- +%% +%% Value compaction does not remove tombstones from the database, and so a +%% separate compaction job is required for this. +%% +%% Tombstones can only be reaped for Tags set to recovr or recalc. +%% +%% The tombstone reaping process should select a file to compact, and then +%% take that file and discover the LedgerKeys of all reapable tombstones. +%% The lesger should then be scanned from SQN 0 looking for unreaped objects +%% before the tombstone. If no ushc objects exist for that tombstone, it can +%% now be reaped as part of the compaction job. +%% +%% Other tombstones cannot be reaped, as otherwis eon laoding a ledger an old +%% version of the object may re-emerge. -module(leveled_iclerk). @@ -161,7 +218,7 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> - {{SQN, PK}, Size} = KS, + {{SQN, _Type, PK}, Size} = KS, Check = FilterFun(FilterServer, PK, SQN), case {Check, SQN > MaxSQN} of {true, _} -> @@ -368,7 +425,7 @@ split_positions_into_batches(Positions, Journal, Batches) -> filter_output(KVCs, FilterFun, FilterServer, MaxSQN) -> lists:foldl(fun(KVC, {Acc, PromptDelete}) -> - {{SQN, PK}, _V, CrcCheck} = KVC, + {{SQN, _Type, PK}, _V, CrcCheck} = KVC, KeyValid = FilterFun(FilterServer, PK, SQN), case {KeyValid, CrcCheck, SQN > MaxSQN} of {true, true, _} -> @@ -390,7 +447,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN) -> write_values([], _CDBopts, Journal0, ManSlice0) -> {Journal0, ManSlice0}; write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> - {{SQN, PK}, V, _CrcCheck} = KVC, + {{SQN, Type, PK}, V, _CrcCheck} = KVC, {ok, Journal1} = case Journal0 of null -> FP = CDBopts#cdb_options.file_path, @@ -406,7 +463,7 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> {ok, Journal0} end, ValueToStore = leveled_inker:create_value_for_cdb(V), - R = leveled_cdb:cdb_put(Journal1, {SQN, PK}, ValueToStore), + R = leveled_cdb:cdb_put(Journal1, {SQN, Type, PK}, ValueToStore), case R of ok -> write_values(Rest, CDBopts, Journal1, ManSlice0); @@ -419,7 +476,7 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> generate_manifest_entry(ActiveJournal) -> {ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal), {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), - {StartSQN, _PK} = leveled_cdb:cdb_firstkey(PidR), + {StartSQN, _Type, _PK} = leveled_cdb:cdb_firstkey(PidR), [{StartSQN, NewFN, PidR}]. @@ -514,14 +571,14 @@ find_bestrun_test() -> fetch_testcdb(RP) -> FN1 = leveled_inker:filepath(RP, 1, new_journal), {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, #cdb_options{}), - {K1, V1} = {{1, "Key1"}, term_to_binary("Value1")}, - {K2, V2} = {{2, "Key2"}, term_to_binary("Value2")}, - {K3, V3} = {{3, "Key3"}, term_to_binary("Value3")}, - {K4, V4} = {{4, "Key1"}, term_to_binary("Value4")}, - {K5, V5} = {{5, "Key1"}, term_to_binary("Value5")}, - {K6, V6} = {{6, "Key1"}, term_to_binary("Value6")}, - {K7, V7} = {{7, "Key1"}, term_to_binary("Value7")}, - {K8, V8} = {{8, "Key1"}, term_to_binary("Value8")}, + {K1, V1} = {{1, stnd, "Key1"}, term_to_binary("Value1")}, + {K2, V2} = {{2, stnd, "Key2"}, term_to_binary("Value2")}, + {K3, V3} = {{3, stnd, "Key3"}, term_to_binary("Value3")}, + {K4, V4} = {{4, stnd, "Key1"}, term_to_binary("Value4")}, + {K5, V5} = {{5, stnd, "Key1"}, term_to_binary("Value5")}, + {K6, V6} = {{6, stnd, "Key1"}, term_to_binary("Value6")}, + {K7, V7} = {{7, stnd, "Key1"}, term_to_binary("Value7")}, + {K8, V8} = {{8, stnd, "Key1"}, term_to_binary("Value8")}, ok = leveled_cdb:cdb_put(CDB1, K1, V1), ok = leveled_cdb:cdb_put(CDB1, K2, V2), ok = leveled_cdb:cdb_put(CDB1, K3, V3), @@ -583,10 +640,10 @@ compact_single_file_test() -> [{LowSQN, FN, PidR}] = ManSlice1, io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), - ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, "Key1"})), - ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {7, "Key1"})), - ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {1, "Key1"})), - {_RK1, RV1} = leveled_cdb:cdb_get(PidR, {2, "Key2"}), + ?assertMatch(probably, leveled_cdb:cdb_keycheck(PidR, {8, stnd, "Key1"})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {7, stnd, "Key1"})), + ?assertMatch(missing, leveled_cdb:cdb_get(PidR, {1, stnd, "Key1"})), + {_RK1, RV1} = leveled_cdb:cdb_get(PidR, {2, stnd, "Key2"}), ?assertMatch("Value2", binary_to_term(RV1)), ok = leveled_cdb:cdb_destroy(CDB). @@ -596,7 +653,7 @@ compact_empty_file_test() -> FN1 = leveled_inker:filepath(RP, 1, new_journal), CDBopts = #cdb_options{binary_mode=true}, {ok, CDB1} = leveled_cdb:cdb_open_writer(FN1, CDBopts), - ok = leveled_cdb:cdb_put(CDB1, {1, "Key1"}, <<>>), + ok = leveled_cdb:cdb_put(CDB1, {1, stnd, "Key1"}, <<>>), {ok, FN2} = leveled_cdb:cdb_complete(CDB1), {ok, CDB2} = leveled_cdb:cdb_open_reader(FN2), LedgerSrv1 = [{8, "Key1"}, {2, "Key2"}, {3, "Key3"}], diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index e46ddfa..b7f26aa 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -50,10 +50,19 @@ %% -------- Objects --------- %% %% From the perspective of the Inker, objects to store are made up of: -%% - A Primary Key (as an Erlang term) -%% - A sequence number (assigned by the Inker) -%% - An object (an Erlang term) -%% - A set of Key Deltas associated with the change +%% - An Inker Key formed from +%% - A sequence number (assigned by the Inker) +%% - An Inker key type (stnd, tomb or keyd) +%% - A Ledger Key (as an Erlang term) +%% - A value formed from +%% - An object (an Erlang term) which should be null for tomb types, and +%% maybe null for keyd types +%% - A set of Key Deltas associated with the change (which may be an +%% empty list ) +%% +%% Note that only the Inker key type of stnd is directly fetchable, other +%% key types are to be found only in scans and so can be added without being +%% entered into the hashtree %% %% -------- Compaction --------- %% @@ -372,15 +381,20 @@ start_from_file(InkerOpts) -> put_object(PrimaryKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, - %% TODO: The term goes through a double binary_to_term conversion - %% as the CDB will also do the same conversion - %% Perhaps have CDB started up in apure binary mode, when it doesn't - %5 receive terms? + {InkerType, HashOpt} = case Object of + delete -> + {tomb, no_hash}; + %delta -> + % {keyd, no_hash + _ -> + {stnd, hash} + end, Bin1 = create_value_for_cdb({Object, KeyChanges}), ObjSize = byte_size(Bin1), case leveled_cdb:cdb_put(State#state.active_journaldb, - {NewSQN, PrimaryKey}, - Bin1) of + {NewSQN, InkerType, PrimaryKey}, + Bin1, + HashOpt) of ok -> {ok, State#state{journal_sqn=NewSQN}, ObjSize}; roll -> @@ -394,7 +408,10 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> ok = simple_manifest_writer(NewManifest, State#state.manifest_sqn + 1, State#state.root_path), - ok = leveled_cdb:cdb_put(NewJournalP, {NewSQN, PrimaryKey}, Bin1), + ok = leveled_cdb:cdb_put(NewJournalP, + {NewSQN, InkerType, PrimaryKey}, + Bin1, + HashOpt), io:format("Put to new active journal " ++ "with manifest write took ~w microseconds~n", [timer:now_diff(os:timestamp(),SW)]), @@ -418,11 +435,11 @@ create_value_for_cdb(Value) -> get_object(PrimaryKey, SQN, Manifest) -> JournalP = find_in_manifest(SQN, Manifest), - Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), + Obj = leveled_cdb:cdb_get(JournalP, {SQN, stnd, PrimaryKey}), case Obj of - {{SQN, PK}, Bin} when is_binary(Bin) -> + {{SQN, stnd, PK}, Bin} when is_binary(Bin) -> {{SQN, PK}, binary_to_term(Bin)}; - {{SQN, PK}, Term} -> + {{SQN, stnd, PK}, Term} -> {{SQN, PK}, Term}; _ -> Obj @@ -456,7 +473,7 @@ build_manifest(ManifestFilenames, JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of empty -> ActiveLowSQN; - {JSQN, _LastKey} -> + {JSQN, _Type, _LastKey} -> JSQN end, @@ -499,7 +516,7 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> io:format("Head manifest entry ~s is complete~n", [HeadFN]), {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), - {LastSQN, _LastPK} = leveled_cdb:cdb_lastkey(HeadR), + {LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR), add_to_manifest(add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadR}), start_new_activejournal(LastSQN + 1, @@ -765,8 +782,8 @@ build_dummy_journal() -> {ok, J1} = leveled_cdb:cdb_open_writer(F1), {K1, V1} = {"Key1", "TestValue1"}, {K2, V2} = {"Key2", "TestValue2"}, - ok = leveled_cdb:cdb_put(J1, {1, K1}, term_to_binary({V1, []})), - ok = leveled_cdb:cdb_put(J1, {2, K2}, term_to_binary({V2, []})), + ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})), + ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})), ok = leveled_cdb:cdb_roll(J1), _LK = leveled_cdb:cdb_lastkey(J1), ok = leveled_cdb:cdb_close(J1), @@ -774,8 +791,8 @@ build_dummy_journal() -> {ok, J2} = leveled_cdb:cdb_open_writer(F2), {K1, V3} = {"Key1", "TestValue3"}, {K4, V4} = {"Key4", "TestValue4"}, - ok = leveled_cdb:cdb_put(J2, {3, K1}, term_to_binary({V3, []})), - ok = leveled_cdb:cdb_put(J2, {4, K4}, term_to_binary({V4, []})), + ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})), + ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})), ok = leveled_cdb:cdb_close(J2), Manifest = [{1, "../test/journal/journal_files/nursery_1"}, {3, "../test/journal/journal_files/nursery_3"}],