From 2b6281b2b56caf1d2cebc6916e1f7379894dc5d2 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 15 Feb 2018 16:14:46 +0000 Subject: [PATCH] Initial head_only features Initial commit to add head_only mode to leveled. This allows leveled to receive batches of object changes, but where those objects exist only in the Penciller's Ledger (once they have been persisted within the Ledger). The aim is to reduce significantly the cost of compaction. Also, the objects ar enot directly accessible (they can only be accessed through folds). Again this makes life easier during merging in the LSM trees (as no bloom filters have to be created). --- include/leveled.hrl | 5 ++ src/leveled_bookie.erl | 96 +++++++++++++++++++++++++++++--- src/leveled_codec.erl | 37 +++++++++++- src/leveled_iclerk.erl | 15 ++++- src/leveled_imanifest.erl | 32 ++++++++++- src/leveled_inker.erl | 29 +++++++++- src/leveled_penciller.erl | 15 ++++- src/leveled_runner.erl | 19 ++++--- test/end_to_end/tictac_SUITE.erl | 81 ++++++++++++++++++++++++++- 9 files changed, 304 insertions(+), 25 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 5681974..fc6a7f0 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -5,10 +5,15 @@ -define(STD_TAG, o). %% Tag used for secondary index keys -define(IDX_TAG, i). +%% Tag used for head-only objects +-define(HEAD_TAG, h). %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). +%% Inker key type used for 'batch' objects +-define(INKT_MPUT, mput). + %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker %% compaction diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3720ba3..a475dbd 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -51,6 +51,8 @@ book_put/5, book_put/6, book_tempput/7, + book_mput/2, + book_mput/3, book_delete/4, book_get/3, book_get/4, @@ -60,6 +62,7 @@ book_snapshot/4, book_compactjournal/2, book_islastcompactionpending/1, + book_trimjournal/1, book_close/1, book_destroy/1]). @@ -85,6 +88,7 @@ -define(COMPRESSION_POINT, on_receipt). -define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(DUMMY, dummy). % Dummy key used for mput operations -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -101,6 +105,9 @@ ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), + + head_only = false :: boolean(), + put_countdown = 0 :: integer(), get_countdown = 0 :: integer(), fold_countdown = 0 :: integer(), @@ -310,6 +317,28 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> infinity). +-spec book_mput(pid(), list(tuple())) -> ok|pause. +%% @doc +%% +%% When the store is being run in head_only mode, batches fo object specs may +%% be inserted in to the store using book_mput/2. ObjectSpecs should be +%% of the form {ObjectOp, Bucket, Key, SubKey, Value}. The Value will be +%% stored within the HEAD of the object (in the Ledger), so the full object +%% is retrievable using a HEAD request. The ObjectOp is either add or remove. +book_mput(Pid, ObjectSpecs) -> + book_mput(Pid, ObjectSpecs, infinity). + +-spec book_mput(pid(), list(tuple()), infinity|integer()) -> ok|pause. +%% @doc +%% +%% When the store is being run in head_only mode, batches fo object specs may +%% be inserted in to the store using book_mput/2. ObjectSpecs should be +%% of the form {action, {Bucket, Key, SubKey, Value}}. The Value will be +%% stored within the HEAD of the object (in the Ledger), so the full object +%% is retrievable using a HEAD request. +book_mput(Pid, ObjectSpecs, TTL) -> + gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity). + -spec book_delete(pid(), any(), any(), list()) -> ok|pause. %% @doc @@ -419,6 +448,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok. -spec book_islastcompactionpending(pid()) -> boolean(). +-spec book_trimjournal(pid()) -> ok. %% @doc Call for compaction of the Journal %% @@ -433,6 +463,13 @@ book_compactjournal(Pid, Timeout) -> book_islastcompactionpending(Pid) -> gen_server:call(Pid, confirm_compact, infinity). +%% @doc Trim the journal when in head_only mode +%% +%% In head_only mode the journlacna be trimmed of entries which are before the +%% persisted SQN. This is much quicker than compacting the journal + +book_trimjournal(Pid) -> + gen_server:call(Pid, trim, infinity). -spec book_close(pid()) -> ok. -spec book_destroy(pid()) -> ok. @@ -474,6 +511,8 @@ init([Opts]) -> limit_minutes = LimitMinutes, unit_minutes = UnitMinutes} end, + + HeadOnly = get_opt(head_only, Opts, false), {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), @@ -485,7 +524,8 @@ init([Opts]) -> cache_size=CacheSize, recent_aae=RecentAAE, ledger_cache=#ledger_cache{mem = NewETS}, - is_snapshot=false}}; + is_snapshot=false, + head_only=HeadOnly}}; Bookie -> {ok, Penciller, Inker} = book_snapshot(Bookie, store, undefined, true), @@ -496,7 +536,8 @@ init([Opts]) -> end. -handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) + when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, @@ -541,7 +582,34 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> put_countdown = CountDown, slow_offer = true}} end; -handle_call({get, Bucket, Key, Tag}, _From, State) -> +handle_call({mput, ObjectSpecs, TTL}, From, State) + when State#state.head_only == true -> + {ok, SQN} = + leveled_inker:ink_mput(State#state.inker, dummy, {ObjectSpecs, TTL}), + Changes = + preparefor_ledgercache(?INKT_MPUT, ?DUMMY, + SQN, null, length(ObjectSpecs), + {ObjectSpecs, TTL}, + false), + Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + case State#state.slow_offer of + true -> + gen_server:reply(From, pause); + false -> + gen_server:reply(From, ok) + end, + case maybepush_ledgercache(State#state.cache_size, + Cache0, + State#state.penciller) of + {ok, NewCache} -> + {noreply, State#state{ledger_cache = NewCache, + slow_offer = false}}; + {returned, NewCache} -> + {noreply, State#state{ledger_cache = NewCache, + slow_offer = true}} + end; +handle_call({get, Bucket, Key, Tag}, _From, State) + when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SWh = os:timestamp(), HeadResult = @@ -586,7 +654,11 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> update_statetimings(get, Timings2, State#state.get_countdown), {reply, Reply, State#state{get_timings = Timings, get_countdown = CountDown}}; -handle_call({head, Bucket, Key, Tag}, _From, State) -> +handle_call({head, Bucket, Key, Tag}, _From, State) + when State#state.head_only == false -> + % Head requests are not possible when the status is head_only, as head_only + % objects are only retrievable via folds not direct object access (there + % is no hash generated for the objects to accelerate lookup) SWp = os:timestamp(), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of @@ -634,17 +706,24 @@ handle_call({return_runner, QueryType}, _From, State) -> update_statetimings(fold, Timings1, State#state.fold_countdown), {reply, Runner, State#state{fold_timings = Timings, fold_countdown = CountDown}}; -handle_call({compact_journal, Timeout}, _From, State) -> +handle_call({compact_journal, Timeout}, _From, State) + when State#state.head_only == false -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), Timeout), {reply, ok, State}; -handle_call(confirm_compact, _From, State) -> +handle_call(confirm_compact, _From, State) + when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; +handle_call(trim, _From, State) when State#state.head_only == true -> + PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller), + {reply, leveled_inker:ink_trim(State#state.inker, PSQN), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false -> - {stop, destroy, ok, State}. + {stop, destroy, ok, State}; +handle_call(Msg, _From, State) -> + {reply, {unsupported_message, element(1, Msg)}, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -1118,6 +1197,9 @@ fetch_head(Key, Penciller, LedgerCache) -> end. +preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, _A) -> + ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), + {no_lookup, SQN, ObjChanges}; preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, _AAE) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index df4d491..4c10c65 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -62,6 +62,7 @@ get_size/2, get_keyandobjhash/2, idx_indexspecs/5, + obj_objectspecs/3, aae_indexspecs/6, generate_uuid/0, integer_now/0, @@ -222,7 +223,7 @@ from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) -> {Bucket, Key, IdxVal}; from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) -> {Bucket, Key, IdxVal}; -from_ledgerkey({_Tag, Bucket, Key, null}) -> +from_ledgerkey({_Tag, Bucket, Key, _SubKey}) -> {Bucket, Key}. to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> @@ -231,6 +232,9 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +to_ledgerkey(Bucket, Key, Tag, SubKey) -> + {Tag, Bucket, Key, SubKey}. + %% Return the Key, Value and Hash Option for this object. The hash option %% indicates whether the key would ever be looked up directly, and so if it %% requires an entry in the hash table @@ -404,6 +408,8 @@ split_inkvalue(VBin) when is_binary(VBin) -> check_forinkertype(_LedgerKey, delete) -> ?INKT_TOMB; +check_forinkertype(_LedgerKey, head_only) -> + ?INKT_MPUT; check_forinkertype(_LedgerKey, _Object) -> ?INKT_STND. @@ -424,6 +430,14 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. + +obj_objectspecs(ObjectSpecs, SQN, TTL) -> + lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) -> + gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) + end, + ObjectSpecs). + + idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> lists:map( fun({IdxOp, IdxFld, IdxTrm}) -> @@ -458,6 +472,19 @@ gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) -> {SQN, Status, no_lookup, null}} end. +gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) -> + Status = + case IdxOp of + add -> + {active, TTL}; + remove -> + %% TODO: timestamps for delayed reaping + tomb + end, + {to_ledgerkey(Bucket, Key, ?HEAD_TAG, SubKey), + {SQN, Status, no_lookup, Value}}. + + -spec aae_indexspecs(false|recent_aae(), any(), any(), integer(), integer(), @@ -611,7 +638,9 @@ get_size(PK, Value) -> Size; ?STD_TAG -> {_Hash, Size} = MD, - Size + Size; + ?HEAD_TAG -> + 0 end. -spec get_keyandobjhash(tuple(), tuple()) -> tuple(). @@ -641,12 +670,14 @@ get_objhash(Tag, ObjMetaData) -> build_metadata_object(PrimaryKey, MD) -> - {Tag, _Bucket, _Key, null} = PrimaryKey, + {Tag, _Bucket, _Key, _SubKey} = PrimaryKey, case Tag of ?RIAK_TAG -> {SibData, Vclock, _Hash, _Size} = MD, riak_metadata_to_binary(Vclock, SibData); ?STD_TAG -> + MD; + ?HEAD_TAG -> MD end. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index b884b29..5d65e8e 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -82,6 +82,7 @@ clerk_new/1, clerk_compact/7, clerk_hashtablecalc/3, + clerk_trim/3, clerk_stop/1, code_change/3]). @@ -144,6 +145,12 @@ clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> Inker, TimeO}). +-spec clerk_trim(pid(), pid(), integer()) -> ok. +%% @doc +%% Trim the Inker back to the persisted SQN +clerk_trim(Pid, Inker, PersistedSQN) -> + gen_server:cast(Pid, {trim, Inker, PersistedSQN}). + -spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok. %% @doc %% Spawn a dedicated clerk for the process of calculating the binary view @@ -235,6 +242,12 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, ok = CloseFun(FilterServer), {noreply, State} end; +handle_cast({trim, Inker, PersistedSQN}, State) -> + [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), + FilesToDelete = + leveled_imanifest:find_persistedentries(PersistedSQN, Manifest), + ok = update_inker(Inker, [], FilesToDelete), + {noreply, State}; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), @@ -527,7 +540,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) -> Inker) end, FilesToDelete), - ok. + ok. compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod) -> diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index cc006f0..7801b6f 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -14,6 +14,7 @@ append_lastkey/3, remove_entry/2, find_entry/2, + find_persistedentries/2, head_entry/1, to_list/1, from_list/1, @@ -21,7 +22,6 @@ writer/3, printer/1, complete_filex/0 - ]). -define(MANIFEST_FILEX, "man"). @@ -106,9 +106,26 @@ find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker -> find_entry(SQN, [_TopEntry|Tail]) -> find_entry(SQN, Tail). +-spec find_persistedentries(integer(), manifest()) -> list(manifest_entry()). +%% @doc +%% Find the entries in the manifest where all items are < than the persisted +%% SQN in the ledger +find_persistedentries(SQN, Manifest) -> + DropFun = + fun({ME_SQN, _FN, _ME_P, _LK}) -> + ME_SQN > SQN + end, + Entries = lists:dropwhile(DropFun, to_list(Manifest)), + case Entries of + [_Head|Tail] -> + Tail; + [] -> + [] + end. + -spec head_entry(manifest()) -> manifest_entry(). %% @doc -%% Return the head manifets entry (the most recent journal) +%% Return the head manifest entry (the most recent journal) head_entry(Manifest) -> [{_SQNMarker, SQNL}|_Tail] = Manifest, [HeadEntry|_SQNL_Tail] = SQNL, @@ -239,6 +256,17 @@ buildfromend_test() -> test_testmanifest(Man0), ?assertMatch(ManL, to_list(Man0)). +findpersisted_test() -> + Man = from_list(build_testmanifest_aslist()), + FilesToDelete1 = find_persistedentries(2001, Man), + ?assertMatch(2, length(FilesToDelete1)), + FilesToDelete2 = find_persistedentries(3000, Man), + ?assertMatch(3, length(FilesToDelete2)), + FilesToDelete3 = find_persistedentries(2999, Man), + ?assertMatch(2, length(FilesToDelete3)), + FilesToDelete4 = find_persistedentries(999, Man), + ?assertMatch([], FilesToDelete4). + buildrandomfashion_test() -> ManL0 = build_testmanifest_aslist(), RandMapFun = diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index fde2dda..740a2e1 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -95,6 +95,7 @@ code_change/3, ink_start/1, ink_put/4, + ink_mput/3, ink_get/3, ink_fetch/3, ink_keycheck/3, @@ -105,6 +106,7 @@ ink_compactjournal/3, ink_compactioncomplete/1, ink_compactionpending/1, + ink_trim/2, ink_getmanifest/1, ink_updatemanifest/3, ink_printmanifest/1, @@ -185,6 +187,16 @@ ink_start(InkerOpts) -> ink_put(Pid, PrimaryKey, Object, KeyChanges) -> gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). + +-spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}. +%% @doc +%% MPUT as series of object specifications, which will be converted into +%% objects in the Ledger. This should only be used when the Bookie is +%% running in head_only mode. The journal entries arekept only for handling +%% consistency on startup +ink_mput(Pid, PrimaryKey, ObjectChanges) -> + gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity). + -spec ink_get(pid(), {atom(), any(), any(), any()}|string(), integer()) -> @@ -361,10 +373,17 @@ ink_compactioncomplete(Pid) -> -spec ink_compactionpending(pid()) -> boolean(). %% @doc %% Is there ongoing compaction work? No compaction work should be initiated -%5 if there is already some compaction work ongoing. +%% if there is already some compaction work ongoing. ink_compactionpending(Pid) -> gen_server:call(Pid, compaction_pending, infinity). +-spec ink_trim(pid(), integer()) -> ok. +%% @doc +%% Trim the Journal to just those files that contain entries since the +%% Penciller's persisted SQN +ink_trim(Pid, PersistedSQN) -> + gen_server:call(Pid, {trim, PersistedSQN}, infinity). + -spec ink_getmanifest(pid()) -> list(). %% @doc %% Allows the clerk to fetch the manifest at the point it starts a compaction @@ -420,6 +439,11 @@ handle_call({put, Key, Object, KeyChanges}, _From, State) -> {_, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; +handle_call({mput, Key, ObjChanges}, _From, State) -> + case put_object(Key, head_only, ObjChanges, State) of + {_, UpdState, _ObjSize} -> + {reply, {ok, UpdState#state.journal_sqn}, UpdState} + end; handle_call({fetch, Key, SQN}, _From, State) -> case get_object(Key, SQN, State#state.manifest, true) of {{SQN, Key}, {Value, _IndexSpecs}} -> @@ -503,6 +527,9 @@ handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; +handle_call({trim, PersistedSQN}, _From, State) -> + ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), + {reply, ok, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(doom, _From, State) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d184ab6..ea06283 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -189,7 +189,8 @@ pcl_registersnapshot/5, pcl_getstartupsequencenumber/1, pcl_checkbloomtest/2, - pcl_checkforwork/1]). + pcl_checkforwork/1, + pcl_persistedsqn/1]). -export([ sst_rootpath/1, @@ -504,6 +505,14 @@ pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) -> pcl_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). + +-spec pcl_persistedsqn(pid()) -> integer(). +%% @doc +%% Return the persisted SQN, the highest SQN which has been persisted into the +%% Ledger +pcl_persistedsqn(Pid) -> + gen_server:call(Pid, persisted_sqn, infinity). + -spec pcl_close(pid()) -> ok. %% @doc %% Close the penciller neatly, trying to persist to disk anything in the memory @@ -781,7 +790,9 @@ handle_call({checkbloom_fortest, Key, Hash}, _From, State) -> handle_call(check_for_work, _From, State) -> {_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest, ?LEVEL_SCALEFACTOR), - {reply, WC > 0, State}. + {reply, WC > 0, State}; +handle_call(persisted_sqn, _From, State) -> + {reply, State#state.persisted_sqn, State}. handle_cast({manifest_change, NewManifest}, State) -> NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest), diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 9fe8dcf..1d8af15 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -553,17 +553,19 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> SQN), case InJournal of probably -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), + ProxyObj = + make_proxy_object(Tag, + LK, JK, MD, V, + InkerClone), FoldObjectsFun(B, K, ProxyObj, Acc); missing -> Acc end; {true, false} -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), + ProxyObj = + make_proxy_object(Tag, + LK, JK, MD, V, + InkerClone), FoldObjectsFun(B, K, ProxyObj, Acc); false -> R = leveled_bookie:fetch_value(InkerClone, JK), @@ -581,7 +583,10 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> end, AccFun. -make_proxy_object(LK, JK, MD, V, InkerClone) -> + +make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) -> + MD; +make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) -> Size = leveled_codec:get_size(LK, V), MDBin = leveled_codec:build_metadata_object(LK, MD), term_to_binary({proxy_object, diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index 506704c..0cdf109 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -8,7 +8,8 @@ recent_aae_noaae/1, recent_aae_allaae/1, recent_aae_bucketaae/1, - recent_aae_expiry/1 + recent_aae_expiry/1, + basic_headonly/1 ]). all() -> [ @@ -17,7 +18,8 @@ all() -> [ recent_aae_noaae, recent_aae_allaae, recent_aae_bucketaae, - recent_aae_expiry + recent_aae_expiry, + basic_headonly ]. -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). @@ -1010,6 +1012,81 @@ recent_aae_expiry(_Config) -> true = length(DL4_0) == 0. +basic_headonly(_Config) -> + % Load some AAE type objects into Leveled using the read_only mode. This + % should allow for the items to be added in batches. Confirm that the + % journal is garbage collected as expected, and that it is possible to + % perform a fold_heads style query + ObjectCount = 100000, + + RootPathHO = testutil:reset_filestructure("testHO"), + StartOpts1 = [{root_path, RootPathHO}, + {max_pencillercachesize, 16000}, + {sync_strategy, sync}, + {head_only, true}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + {unsupported_message, put} = + testutil:book_riakput(Bookie1, TestObject, TestSpec), + + ObjectSpecFun = + fun(Op) -> + fun(N) -> + Bucket = <<"B", N:32/integer>>, + Key = <<"K", N:32/integer>>, + <> = + crypto:hash(md5, term_to_binary({Bucket, Key})), + <> = + crypto:hash(md5, <>), + {Op, <>, Bucket, Key, Hash} + end + end, + + ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)), + ok = load_objectspecs(ObjectSpecL, 32, Bookie1), + + FoldFun = + fun(_B, _K, V, {HashAcc, CountAcc}) -> + {HashAcc bxor V, CountAcc + 1} + end, + InitAcc = {0, 0}, + + RunnerDefinition = + {foldheads_allkeys, h, {FoldFun, InitAcc}, false, false, false}, + {async, Runner1} = + leveled_bookie:book_returnfolder(Bookie1, RunnerDefinition), + + SW1 = os:timestamp(), + {AccH1, AccC1} = Runner1(), + io:format("AccH and AccC of ~w ~w in ~w microseconds ~n", + [AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]), + + true = AccC1 == ObjectCount, + + ok = leveled_bookie:book_close(Bookie1). + + +load_objectspecs([], _SliceSize, _Bookie) -> + ok; +load_objectspecs(ObjectSpecL, SliceSize, Bookie) + when length(ObjectSpecL) < SliceSize -> + load_objectspecs(ObjectSpecL, length(ObjectSpecL), Bookie); +load_objectspecs(ObjectSpecL, SliceSize, Bookie) -> + {Head, Tail} = lists:split(SliceSize, ObjectSpecL), + case leveled_bookie:book_mput(Bookie, Head) of + ok -> + load_objectspecs(Tail, SliceSize, Bookie); + pause -> + timer:sleep(10), + load_objectspecs(Tail, SliceSize, Bookie) + end. + + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, SW_StartLoad, TreeSize, UnitMins,