Add tests using fold_heads

Comparing the inbuilt tictac_tree fold, to using "proper" abstraction and achieving the same thing through fold_heads.

The fold_heads method is slower (a lot more manipulation required in the fold) - expect it to require > 2 x CPU.

However, this does give the flexibility to change the hash algorithm.  This would allow for a fold over a database of AAE trees (where the hash has been pre-computed using sha) to be compared with a fold over a database of leveled backends.

Also can vary whether the fold_heads checks for presence of the object in the Inker.  So normally we can get the speed advantage of not checking the Journal for presence, but periodically we can.
This commit is contained in:
Martin Sumner 2017-08-07 10:45:41 +01:00
parent dd20132892
commit 53ddc8950b
2 changed files with 207 additions and 121 deletions

View file

@ -16,7 +16,7 @@
%% %%
%% %%
%% -------- Actors --------- %% -------- Actors ---------
%% %%
%% The store is fronted by a Bookie, who takes support from different actors: %% The store is fronted by a Bookie, who takes support from different actors:
%% - An Inker who persists new data into the journal, and returns items from %% - An Inker who persists new data into the journal, and returns items from
%% the journal based on sequence number %% the journal based on sequence number
@ -70,7 +70,7 @@
loadqueue_ledgercache/1, loadqueue_ledgercache/1,
push_ledgercache/2, push_ledgercache/2,
snapshot_store/5, snapshot_store/5,
fetch_value/2]). fetch_value/2]).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -102,6 +102,7 @@
put_timing :: tuple() | undefined, put_timing :: tuple() | undefined,
get_timing :: tuple() | undefined}). get_timing :: tuple() | undefined}).
-type book_state() :: #state{}.
%%%============================================================================ %%%============================================================================
%%% API %%% API
@ -113,7 +114,7 @@
%% provide this startup method. This will start a KV store from the previous %% provide this startup method. This will start a KV store from the previous
%% store at root path - or an empty one if there is no store at the path. %% store at root path - or an empty one if there is no store at the path.
%% %%
%% Fiddling with the LedgerCacheSize and JournalSize may improve performance, %% Fiddling with the LedgerCacheSize and JournalSize may improve performance,
%% but these are primarily exposed to support special situations (e.g. very %% but these are primarily exposed to support special situations (e.g. very
%% low memory installations), there should not be huge variance in outcomes %% low memory installations), there should not be huge variance in outcomes
%% from modifying these numbers. %% from modifying these numbers.
@ -158,7 +159,7 @@ book_start(RootPath, LedgerCacheSize, JournalSize, SyncStrategy) ->
%% up before deletion) %% up before deletion)
%% %%
%% TODO: %% TODO:
%% The reload_strategy is exposed as currently no firm decision has been made %% The reload_strategy is exposed as currently no firm decision has been made
%% about how recovery should work. For instance if we were to trust everything %% about how recovery should work. For instance if we were to trust everything
%% as permanent in the Ledger once it is persisted, then there would be no %% as permanent in the Ledger once it is persisted, then there would be no
%% need to retain a skinny history of key changes in the Journal after %% need to retain a skinny history of key changes in the Journal after
@ -200,7 +201,7 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
%% - A Primary Key and a Value %% - A Primary Key and a Value
%% - IndexSpecs - a set of secondary key changes associated with the %% - IndexSpecs - a set of secondary key changes associated with the
%% transaction %% transaction
%% - A tag indictaing the type of object. Behaviour for metadata extraction, %% - A tag indictaing the type of object. Behaviour for metadata extraction,
%% and ledger compaction will vary by type. There are three currently %% and ledger compaction will vary by type. There are three currently
%% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with %% implemented types i (Index), o (Standard), o_rkv (Riak). Keys added with
%% Index tags are not fetchable (as they will not be hashed), but are %% Index tags are not fetchable (as they will not be hashed), but are
@ -314,9 +315,9 @@ book_head(Pid, Bucket, Key, Tag) ->
%% objects with a given tag %% objects with a given tag
%% {tictactree_idx, %% {tictactree_idx,
%% {Bucket, IdxField, StartValue, EndValue}, %% {Bucket, IdxField, StartValue, EndValue},
%% TreeSize, %% TreeSize,
%% PartitionFilter} %% PartitionFilter}
%% -> compile a hashtree for the items on the index. A partition filter is %% -> compile a hashtree for the items on the index. A partition filter is
%% required to avoid adding an index entry in this vnode as a fallback. %% required to avoid adding an index entry in this vnode as a fallback.
%% There is no de-duplicate of results, duplicate reuslts corrupt the tree. %% There is no de-duplicate of results, duplicate reuslts corrupt the tree.
%% {tictactree_obj, %% {tictactree_obj,
@ -384,7 +385,7 @@ init([Opts]) ->
undefined -> undefined ->
% Start from file not snapshot % Start from file not snapshot
{InkerOpts, PencillerOpts} = set_options(Opts), {InkerOpts, PencillerOpts} = set_options(Opts),
CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER), CacheJitter = ?CACHE_SIZE div (100 div ?CACHE_SIZE_JITTER),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE) CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE)
+ erlang:phash2(self()) rem CacheJitter, + erlang:phash2(self()) rem CacheJitter,
@ -398,9 +399,9 @@ init([Opts]) ->
limit_minutes = LimitMinutes, limit_minutes = LimitMinutes,
unit_minutes = UnitMinutes} unit_minutes = UnitMinutes}
end, end,
{Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE), {Inker, Penciller} = startup(InkerOpts, PencillerOpts, RecentAAE),
NewETS = ets:new(mem, [ordered_set]), NewETS = ets:new(mem, [ordered_set]),
leveled_log:log("B0001", [Inker, Penciller]), leveled_log:log("B0001", [Inker, Penciller]),
{ok, #state{inker=Inker, {ok, #state{inker=Inker,
@ -467,12 +468,12 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
State#state.penciller, State#state.penciller,
State#state.ledger_cache) of State#state.ledger_cache) of
not_present -> not_present ->
GT0 = leveled_log:get_timing(State#state.get_timing, GT0 = leveled_log:get_timing(State#state.get_timing,
SWh, SWh,
head_not_present), head_not_present),
{reply, not_found, State#state{get_timing=GT0}}; {reply, not_found, State#state{get_timing=GT0}};
Head -> Head ->
GT0 = leveled_log:get_timing(State#state.get_timing, GT0 = leveled_log:get_timing(State#state.get_timing,
SWh, SWh,
head_found), head_found),
SWg = os:timestamp(), SWg = os:timestamp(),
@ -518,8 +519,8 @@ handle_call({head, Bucket, Key, Tag}, _From, State) ->
end end
end; end;
handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) -> handle_call({snapshot, _Requestor, SnapType, _Timeout}, _From, State) ->
% TODO: clean-up passing of Requestor (which was previously just used in % TODO: clean-up passing of Requestor (which was previously just used in
% logs) and so can now be ignored, and timeout which is ignored - but % logs) and so can now be ignored, and timeout which is ignored - but
% probably shouldn't be. % probably shouldn't be.
Reply = snapshot_store(State, SnapType), Reply = snapshot_store(State, SnapType),
{reply, Reply, State}; {reply, Reply, State};
@ -595,13 +596,21 @@ handle_call({return_folder, FolderType}, _From, State) ->
TreeSize, TreeSize,
PartitionFilter), PartitionFilter),
State}; State};
{foldheads_allkeys, Tag, FoldHeadsFun} -> {foldheads_allkeys, Tag, FoldHeadsFun,
CheckPresence, SnapPreFold} ->
{reply, {reply,
foldheads_allkeys(State, Tag, FoldHeadsFun), foldheads_allkeys(State, Tag,
FoldHeadsFun,
CheckPresence,
SnapPreFold),
State}; State};
{foldheads_bybucket, Tag, Bucket, FoldHeadsFun} -> {foldheads_bybucket, Tag, Bucket, FoldHeadsFun,
CheckPresence, SnapPreFold} ->
{reply, {reply,
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun), foldheads_bybucket(State, Tag, Bucket,
FoldHeadsFun,
CheckPresence,
SnapPreFold),
State}; State};
{foldobjects_allkeys, Tag, FoldObjectsFun} -> {foldobjects_allkeys, Tag, FoldObjectsFun} ->
{reply, {reply,
@ -622,7 +631,7 @@ handle_call({return_folder, FolderType}, _From, State) ->
Field, FromTerm, ToTerm, Field, FromTerm, ToTerm,
FoldObjectsFun), FoldObjectsFun),
State} State}
end; end;
handle_call({compact_journal, Timeout}, _From, State) -> handle_call({compact_journal, Timeout}, _From, State) ->
ok = leveled_inker:ink_compactjournal(State#state.inker, ok = leveled_inker:ink_compactjournal(State#state.inker,
@ -703,9 +712,9 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
LedgerCache#ledger_cache.index, LedgerCache#ledger_cache.index,
LedgerCache#ledger_cache.min_sqn, LedgerCache#ledger_cache.min_sqn,
LedgerCache#ledger_cache.max_sqn}, LedgerCache#ledger_cache.max_sqn},
LongRunning = LongRunning =
case Query of case Query of
undefined -> undefined ->
true; true;
no_lookup -> no_lookup ->
true; true;
@ -728,7 +737,7 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
{ok, LedgerSnapshot, JournalSnapshot}; {ok, LedgerSnapshot, JournalSnapshot};
ledger -> ledger ->
{ok, LedgerSnapshot, null} {ok, LedgerSnapshot, null}
end. end.
snapshot_store(State, SnapType) -> snapshot_store(State, SnapType) ->
snapshot_store(State, SnapType, undefined). snapshot_store(State, SnapType, undefined).
@ -837,7 +846,7 @@ get_nextbucket(NextBucket, NextKey, Tag, LedgerSnapshot, BKList) ->
leveled_log:log("B0010",[NB]), leveled_log:log("B0010",[NB]),
[] []
end. end.
index_query(State, index_query(State,
Bucket, Bucket,
@ -955,7 +964,7 @@ tictactree(State, Tag, Bucket, Query, JournalCheck, TreeSize, Filter) ->
Tag), Tag),
PassHashFun} PassHashFun}
end, end,
AccFun = accumulate_tree(Filter, AccFun = accumulate_tree(Filter,
JournalCheck, JournalCheck,
JournalSnapshot, JournalSnapshot,
@ -965,7 +974,7 @@ tictactree(State, Tag, Bucket, Query, JournalCheck, TreeSize, Filter) ->
EndKey, EndKey,
AccFun, AccFun,
Tree), Tree),
% Close down snapshot when complete so as not to hold removed % Close down snapshot when complete so as not to hold removed
% files open % files open
ok = leveled_penciller:pcl_close(LedgerSnapshot), ok = leveled_penciller:pcl_close(LedgerSnapshot),
@ -982,22 +991,26 @@ tictactree(State, Tag, Bucket, Query, JournalCheck, TreeSize, Filter) ->
foldobjects_allkeys(State, Tag, FoldObjectsFun) -> foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun,
false, true).
foldheads_allkeys(State, Tag, FoldHeadsFun) -> foldheads_allkeys(State, Tag, FoldHeadsFun, CheckPresence, SnapPreFold) ->
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
EndKey = leveled_codec:to_ledgerkey(null, null, Tag), EndKey = leveled_codec:to_ledgerkey(null, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true). foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun,
{true, CheckPresence}, SnapPreFold).
foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) -> foldobjects_bybucket(State, Tag, Bucket, FoldObjectsFun) ->
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun,
false, true).
foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun) -> foldheads_bybucket(State, Tag, Bucket, FoldHeadsFun, CheckPresence, SnapPreFold) ->
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun, true). foldobjects(State, Tag, StartKey, EndKey, FoldHeadsFun,
{true, CheckPresence}, SnapPreFold).
foldobjects_byindex(State, Tag, Bucket, foldobjects_byindex(State, Tag, Bucket,
Field, FromTerm, ToTerm, FoldObjectsFun) -> Field, FromTerm, ToTerm, FoldObjectsFun) ->
@ -1005,34 +1018,53 @@ foldobjects_byindex(State, Tag, Bucket,
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm), leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, FromTerm),
EndKey = EndKey =
leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm), leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, Field, ToTerm),
foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun, false). foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun,
false, true).
-spec foldobjects(book_state(), atom(), tuple(), tuple(), fun(),
foldobjects(_State, Tag, StartKey, EndKey, FoldObjectsFun, DeferredFetch) -> false|{true, boolean()}, boolean()) ->
{FoldFun, InitAcc} = case is_tuple(FoldObjectsFun) of {async, fun()}.
true -> %% @doc
FoldObjectsFun; %% The object folder should be passed DeferredFetch and SnapPreFold.
false -> %% DeferredFetch can either be false (which will return to the fold function
{FoldObjectsFun, []} %% the full object), or {true, CheckPresence} - in which case a proxy object
end, %% will be created that if understood by the fold function will allow the fold
% For fold_objects the snapshot has been moved inside of the Folder %% function to work on the head of the object, and defer fetching the body in
% function. %% case such a fetch is unecessary.
% %% SnapPreFold determines if the snapshot of the database is done prior to
% fold_objects and fold_heads are called by the riak_kv_sweeper in Riak, %% returning the Folder function (SnapPreFold=true) or when the Folder function
% and the sweeper prompts the fold before checking to see if the fold is %% is called as Folder() {SnapPreFold=false}
% ready to be run. This may lead to the fold being called on an old foldobjects(State, Tag, StartKey, EndKey, FoldObjectsFun,
% snapshot. DeferredFetch, SnapPreFold) ->
Self = self(), {FoldFun, InitAcc} =
Folder = case is_tuple(FoldObjectsFun) of
fun() -> true ->
{ok, % FoldObjectsFun is already a tuple with a Fold function and an
LedgerSnapshot, % initial accumulator
JournalSnapshot} = book_snapshotstore(Self, Self, 5400), FoldObjectsFun;
false ->
% no initial accumulatr passed, and so should be just a list
{FoldObjectsFun, []}
end,
SnapFun =
case SnapPreFold of
true ->
{ok, LS, JS} = snapshot_store(State, store, undefined),
fun() -> {ok, LS, JS} end;
false ->
Self = self(),
% Timeout will be ignored, as will Requestor % Timeout will be ignored, as will Requestor
% %
% This uses the external snapshot - as the snpshot will need % This uses the external snapshot - as the snapshot will need
% to have consistent state between Bookie and Penciller when % to have consistent state between Bookie and Penciller when
% it is made. % it is made.
fun() -> book_snapshotstore(Self, Self, 5400) end
end,
Folder =
fun() ->
{ok, LedgerSnapshot, JournalSnapshot} = SnapFun(),
AccFun = accumulate_objects(FoldFun, AccFun = accumulate_objects(FoldFun,
JournalSnapshot, JournalSnapshot,
Tag, Tag,
@ -1099,7 +1131,7 @@ readycache_forsnapshot(LedgerCache, Query) ->
min_sqn=LedgerCache#ledger_cache.min_sqn, min_sqn=LedgerCache#ledger_cache.min_sqn,
max_sqn=LedgerCache#ledger_cache.max_sqn}; max_sqn=LedgerCache#ledger_cache.max_sqn};
_ -> _ ->
Idx = Idx =
case Query of case Query of
no_lookup -> no_lookup ->
empty_index; empty_index;
@ -1141,21 +1173,21 @@ set_options(Opts) ->
JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER), JournalSizeJitter = MaxJournalSize0 div (100 div ?JOURNAL_SIZE_JITTER),
MaxJournalSize = MaxJournalSize0 - MaxJournalSize = MaxJournalSize0 -
erlang:phash2(self()) rem JournalSizeJitter, erlang:phash2(self()) rem JournalSizeJitter,
SyncStrat = get_opt(sync_strategy, Opts, sync), SyncStrat = get_opt(sync_strategy, Opts, sync),
WRP = get_opt(waste_retention_period, Opts), WRP = get_opt(waste_retention_period, Opts),
AltStrategy = get_opt(reload_strategy, Opts, []), AltStrategy = get_opt(reload_strategy, Opts, []),
ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy),
PCLL0CacheSize = get_opt(max_pencillercachesize, Opts), PCLL0CacheSize = get_opt(max_pencillercachesize, Opts),
RootPath = get_opt(root_path, Opts), RootPath = get_opt(root_path, Opts),
JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP, JournalFP = RootPath ++ "/" ++ ?JOURNAL_FP,
LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP, LedgerFP = RootPath ++ "/" ++ ?LEDGER_FP,
ok = filelib:ensure_dir(JournalFP), ok = filelib:ensure_dir(JournalFP),
ok = filelib:ensure_dir(LedgerFP), ok = filelib:ensure_dir(LedgerFP),
{#inker_options{root_path = JournalFP, {#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy, reload_strategy = ReloadStrategy,
max_run_length = get_opt(max_run_length, Opts), max_run_length = get_opt(max_run_length, Opts),
@ -1181,7 +1213,7 @@ startup(InkerOpts, PencillerOpts, RecentAAE) ->
fetch_head(Key, Penciller, LedgerCache) -> fetch_head(Key, Penciller, LedgerCache) ->
SW = os:timestamp(), SW = os:timestamp(),
CacheResult = CacheResult =
case LedgerCache#ledger_cache.mem of case LedgerCache#ledger_cache.mem of
undefined -> undefined ->
[]; [];
@ -1239,7 +1271,7 @@ accumulate_tree(FilterFun, JournalCheck, InkerClone, HashFun) ->
get_hashaccumulator(JournalCheck, get_hashaccumulator(JournalCheck,
InkerClone, InkerClone,
AddKeyFun). AddKeyFun).
get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
Now = leveled_codec:integer_now(), Now = leveled_codec:integer_now(),
AccFun = AccFun =
@ -1256,7 +1288,7 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) ->
false -> false ->
Acc Acc
end; end;
_ -> _ ->
AddKeyFun(B, K, H, Acc) AddKeyFun(B, K, H, Acc)
end; end;
false -> false ->
@ -1292,30 +1324,25 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
end, end,
JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN}, JK = {leveled_codec:to_ledgerkey(B, K, Tag), SQN},
case DeferredFetch of case DeferredFetch of
true -> {true, true} ->
InJournal = InJournal =
leveled_inker:ink_keycheck(InkerClone, leveled_inker:ink_keycheck(InkerClone,
LK, LK,
SQN), SQN),
case InJournal of case InJournal of
probably -> probably ->
Size = leveled_codec:get_size(LK, V), ProxyObj = make_proxy_object(LK, JK,
MDBin = MD, V,
leveled_codec:build_metadata_object(LK, InkerClone),
MD), FoldObjectsFun(B, K,ProxyObj, Acc);
Value = {proxy_object,
MDBin,
Size,
{fun fetch_value/2,
InkerClone,
JK}},
FoldObjectsFun(B,
K,
term_to_binary(Value),
Acc);
missing -> missing ->
Acc Acc
end; end;
{true, false} ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
FoldObjectsFun(B, K,ProxyObj, Acc);
false -> false ->
R = fetch_value(InkerClone, JK), R = fetch_value(InkerClone, JK),
case R of case R of
@ -1323,7 +1350,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
Acc; Acc;
Value -> Value ->
FoldObjectsFun(B, K, Value, Acc) FoldObjectsFun(B, K, Value, Acc)
end end
end; end;
false -> false ->
@ -1332,6 +1359,13 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
end, end,
AccFun. AccFun.
make_proxy_object(LK, JK, MD, V, InkerClone) ->
Size = leveled_codec:get_size(LK, V),
MDBin = leveled_codec:build_metadata_object(LK, MD),
term_to_binary({proxy_object,
MDBin,
Size,
{fun fetch_value/2, InkerClone, JK}}).
check_presence(Key, Value, InkerClone) -> check_presence(Key, Value, InkerClone) ->
{LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}), {LedgerKey, SQN} = leveled_codec:strip_to_keyseqonly({Key, Value}),
@ -1455,7 +1489,7 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
end. end.
maybe_withjitter(CacheSize, MaxCacheSize) -> maybe_withjitter(CacheSize, MaxCacheSize) ->
if if
CacheSize > MaxCacheSize -> CacheSize > MaxCacheSize ->
R = leveled_rand:uniform(7 * MaxCacheSize), R = leveled_rand:uniform(7 * MaxCacheSize),
@ -1471,7 +1505,7 @@ maybe_withjitter(CacheSize, MaxCacheSize) ->
get_loadfun(RecentAAE) -> get_loadfun(RecentAAE) ->
PrepareFun = PrepareFun =
fun(Tag, PK, SQN, Obj, VS, IdxSpecs) -> fun(Tag, PK, SQN, Obj, VS, IdxSpecs) ->
preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE) preparefor_ledgercache(Tag, PK, SQN, Obj, VS, IdxSpecs, RecentAAE)
end, end,
@ -1480,11 +1514,11 @@ get_loadfun(RecentAAE) ->
{MinSQN, MaxSQN, OutputTree} = Acc0, {MinSQN, MaxSQN, OutputTree} = Acc0,
{SQN, InkTag, PK} = KeyInJournal, {SQN, InkTag, PK} = KeyInJournal,
% VBin may already be a term % VBin may already be a term
{VBin, VSize} = ExtractFun(ValueInJournal), {VBin, VSize} = ExtractFun(ValueInJournal),
{Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin), {Obj, IdxSpecs} = leveled_codec:split_inkvalue(VBin),
case SQN of case SQN of
SQN when SQN < MinSQN -> SQN when SQN < MinSQN ->
{loop, Acc0}; {loop, Acc0};
SQN when SQN < MaxSQN -> SQN when SQN < MaxSQN ->
Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs), Chngs = PrepareFun(InkTag, PK, SQN, Obj, VSize, IdxSpecs),
{loop, {loop,
@ -1549,7 +1583,7 @@ generate_multiple_objects(Count, KeyNumber, ObjL) ->
KeyNumber + 1, KeyNumber + 1,
ObjL ++ [{Key, Value, IndexSpec}]). ObjL ++ [{Key, Value, IndexSpec}]).
ttl_test() -> ttl_test() ->
RootPath = reset_filestructure(), RootPath = reset_filestructure(),
{ok, Bookie1} = book_start([{root_path, RootPath}]), {ok, Bookie1} = book_start([{root_path, RootPath}]),
@ -1569,7 +1603,7 @@ ttl_test() ->
{ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG) {ok, _} = book_head(Bookie1, "Bucket", K, ?STD_TAG)
end, end,
ObjL1), ObjL1),
ObjL2 = generate_multiple_objects(100, 101), ObjL2 = generate_multiple_objects(100, 101),
Past = leveled_codec:integer_now() - 300, Past = leveled_codec:integer_now() - 300,
lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1, lists:foreach(fun({K, V, S}) -> ok = book_tempput(Bookie1,
@ -1585,7 +1619,7 @@ ttl_test() ->
not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG) not_found = book_head(Bookie1, "Bucket", K, ?STD_TAG)
end, end,
ObjL2), ObjL2),
{async, BucketFolder} = book_returnfolder(Bookie1, {async, BucketFolder} = book_returnfolder(Bookie1,
{bucket_stats, "Bucket"}), {bucket_stats, "Bucket"}),
{_Size, Count} = BucketFolder(), {_Size, Count} = BucketFolder(),
@ -1600,7 +1634,7 @@ ttl_test() ->
{false, undefined}}), {false, undefined}}),
KeyList = IndexFolder(), KeyList = IndexFolder(),
?assertMatch(20, length(KeyList)), ?assertMatch(20, length(KeyList)),
{ok, Regex} = re:compile("f8"), {ok, Regex} = re:compile("f8"),
{async, {async,
IndexFolderTR} = book_returnfolder(Bookie1, IndexFolderTR} = book_returnfolder(Bookie1,
@ -1611,10 +1645,10 @@ ttl_test() ->
{true, Regex}}), {true, Regex}}),
TermKeyList = IndexFolderTR(), TermKeyList = IndexFolderTR(),
?assertMatch(10, length(TermKeyList)), ?assertMatch(10, length(TermKeyList)),
ok = book_close(Bookie1), ok = book_close(Bookie1),
{ok, Bookie2} = book_start([{root_path, RootPath}]), {ok, Bookie2} = book_start([{root_path, RootPath}]),
{async, {async,
IndexFolderTR2} = book_returnfolder(Bookie2, IndexFolderTR2} = book_returnfolder(Bookie2,
{index_query, {index_query,
@ -1624,7 +1658,7 @@ ttl_test() ->
{false, Regex}}), {false, Regex}}),
KeyList2 = IndexFolderTR2(), KeyList2 = IndexFolderTR2(),
?assertMatch(10, length(KeyList2)), ?assertMatch(10, length(KeyList2)),
lists:foreach(fun({K, _V, _S}) -> lists:foreach(fun({K, _V, _S}) ->
not_found = book_get(Bookie2, "Bucket", K, ?STD_TAG) not_found = book_get(Bookie2, "Bucket", K, ?STD_TAG)
end, end,
@ -1633,7 +1667,7 @@ ttl_test() ->
not_found = book_head(Bookie2, "Bucket", K, ?STD_TAG) not_found = book_head(Bookie2, "Bucket", K, ?STD_TAG)
end, end,
ObjL2), ObjL2),
ok = book_close(Bookie2), ok = book_close(Bookie2),
reset_filestructure(). reset_filestructure().
@ -1729,7 +1763,7 @@ foldobjects_vs_hashtree_test() ->
?STD_TAG, ?STD_TAG,
false}), false}),
KeyHashList1 = lists:usort(HTFolder1()), KeyHashList1 = lists:usort(HTFolder1()),
FoldObjectsFun = fun(B, K, V, Acc) -> FoldObjectsFun = fun(B, K, V, Acc) ->
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end, [{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
{async, HTFolder2} = {async, HTFolder2} =
@ -1737,7 +1771,7 @@ foldobjects_vs_hashtree_test() ->
{foldobjects_allkeys, ?STD_TAG, FoldObjectsFun}), {foldobjects_allkeys, ?STD_TAG, FoldObjectsFun}),
KeyHashList2 = HTFolder2(), KeyHashList2 = HTFolder2(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList2)), ?assertMatch(KeyHashList1, lists:usort(KeyHashList2)),
FoldHeadsFun = FoldHeadsFun =
fun(B, K, ProxyV, Acc) -> fun(B, K, ProxyV, Acc) ->
{proxy_object, {proxy_object,
@ -1747,14 +1781,14 @@ foldobjects_vs_hashtree_test() ->
V = FetchFun(Clone, JK), V = FetchFun(Clone, JK),
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] [{B, K, erlang:phash2(term_to_binary(V))}|Acc]
end, end,
{async, HTFolder3} = {async, HTFolder3} =
book_returnfolder(Bookie1, book_returnfolder(Bookie1,
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun}), {foldheads_allkeys, ?STD_TAG, FoldHeadsFun}),
KeyHashList3 = HTFolder3(), KeyHashList3 = HTFolder3(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList3)), ?assertMatch(KeyHashList1, lists:usort(KeyHashList3)),
FoldHeadsFun2 = FoldHeadsFun2 =
fun(B, K, ProxyV, Acc) -> fun(B, K, ProxyV, Acc) ->
{proxy_object, {proxy_object,
MD, MD,
@ -1763,13 +1797,13 @@ foldobjects_vs_hashtree_test() ->
{Hash, _Size} = MD, {Hash, _Size} = MD,
[{B, K, Hash}|Acc] [{B, K, Hash}|Acc]
end, end,
{async, HTFolder4} = {async, HTFolder4} =
book_returnfolder(Bookie1, book_returnfolder(Bookie1,
{foldheads_allkeys, ?STD_TAG, FoldHeadsFun2}), {foldheads_allkeys, ?STD_TAG, FoldHeadsFun2}),
KeyHashList4 = HTFolder4(), KeyHashList4 = HTFolder4(),
?assertMatch(KeyHashList1, lists:usort(KeyHashList4)), ?assertMatch(KeyHashList1, lists:usort(KeyHashList4)),
ok = book_close(Bookie1), ok = book_close(Bookie1),
reset_filestructure(). reset_filestructure().
@ -1793,7 +1827,7 @@ foldobjects_vs_foldheads_bybucket_test() ->
?STD_TAG, ?STD_TAG,
Future) end, Future) end,
ObjL2), ObjL2),
FoldObjectsFun = fun(B, K, V, Acc) -> FoldObjectsFun = fun(B, K, V, Acc) ->
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] end, [{B, K, erlang:phash2(term_to_binary(V))}|Acc] end,
{async, HTFolder1A} = {async, HTFolder1A} =
@ -1812,7 +1846,7 @@ foldobjects_vs_foldheads_bybucket_test() ->
KeyHashList1B = HTFolder1B(), KeyHashList1B = HTFolder1B(),
?assertMatch(false, ?assertMatch(false,
lists:usort(KeyHashList1A) == lists:usort(KeyHashList1B)), lists:usort(KeyHashList1A) == lists:usort(KeyHashList1B)),
FoldHeadsFun = FoldHeadsFun =
fun(B, K, ProxyV, Acc) -> fun(B, K, ProxyV, Acc) ->
{proxy_object, {proxy_object,
@ -1822,7 +1856,7 @@ foldobjects_vs_foldheads_bybucket_test() ->
V = FetchFun(Clone, JK), V = FetchFun(Clone, JK),
[{B, K, erlang:phash2(term_to_binary(V))}|Acc] [{B, K, erlang:phash2(term_to_binary(V))}|Acc]
end, end,
{async, HTFolder2A} = {async, HTFolder2A} =
book_returnfolder(Bookie1, book_returnfolder(Bookie1,
{foldheads_bybucket, {foldheads_bybucket,
@ -1841,7 +1875,7 @@ foldobjects_vs_foldheads_bybucket_test() ->
lists:usort(KeyHashList1A) == lists:usort(KeyHashList2A)), lists:usort(KeyHashList1A) == lists:usort(KeyHashList2A)),
?assertMatch(true, ?assertMatch(true,
lists:usort(KeyHashList1B) == lists:usort(KeyHashList2B)), lists:usort(KeyHashList1B) == lists:usort(KeyHashList2B)),
ok = book_close(Bookie1), ok = book_close(Bookie1),
reset_filestructure(). reset_filestructure().
@ -1873,7 +1907,7 @@ scan_table_test() ->
<<"F1-bin">>, <<"F1-bin">>,
<<"AA2">>), <<"AA2">>),
Tab0 = ets:new(mem, [ordered_set]), Tab0 = ets:new(mem, [ordered_set]),
SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>, SK_A0 = leveled_codec:to_ledgerkey(<<"B1">>,
null, null,
?IDX_TAG, ?IDX_TAG,

View file

@ -113,31 +113,83 @@ many_put_compare(_Config) ->
% Now run the same query by putting the tree-building responsibility onto % Now run the same query by putting the tree-building responsibility onto
% the fold_objects_fun % the fold_objects_fun
HashFun =
fun(_Key, Value) -> ApplyHash =
{proxy_object, HeadBin, _Size, _FetchFun} = binary_to_term(Value), fun(HashFun) ->
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer, fun(_Key, Value) ->
Rest/binary>> = HeadBin, {proxy_object, HeadBin, _Size, _FetchFun} = binary_to_term(Value),
<<VclockBin:VclockLen/binary, _NotNeeded/binary>> = Rest, <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
erlang:phash2(lists:sort(binary_to_term(VclockBin))) Rest/binary>> = HeadBin,
<<VclockBin:VclockLen/binary, _NotNeeded/binary>> = Rest,
HashFun(lists:sort(binary_to_term(VclockBin)))
end
end, end,
FoldObjectsFun = FoldObjectsFun =
fun(_Bucket, Key, Value, Acc) -> fun(_Bucket, Key, Value, Acc) ->
leveled_tictac:add_kv(Acc, Key, Value, HashFun) leveled_tictac:add_kv(Acc, Key, Value, ApplyHash(fun erlang:phash2/1))
end, end,
FoldQ = {foldheads_bybucket,
FoldQ0 = {foldheads_bybucket,
o_rkv, o_rkv,
"Bucket", "Bucket",
{FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)}}, {FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
{async, TreeAObjFolder} = leveled_bookie:book_returnfolder(Bookie2, FoldQ), false, true},
{async, TreeAObjFolder0} =
leveled_bookie:book_returnfolder(Bookie2, FoldQ0),
SWB0Obj = os:timestamp(), SWB0Obj = os:timestamp(),
TreeAObj = TreeAObjFolder(), TreeAObj0 = TreeAObjFolder0(),
io:format("Build tictac tree via object foldwith 200K objects in ~w~n", io:format("Build tictac tree via object fold with no "++
"presence check and 200K objects in ~w~n",
[timer:now_diff(os:timestamp(), SWB0Obj)]), [timer:now_diff(os:timestamp(), SWB0Obj)]),
SegList0Obj = leveled_tictac:find_dirtyleaves(TreeA, TreeAObj), true = length(leveled_tictac:find_dirtyleaves(TreeA, TreeAObj0)) == 0,
io:format("Fold object compared with tictac fold has ~w diffs~n",
[length(SegList0Obj)]), FoldQ1 = {foldheads_bybucket,
true = length(SegList0Obj) == 0, o_rkv,
"Bucket",
{FoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
true, true},
{async, TreeAObjFolder1} =
leveled_bookie:book_returnfolder(Bookie2, FoldQ1),
SWB1Obj = os:timestamp(),
TreeAObj1 = TreeAObjFolder1(),
io:format("Build tictac tree via object fold with "++
"presence check and 200K objects in ~w~n",
[timer:now_diff(os:timestamp(), SWB1Obj)]),
true = length(leveled_tictac:find_dirtyleaves(TreeA, TreeAObj1)) == 0,
% AAE trees within riak are based on a sha of the vector clock. So to
% compare with an AAE tree we need to compare outputs when we're hashing
% a hash
AltHashFun =
fun(Term) ->
erlang:phash2(crypto:hash(sha, term_to_binary(Term)))
end,
AltFoldObjectsFun =
fun(_Bucket, Key, Value, Acc) ->
leveled_tictac:add_kv(Acc, Key, Value, ApplyHash(AltHashFun))
end,
AltFoldQ0 = {foldheads_bybucket,
o_rkv,
"Bucket",
{AltFoldObjectsFun, leveled_tictac:new_tree(0, TreeSize)},
false, true},
{async, TreeAAltObjFolder0} =
leveled_bookie:book_returnfolder(Bookie2, AltFoldQ0),
SWB2Obj = os:timestamp(),
TreeAAltObj = TreeAAltObjFolder0(),
io:format("Build tictac tree via object fold with no "++
"presence check and 200K objects and alt hash in ~w~n",
[timer:now_diff(os:timestamp(), SWB2Obj)]),
{async, TreeBAltObjFolder0} =
leveled_bookie:book_returnfolder(Bookie3, AltFoldQ0),
SWB3Obj = os:timestamp(),
TreeBAltObj = TreeBAltObjFolder0(),
io:format("Build tictac tree via object fold with no "++
"presence check and 200K objects and alt hash in ~w~n",
[timer:now_diff(os:timestamp(), SWB3Obj)]),
true =
length(leveled_tictac:find_dirtyleaves(TreeBAltObj, TreeAAltObj)) == 1,
%% Finding differing keys %% Finding differing keys
FoldKeysFun = FoldKeysFun =