Extend foldobjects to support proxy object

To allow for folds which probbaly don't need values to not always to
have to fetch the value
This commit is contained in:
martinsumner 2017-04-07 12:09:11 +00:00
parent 1146e843bc
commit b464e2e28c

View file

@ -69,7 +69,8 @@
empty_ledgercache/0, empty_ledgercache/0,
loadqueue_ledgercache/1, loadqueue_ledgercache/1,
push_ledgercache/2, push_ledgercache/2,
snapshot_store/5]). snapshot_store/5,
fetch_value/2]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -447,7 +448,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
{reply, not_found, State}; {reply, not_found, State};
{active, TS} -> {active, TS} ->
Active = TS >= leveled_codec:integer_now(), Active = TS >= leveled_codec:integer_now(),
Object = fetch_value(LedgerKey, Seqn, State#state.inker), Object = fetch_value(State#state.inker, {LedgerKey, Seqn}),
GT1 = leveled_log:get_timing(GT0, SWg, fetch), GT1 = leveled_log:get_timing(GT0, SWg, fetch),
case {Active, Object} of case {Active, Object} of
{_, not_present} -> {_, not_present} ->
@ -531,6 +532,10 @@ handle_call({return_folder, FolderType}, _From, State) ->
{reply, {reply,
hashtree_query(State, Tag, JournalCheck), hashtree_query(State, Tag, JournalCheck),
State}; State};
{foldheads_allkeys, Tag, FoldHeadsFun} ->
{reply,
foldheads_allkeys(State, Tag, FoldHeadsFun),
State};
{foldobjects_allkeys, Tag, FoldObjectsFun} -> {foldobjects_allkeys, Tag, FoldObjectsFun} ->
{reply, {reply,
foldobjects_allkeys(State, Tag, FoldObjectsFun), foldobjects_allkeys(State, Tag, FoldObjectsFun),
@ -656,6 +661,16 @@ snapshot_store(State, SnapType, Query) ->
SnapType, SnapType,
Query). Query).
fetch_value(Inker, {Key, SQN}) ->
SW = os:timestamp(),
case leveled_inker:ink_fetch(Inker, Key, SQN) of
{ok, Value} ->
maybe_longrunning(SW, inker_fetch),
Value;
not_present ->
not_present
end.
%%%============================================================================ %%%============================================================================
%%% Internal functions %%% Internal functions
@ -803,21 +818,28 @@ hashtree_query(State, Tag, JournalCheck) ->
foldobjects_allkeys(State, Tag, FoldObjectsFun) -> foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
foldheads_allkeys(State, Tag, FoldHeadsFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true).
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) -> foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
foldobjects_byindex(State, Tag, Bucket, Field, FromTerm, ToTerm, FoldObjectsFun) -> foldobjects_byindex(State, Tag, Bucket,
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, Field, FromTerm, ToTerm, FoldObjectsFun) ->
FromTerm), StartKey =
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
ToTerm), EndKey =
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun). leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false).
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) ->
{ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store), {ok, LedgerSnapshot, JournalSnapshot} = snapshot_store(State, store),
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of {FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of
true -> true ->
@ -826,7 +848,10 @@ foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun) ->
{FoldObjectsFun, []} {FoldObjectsFun, []}
end, end,
Folder = fun() -> Folder = fun() ->
AccFun = accumulate_objects(FoldFun, JournalSnapshot, Tag), AccFun = accumulate_objects(FoldFun,
JournalSnapshot,
Tag,
DeferredFetch),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey, StartKey,
EndKey, EndKey,
@ -943,8 +968,8 @@ set_options(Opts) ->
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
ok =filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(JournalFP),
ok =filelib:ensure_dir(LedgerFP), ok = filelib:ensure_dir(LedgerFP),
{#inker_options{root_path = JournalFP, {#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
@ -993,16 +1018,6 @@ fetch_head(Key, Penciller, LedgerCache) ->
end end
end. end.
fetch_value(Key, SQN, Inker) ->
SW = os:timestamp(),
case leveled_inker:ink_fetch(Inker, Key, SQN) of
{ok, Value} ->
maybe_longrunning(SW, inker_fetch),
Value;
not_present ->
not_present
end.
accumulate_size() -> accumulate_size() ->
Now = leveled_codec:integer_now(), Now = leveled_codec:integer_now(),
@ -1041,28 +1056,47 @@ accumulate_hashes(JournalCheck, InkerClone) ->
end, end,
AccFun. AccFun.
accumulate_objects(FoldObjectsFun, InkerClone, Tag) ->
accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
Now = leveled_codec:integer_now(), Now = leveled_codec:integer_now(),
AccFun = fun(LK, V, Acc) -> AccFun =
case leveled_codec:is_active(LK, V, Now) of fun(LK, V, Acc) ->
case leveled_codec:is_active(LK, V, Now) of
true ->
{SQN, _St, _MH, MD} =
leveled_codec:striphead_to_details(V),
{B, K} =
case leveled_codec:from_ledgerkey(LK) of
{B0, K0} ->
{B0, K0};
{B0, K0, _T0} ->
{B0, K0}
end,
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
case DeferredFetch of
true -> true ->
SQN = leveled_codec:strip_to_seqonly({LK, V}), Size = leveled_codec:get_size(LK, V),
{B, K} = case leveled_codec:from_ledgerkey(LK) of MDBin =
{B0, K0} -> {B0, K0}; leveled_codec:build_metadata_object(LK, MD),
{B0, K0, _T0} -> {B0, K0} Value = {proxy_object,
end, MDBin,
QK = leveled_codec:to_ledgerkey(B, K, Tag), Size,
R = leveled_inker:ink_fetch(InkerClone, QK, SQN), {fun fetch_value/2, InkerClone, JK}},
case R of FoldObjectsFun(B, K, Value, Acc);
{ok, Value} ->
FoldObjectsFun(B, K, Value, Acc);
not_present ->
Acc
end;
false -> false ->
Acc R = fetch_value(InkerClone, JK),
end case R of
end, not_present ->
Acc;
Value ->
FoldObjectsFun(B, K, Value, Acc)
end
end;
false ->
Acc
end
end,
AccFun. AccFun.