diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 81a5a36..f56ce38 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -69,7 +69,8 @@ empty_ledgercache/0, loadqueue_ledgercache/1, push_ledgercache/2, - snapshot_store/5]). + snapshot_store/5, + fetch_value/2]). -include_lib("eunit/include/eunit.hrl"). @@ -447,7 +448,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) -> {reply, not_found, State}; {active, TS} -> Active = TS >= leveled_codec:integer_now(), - Object = fetch_value(LedgerKey, Seqn, State#state.inker), + Object = fetch_value(State#state.inker, {LedgerKey, Seqn}), GT1 = leveled_log:get_timing(GT0, SWg, fetch), case {Active, Object} of {_, not_present} -> @@ -531,6 +532,10 @@ handle_call({return_folder, FolderType}, _From, State) -> {reply, hashtree_query(State, Tag, JournalCheck), State}; + {foldheads_allkeys, Tag, FoldHeadsFun} -> + {reply, + foldheads_allkeys(State, Tag, FoldHeadsFun), + State}; {foldobjects_allkeys, Tag, FoldObjectsFun} -> {reply, foldobjects_allkeys(State, Tag, FoldObjectsFun), @@ -656,6 +661,16 @@ snapshot_store(State, SnapType, Query) -> SnapType, Query). +fetch_value(Inker, {Key, SQN}) -> + SW = os:timestamp(), + case leveled_inker:ink_fetch(Inker, Key, SQN) of + {ok, Value} -> + maybe_longrunning(SW, inker_fetch), + Value; + not_present -> + not_present + end. + %%%============================================================================ %%% Internal functions @@ -803,21 +818,28 @@ hashtree_query(State, Tag, JournalCheck) -> foldobjects_allkeys(State, Tag, FoldObjectsFun) -> StartKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). + +foldheads_allkeys(State, Tag, FoldHeadsFun) -> + StartKey = leveled_codec:to_ledgerkey(null, null, Tag), + EndKey = leveled_codec:to_ledgerkey(null, null, Tag), + foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true). foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) -> StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). -foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) -> - StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, - FromTerm), - EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, - ToTerm), - foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). +foldobjects_byindex(State, Tag, Bucket, + Field, FromTerm, ToTerm, FoldObjectsFun) -> + StartKey = + leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm), + EndKey = + leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), + foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). -foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> + +foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) -> {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store), {FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of true -> @@ -826,7 +848,10 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) -> {FoldObjectsFun, []} end, Folder = fun() -> - AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), + AccFun = accumulate_objects(FoldFun, + JournalSnapshot, + Tag, + DeferredFetch), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, StartKey, EndKey, @@ -943,8 +968,8 @@ set_options(Opts) -> JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, - ok =filelib:ensure_dir(JournalFP), - ok =filelib:ensure_dir(LedgerFP), + ok = filelib:ensure_dir(JournalFP), + ok = filelib:ensure_dir(LedgerFP), {#inker_options{root_path = JournalFP, reload_strategy = ReloadStrategy, @@ -993,16 +1018,6 @@ fetch_head(Key, Penciller, LedgerCache) -> end end. -fetch_value(Key, SQN, Inker) -> - SW = os:timestamp(), - case leveled_inker:ink_fetch(Inker, Key, SQN) of - {ok, Value} -> - maybe_longrunning(SW, inker_fetch), - Value; - not_present -> - not_present - end. - accumulate_size() -> Now = leveled_codec:integer_now(), @@ -1041,28 +1056,47 @@ accumulate_hashes(JournalCheck, InkerClone) -> end, AccFun. -accumulate_objects(FoldObjectsFun, InkerClone, Tag) -> + +accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> Now = leveled_codec:integer_now(), - AccFun = fun(LK, V, Acc) -> - case leveled_codec:is_active(LK, V, Now) of + AccFun = + fun(LK, V, Acc) -> + case leveled_codec:is_active(LK, V, Now) of + true -> + {SQN, _St, _MH, MD} = + leveled_codec:striphead_to_details(V), + {B, K} = + case leveled_codec:from_ledgerkey(LK) of + {B0, K0} -> + {B0, K0}; + {B0, K0, _T0} -> + {B0, K0} + end, + JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, + case DeferredFetch of true -> - SQN = leveled_codec:strip_to_seqonly({LK, V}), - {B, K} = case leveled_codec:from_ledgerkey(LK) of - {B0, K0} -> {B0, K0}; - {B0, K0, _T0} -> {B0, K0} - end, - QK = leveled_codec:to_ledgerkey(B, K, Tag), - R = leveled_inker:ink_fetch(InkerClone, QK, SQN), - case R of - {ok, Value} -> - FoldObjectsFun(B, K, Value, Acc); - not_present -> - Acc - end; + Size = leveled_codec:get_size(LK, V), + MDBin = + leveled_codec:build_metadata_object(LK, MD), + Value = {proxy_object, + MDBin, + Size, + {fun fetch_value/2, InkerClone, JK}}, + FoldObjectsFun(B, K, Value, Acc); false -> - Acc - end - end, + R = fetch_value(InkerClone, JK), + case R of + not_present -> + Acc; + Value -> + FoldObjectsFun(B, K, Value, Acc) + + end + end; + false -> + Acc + end + end, AccFun.