diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index f93a3bb..708bbb6 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -101,12 +101,13 @@ -record(state, {inker :: pid() | undefined, penciller :: pid() | undefined, cache_size :: integer() | undefined, - recent_aae :: false | #recent_aae{} | undefined, + recent_aae :: recent_aae(), ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), head_only = false :: boolean(), + head_lookup = true :: boolean(), put_countdown = 0 :: integer(), get_countdown = 0 :: integer(), @@ -144,6 +145,7 @@ -type fold_timings() :: no_timing|#fold_timings{}. -type head_timings() :: no_timing|#head_timings{}. -type timing_types() :: head|get|put|fold. +-type recent_aae() :: false|#recent_aae{}|undefined. %%%============================================================================ %%% API @@ -512,20 +514,30 @@ init([Opts]) -> unit_minutes = UnitMinutes} end, - HeadOnly = get_opt(head_only, Opts, false), + {HeadOnly, HeadLookup} = + case get_opt(head_only, Opts, false) of + false -> + {false, true}; + with_lookup -> + {true, true}; + no_lookup -> + {true, false} + end, + + State0 = #state{cache_size=CacheSize, + recent_aae=RecentAAE, + is_snapshot=false, + head_only=HeadOnly, + head_lookup = HeadLookup}, {Inker, Penciller} = - startup(InkerOpts, PencillerOpts, RecentAAE), + startup(InkerOpts, PencillerOpts, State0), NewETS = ets:new(mem, [ordered_set]), leveled_log:log("B0001", [Inker, Penciller]), - {ok, #state{inker=Inker, - penciller=Penciller, - cache_size=CacheSize, - recent_aae=RecentAAE, - ledger_cache=#ledger_cache{mem = NewETS}, - is_snapshot=false, - head_only=HeadOnly}}; + {ok, State0#state{inker=Inker, + penciller=Penciller, + ledger_cache=#ledger_cache{mem = NewETS}}}; Bookie -> {ok, Penciller, Inker} = book_snapshot(Bookie, store, undefined, true), @@ -552,7 +564,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) Object, ObjSize, {IndexSpecs, TTL}, - State#state.recent_aae), + State), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), {_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1), @@ -590,7 +602,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, null, length(ObjectSpecs), {ObjectSpecs, TTL}, - false), + State), Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), case State#state.slow_offer of true -> @@ -654,7 +666,8 @@ handle_call({get, Bucket, Key, Tag}, _From, State) update_statetimings(get, Timings2, State#state.get_countdown), {reply, Reply, State#state{get_timings = Timings, get_countdown = CountDown}}; -handle_call({head, Bucket, Key, Tag}, _From, State) -> +handle_call({head, Bucket, Key, Tag}, _From, State) + when State#state.head_lookup == true -> SWp = os:timestamp(), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of @@ -1156,14 +1169,14 @@ set_options(Opts) -> levelzero_cointoss = true, compression_method = CompressionMethod}}. -startup(InkerOpts, PencillerOpts, RecentAAE) -> +startup(InkerOpts, PencillerOpts, State) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), leveled_log:log("B0005", [LedgerSQN]), ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN + 1, - get_loadfun(RecentAAE), + get_loadfun(State), Penciller), {Inker, Penciller}. @@ -1193,25 +1206,34 @@ fetch_head(Key, Penciller, LedgerCache) -> end. -preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, _A) -> +-spec preparefor_ledgercache(atom(), any(), integer(), any(), + integer(), tuple(), book_state()) -> + {integer()|no_lookup, integer(), list()}. +%% @doc +%% Prepare an object and its related key changes for addition to the Ledger +%% via the Ledger Cache. +preparefor_ledgercache(?INKT_MPUT, + ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, + _State) -> ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), {no_lookup, SQN, ObjChanges}; preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, - _AAE) -> + _State) -> {Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey), KeyChanges = leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL), {no_lookup, SQN, KeyChanges}; preparefor_ledgercache(_InkTag, LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL}, - AAE) -> + State) -> {Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} = leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL), KeyChanges = [{LedgerKey, MetaValue}] ++ leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++ - leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods), + leveled_codec:aae_indexspecs(State#state.recent_aae, + Bucket, Key, SQN, ObjH, LastMods), {KeyH, SQN, KeyChanges}. @@ -1270,10 +1292,10 @@ maybe_withjitter(CacheSize, MaxCacheSize) -> end. -get_loadfun(RecentAAE) -> +get_loadfun(State) -> PrepareFun = fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> - preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE) + preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State) end, LoadFun = fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) -> @@ -1832,7 +1854,7 @@ foldobjects_vs_foldheads_bybucket_testto() -> {foldheads_bybucket, ?STD_TAG, "BucketB", - {"Key", "Key4zzzz"}, + {"Key", "Key4|"}, FoldHeadsFun, true, false, false}), KeyHashList2E = HTFolder2E(), @@ -1841,13 +1863,17 @@ foldobjects_vs_foldheads_bybucket_testto() -> {foldheads_bybucket, ?STD_TAG, "BucketB", - {"Key5", <<"all">>}, + {"Key5", "Key|"}, FoldHeadsFun, true, false, false}), KeyHashList2F = HTFolder2F(), ?assertMatch(true, length(KeyHashList2E) > 0), ?assertMatch(true, length(KeyHashList2F) > 0), + io:format("Length of 2B ~w 2E ~w 2F ~w~n", + [length(KeyHashList2B), + length(KeyHashList2E), + length(KeyHashList2F)]), ?assertMatch(true, lists:usort(KeyHashList2B) == lists:usort(KeyHashList2E ++ KeyHashList2F)), diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index ee149c4..33d3029 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -1013,17 +1013,21 @@ recent_aae_expiry(_Config) -> basic_headonly(_Config) -> + ObjectCount = 200000, + basic_headonly_test(ObjectCount, with_lookup), + basic_headonly_test(ObjectCount, no_lookup). + + +basic_headonly_test(ObjectCount, HeadOnly) -> % Load some AAE type objects into Leveled using the read_only mode. This % should allow for the items to be added in batches. Confirm that the % journal is garbage collected as expected, and that it is possible to % perform a fold_heads style query - ObjectCount = 200000, - RootPathHO = testutil:reset_filestructure("testHO"), StartOpts1 = [{root_path, RootPathHO}, {max_pencillercachesize, 16000}, {sync_strategy, sync}, - {head_only, true}, + {head_only, HeadOnly}, {max_journalsize, 500000}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {B1, K1, V1, S1, MD} = {"Bucket", @@ -1052,8 +1056,8 @@ basic_headonly(_Config) -> SW0 = os:timestamp(), ok = load_objectspecs(ObjectSpecL, 32, Bookie1), - io:format("Loaded an object count of ~w in ~w microseconds ~n", - [ObjectCount, timer:now_diff(os:timestamp(), SW0)]), + io:format("Loaded an object count of ~w in ~w microseconds with ~w~n", + [ObjectCount, timer:now_diff(os:timestamp(), SW0), HeadOnly]), FoldFun = fun(_B, _K, V, {HashAcc, CountAcc}) -> @@ -1068,7 +1072,7 @@ basic_headonly(_Config) -> SW1 = os:timestamp(), {AccH1, AccC1} = Runner1(), - io:format("AccH and AccC of ~w ~w in ~w microseconds ~n", + io:format("AccH and AccC of ~w ~w in ~w microseconds~n", [AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]), true = AccC1 == ObjectCount, @@ -1094,10 +1098,24 @@ basic_headonly(_Config) -> {ok, FinalFNs} = file:list_dir(JFP), - % If we allow HEAD_TAG to be suubject to a lookup, then test this here [{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL, - {ok, Hash0} = - leveled_bookie:book_head(Bookie1, SegmentID0, {Bucket0, Key0}, h), + case HeadOnly of + with_lookup -> + % If we allow HEAD_TAG to be suubject to a lookup, then test this + % here + {ok, Hash0} = + leveled_bookie:book_head(Bookie1, + SegmentID0, + {Bucket0, Key0}, + h); + no_lookup -> + {unsupported_message, head} = + leveled_bookie:book_head(Bookie1, + SegmentID0, + {Bucket0, Key0}, + h) + end, + ok = leveled_bookie:book_close(Bookie1), {ok, FinalJournals} = file:list_dir(JFP), @@ -1113,8 +1131,22 @@ basic_headonly(_Config) -> {_AccH2, AccC2} = Runner2(), true = AccC2 == ObjectCount, - {ok, Hash0} = - leveled_bookie:book_head(Bookie2, SegmentID0, {Bucket0, Key0}, h), + case HeadOnly of + with_lookup -> + % If we allow HEAD_TAG to be suubject to a lookup, then test this + % here + {ok, Hash0} = + leveled_bookie:book_head(Bookie2, + SegmentID0, + {Bucket0, Key0}, + h); + no_lookup -> + {unsupported_message, head} = + leveled_bookie:book_head(Bookie2, + SegmentID0, + {Bucket0, Key0}, + h) + end, ok = leveled_bookie:book_close(Bookie2).