Initial head_only features

Initial commit to add head_only mode to leveled.  This allows leveled to receive batches of object changes, but where those objects exist only in the Penciller's Ledger (once they have been persisted within the Ledger).

The aim is to reduce significantly the cost of compaction.  Also, the objects ar enot directly accessible (they can only be accessed through folds).  Again this makes life easier during merging in the LSM trees (as no bloom filters have to be created).
This commit is contained in:
Martin Sumner 2018-02-15 16:14:46 +00:00
parent 4d3151752f
commit 2b6281b2b5
9 changed files with 304 additions and 25 deletions

View file

@ -5,10 +5,15 @@
-define(STD_TAG, o).
%% Tag used for secondary index keys
-define(IDX_TAG, i).
%% Tag used for head-only objects
-define(HEAD_TAG, h).
%% Inker key type used for 'normal' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects
-define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction

View file

@ -51,6 +51,8 @@
book_put/5,
book_put/6,
book_tempput/7,
book_mput/2,
book_mput/3,
book_delete/4,
book_get/3,
book_get/4,
@ -60,6 +62,7 @@
book_snapshot/4,
book_compactjournal/2,
book_islastcompactionpending/1,
book_trimjournal/1,
book_close/1,
book_destroy/1]).
@ -85,6 +88,7 @@
-define(COMPRESSION_POINT, on_receipt).
-define(TIMING_SAMPLESIZE, 100).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-record(ledger_cache, {mem :: ets:tab(),
loader = leveled_tree:empty(?CACHE_TYPE)
@ -101,6 +105,9 @@
ledger_cache = #ledger_cache{},
is_snapshot :: boolean() | undefined,
slow_offer = false :: boolean(),
head_only = false :: boolean(),
put_countdown = 0 :: integer(),
get_countdown = 0 :: integer(),
fold_countdown = 0 :: integer(),
@ -310,6 +317,28 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) ->
infinity).
-spec book_mput(pid(), list(tuple())) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches fo object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {ObjectOp, Bucket, Key, SubKey, Value}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request. The ObjectOp is either add or remove.
book_mput(Pid, ObjectSpecs) ->
book_mput(Pid, ObjectSpecs, infinity).
-spec book_mput(pid(), list(tuple()), infinity|integer()) -> ok|pause.
%% @doc
%%
%% When the store is being run in head_only mode, batches fo object specs may
%% be inserted in to the store using book_mput/2. ObjectSpecs should be
%% of the form {action, {Bucket, Key, SubKey, Value}}. The Value will be
%% stored within the HEAD of the object (in the Ledger), so the full object
%% is retrievable using a HEAD request.
book_mput(Pid, ObjectSpecs, TTL) ->
gen_server:call(Pid, {mput, ObjectSpecs, TTL}, infinity).
-spec book_delete(pid(), any(), any(), list()) -> ok|pause.
%% @doc
@ -419,6 +448,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
-spec book_compactjournal(pid(), integer()) -> ok.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
%% @doc Call for compaction of the Journal
%%
@ -433,6 +463,13 @@ book_compactjournal(Pid, Timeout) ->
book_islastcompactionpending(Pid) ->
gen_server:call(Pid, confirm_compact, infinity).
%% @doc Trim the journal when in head_only mode
%%
%% In head_only mode the journlacna be trimmed of entries which are before the
%% persisted SQN. This is much quicker than compacting the journal
book_trimjournal(Pid) ->
gen_server:call(Pid, trim, infinity).
-spec book_close(pid()) -> ok.
-spec book_destroy(pid()) -> ok.
@ -475,6 +512,8 @@ init([Opts]) ->
unit_minutes = UnitMinutes}
end,
HeadOnly = get_opt(head_only, Opts, false),
{Inker, Penciller} =
startup(InkerOpts, PencillerOpts, RecentAAE),
@ -485,7 +524,8 @@ init([Opts]) ->
cache_size=CacheSize,
recent_aae=RecentAAE,
ledger_cache=#ledger_cache{mem = NewETS},
is_snapshot=false}};
is_snapshot=false,
head_only=HeadOnly}};
Bookie ->
{ok, Penciller, Inker} =
book_snapshot(Bookie, store, undefined, true),
@ -496,7 +536,8 @@ init([Opts]) ->
end.
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State)
when State#state.head_only == false ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SW0 = os:timestamp(),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
@ -541,7 +582,34 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) ->
put_countdown = CountDown,
slow_offer = true}}
end;
handle_call({get, Bucket, Key, Tag}, _From, State) ->
handle_call({mput, ObjectSpecs, TTL}, From, State)
when State#state.head_only == true ->
{ok, SQN} =
leveled_inker:ink_mput(State#state.inker, dummy, {ObjectSpecs, TTL}),
Changes =
preparefor_ledgercache(?INKT_MPUT, ?DUMMY,
SQN, null, length(ObjectSpecs),
{ObjectSpecs, TTL},
false),
Cache0 = addto_ledgercache(Changes, State#state.ledger_cache),
case State#state.slow_offer of
true ->
gen_server:reply(From, pause);
false ->
gen_server:reply(From, ok)
end,
case maybepush_ledgercache(State#state.cache_size,
Cache0,
State#state.penciller) of
{ok, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
slow_offer = false}};
{returned, NewCache} ->
{noreply, State#state{ledger_cache = NewCache,
slow_offer = true}}
end;
handle_call({get, Bucket, Key, Tag}, _From, State)
when State#state.head_only == false ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SWh = os:timestamp(),
HeadResult =
@ -586,7 +654,11 @@ handle_call({get, Bucket, Key, Tag}, _From, State) ->
update_statetimings(get, Timings2, State#state.get_countdown),
{reply, Reply, State#state{get_timings = Timings,
get_countdown = CountDown}};
handle_call({head, Bucket, Key, Tag}, _From, State) ->
handle_call({head, Bucket, Key, Tag}, _From, State)
when State#state.head_only == false ->
% Head requests are not possible when the status is head_only, as head_only
% objects are only retrievable via folds not direct object access (there
% is no hash generated for the objects to accelerate lookup)
SWp = os:timestamp(),
LK = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
case fetch_head(LK, State#state.penciller, State#state.ledger_cache) of
@ -634,17 +706,24 @@ handle_call({return_runner, QueryType}, _From, State) ->
update_statetimings(fold, Timings1, State#state.fold_countdown),
{reply, Runner, State#state{fold_timings = Timings,
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State) ->
handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
self(),
Timeout),
{reply, ok, State};
handle_call(confirm_compact, _From, State) ->
handle_call(confirm_compact, _From, State)
when State#state.head_only == false ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};
handle_call(trim, _From, State) when State#state.head_only == true ->
PSQN = leveled_penciller:pcl_persistedsqn(State#state.penciller),
{reply, leveled_inker:ink_trim(State#state.inker, PSQN), State};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(destroy, _From, State=#state{is_snapshot=Snp}) when Snp == false ->
{stop, destroy, ok, State}.
{stop, destroy, ok, State};
handle_call(Msg, _From, State) ->
{reply, {unsupported_message, element(1, Msg)}, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@ -1118,6 +1197,9 @@ fetch_head(Key, Penciller, LedgerCache) ->
end.
preparefor_ledgercache(?INKT_MPUT, ?DUMMY, SQN, _O, _S, {ObjSpecs, TTL}, _A) ->
ObjChanges = leveled_codec:obj_objectspecs(ObjSpecs, SQN, TTL),
{no_lookup, SQN, ObjChanges};
preparefor_ledgercache(?INKT_KEYD,
LedgerKey, SQN, _Obj, _Size, {IdxSpecs, TTL},
_AAE) ->

View file

@ -62,6 +62,7 @@
get_size/2,
get_keyandobjhash/2,
idx_indexspecs/5,
obj_objectspecs/3,
aae_indexspecs/6,
generate_uuid/0,
integer_now/0,
@ -222,7 +223,7 @@ from_ledgerkey({?IDX_TAG, ?ALL_BUCKETS, {_IdxFld, IdxVal}, {Bucket, Key}}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({?IDX_TAG, Bucket, {_IdxFld, IdxVal}, Key}) ->
{Bucket, Key, IdxVal};
from_ledgerkey({_Tag, Bucket, Key, null}) ->
from_ledgerkey({_Tag, Bucket, Key, _SubKey}) ->
{Bucket, Key}.
to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
@ -231,6 +232,9 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG ->
to_ledgerkey(Bucket, Key, Tag) ->
{Tag, Bucket, Key, null}.
to_ledgerkey(Bucket, Key, Tag, SubKey) ->
{Tag, Bucket, Key, SubKey}.
%% Return the Key, Value and Hash Option for this object. The hash option
%% indicates whether the key would ever be looked up directly, and so if it
%% requires an entry in the hash table
@ -404,6 +408,8 @@ split_inkvalue(VBin) when is_binary(VBin) ->
check_forinkertype(_LedgerKey, delete) ->
?INKT_TOMB;
check_forinkertype(_LedgerKey, head_only) ->
?INKT_MPUT;
check_forinkertype(_LedgerKey, _Object) ->
?INKT_STND.
@ -424,6 +430,14 @@ endkey_passed({EK1, EK2, EK3, null}, {CK1, CK2, CK3, _}) ->
endkey_passed(EndKey, CheckingKey) ->
EndKey < CheckingKey.
obj_objectspecs(ObjectSpecs, SQN, TTL) ->
lists:map(fun({IdxOp, Bucket, Key, SubKey, Value}) ->
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL)
end,
ObjectSpecs).
idx_indexspecs(IndexSpecs, Bucket, Key, SQN, TTL) ->
lists:map(
fun({IdxOp, IdxFld, IdxTrm}) ->
@ -458,6 +472,19 @@ gen_indexspec(Bucket, Key, IdxOp, IdxField, IdxTerm, SQN, TTL) ->
{SQN, Status, no_lookup, null}}
end.
gen_headspec(Bucket, Key, IdxOp, SubKey, Value, SQN, TTL) ->
Status =
case IdxOp of
add ->
{active, TTL};
remove ->
%% TODO: timestamps for delayed reaping
tomb
end,
{to_ledgerkey(Bucket, Key, ?HEAD_TAG, SubKey),
{SQN, Status, no_lookup, Value}}.
-spec aae_indexspecs(false|recent_aae(),
any(), any(),
integer(), integer(),
@ -611,7 +638,9 @@ get_size(PK, Value) ->
Size;
?STD_TAG ->
{_Hash, Size} = MD,
Size
Size;
?HEAD_TAG ->
0
end.
-spec get_keyandobjhash(tuple(), tuple()) -> tuple().
@ -641,12 +670,14 @@ get_objhash(Tag, ObjMetaData) ->
build_metadata_object(PrimaryKey, MD) ->
{Tag, _Bucket, _Key, null} = PrimaryKey,
{Tag, _Bucket, _Key, _SubKey} = PrimaryKey,
case Tag of
?RIAK_TAG ->
{SibData, Vclock, _Hash, _Size} = MD,
riak_metadata_to_binary(Vclock, SibData);
?STD_TAG ->
MD;
?HEAD_TAG ->
MD
end.

View file

@ -82,6 +82,7 @@
clerk_new/1,
clerk_compact/7,
clerk_hashtablecalc/3,
clerk_trim/3,
clerk_stop/1,
code_change/3]).
@ -144,6 +145,12 @@ clerk_compact(Pid, Checker, InitiateFun, CloseFun, FilterFun, Inker, TimeO) ->
Inker,
TimeO}).
-spec clerk_trim(pid(), pid(), integer()) -> ok.
%% @doc
%% Trim the Inker back to the persisted SQN
clerk_trim(Pid, Inker, PersistedSQN) ->
gen_server:cast(Pid, {trim, Inker, PersistedSQN}).
-spec clerk_hashtablecalc(ets:tid(), integer(), pid()) -> ok.
%% @doc
%% Spawn a dedicated clerk for the process of calculating the binary view
@ -235,6 +242,12 @@ handle_cast({compact, Checker, InitiateFun, CloseFun, FilterFun, Inker, _TO},
ok = CloseFun(FilterServer),
{noreply, State}
end;
handle_cast({trim, Inker, PersistedSQN}, State) ->
[_Active|Manifest] = leveled_inker:ink_getmanifest(Inker),
FilesToDelete =
leveled_imanifest:find_persistedentries(PersistedSQN, Manifest),
ok = update_inker(Inker, [], FilesToDelete),
{noreply, State};
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin),
@ -527,7 +540,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
Inker)
end,
FilesToDelete),
ok.
ok.
compact_files(BestRun, CDBopts, FilterFun, FilterServer,
MaxSQN, RStrategy, PressMethod) ->

View file

@ -14,6 +14,7 @@
append_lastkey/3,
remove_entry/2,
find_entry/2,
find_persistedentries/2,
head_entry/1,
to_list/1,
from_list/1,
@ -21,7 +22,6 @@
writer/3,
printer/1,
complete_filex/0
]).
-define(MANIFEST_FILEX, "man").
@ -106,9 +106,26 @@ find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker ->
find_entry(SQN, [_TopEntry|Tail]) ->
find_entry(SQN, Tail).
-spec find_persistedentries(integer(), manifest()) -> list(manifest_entry()).
%% @doc
%% Find the entries in the manifest where all items are < than the persisted
%% SQN in the ledger
find_persistedentries(SQN, Manifest) ->
DropFun =
fun({ME_SQN, _FN, _ME_P, _LK}) ->
ME_SQN > SQN
end,
Entries = lists:dropwhile(DropFun, to_list(Manifest)),
case Entries of
[_Head|Tail] ->
Tail;
[] ->
[]
end.
-spec head_entry(manifest()) -> manifest_entry().
%% @doc
%% Return the head manifets entry (the most recent journal)
%% Return the head manifest entry (the most recent journal)
head_entry(Manifest) ->
[{_SQNMarker, SQNL}|_Tail] = Manifest,
[HeadEntry|_SQNL_Tail] = SQNL,
@ -239,6 +256,17 @@ buildfromend_test() ->
test_testmanifest(Man0),
?assertMatch(ManL, to_list(Man0)).
findpersisted_test() ->
Man = from_list(build_testmanifest_aslist()),
FilesToDelete1 = find_persistedentries(2001, Man),
?assertMatch(2, length(FilesToDelete1)),
FilesToDelete2 = find_persistedentries(3000, Man),
?assertMatch(3, length(FilesToDelete2)),
FilesToDelete3 = find_persistedentries(2999, Man),
?assertMatch(2, length(FilesToDelete3)),
FilesToDelete4 = find_persistedentries(999, Man),
?assertMatch([], FilesToDelete4).
buildrandomfashion_test() ->
ManL0 = build_testmanifest_aslist(),
RandMapFun =

View file

@ -95,6 +95,7 @@
code_change/3,
ink_start/1,
ink_put/4,
ink_mput/3,
ink_get/3,
ink_fetch/3,
ink_keycheck/3,
@ -105,6 +106,7 @@
ink_compactjournal/3,
ink_compactioncomplete/1,
ink_compactionpending/1,
ink_trim/2,
ink_getmanifest/1,
ink_updatemanifest/3,
ink_printmanifest/1,
@ -185,6 +187,16 @@ ink_start(InkerOpts) ->
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
-spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}.
%% @doc
%% MPUT as series of object specifications, which will be converted into
%% objects in the Ledger. This should only be used when the Bookie is
%% running in head_only mode. The journal entries arekept only for handling
%% consistency on startup
ink_mput(Pid, PrimaryKey, ObjectChanges) ->
gen_server:call(Pid, {mput, PrimaryKey, ObjectChanges}, infinity).
-spec ink_get(pid(),
{atom(), any(), any(), any()}|string(),
integer()) ->
@ -361,10 +373,17 @@ ink_compactioncomplete(Pid) ->
-spec ink_compactionpending(pid()) -> boolean().
%% @doc
%% Is there ongoing compaction work? No compaction work should be initiated
%5 if there is already some compaction work ongoing.
%% if there is already some compaction work ongoing.
ink_compactionpending(Pid) ->
gen_server:call(Pid, compaction_pending, infinity).
-spec ink_trim(pid(), integer()) -> ok.
%% @doc
%% Trim the Journal to just those files that contain entries since the
%% Penciller's persisted SQN
ink_trim(Pid, PersistedSQN) ->
gen_server:call(Pid, {trim, PersistedSQN}, infinity).
-spec ink_getmanifest(pid()) -> list().
%% @doc
%% Allows the clerk to fetch the manifest at the point it starts a compaction
@ -420,6 +439,11 @@ handle_call({put, Key, Object, KeyChanges}, _From, State) ->
{_, UpdState, ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
end;
handle_call({mput, Key, ObjChanges}, _From, State) ->
case put_object(Key, head_only, ObjChanges, State) of
{_, UpdState, _ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn}, UpdState}
end;
handle_call({fetch, Key, SQN}, _From, State) ->
case get_object(Key, SQN, State#state.manifest, true) of
{{SQN, Key}, {Value, _IndexSpecs}} ->
@ -503,6 +527,9 @@ handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
handle_call(compaction_pending, _From, State) ->
{reply, State#state.compaction_pending, State};
handle_call({trim, PersistedSQN}, _From, State) ->
ok = leveled_iclerk:clerk_trim(State#state.clerk, self(), PersistedSQN),
{reply, ok, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State};
handle_call(doom, _From, State) ->

View file

@ -189,7 +189,8 @@
pcl_registersnapshot/5,
pcl_getstartupsequencenumber/1,
pcl_checkbloomtest/2,
pcl_checkforwork/1]).
pcl_checkforwork/1,
pcl_persistedsqn/1]).
-export([
sst_rootpath/1,
@ -504,6 +505,14 @@ pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) ->
pcl_releasesnapshot(Pid, Snapshot) ->
gen_server:cast(Pid, {release_snapshot, Snapshot}).
-spec pcl_persistedsqn(pid()) -> integer().
%% @doc
%% Return the persisted SQN, the highest SQN which has been persisted into the
%% Ledger
pcl_persistedsqn(Pid) ->
gen_server:call(Pid, persisted_sqn, infinity).
-spec pcl_close(pid()) -> ok.
%% @doc
%% Close the penciller neatly, trying to persist to disk anything in the memory
@ -781,7 +790,9 @@ handle_call({checkbloom_fortest, Key, Hash}, _From, State) ->
handle_call(check_for_work, _From, State) ->
{_WL, WC} = leveled_pmanifest:check_for_work(State#state.manifest,
?LEVEL_SCALEFACTOR),
{reply, WC > 0, State}.
{reply, WC > 0, State};
handle_call(persisted_sqn, _From, State) ->
{reply, State#state.persisted_sqn, State}.
handle_cast({manifest_change, NewManifest}, State) ->
NewManSQN = leveled_pmanifest:get_manifest_sqn(NewManifest),

View file

@ -553,17 +553,19 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
SQN),
case InJournal of
probably ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
ProxyObj =
make_proxy_object(Tag,
LK, JK, MD, V,
InkerClone),
FoldObjectsFun(B, K, ProxyObj, Acc);
missing ->
Acc
end;
{true, false} ->
ProxyObj = make_proxy_object(LK, JK,
MD, V,
InkerClone),
ProxyObj =
make_proxy_object(Tag,
LK, JK, MD, V,
InkerClone),
FoldObjectsFun(B, K, ProxyObj, Acc);
false ->
R = leveled_bookie:fetch_value(InkerClone, JK),
@ -581,7 +583,10 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) ->
end,
AccFun.
make_proxy_object(LK, JK, MD, V, InkerClone) ->
make_proxy_object(?HEAD_TAG, _LK, _JK, MD, _V, _InkerClone) ->
MD;
make_proxy_object(_Tag, LK, JK, MD, V, InkerClone) ->
Size = leveled_codec:get_size(LK, V),
MDBin = leveled_codec:build_metadata_object(LK, MD),
term_to_binary({proxy_object,

View file

@ -8,7 +8,8 @@
recent_aae_noaae/1,
recent_aae_allaae/1,
recent_aae_bucketaae/1,
recent_aae_expiry/1
recent_aae_expiry/1,
basic_headonly/1
]).
all() -> [
@ -17,7 +18,8 @@ all() -> [
recent_aae_noaae,
recent_aae_allaae,
recent_aae_bucketaae,
recent_aae_expiry
recent_aae_expiry,
basic_headonly
].
-define(LMD_FORMAT, "~4..0w~2..0w~2..0w~2..0w~2..0w").
@ -1010,6 +1012,81 @@ recent_aae_expiry(_Config) ->
true = length(DL4_0) == 0.
basic_headonly(_Config) ->
% Load some AAE type objects into Leveled using the read_only mode. This
% should allow for the items to be added in batches. Confirm that the
% journal is garbage collected as expected, and that it is possible to
% perform a fold_heads style query
ObjectCount = 100000,
RootPathHO = testutil:reset_filestructure("testHO"),
StartOpts1 = [{root_path, RootPathHO},
{max_pencillercachesize, 16000},
{sync_strategy, sync},
{head_only, true}],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
{B1, K1, V1, S1, MD} = {"Bucket",
"Key1.1.4567.4321",
"Value1",
[],
[{"MDK1", "MDV1"}]},
{TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD),
{unsupported_message, put} =
testutil:book_riakput(Bookie1, TestObject, TestSpec),
ObjectSpecFun =
fun(Op) ->
fun(N) ->
Bucket = <<"B", N:32/integer>>,
Key = <<"K", N:32/integer>>,
<<SegmentID:20/integer, _RestBS/bitstring>> =
crypto:hash(md5, term_to_binary({Bucket, Key})),
<<Hash:32/integer, _RestBN/bitstring>> =
crypto:hash(md5, <<N:32/integer>>),
{Op, <<SegmentID:32/integer>>, Bucket, Key, Hash}
end
end,
ObjectSpecL = lists:map(ObjectSpecFun(add), lists:seq(1, ObjectCount)),
ok = load_objectspecs(ObjectSpecL, 32, Bookie1),
FoldFun =
fun(_B, _K, V, {HashAcc, CountAcc}) ->
{HashAcc bxor V, CountAcc + 1}
end,
InitAcc = {0, 0},
RunnerDefinition =
{foldheads_allkeys, h, {FoldFun, InitAcc}, false, false, false},
{async, Runner1} =
leveled_bookie:book_returnfolder(Bookie1, RunnerDefinition),
SW1 = os:timestamp(),
{AccH1, AccC1} = Runner1(),
io:format("AccH and AccC of ~w ~w in ~w microseconds ~n",
[AccH1, AccC1, timer:now_diff(os:timestamp(), SW1)]),
true = AccC1 == ObjectCount,
ok = leveled_bookie:book_close(Bookie1).
load_objectspecs([], _SliceSize, _Bookie) ->
ok;
load_objectspecs(ObjectSpecL, SliceSize, Bookie)
when length(ObjectSpecL) < SliceSize ->
load_objectspecs(ObjectSpecL, length(ObjectSpecL), Bookie);
load_objectspecs(ObjectSpecL, SliceSize, Bookie) ->
{Head, Tail} = lists:split(SliceSize, ObjectSpecL),
case leveled_bookie:book_mput(Bookie, Head) of
ok ->
load_objectspecs(Tail, SliceSize, Bookie);
pause ->
timer:sleep(10),
load_objectspecs(Tail, SliceSize, Bookie)
end.
load_and_check_recentaae(Book1A, Book1B, Book1C, Book1D,
SW_StartLoad, TreeSize, UnitMins,