Add lookup support in head_only mode
Originally had disabled the ability to lookup individual values when running in head_only mode. This is a saving of about 11% at PUT time (about 3 microseconds per PUT) on a macbook. Not sure this saving is sufficient enought to justify the extra work if this is used as an AAE Keystore with Bitcask and LWW (when we need to lookup the current value before adjusting). So reverted to re-adding support for HEAD requests with these keys.
This commit is contained in:
parent
2b6281b2b5
commit
910ccb6072
5 changed files with 71 additions and 22 deletions
|
@ -654,11 +654,7 @@ handle_call({get, Bucket, Key, Tag}, _From, State)
|
||||||
update_statetimings(get, Timings2, State#state.get_countdown),
|
update_statetimings(get, Timings2, State#state.get_countdown),
|
||||||
{reply, Reply, State#state{get_timings = Timings,
|
{reply, Reply, State#state{get_timings = Timings,
|
||||||
get_countdown = CountDown}};
|
get_countdown = CountDown}};
|
||||||
handle_call({head, Bucket, Key, Tag}, _From, State)
|
handle_call({head, Bucket, Key, Tag}, _From, State) ->
|
||||||
when State#state.head_only == false ->
|
|
||||||
% Head requests are not possible when the status is head_only, as head_only
|
|
||||||
% objects are only retrievable via folds not direct object access (there
|
|
||||||
% is no hash generated for the objects to accelerate lookup)
|
|
||||||
SWp = os:timestamp(),
|
SWp = os:timestamp(),
|
||||||
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
|
||||||
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
|
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
|
||||||
|
|
|
@ -98,6 +98,9 @@ segment_hash(Key) when is_binary(Key) ->
|
||||||
segment_hash({?RIAK_TAG, Bucket, Key, null})
|
segment_hash({?RIAK_TAG, Bucket, Key, null})
|
||||||
when is_binary(Bucket), is_binary(Key) ->
|
when is_binary(Bucket), is_binary(Key) ->
|
||||||
segment_hash(<<Bucket/binary, Key/binary>>);
|
segment_hash(<<Bucket/binary, Key/binary>>);
|
||||||
|
segment_hash({?HEAD_TAG, Bucket, Key, SubKey})
|
||||||
|
when is_binary(Bucket), is_binary(Key), is_binary(SubKey) ->
|
||||||
|
segment_hash(<<Bucket/binary, Key/binary, SubKey/binary>>);
|
||||||
segment_hash(Key) ->
|
segment_hash(Key) ->
|
||||||
segment_hash(term_to_binary(Key)).
|
segment_hash(term_to_binary(Key)).
|
||||||
|
|
||||||
|
@ -229,11 +232,11 @@ from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
|
||||||
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
|
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
|
||||||
{?IDX_TAG, Bucket, {Field, Value}, Key}.
|
{?IDX_TAG, Bucket, {Field, Value}, Key}.
|
||||||
|
|
||||||
|
to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG) ->
|
||||||
|
{?HEAD_TAG, Bucket, Key, SubKey};
|
||||||
to_ledgerkey(Bucket, Key, Tag) ->
|
to_ledgerkey(Bucket, Key, Tag) ->
|
||||||
{Tag, Bucket, Key, null}.
|
{Tag, Bucket, Key, null}.
|
||||||
|
|
||||||
to_ledgerkey(Bucket, Key, Tag, SubKey) ->
|
|
||||||
{Tag, Bucket, Key, SubKey}.
|
|
||||||
|
|
||||||
%% Return the Key, Value and Hash Option for this object. The hash option
|
%% Return the Key, Value and Hash Option for this object. The hash option
|
||||||
%% indicates whether the key would ever be looked up directly, and so if it
|
%% indicates whether the key would ever be looked up directly, and so if it
|
||||||
|
@ -481,8 +484,8 @@ gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) ->
|
||||||
%% TODO: timestamps for delayed reaping
|
%% TODO: timestamps for delayed reaping
|
||||||
tomb
|
tomb
|
||||||
end,
|
end,
|
||||||
{to_ledgerkey(Bucket, Key, ?HEAD_TAG, SubKey),
|
K = to_ledgerkey(Bucket, {Key, SubKey}, ?HEAD_TAG),
|
||||||
{SQN, Status, no_lookup, Value}}.
|
{K, {SQN, Status, segment_hash(K), Value}}.
|
||||||
|
|
||||||
|
|
||||||
-spec aae_indexspecs(false|recent_aae(),
|
-spec aae_indexspecs(false|recent_aae(),
|
||||||
|
|
|
@ -243,9 +243,9 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
handle_cast({trim, Inker, PersistedSQN}, State) ->
|
handle_cast({trim, Inker, PersistedSQN}, State) ->
|
||||||
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
|
ManifestAsList = leveled_inker:ink_getmanifest(Inker),
|
||||||
FilesToDelete =
|
FilesToDelete =
|
||||||
leveled_imanifest:find_persistedentries(PersistedSQN, Manifest),
|
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
|
||||||
ok = update_inker(Inker, [], FilesToDelete),
|
ok = update_inker(Inker, [], FilesToDelete),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
|
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
|
||||||
|
|
|
@ -106,16 +106,16 @@ find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker ->
|
||||||
find_entry(SQN, [_TopEntry|Tail]) ->
|
find_entry(SQN, [_TopEntry|Tail]) ->
|
||||||
find_entry(SQN, Tail).
|
find_entry(SQN, Tail).
|
||||||
|
|
||||||
-spec find_persistedentries(integer(), manifest()) -> list(manifest_entry()).
|
-spec find_persistedentries(integer(), list()) -> list(manifest_entry()).
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Find the entries in the manifest where all items are < than the persisted
|
%% Find the entries in the manifest where all items are < than the persisted
|
||||||
%% SQN in the ledger
|
%% SQN in the ledger
|
||||||
find_persistedentries(SQN, Manifest) ->
|
find_persistedentries(SQN, ManifestAsList) ->
|
||||||
DropFun =
|
DropFun =
|
||||||
fun({ME_SQN, _FN, _ME_P, _LK}) ->
|
fun({ME_SQN, _FN, _ME_P, _LK}) ->
|
||||||
ME_SQN > SQN
|
ME_SQN > SQN
|
||||||
end,
|
end,
|
||||||
Entries = lists:dropwhile(DropFun, to_list(Manifest)),
|
Entries = lists:dropwhile(DropFun, ManifestAsList),
|
||||||
case Entries of
|
case Entries of
|
||||||
[_Head|Tail] ->
|
[_Head|Tail] ->
|
||||||
Tail;
|
Tail;
|
||||||
|
@ -258,13 +258,13 @@ buildfromend_test() ->
|
||||||
|
|
||||||
findpersisted_test() ->
|
findpersisted_test() ->
|
||||||
Man = from_list(build_testmanifest_aslist()),
|
Man = from_list(build_testmanifest_aslist()),
|
||||||
FilesToDelete1 = find_persistedentries(2001, Man),
|
FilesToDelete1 = find_persistedentries(2001, to_list(Man)),
|
||||||
?assertMatch(2, length(FilesToDelete1)),
|
?assertMatch(2, length(FilesToDelete1)),
|
||||||
FilesToDelete2 = find_persistedentries(3000, Man),
|
FilesToDelete2 = find_persistedentries(3000, to_list(Man)),
|
||||||
?assertMatch(3, length(FilesToDelete2)),
|
?assertMatch(3, length(FilesToDelete2)),
|
||||||
FilesToDelete3 = find_persistedentries(2999, Man),
|
FilesToDelete3 = find_persistedentries(2999, to_list(Man)),
|
||||||
?assertMatch(2, length(FilesToDelete3)),
|
?assertMatch(2, length(FilesToDelete3)),
|
||||||
FilesToDelete4 = find_persistedentries(999, Man),
|
FilesToDelete4 = find_persistedentries(999, to_list(Man)),
|
||||||
?assertMatch([], FilesToDelete4).
|
?assertMatch([], FilesToDelete4).
|
||||||
|
|
||||||
buildrandomfashion_test() ->
|
buildrandomfashion_test() ->
|
||||||
|
|
|
@ -1017,13 +1017,14 @@ basic_headonly(_Config) ->
|
||||||
% should allow for the items to be added in batches. Confirm that the
|
% should allow for the items to be added in batches. Confirm that the
|
||||||
% journal is garbage collected as expected, and that it is possible to
|
% journal is garbage collected as expected, and that it is possible to
|
||||||
% perform a fold_heads style query
|
% perform a fold_heads style query
|
||||||
ObjectCount = 100000,
|
ObjectCount = 200000,
|
||||||
|
|
||||||
RootPathHO = testutil:reset_filestructure("testHO"),
|
RootPathHO = testutil:reset_filestructure("testHO"),
|
||||||
StartOpts1 = [{root_path, RootPathHO},
|
StartOpts1 = [{root_path, RootPathHO},
|
||||||
{max_pencillercachesize, 16000},
|
{max_pencillercachesize, 16000},
|
||||||
{sync_strategy, sync},
|
{sync_strategy, sync},
|
||||||
{head_only, true}],
|
{head_only, true},
|
||||||
|
{max_journalsize, 500000}],
|
||||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||||
{B1, K1, V1, S1, MD} = {"Bucket",
|
{B1, K1, V1, S1, MD} = {"Bucket",
|
||||||
"Key1.1.4567.4321",
|
"Key1.1.4567.4321",
|
||||||
|
@ -1037,7 +1038,7 @@ basic_headonly(_Config) ->
|
||||||
ObjectSpecFun =
|
ObjectSpecFun =
|
||||||
fun(Op) ->
|
fun(Op) ->
|
||||||
fun(N) ->
|
fun(N) ->
|
||||||
Bucket = <<"B", N:32/integer>>,
|
Bucket = <<"B", N:8/integer>>,
|
||||||
Key = <<"K", N:32/integer>>,
|
Key = <<"K", N:32/integer>>,
|
||||||
<<SegmentID:20/integer, _RestBS/bitstring>> =
|
<<SegmentID:20/integer, _RestBS/bitstring>> =
|
||||||
crypto:hash(md5, term_to_binary({Bucket, Key})),
|
crypto:hash(md5, term_to_binary({Bucket, Key})),
|
||||||
|
@ -1048,7 +1049,11 @@ basic_headonly(_Config) ->
|
||||||
end,
|
end,
|
||||||
|
|
||||||
ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)),
|
ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)),
|
||||||
|
|
||||||
|
SW0 = os:timestamp(),
|
||||||
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
|
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
|
||||||
|
io:format("Loaded an object count of ~w in ~w microseconds ~n",
|
||||||
|
[ObjectCount, timer:now_diff(os:timestamp(), SW0)]),
|
||||||
|
|
||||||
FoldFun =
|
FoldFun =
|
||||||
fun(_B, _K, V, {HashAcc, CountAcc}) ->
|
fun(_B, _K, V, {HashAcc, CountAcc}) ->
|
||||||
|
@ -1068,7 +1073,52 @@ basic_headonly(_Config) ->
|
||||||
|
|
||||||
true = AccC1 == ObjectCount,
|
true = AccC1 == ObjectCount,
|
||||||
|
|
||||||
ok = leveled_bookie:book_close(Bookie1).
|
JFP = RootPathHO ++ "/journal/journal_files",
|
||||||
|
{ok, FNs} = file:list_dir(JFP),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_trimjournal(Bookie1),
|
||||||
|
|
||||||
|
WaitForTrimFun =
|
||||||
|
fun(N, _Acc) ->
|
||||||
|
{ok, PollFNs} = file:list_dir(JFP),
|
||||||
|
case length(PollFNs) < length(FNs) of
|
||||||
|
true ->
|
||||||
|
true;
|
||||||
|
false ->
|
||||||
|
timer:sleep(N * 1000),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
|
||||||
|
true = lists:foldl(WaitForTrimFun, false, [1, 2, 3, 5, 8, 13]),
|
||||||
|
|
||||||
|
{ok, FinalFNs} = file:list_dir(JFP),
|
||||||
|
|
||||||
|
% If we allow HEAD_TAG to be suubject to a lookup, then test this here
|
||||||
|
[{add, SegmentID0, Bucket0, Key0, Hash0}|_Rest] = ObjectSpecL,
|
||||||
|
{ok, Hash0} =
|
||||||
|
leveled_bookie:book_head(Bookie1, SegmentID0, {Bucket0, Key0}, h),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie1),
|
||||||
|
{ok, FinalJournals} = file:list_dir(JFP),
|
||||||
|
io:format("Trim has reduced journal count from " ++
|
||||||
|
"~w to ~w and ~w after restart~n",
|
||||||
|
[length(FNs), length(FinalFNs), length(FinalJournals)]),
|
||||||
|
|
||||||
|
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||||
|
|
||||||
|
{async, Runner2} =
|
||||||
|
leveled_bookie:book_returnfolder(Bookie2, RunnerDefinition),
|
||||||
|
|
||||||
|
{_AccH2, AccC2} = Runner2(),
|
||||||
|
true = AccC2 == ObjectCount,
|
||||||
|
|
||||||
|
{ok, Hash0} =
|
||||||
|
leveled_bookie:book_head(Bookie2, SegmentID0, {Bucket0, Key0}, h),
|
||||||
|
|
||||||
|
ok = leveled_bookie:book_close(Bookie2).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
load_objectspecs([], _SliceSize, _Bookie) ->
|
load_objectspecs([], _SliceSize, _Bookie) ->
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue