diff --git a/include/leveled.hrl b/include/leveled.hrl index 5681974..fc6a7f0 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -5,10 +5,15 @@ -define(STD_TAG, o). %% Tag used for secondary index keys -define(IDX_TAG, i). +%% Tag used for head-only objects +-define(HEAD_TAG, h). %% Inker key type used for 'normal' objects -define(INKT_STND, stnd). +%% Inker key type used for 'batch' objects +-define(INKT_MPUT, mput). + %% Inker key type used for objects which contain no value, only key changes %% This is used currently for objects formed under a 'retain' strategy on Inker %% compaction diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3720ba3..a475dbd 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -51,6 +51,8 @@ book_put/5, book_put/6, book_tempput/7, + book_mput/2, + book_mput/3, book_delete/4, book_get/3, book_get/4, @@ -60,6 +62,7 @@ book_snapshot/4, book_compactjournal/2, book_islastcompactionpending/1, + book_trimjournal/1, book_close/1, book_destroy/1]). @@ -85,6 +88,7 @@ -define(COMPRESSION_POINT, on_receipt). -define(TIMING_SAMPLESIZE, 100). -define(TIMING_SAMPLECOUNTDOWN, 10000). +-define(DUMMY, dummy). % Dummy key used for mput operations -record(ledger_cache, {mem :: ets:tab(), loader = leveled_tree:empty(?CACHE_TYPE) @@ -101,6 +105,9 @@ ledger_cache = #ledger_cache{}, is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), + + head_only = false :: boolean(), + put_countdown = 0 :: integer(), get_countdown = 0 :: integer(), fold_countdown = 0 :: integer(), @@ -310,6 +317,28 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) -> infinity). +-spec book_mput(pid(), list(tuple())) -> ok|pause. +%% @doc +%% +%% When the store is being run in head_only mode, batches fo object specs may +%% be inserted in to the store using book_mput/2. ObjectSpecs should be +%% of the form {ObjectOp, Bucket, Key, SubKey, Value}. The Value will be +%% stored within the HEAD of the object (in the Ledger), so the full object +%% is retrievable using a HEAD request. The ObjectOp is either add or remove. +book_mput(Pid, ObjectSpecs) -> + book_mput(Pid, ObjectSpecs, infinity). + +-spec book_mput(pid(), list(tuple()), infinity|integer()) -> ok|pause. +%% @doc +%% +%% When the store is being run in head_only mode, batches fo object specs may +%% be inserted in to the store using book_mput/2. ObjectSpecs should be +%% of the form {action, {Bucket, Key, SubKey, Value}}. The Value will be +%% stored within the HEAD of the object (in the Ledger), so the full object +%% is retrievable using a HEAD request. +book_mput(Pid, ObjectSpecs, TTL) -> + gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity). + -spec book_delete(pid(), any(), any(), list()) -> ok|pause. %% @doc @@ -419,6 +448,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok. -spec book_islastcompactionpending(pid()) -> boolean(). +-spec book_trimjournal(pid()) -> ok. %% @doc Call for compaction of the Journal %% @@ -433,6 +463,13 @@ book_compactjournal(Pid, Timeout) -> book_islastcompactionpending(Pid) -> gen_server:call(Pid, confirm_compact, infinity). +%% @doc Trim the journal when in head_only mode +%% +%% In head_only mode the journlacna be trimmed of entries which are before the +%% persisted SQN. This is much quicker than compacting the journal + +book_trimjournal(Pid) -> + gen_server:call(Pid, trim, infinity). -spec book_close(pid()) -> ok. -spec book_destroy(pid()) -> ok. @@ -474,6 +511,8 @@ init([Opts]) -> limit_minutes = LimitMinutes, unit_minutes = UnitMinutes} end, + + HeadOnly = get_opt(head_only, Opts, false), {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), @@ -485,7 +524,8 @@ init([Opts]) -> cache_size=CacheSize, recent_aae=RecentAAE, ledger_cache=#ledger_cache{mem = NewETS}, - is_snapshot=false}}; + is_snapshot=false, + head_only=HeadOnly}}; Bookie -> {ok, Penciller, Inker} = book_snapshot(Bookie, store, undefined, true), @@ -496,7 +536,8 @@ init([Opts]) -> end. -handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) + when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, @@ -541,7 +582,34 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) -> put_countdown = CountDown, slow_offer = true}} end; -handle_call({get, Bucket, Key, Tag}, _From, State) -> +handle_call({mput, ObjectSpecs, TTL}, From, State) + when State#state.head_only == true -> + {ok, SQN} = + leveled_inker:ink_mput(State#state.inker, dummy, {ObjectSpecs, TTL}), + Changes = + preparefor_ledgercache(?INKT_MPUT, ?DUMMY, + SQN, null, length(ObjectSpecs), + {ObjectSpecs, TTL}, + false), + Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + case State#state.slow_offer of + true -> + gen_server:reply(From, pause); + false -> + gen_server:reply(From, ok) + end, + case maybepush_ledgercache(State#state.cache_size, + Cache0, + State#state.penciller) of + {ok, NewCache} -> + {noreply, State#state{ledger_cache = NewCache, + slow_offer = false}}; + {returned, NewCache} -> + {noreply, State#state{ledger_cache = NewCache, + slow_offer = true}} + end; +handle_call({get, Bucket, Key, Tag}, _From, State) + when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SWh = os:timestamp(), HeadResult = @@ -586,7 +654,11 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> update_statetimings(get, Timings2, State#state.get_countdown), {reply, Reply, State#state{get_timings = Timings, get_countdown = CountDown}}; -handle_call({head, Bucket, Key, Tag}, _From, State) -> +handle_call({head, Bucket, Key, Tag}, _From, State) + when State#state.head_only == false -> + % Head requests are not possible when the status is head_only, as head_only + % objects are only retrievable via folds not direct object access (there + % is no hash generated for the objects to accelerate lookup) SWp = os:timestamp(), LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag), case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of @@ -634,17 +706,24 @@ handle_call({return_runner, QueryType}, _From, State) -> update_statetimings(fold, Timings1, State#state.fold_countdown), {reply, Runner, State#state{fold_timings = Timings, fold_countdown = CountDown}}; -handle_call({compact_journal, Timeout}, _From, State) -> +handle_call({compact_journal, Timeout}, _From, State) + when State#state.head_only == false -> ok = leveled_inker:ink_compactjournal(State#state.inker, self(), Timeout), {reply, ok, State}; -handle_call(confirm_compact, _From, State) -> +handle_call(confirm_compact, _From, State) + when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; +handle_call(trim, _From, State) when State#state.head_only == true -> + PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller), + {reply, leveled_inker:ink_trim(State#state.inker, PSQN), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false -> - {stop, destroy, ok, State}. + {stop, destroy, ok, State}; +handle_call(Msg, _From, State) -> + {reply, {unsupported_message, element(1, Msg)}, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -1118,6 +1197,9 @@ fetch_head(Key, Penciller, LedgerCache) -> end. +preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, _A) -> + ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL), + {no_lookup, SQN, ObjChanges}; preparefor_ledgercache(?INKT_KEYD, LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL}, _AAE) -> diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index df4d491..4c10c65 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -62,6 +62,7 @@ get_size/2, get_keyandobjhash/2, idx_indexspecs/5, + obj_objectspecs/3, aae_indexspecs/6, generate_uuid/0, integer_now/0, @@ -222,7 +223,7 @@ from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) -> {Bucket, Key, IdxVal}; from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) -> {Bucket, Key, IdxVal}; -from_ledgerkey({_Tag, Bucket, Key, null}) -> +from_ledgerkey({_Tag, Bucket, Key, _SubKey}) -> {Bucket, Key}. to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> @@ -231,6 +232,9 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. +to_ledgerkey(Bucket, Key, Tag, SubKey) -> + {Tag, Bucket, Key, SubKey}. + %% Return the Key, Value and Hash Option for this object. The hash option %% indicates whether the key would ever be looked up directly, and so if it %% requires an entry in the hash table @@ -404,6 +408,8 @@ split_inkvalue(VBin) when is_binary(VBin) -> check_forinkertype(_LedgerKey, delete) -> ?INKT_TOMB; +check_forinkertype(_LedgerKey, head_only) -> + ?INKT_MPUT; check_forinkertype(_LedgerKey, _Object) -> ?INKT_STND. @@ -424,6 +430,14 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) -> endkey_passed(EndKey, CheckingKey) -> EndKey < CheckingKey. + +obj_objectspecs(ObjectSpecs, SQN, TTL) -> + lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) -> + gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) + end, + ObjectSpecs). + + idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) -> lists:map( fun({IdxOp, IdxFld, IdxTrm}) -> @@ -458,6 +472,19 @@ gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) -> {SQN, Status, no_lookup, null}} end. +gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) -> + Status = + case IdxOp of + add -> + {active, TTL}; + remove -> + %% TODO: timestamps for delayed reaping + tomb + end, + {to_ledgerkey(Bucket, Key, ?HEAD_TAG, SubKey), + {SQN, Status, no_lookup, Value}}. + + -spec aae_indexspecs(false|recent_aae(), any(), any(), integer(), integer(), @@ -611,7 +638,9 @@ get_size(PK, Value) -> Size; ?STD_TAG -> {_Hash, Size} = MD, - Size + Size; + ?HEAD_TAG -> + 0 end. -spec get_keyandobjhash(tuple(), tuple()) -> tuple(). @@ -641,12 +670,14 @@ get_objhash(Tag, ObjMetaData) -> build_metadata_object(PrimaryKey, MD) -> - {Tag, _Bucket, _Key, null} = PrimaryKey, + {Tag, _Bucket, _Key, _SubKey} = PrimaryKey, case Tag of ?RIAK_TAG -> {SibData, Vclock, _Hash, _Size} = MD, riak_metadata_to_binary(Vclock, SibData); ?STD_TAG -> + MD; + ?HEAD_TAG -> MD end. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index b884b29..5d65e8e 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -82,6 +82,7 @@ clerk_new/1, clerk_compact/7, clerk_hashtablecalc/3, + clerk_trim/3, clerk_stop/1, code_change/3]). @@ -144,6 +145,12 @@ clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) -> Inker, TimeO}). +-spec clerk_trim(pid(), pid(), integer()) -> ok. +%% @doc +%% Trim the Inker back to the persisted SQN +clerk_trim(Pid, Inker, PersistedSQN) -> + gen_server:cast(Pid, {trim, Inker, PersistedSQN}). + -spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok. %% @doc %% Spawn a dedicated clerk for the process of calculating the binary view @@ -235,6 +242,12 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO}, ok = CloseFun(FilterServer), {noreply, State} end; +handle_cast({trim, Inker, PersistedSQN}, State) -> + [_Active|Manifest] = leveled_inker:ink_getmanifest(Inker), + FilesToDelete = + leveled_imanifest:find_persistedentries(PersistedSQN, Manifest), + ok = update_inker(Inker, [], FilesToDelete), + {noreply, State}; handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), @@ -527,7 +540,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) -> Inker) end, FilesToDelete), - ok. + ok. compact_files(BestRun, CDBopts, FilterFun, FilterServer, MaxSQN, RStrategy, PressMethod) -> diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index cc006f0..7801b6f 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -14,6 +14,7 @@ append_lastkey/3, remove_entry/2, find_entry/2, + find_persistedentries/2, head_entry/1, to_list/1, from_list/1, @@ -21,7 +22,6 @@ writer/3, printer/1, complete_filex/0 - ]). -define(MANIFEST_FILEX, "man"). @@ -106,9 +106,26 @@ find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker -> find_entry(SQN, [_TopEntry|Tail]) -> find_entry(SQN, Tail). +-spec find_persistedentries(integer(), manifest()) -> list(manifest_entry()). +%% @doc +%% Find the entries in the manifest where all items are < than the persisted +%% SQN in the ledger +find_persistedentries(SQN, Manifest) -> + DropFun = + fun({ME_SQN, _FN, _ME_P, _LK}) -> + ME_SQN > SQN + end, + Entries = lists:dropwhile(DropFun, to_list(Manifest)), + case Entries of + [_Head|Tail] -> + Tail; + [] -> + [] + end. + -spec head_entry(manifest()) -> manifest_entry(). %% @doc -%% Return the head manifets entry (the most recent journal) +%% Return the head manifest entry (the most recent journal) head_entry(Manifest) -> [{_SQNMarker, SQNL}|_Tail] = Manifest, [HeadEntry|_SQNL_Tail] = SQNL, @@ -239,6 +256,17 @@ buildfromend_test() -> test_testmanifest(Man0), ?assertMatch(ManL, to_list(Man0)). +findpersisted_test() -> + Man = from_list(build_testmanifest_aslist()), + FilesToDelete1 = find_persistedentries(2001, Man), + ?assertMatch(2, length(FilesToDelete1)), + FilesToDelete2 = find_persistedentries(3000, Man), + ?assertMatch(3, length(FilesToDelete2)), + FilesToDelete3 = find_persistedentries(2999, Man), + ?assertMatch(2, length(FilesToDelete3)), + FilesToDelete4 = find_persistedentries(999, Man), + ?assertMatch([], FilesToDelete4). + buildrandomfashion_test() -> ManL0 = build_testmanifest_aslist(), RandMapFun = diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index fde2dda..740a2e1 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -95,6 +95,7 @@ code_change/3, ink_start/1, ink_put/4, + ink_mput/3, ink_get/3, ink_fetch/3, ink_keycheck/3, @@ -105,6 +106,7 @@ ink_compactjournal/3, ink_compactioncomplete/1, ink_compactionpending/1, + ink_trim/2, ink_getmanifest/1, ink_updatemanifest/3, ink_printmanifest/1, @@ -185,6 +187,16 @@ ink_start(InkerOpts) -> ink_put(Pid, PrimaryKey, Object, KeyChanges) -> gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). + +-spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}. +%% @doc +%% MPUT as series of object specifications, which will be converted into +%% objects in the Ledger. This should only be used when the Bookie is +%% running in head_only mode. The journal entries arekept only for handling +%% consistency on startup +ink_mput(Pid, PrimaryKey, ObjectChanges) -> + gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity). + -spec ink_get(pid(), {atom(), any(), any(), any()}|string(), integer()) -> @@ -361,10 +373,17 @@ ink_compactioncomplete(Pid) -> -spec ink_compactionpending(pid()) -> boolean(). %% @doc %% Is there ongoing compaction work? No compaction work should be initiated -%5 if there is already some compaction work ongoing. +%% if there is already some compaction work ongoing. ink_compactionpending(Pid) -> gen_server:call(Pid, compaction_pending, infinity). +-spec ink_trim(pid(), integer()) -> ok. +%% @doc +%% Trim the Journal to just those files that contain entries since the +%% Penciller's persisted SQN +ink_trim(Pid, PersistedSQN) -> + gen_server:call(Pid, {trim, PersistedSQN}, infinity). + -spec ink_getmanifest(pid()) -> list(). %% @doc %% Allows the clerk to fetch the manifest at the point it starts a compaction @@ -420,6 +439,11 @@ handle_call({put, Key, Object, KeyChanges}, _From, State) -> {_, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; +handle_call({mput, Key, ObjChanges}, _From, State) -> + case put_object(Key, head_only, ObjChanges, State) of + {_, UpdState, _ObjSize} -> + {reply, {ok, UpdState#state.journal_sqn}, UpdState} + end; handle_call({fetch, Key, SQN}, _From, State) -> case get_object(Key, SQN, State#state.manifest, true) of {{SQN, Key}, {Value, _IndexSpecs}} -> @@ -503,6 +527,9 @@ handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; +handle_call({trim, PersistedSQN}, _From, State) -> + ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN), + {reply, ok, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}; handle_call(doom, _From, State) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index d184ab6..ea06283 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -189,7 +189,8 @@ pcl_registersnapshot/5, pcl_getstartupsequencenumber/1, pcl_checkbloomtest/2, - pcl_checkforwork/1]). + pcl_checkforwork/1, + pcl_persistedsqn/1]). -export([ sst_rootpath/1, @@ -504,6 +505,14 @@ pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) -> pcl_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). + +-spec pcl_persistedsqn(pid()) -> integer(). +%% @doc +%% Return the persisted SQN, the highest SQN which has been persisted into the +%% Ledger +pcl_persistedsqn(Pid) -> + gen_server:call(Pid, persisted_sqn, infinity). + -spec pcl_close(pid()) -> ok. %% @doc %% Close the penciller neatly, trying to persist to disk anything in the memory @@ -781,7 +790,9 @@ handle_call({checkbloom_fortest, Key, Hash}, _From, State) -> handle_call(check_for_work, _From, State) -> {_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest, ?LEVEL_SCALEFACTOR), - {reply, WC > 0, State}. + {reply, WC > 0, State}; +handle_call(persisted_sqn, _From, State) -> + {reply, State#state.persisted_sqn, State}. handle_cast({manifest_change, NewManifest}, State) -> NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest), diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index 9fe8dcf..1d8af15 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -553,17 +553,19 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> SQN), case InJournal of probably -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), + ProxyObj = + make_proxy_object(Tag, + LK, JK, MD, V, + InkerClone), FoldObjectsFun(B, K, ProxyObj, Acc); missing -> Acc end; {true, false} -> - ProxyObj = make_proxy_object(LK, JK, - MD, V, - InkerClone), + ProxyObj = + make_proxy_object(Tag, + LK, JK, MD, V, + InkerClone), FoldObjectsFun(B, K, ProxyObj, Acc); false -> R = leveled_bookie:fetch_value(InkerClone, JK), @@ -581,7 +583,10 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> end, AccFun. -make_proxy_object(LK, JK, MD, V, InkerClone) -> + +make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) -> + MD; +make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) -> Size = leveled_codec:get_size(LK, V), MDBin = leveled_codec:build_metadata_object(LK, MD), term_to_binary({proxy_object, diff --git a/test/end_to_end/tictac_SUITE.erl b/test/end_to_end/tictac_SUITE.erl index 506704c..0cdf109 100644 --- a/test/end_to_end/tictac_SUITE.erl +++ b/test/end_to_end/tictac_SUITE.erl @@ -8,7 +8,8 @@ recent_aae_noaae/1, recent_aae_allaae/1, recent_aae_bucketaae/1, - recent_aae_expiry/1 + recent_aae_expiry/1, + basic_headonly/1 ]). all() -> [ @@ -17,7 +18,8 @@ all() -> [ recent_aae_noaae, recent_aae_allaae, recent_aae_bucketaae, - recent_aae_expiry + recent_aae_expiry, + basic_headonly ]. -define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w"). @@ -1010,6 +1012,81 @@ recent_aae_expiry(_Config) -> true = length(DL4_0) == 0. +basic_headonly(_Config) -> + % Load some AAE type objects into Leveled using the read_only mode. This + % should allow for the items to be added in batches. Confirm that the + % journal is garbage collected as expected, and that it is possible to + % perform a fold_heads style query + ObjectCount = 100000, + + RootPathHO = testutil:reset_filestructure("testHO"), + StartOpts1 = [{root_path, RootPathHO}, + {max_pencillercachesize, 16000}, + {sync_strategy, sync}, + {head_only, true}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {B1, K1, V1, S1, MD} = {"Bucket", + "Key1.1.4567.4321", + "Value1", + [], + [{"MDK1", "MDV1"}]}, + {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), + {unsupported_message, put} = + testutil:book_riakput(Bookie1, TestObject, TestSpec), + + ObjectSpecFun = + fun(Op) -> + fun(N) -> + Bucket = <<"B", N:32/integer>>, + Key = <<"K", N:32/integer>>, + <> = + crypto:hash(md5, term_to_binary({Bucket, Key})), + <> = + crypto:hash(md5, <>), + {Op, <>, Bucket, Key, Hash} + end + end, + + ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)), + ok = load_objectspecs(ObjectSpecL, 32, Bookie1), + + FoldFun = + fun(_B, _K, V, {HashAcc, CountAcc}) -> + {HashAcc bxor V, CountAcc + 1} + end, + InitAcc = {0, 0}, + + RunnerDefinition = + {foldheads_allkeys, h, {FoldFun, InitAcc}, false, false, false}, + {async, Runner1} = + leveled_bookie:book_returnfolder(Bookie1, RunnerDefinition), + + SW1 = os:timestamp(), + {AccH1, AccC1} = Runner1(), + io:format("AccH and AccC of ~w ~w in ~w microseconds ~n", + [AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]), + + true = AccC1 == ObjectCount, + + ok = leveled_bookie:book_close(Bookie1). + + +load_objectspecs([], _SliceSize, _Bookie) -> + ok; +load_objectspecs(ObjectSpecL, SliceSize, Bookie) + when length(ObjectSpecL) < SliceSize -> + load_objectspecs(ObjectSpecL, length(ObjectSpecL), Bookie); +load_objectspecs(ObjectSpecL, SliceSize, Bookie) -> + {Head, Tail} = lists:split(SliceSize, ObjectSpecL), + case leveled_bookie:book_mput(Bookie, Head) of + ok -> + load_objectspecs(Tail, SliceSize, Bookie); + pause -> + timer:sleep(10), + load_objectspecs(Tail, SliceSize, Bookie) + end. + + load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D, SW_StartLoad, TreeSize, UnitMins,