Optional lookup in head_only mode
Allow decision to be made on startup whether ObjectSpecs can be looked up directly when running in head_only mode.
This commit is contained in:
parent
7751be1431
commit
70dfb77088
2 changed files with 92 additions and 34 deletions
|
@ -101,12 +101,13 @@
|
||||||
-record(state, {inker :: pid() | undefined,
|
-record(state, {inker :: pid() | undefined,
|
||||||
penciller :: pid() | undefined,
|
penciller :: pid() | undefined,
|
||||||
cache_size :: integer() | undefined,
|
cache_size :: integer() | undefined,
|
||||||
recent_aae :: false | #recent_aae{} | undefined,
|
recent_aae :: recent_aae(),
|
||||||
ledger_cache = #ledger_cache{},
|
ledger_cache = #ledger_cache{},
|
||||||
is_snapshot :: boolean() | undefined,
|
is_snapshot :: boolean() | undefined,
|
||||||
slow_offer = false :: boolean(),
|
slow_offer = false :: boolean(),
|
||||||
|
|
||||||
head_only = false :: boolean(),
|
head_only = false :: boolean(),
|
||||||
|
head_lookup = true :: boolean(),
|
||||||
|
|
||||||
put_countdown = 0 :: integer(),
|
put_countdown = 0 :: integer(),
|
||||||
get_countdown = 0 :: integer(),
|
get_countdown = 0 :: integer(),
|
||||||
|
@ -144,6 +145,7 @@
|
||||||
-type fold_timings() :: no_timing|#fold_timings{}.
|
-type fold_timings() :: no_timing|#fold_timings{}.
|
||||||
-type head_timings() :: no_timing|#head_timings{}.
|
-type head_timings() :: no_timing|#head_timings{}.
|
||||||
-type timing_types() :: head|get|put|fold.
|
-type timing_types() :: head|get|put|fold.
|
||||||
|
-type recent_aae() :: false|#recent_aae{}|undefined.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -512,20 +514,30 @@ init([Opts]) ->
|
||||||
unit_minutes = UnitMinutes}
|
unit_minutes = UnitMinutes}
|
||||||
end,
|
end,
|
||||||
|
|
||||||
HeadOnly = get_opt(head_only, Opts, false),
|
{HeadOnly, HeadLookup} =
|
||||||
|
case get_opt(head_only, Opts, false) of
|
||||||
|
false ->
|
||||||
|
{false, true};
|
||||||
|
with_lookup ->
|
||||||
|
{true, true};
|
||||||
|
no_lookup ->
|
||||||
|
{true, false}
|
||||||
|
end,
|
||||||
|
|
||||||
|
State0 = #state{cache_size=CacheSize,
|
||||||
|
recent_aae=RecentAAE,
|
||||||
|
is_snapshot=false,
|
||||||
|
head_only=HeadOnly,
|
||||||
|
head_lookup = HeadLookup},
|
||||||
|
|
||||||
{Inker, Penciller} =
|
{Inker, Penciller} =
|
||||||
startup(InkerOpts, PencillerOpts, RecentAAE),
|
startup(InkerOpts, PencillerOpts, State0),
|
||||||
|
|
||||||
NewETS = ets:new(mem, [ordered_set]),
|
NewETS = ets:new(mem, [ordered_set]),
|
||||||
leveled_log:log("B0001", [Inker, Penciller]),
|
leveled_log:log("B0001", [Inker, Penciller]),
|
||||||
{ok, #state{inker=Inker,
|
{ok, State0#state{inker=Inker,
|
||||||
penciller=Penciller,
|
penciller=Penciller,
|
||||||
cache_size=CacheSize,
|
ledger_cache=#ledger_cache{mem = NewETS}}};
|
||||||
recent_aae=RecentAAE,
|
|
||||||
ledger_cache=#ledger_cache{mem = NewETS},
|
|
||||||
is_snapshot=false,
|
|
||||||
head_only=HeadOnly}};
|
|
||||||
Bookie ->
|
Bookie ->
|
||||||
{ok, Penciller, Inker} =
|
{ok, Penciller, Inker} =
|
||||||
book_snapshot(Bookie, store, undefined, true),
|
book_snapshot(Bookie, store, undefined, true),
|
||||||
|
@ -552,7 +564,7 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
|
||||||
Object,
|
Object,
|
||||||
ObjSize,
|
ObjSize,
|
||||||
{IndexSpecs, TTL},
|
{IndexSpecs, TTL},
|
||||||
State#state.recent_aae),
|
State),
|
||||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||||
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
|
{_SW2, Timings2} = update_timings(SW1, {put, mem}, Timings1),
|
||||||
|
|
||||||
|
@ -590,7 +602,7 @@ handle_call({mput, ObjectSpecs, TTL}, From, State)
|
||||||
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
|
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
|
||||||
SQN, null, length(ObjectSpecs),
|
SQN, null, length(ObjectSpecs),
|
||||||
{ObjectSpecs, TTL},
|
{ObjectSpecs, TTL},
|
||||||
false),
|
State),
|
||||||
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
|
||||||
case State#state.slow_offer of
|
case State#state.slow_offer of
|
||||||
true ->
|
true ->
|
||||||
|
@ -654,7 +666,8 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
|
||||||
update_statetimings(get, Timings2, State#state.get_countdown),
|
update_statetimings(get, Timings2, State#state.get_countdown),
|
||||||
{reply, Reply, State#state{get_timings = Timings,
|
{reply, Reply, State#state{get_timings = Timings,
|
||||||
get_countdown = CountDown}};
|
get_countdown = CountDown}};
|
||||||
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
handle_call({head, Bucket, Key, Tag}, _From, State)
|
||||||
|
when State#state.head_lookup == true ->
|
||||||
SWp = os:timestamp(),
|
SWp = os:timestamp(),
|
||||||
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
|
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
|
||||||
|
@ -1156,14 +1169,14 @@ set_options(Opts) ->
|
||||||
levelzero_cointoss = true,
|
levelzero_cointoss = true,
|
||||||
compression_method = CompressionMethod}}.
|
compression_method = CompressionMethod}}.
|
||||||
|
|
||||||
startup(InkerOpts, PencillerOpts, RecentAAE) ->
|
startup(InkerOpts, PencillerOpts, State) ->
|
||||||
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
|
||||||
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
|
||||||
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
|
||||||
leveled_log:log("B0005", [LedgerSQN]),
|
leveled_log:log("B0005", [LedgerSQN]),
|
||||||
ok = leveled_inker:ink_loadpcl(Inker,
|
ok = leveled_inker:ink_loadpcl(Inker,
|
||||||
LedgerSQN + 1,
|
LedgerSQN + 1,
|
||||||
get_loadfun(RecentAAE),
|
get_loadfun(State),
|
||||||
Penciller),
|
Penciller),
|
||||||
{Inker, Penciller}.
|
{Inker, Penciller}.
|
||||||
|
|
||||||
|
@ -1193,25 +1206,34 @@ fetch_head(Key, Penciller, LedgerCache) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, _A) ->
|
-spec preparefor_ledgercache(atom(), any(), integer(), any(),
|
||||||
|
integer(), tuple(), book_state()) ->
|
||||||
|
{integer()|no_lookup, integer(), list()}.
|
||||||
|
%% @doc
|
||||||
|
%% Prepare an object and its related key changes for addition to the Ledger
|
||||||
|
%% via the Ledger Cache.
|
||||||
|
preparefor_ledgercache(?INKT_MPUT,
|
||||||
|
?DUMMY, SQN, _O, _S, {ObjSpecs, TTL},
|
||||||
|
_State) ->
|
||||||
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
|
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
|
||||||
{no_lookup, SQN, ObjChanges};
|
{no_lookup, SQN, ObjChanges};
|
||||||
preparefor_ledgercache(?INKT_KEYD,
|
preparefor_ledgercache(?INKT_KEYD,
|
||||||
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
|
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
|
||||||
_AAE) ->
|
_State) ->
|
||||||
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
{Bucket, Key} = leveled_codec:from_ledgerkey(LedgerKey),
|
||||||
KeyChanges =
|
KeyChanges =
|
||||||
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL),
|
||||||
{no_lookup, SQN, KeyChanges};
|
{no_lookup, SQN, KeyChanges};
|
||||||
preparefor_ledgercache(_InkTag,
|
preparefor_ledgercache(_InkTag,
|
||||||
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
|
LedgerKey, SQN, Obj, Size, {IdxSpecs, TTL},
|
||||||
AAE) ->
|
State) ->
|
||||||
{Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
|
{Bucket, Key, MetaValue, {KeyH, ObjH}, LastMods} =
|
||||||
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
|
leveled_codec:generate_ledgerkv(LedgerKey, SQN, Obj, Size, TTL),
|
||||||
KeyChanges =
|
KeyChanges =
|
||||||
[{LedgerKey, MetaValue}] ++
|
[{LedgerKey, MetaValue}] ++
|
||||||
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
|
leveled_codec:idx_indexspecs(IdxSpecs, Bucket, Key, SQN, TTL) ++
|
||||||
leveled_codec:aae_indexspecs(AAE, Bucket, Key, SQN, ObjH, LastMods),
|
leveled_codec:aae_indexspecs(State#state.recent_aae,
|
||||||
|
Bucket, Key, SQN, ObjH, LastMods),
|
||||||
{KeyH, SQN, KeyChanges}.
|
{KeyH, SQN, KeyChanges}.
|
||||||
|
|
||||||
|
|
||||||
|
@ -1270,10 +1292,10 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
get_loadfun(RecentAAE) ->
|
get_loadfun(State) ->
|
||||||
PrepareFun =
|
PrepareFun =
|
||||||
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
|
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
|
||||||
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE)
|
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, State)
|
||||||
end,
|
end,
|
||||||
LoadFun =
|
LoadFun =
|
||||||
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
|
fun(KeyInJournal, ValueInJournal, _Pos, Acc0, ExtractFun) ->
|
||||||
|
@ -1832,7 +1854,7 @@ foldobjects_vs_foldheads_bybucket_testto() ->
|
||||||
{foldheads_bybucket,
|
{foldheads_bybucket,
|
||||||
?STD_TAG,
|
?STD_TAG,
|
||||||
"BucketB",
|
"BucketB",
|
||||||
{"Key", "Key4zzzz"},
|
{"Key", "Key4|"},
|
||||||
FoldHeadsFun,
|
FoldHeadsFun,
|
||||||
true, false, false}),
|
true, false, false}),
|
||||||
KeyHashList2E = HTFolder2E(),
|
KeyHashList2E = HTFolder2E(),
|
||||||
|
@ -1841,13 +1863,17 @@ foldobjects_vs_foldheads_bybucket_testto() ->
|
||||||
{foldheads_bybucket,
|
{foldheads_bybucket,
|
||||||
?STD_TAG,
|
?STD_TAG,
|
||||||
"BucketB",
|
"BucketB",
|
||||||
{"Key5", <<"all">>},
|
{"Key5", "Key|"},
|
||||||
FoldHeadsFun,
|
FoldHeadsFun,
|
||||||
true, false, false}),
|
true, false, false}),
|
||||||
KeyHashList2F = HTFolder2F(),
|
KeyHashList2F = HTFolder2F(),
|
||||||
|
|
||||||
?assertMatch(true, length(KeyHashList2E) > 0),
|
?assertMatch(true, length(KeyHashList2E) > 0),
|
||||||
?assertMatch(true, length(KeyHashList2F) > 0),
|
?assertMatch(true, length(KeyHashList2F) > 0),
|
||||||
|
io:format("Length of 2B ~w 2E ~w 2F ~w~n",
|
||||||
|
[length(KeyHashList2B),
|
||||||
|
length(KeyHashList2E),
|
||||||
|
length(KeyHashList2F)]),
|
||||||
?assertMatch(true,
|
?assertMatch(true,
|
||||||
lists:usort(KeyHashList2B) ==
|
lists:usort(KeyHashList2B) ==
|
||||||
lists:usort(KeyHashList2E ++ KeyHashList2F)),
|
lists:usort(KeyHashList2E ++ KeyHashList2F)),
|
||||||
|
|
|
@ -1013,17 +1013,21 @@ recent_aae_expiry(_Config) ->
|
||||||
|
|
||||||
|
|
||||||
basic_headonly(_Config) ->
|
basic_headonly(_Config) ->
|
||||||
|
ObjectCount = 200000,
|
||||||
|
basic_headonly_test(ObjectCount, with_lookup),
|
||||||
|
basic_headonly_test(ObjectCount, no_lookup).
|
||||||
|
|
||||||
|
|
||||||
|
basic_headonly_test(ObjectCount, HeadOnly) ->
|
||||||
% Load some AAE type objects into Leveled using the read_only mode. This
|
% Load some AAE type objects into Leveled using the read_only mode. This
|
||||||
% should allow for the items to be added in batches. Confirm that the
|
% should allow for the items to be added in batches. Confirm that the
|
||||||
% journal is garbage collected as expected, and that it is possible to
|
% journal is garbage collected as expected, and that it is possible to
|
||||||
% perform a fold_heads style query
|
% perform a fold_heads style query
|
||||||
ObjectCount = 200000,
|
|
||||||
|
|
||||||
RootPathHO = testutil:reset_filestructure("testHO"),
|
RootPathHO = testutil:reset_filestructure("testHO"),
|
||||||
StartOpts1 = [{root_path, RootPathHO},
|
StartOpts1 = [{root_path, RootPathHO},
|
||||||
{max_pencillercachesize, 16000},
|
{max_pencillercachesize, 16000},
|
||||||
{sync_strategy, sync},
|
{sync_strategy, sync},
|
||||||
{head_only, true},
|
{head_only, HeadOnly},
|
||||||
{max_journalsize, 500000}],
|
{max_journalsize, 500000}],
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
{B1, K1, V1, S1, MD} = {"Bucket",
|
{B1, K1, V1, S1, MD} = {"Bucket",
|
||||||
|
@ -1052,8 +1056,8 @@ basic_headonly(_Config) ->
|
||||||
|
|
||||||
SW0 = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
|
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
|
||||||
io:format("Loaded an object count of ~w in ~w microseconds ~n",
|
io:format("Loaded an object count of ~w in ~w microseconds with ~w~n",
|
||||||
[ObjectCount, timer:now_diff(os:timestamp(), SW0)]),
|
[ObjectCount, timer:now_diff(os:timestamp(), SW0), HeadOnly]),
|
||||||
|
|
||||||
FoldFun =
|
FoldFun =
|
||||||
fun(_B, _K, V, {HashAcc, CountAcc}) ->
|
fun(_B, _K, V, {HashAcc, CountAcc}) ->
|
||||||
|
@ -1068,7 +1072,7 @@ basic_headonly(_Config) ->
|
||||||
|
|
||||||
SW1 = os:timestamp(),
|
SW1 = os:timestamp(),
|
||||||
{AccH1, AccC1} = Runner1(),
|
{AccH1, AccC1} = Runner1(),
|
||||||
io:format("AccH and AccC of ~w ~w in ~w microseconds ~n",
|
io:format("AccH and AccC of ~w ~w in ~w microseconds~n",
|
||||||
[AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]),
|
[AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]),
|
||||||
|
|
||||||
true = AccC1 == ObjectCount,
|
true = AccC1 == ObjectCount,
|
||||||
|
@ -1094,10 +1098,24 @@ basic_headonly(_Config) ->
|
||||||
|
|
||||||
{ok, FinalFNs} = file:list_dir(JFP),
|
{ok, FinalFNs} = file:list_dir(JFP),
|
||||||
|
|
||||||
% If we allow HEAD_TAG to be suubject to a lookup, then test this here
|
|
||||||
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
|
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
|
||||||
{ok, Hash0} =
|
case HeadOnly of
|
||||||
leveled_bookie:book_head(Bookie1, SegmentID0, {Bucket0, Key0}, h),
|
with_lookup ->
|
||||||
|
% If we allow HEAD_TAG to be suubject to a lookup, then test this
|
||||||
|
% here
|
||||||
|
{ok, Hash0} =
|
||||||
|
leveled_bookie:book_head(Bookie1,
|
||||||
|
SegmentID0,
|
||||||
|
{Bucket0, Key0},
|
||||||
|
h);
|
||||||
|
no_lookup ->
|
||||||
|
{unsupported_message, head} =
|
||||||
|
leveled_bookie:book_head(Bookie1,
|
||||||
|
SegmentID0,
|
||||||
|
{Bucket0, Key0},
|
||||||
|
h)
|
||||||
|
end,
|
||||||
|
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie1),
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
{ok, FinalJournals} = file:list_dir(JFP),
|
{ok, FinalJournals} = file:list_dir(JFP),
|
||||||
|
@ -1113,8 +1131,22 @@ basic_headonly(_Config) ->
|
||||||
{_AccH2, AccC2} = Runner2(),
|
{_AccH2, AccC2} = Runner2(),
|
||||||
true = AccC2 == ObjectCount,
|
true = AccC2 == ObjectCount,
|
||||||
|
|
||||||
{ok, Hash0} =
|
case HeadOnly of
|
||||||
leveled_bookie:book_head(Bookie2, SegmentID0, {Bucket0, Key0}, h),
|
with_lookup ->
|
||||||
|
% If we allow HEAD_TAG to be suubject to a lookup, then test this
|
||||||
|
% here
|
||||||
|
{ok, Hash0} =
|
||||||
|
leveled_bookie:book_head(Bookie2,
|
||||||
|
SegmentID0,
|
||||||
|
{Bucket0, Key0},
|
||||||
|
h);
|
||||||
|
no_lookup ->
|
||||||
|
{unsupported_message, head} =
|
||||||
|
leveled_bookie:book_head(Bookie2,
|
||||||
|
SegmentID0,
|
||||||
|
{Bucket0, Key0},
|
||||||
|
h)
|
||||||
|
end,
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie2).
|
ok = leveled_bookie:book_close(Bookie2).
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue