Merge pull request #57 from martinsumner/mas-metadatabin
Mas metadatabin
This commit is contained in:
commit
13a1574fa0
8 changed files with 106 additions and 53 deletions
|
@ -57,6 +57,7 @@
|
|||
snapshot_query,
|
||||
bookies_mem :: tuple(),
|
||||
source_penciller :: pid(),
|
||||
snapshot_longrunning = true :: boolean(),
|
||||
levelzero_cointoss = false :: boolean()}).
|
||||
|
||||
-record(iclerk_options,
|
||||
|
|
|
@ -631,9 +631,21 @@ snapshot_store(LedgerCache0, Penciller, Inker, SnapType, Query) ->
|
|||
LedgerCache#ledger_cache.index,
|
||||
LedgerCache#ledger_cache.min_sqn,
|
||||
LedgerCache#ledger_cache.max_sqn},
|
||||
LongRunning =
|
||||
case Query of
|
||||
undefined ->
|
||||
true;
|
||||
no_lookup ->
|
||||
true;
|
||||
_ ->
|
||||
% If a specific query has been defined, then not expected
|
||||
% to be long running
|
||||
false
|
||||
end,
|
||||
PCLopts = #penciller_options{start_snapshot = true,
|
||||
source_penciller = Penciller,
|
||||
snapshot_query = Query,
|
||||
snapshot_longrunning = LongRunning,
|
||||
bookies_mem = BookiesMem},
|
||||
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
|
||||
case SnapType of
|
||||
|
|
|
@ -445,57 +445,44 @@ build_metadata_object(PrimaryKey, MD) ->
|
|||
riak_extract_metadata(delete, Size) ->
|
||||
{delete, null, null, Size};
|
||||
riak_extract_metadata(ObjBin, Size) ->
|
||||
{Vclock, SibData} = riak_metadata_from_binary(ObjBin),
|
||||
{SibData, Vclock, erlang:phash2(ObjBin), Size}.
|
||||
{Vclock, SibBin} = riak_metadata_from_binary(ObjBin),
|
||||
{SibBin, Vclock, erlang:phash2(ObjBin), Size}.
|
||||
|
||||
%% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
%%% VclockBin/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
|
||||
riak_metadata_to_binary(Vclock, SibData) ->
|
||||
VclockBin = term_to_binary(Vclock),
|
||||
riak_metadata_to_binary(VclockBin, SibMetaBin) ->
|
||||
VclockLen = byte_size(VclockBin),
|
||||
% <<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
% VclockBin:VclockLen/binary, SibData:32/integer>>.
|
||||
SibCount = length(SibData),
|
||||
SibsBin = slimbin_contents(SibData),
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>>.
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer,
|
||||
VclockLen:32/integer, VclockBin/binary,
|
||||
SibMetaBin/binary>>.
|
||||
|
||||
riak_metadata_from_binary(V1Binary) ->
|
||||
<<?MAGIC:8/integer, ?V1_VERS:8/integer, VclockLen:32/integer,
|
||||
Rest/binary>> = V1Binary,
|
||||
% Just grab the Sibling count and not the full metadata
|
||||
% <<VclockBin:VclockLen/binary, SibCount:32/integer, _Rest/binary>> = Rest,
|
||||
% {binary_to_term(VclockBin), SibCount}.
|
||||
<<VclockBin:VclockLen/binary, SibCount:32/integer, SibsBin/binary>> = Rest,
|
||||
SibMetaBinList =
|
||||
SibMetaBin =
|
||||
case SibCount of
|
||||
SC when is_integer(SC) ->
|
||||
get_metadata_from_siblings(SibsBin, SibCount, [])
|
||||
get_metadata_from_siblings(SibsBin,
|
||||
SibCount,
|
||||
<<SibCount:32/integer>>)
|
||||
end,
|
||||
{binary_to_term(VclockBin), SibMetaBinList}.
|
||||
{VclockBin, SibMetaBin}.
|
||||
|
||||
% Fixes the value length for each sibling to be zero, and so includes no value
|
||||
slimbin_content(MetaBin) ->
|
||||
MetaLen = byte_size(MetaBin),
|
||||
<<0:32/integer, MetaLen:32/integer, MetaBin:MetaLen/binary>>.
|
||||
|
||||
slimbin_contents(SibMetaBinList) ->
|
||||
F = fun(MetaBin, Acc) ->
|
||||
<<Acc/binary, (slimbin_content(MetaBin))/binary>>
|
||||
end,
|
||||
lists:foldl(F, <<>>, SibMetaBinList).
|
||||
|
||||
get_metadata_from_siblings(<<>>, 0, SibMetaBinList) ->
|
||||
SibMetaBinList;
|
||||
get_metadata_from_siblings(<<>>, 0, SibMetaBin) ->
|
||||
SibMetaBin;
|
||||
get_metadata_from_siblings(<<ValLen:32/integer, Rest0/binary>>,
|
||||
SibCount,
|
||||
SibMetaBinList) ->
|
||||
SibMetaBin) ->
|
||||
<<_ValBin:ValLen/binary, MetaLen:32/integer, Rest1/binary>> = Rest0,
|
||||
<<MetaBin:MetaLen/binary, Rest2/binary>> = Rest1,
|
||||
get_metadata_from_siblings(Rest2,
|
||||
SibCount - 1,
|
||||
[MetaBin|SibMetaBinList]).
|
||||
<<SibMetaBin/binary,
|
||||
0:32/integer,
|
||||
MetaLen:32/integer,
|
||||
MetaBin:MetaLen/binary>>).
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -135,6 +135,12 @@
|
|||
{info, "Garbage collection on manifest removes key for filename ~s"}},
|
||||
{"P0037",
|
||||
{debug, "Merging of penciller L0 tree from size ~w complete"}},
|
||||
{"P0038",
|
||||
{info, "Timeout of snapshot with pid=~w at SQN=~w at TS ~w "
|
||||
++ "set to timeout=~w"}},
|
||||
{"P0039",
|
||||
{info, "Failed to relase pid=~w "
|
||||
++ "leaving SnapshotCount=~w and MinSQN=~w"}},
|
||||
|
||||
{"PC001",
|
||||
{info, "Penciller's clerk ~w started with owner ~w"}},
|
||||
|
|
|
@ -185,7 +185,7 @@
|
|||
pcl_close/1,
|
||||
pcl_doom/1,
|
||||
pcl_releasesnapshot/2,
|
||||
pcl_registersnapshot/4,
|
||||
pcl_registersnapshot/5,
|
||||
pcl_getstartupsequencenumber/1]).
|
||||
|
||||
-export([
|
||||
|
@ -214,7 +214,8 @@
|
|||
-define(COIN_SIDECOUNT, 5).
|
||||
-define(SLOW_FETCH, 20000).
|
||||
-define(ITERATOR_SCANWIDTH, 4).
|
||||
-define(SNAPSHOT_TIMEOUT, 3600).
|
||||
-define(SNAPSHOT_TIMEOUT_LONG, 3600).
|
||||
-define(SNAPSHOT_TIMEOUT_SHORT, 600).
|
||||
|
||||
-record(state, {manifest, % a manifest record from the leveled_manifest module
|
||||
persisted_sqn = 0 :: integer(), % The highest SQN persisted
|
||||
|
@ -305,9 +306,9 @@ pcl_confirmdelete(Pid, FileName, FilePid) ->
|
|||
pcl_getstartupsequencenumber(Pid) ->
|
||||
gen_server:call(Pid, get_startup_sqn, infinity).
|
||||
|
||||
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem) ->
|
||||
pcl_registersnapshot(Pid, Snapshot, Query, BookiesMem, LR) ->
|
||||
gen_server:call(Pid,
|
||||
{register_snapshot, Snapshot, Query, BookiesMem},
|
||||
{register_snapshot, Snapshot, Query, BookiesMem, LR},
|
||||
infinity).
|
||||
|
||||
pcl_releasesnapshot(Pid, Snapshot) ->
|
||||
|
@ -332,7 +333,12 @@ init([PCLopts]) ->
|
|||
PCLopts#penciller_options.bookies_mem} of
|
||||
{undefined, true, Query, BookiesMem} ->
|
||||
SrcPenciller = PCLopts#penciller_options.source_penciller,
|
||||
{ok, State} = pcl_registersnapshot(SrcPenciller, self(), Query, BookiesMem),
|
||||
LongRunning = PCLopts#penciller_options.snapshot_longrunning,
|
||||
{ok, State} = pcl_registersnapshot(SrcPenciller,
|
||||
self(),
|
||||
Query,
|
||||
BookiesMem,
|
||||
LongRunning),
|
||||
leveled_log:log("P0001", [self()]),
|
||||
{ok, State#state{is_snapshot=true,
|
||||
source_penciller=SrcPenciller}};
|
||||
|
@ -444,16 +450,24 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc, MaxKeys},
|
|||
{reply, Acc, State#state{levelzero_astree = L0AsList}};
|
||||
handle_call(get_startup_sqn, _From, State) ->
|
||||
{reply, State#state.persisted_sqn, State};
|
||||
handle_call({register_snapshot, Snapshot, Query, BookiesMem}, _From, State) ->
|
||||
handle_call({register_snapshot, Snapshot, Query, BookiesMem, LR}, _From, State) ->
|
||||
% Register and load a snapshot
|
||||
%
|
||||
% For setup of the snapshot to be efficient should pass a query
|
||||
% of (StartKey, EndKey) - this will avoid a fully copy of the penciller's
|
||||
% memory being required to be trasnferred to the clone. However, this
|
||||
% will not be a valid clone for fetch
|
||||
Timeout =
|
||||
case LR of
|
||||
true ->
|
||||
?SNAPSHOT_TIMEOUT_LONG;
|
||||
false ->
|
||||
?SNAPSHOT_TIMEOUT_SHORT
|
||||
end,
|
||||
|
||||
Manifest0 = leveled_pmanifest:add_snapshot(State#state.manifest,
|
||||
Snapshot,
|
||||
?SNAPSHOT_TIMEOUT),
|
||||
Timeout),
|
||||
|
||||
{BookieIncrTree, BookieIdx, MinSQN, MaxSQN} = BookiesMem,
|
||||
LM1Cache =
|
||||
|
|
|
@ -54,6 +54,7 @@
|
|||
-define(MAX_LEVELS, 8).
|
||||
-define(TREE_TYPE, idxt).
|
||||
-define(TREE_WIDTH, 8).
|
||||
-define(PHANTOM_PID, r2d_fail).
|
||||
|
||||
-record(manifest, {levels,
|
||||
% an array of lists or trees representing the manifest
|
||||
|
@ -266,9 +267,7 @@ mergefile_selector(Manifest, LevelIdx) ->
|
|||
ME.
|
||||
|
||||
add_snapshot(Manifest, Pid, Timeout) ->
|
||||
{MegaNow, SecNow, _} = os:timestamp(),
|
||||
TimeToTimeout = MegaNow * 1000000 + SecNow + Timeout,
|
||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, TimeToTimeout},
|
||||
SnapEntry = {Pid, Manifest#manifest.manifest_sqn, seconds_now(), Timeout},
|
||||
SnapList0 = [SnapEntry|Manifest#manifest.snapshots],
|
||||
ManSQN = Manifest#manifest.manifest_sqn,
|
||||
case Manifest#manifest.min_snapshot_sqn of
|
||||
|
@ -283,22 +282,34 @@ add_snapshot(Manifest, Pid, Timeout) ->
|
|||
|
||||
release_snapshot(Manifest, Pid) ->
|
||||
FilterFun =
|
||||
fun({P, SQN, TS}, {Acc, MinSQN}) ->
|
||||
fun({P, SQN, ST, TO}, {Acc, MinSQN, Found}) ->
|
||||
case P of
|
||||
Pid ->
|
||||
{Acc, MinSQN};
|
||||
{Acc, MinSQN, true};
|
||||
_ ->
|
||||
{[{P, SQN, TS}|Acc], min(SQN, MinSQN)}
|
||||
case seconds_now() > (ST + TO) of
|
||||
true ->
|
||||
leveled_log:log("P0038", [P, SQN, ST, TO]),
|
||||
{Acc, MinSQN, Found};
|
||||
false ->
|
||||
{[{P, SQN, ST, TO}|Acc], min(SQN, MinSQN), Found}
|
||||
end
|
||||
end
|
||||
end,
|
||||
{SnapList0, MinSnapSQN} = lists:foldl(FilterFun,
|
||||
{[], infinity},
|
||||
Manifest#manifest.snapshots),
|
||||
{SnapList0, MinSnapSQN, Hit} = lists:foldl(FilterFun,
|
||||
{[], infinity, false},
|
||||
Manifest#manifest.snapshots),
|
||||
case Hit of
|
||||
false ->
|
||||
leveled_log:log("P0039", [Pid, length(SnapList0), MinSnapSQN]);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
case SnapList0 of
|
||||
[] ->
|
||||
Manifest#manifest{snapshots = SnapList0,
|
||||
min_snapshot_sqn = 0};
|
||||
_ ->
|
||||
_ ->
|
||||
leveled_log:log("P0004", [SnapList0]),
|
||||
Manifest#manifest{snapshots = SnapList0,
|
||||
min_snapshot_sqn = MinSnapSQN}
|
||||
|
@ -317,7 +328,10 @@ ready_to_delete(Manifest, Filename) ->
|
|||
PDs = dict:erase(Filename, Manifest#manifest.pending_deletes),
|
||||
{true, Manifest#manifest{pending_deletes = PDs}};
|
||||
_N ->
|
||||
{false, Manifest}
|
||||
% If failed to delete then we should release a phantom pid
|
||||
% in case this is necessary to timeout any snapshots
|
||||
% This wll also trigger a log
|
||||
{false, release_snapshot(Manifest, ?PHANTOM_PID)}
|
||||
end.
|
||||
|
||||
check_for_work(Manifest, Thresholds) ->
|
||||
|
@ -627,6 +641,10 @@ open_manifestfile(RootPath, [TopManSQN|Rest]) ->
|
|||
open_manifestfile(RootPath, Rest)
|
||||
end.
|
||||
|
||||
seconds_now() ->
|
||||
{MegaNow, SecNow, _} = os:timestamp(),
|
||||
MegaNow * 1000000 + SecNow.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
|
@ -973,6 +991,19 @@ snapshot_release_test() ->
|
|||
{Bool19, _Man20} = ready_to_delete(Man19, "Z3"),
|
||||
?assertMatch(true, Bool19).
|
||||
|
||||
|
||||
|
||||
snapshot_timeout_test() ->
|
||||
Man6 = element(7, initial_setup()),
|
||||
Man7 = add_snapshot(Man6, "pid_a1", 3600),
|
||||
?assertMatch(1, length(Man7#manifest.snapshots)),
|
||||
Man8 = release_snapshot(Man7, "pid_a1"),
|
||||
?assertMatch(0, length(Man8#manifest.snapshots)),
|
||||
Man9 = add_snapshot(Man8, "pid_a1", 0),
|
||||
timer:sleep(2001),
|
||||
?assertMatch(1, length(Man9#manifest.snapshots)),
|
||||
Man10 = release_snapshot(Man9, ?PHANTOM_PID),
|
||||
?assertMatch(0, length(Man10#manifest.snapshots)).
|
||||
|
||||
|
||||
|
||||
-endif.
|
|
@ -410,7 +410,9 @@ delete_pending(timeout, State) ->
|
|||
ok = leveled_penciller:pcl_confirmdelete(State#state.penciller,
|
||||
State#state.filename,
|
||||
self()),
|
||||
{next_state, delete_pending, State, ?DELETE_TIMEOUT};
|
||||
% If the next thing is another timeout - may be long-running snapshot, so
|
||||
% back-off
|
||||
{next_state, delete_pending, State, random:uniform(10) * ?DELETE_TIMEOUT};
|
||||
delete_pending(close, State) ->
|
||||
leveled_log:log("SST07", [State#state.filename]),
|
||||
ok = file:close(State#state.handle),
|
||||
|
|
|
@ -258,11 +258,11 @@ check_forobject(Bookie, TestObject) ->
|
|||
{ok, HeadBinary} = book_riakhead(Bookie,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
{_SibMetaBinList,
|
||||
{_SibMetaBin,
|
||||
Vclock,
|
||||
_Hash,
|
||||
size} = leveled_codec:riak_extract_metadata(HeadBinary, size),
|
||||
true = Vclock == TestObject#r_object.vclock.
|
||||
true = binary_to_term(Vclock) == TestObject#r_object.vclock.
|
||||
|
||||
check_formissingobject(Bookie, Bucket, Key) ->
|
||||
not_found = book_riakget(Bookie, Bucket, Key),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue