Merge pull request #288 from martinsumner/mas-i287-memory
Mas i287 memory
This commit is contained in:
commit
21e0ce70e7
8 changed files with 250 additions and 108 deletions
Binary file not shown.
|
@ -51,7 +51,8 @@
|
||||||
:: leveled_sst:press_method(),
|
:: leveled_sst:press_method(),
|
||||||
log_options = leveled_log:get_opts()
|
log_options = leveled_log:get_opts()
|
||||||
:: leveled_log:log_options(),
|
:: leveled_log:log_options(),
|
||||||
max_sstslots = 256 :: pos_integer()}).
|
max_sstslots = 256 :: pos_integer(),
|
||||||
|
pagecache_level = 1 :: pos_integer()}).
|
||||||
|
|
||||||
-record(inker_options,
|
-record(inker_options,
|
||||||
{cdb_max_size :: integer() | undefined,
|
{cdb_max_size :: integer() | undefined,
|
||||||
|
|
|
@ -127,6 +127,8 @@
|
||||||
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
|
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
|
||||||
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
|
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
|
||||||
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
|
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
|
||||||
|
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
|
||||||
|
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
|
||||||
-define(OPTION_DEFAULTS,
|
-define(OPTION_DEFAULTS,
|
||||||
[{root_path, undefined},
|
[{root_path, undefined},
|
||||||
{snapshot_bookie, undefined},
|
{snapshot_bookie, undefined},
|
||||||
|
@ -1177,15 +1179,22 @@ init([Opts]) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
|
||||||
{HeadOnly, HeadLookup} =
|
{HeadOnly, HeadLookup, SSTPageCacheLevel} =
|
||||||
case proplists:get_value(head_only, Opts) of
|
case proplists:get_value(head_only, Opts) of
|
||||||
false ->
|
false ->
|
||||||
{false, true};
|
{false, true, ?SST_PAGECACHELEVEL_LOOKUP};
|
||||||
with_lookup ->
|
with_lookup ->
|
||||||
{true, true};
|
{true, true, ?SST_PAGECACHELEVEL_LOOKUP};
|
||||||
no_lookup ->
|
no_lookup ->
|
||||||
{true, false}
|
{true, false, ?SST_PAGECACHELEVEL_NOLOOKUP}
|
||||||
end,
|
end,
|
||||||
|
% Override the default page cache level - we want to load into the
|
||||||
|
% page cache many levels if we intend to support lookups, and only
|
||||||
|
% levels 0 and 1 otherwise
|
||||||
|
SSTOpts = PencillerOpts#penciller_options.sst_options,
|
||||||
|
SSTOpts0 = SSTOpts#sst_options{pagecache_level = SSTPageCacheLevel},
|
||||||
|
PencillerOpts0 =
|
||||||
|
PencillerOpts#penciller_options{sst_options = SSTOpts0},
|
||||||
|
|
||||||
State0 = #state{cache_size=CacheSize,
|
State0 = #state{cache_size=CacheSize,
|
||||||
is_snapshot=false,
|
is_snapshot=false,
|
||||||
|
@ -1193,7 +1202,7 @@ init([Opts]) ->
|
||||||
head_lookup = HeadLookup},
|
head_lookup = HeadLookup},
|
||||||
|
|
||||||
{Inker, Penciller} =
|
{Inker, Penciller} =
|
||||||
startup(InkerOpts, PencillerOpts, State0),
|
startup(InkerOpts, PencillerOpts0, State0),
|
||||||
|
|
||||||
NewETS = ets:new(mem, [ordered_set]),
|
NewETS = ets:new(mem, [ordered_set]),
|
||||||
leveled_log:log("B0001", [Inker, Penciller]),
|
leveled_log:log("B0001", [Inker, Penciller]),
|
||||||
|
|
|
@ -58,7 +58,8 @@
|
||||||
{gen_fsm, sync_send_event, 2},
|
{gen_fsm, sync_send_event, 2},
|
||||||
{gen_fsm, send_event, 2},
|
{gen_fsm, send_event, 2},
|
||||||
{gen_fsm, sync_send_all_state_event, 3},
|
{gen_fsm, sync_send_all_state_event, 3},
|
||||||
{gen_fsm, send_all_state_event, 2}]}).
|
{gen_fsm, send_all_state_event, 2},
|
||||||
|
{gen_fsm, reply, 2}]}).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-ifdef(slow_test).
|
-ifdef(slow_test).
|
||||||
|
@ -113,7 +114,8 @@
|
||||||
cdb_destroy/1,
|
cdb_destroy/1,
|
||||||
cdb_deletepending/1,
|
cdb_deletepending/1,
|
||||||
cdb_deletepending/3,
|
cdb_deletepending/3,
|
||||||
cdb_isrolling/1]).
|
cdb_isrolling/1,
|
||||||
|
cdb_clerkcomplete/1]).
|
||||||
|
|
||||||
-export([finished_rolling/1,
|
-export([finished_rolling/1,
|
||||||
hashtable_calc/2]).
|
hashtable_calc/2]).
|
||||||
|
@ -409,6 +411,14 @@ cdb_keycheck(Pid, Key) ->
|
||||||
cdb_isrolling(Pid) ->
|
cdb_isrolling(Pid) ->
|
||||||
gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity).
|
gen_fsm:sync_send_all_state_event(Pid, cdb_isrolling, infinity).
|
||||||
|
|
||||||
|
-spec cdb_clerkcomplete(pid()) -> ok.
|
||||||
|
%% @doc
|
||||||
|
%% When an Inker's clerk has finished with a CDB process, then it will call
|
||||||
|
%% complete. Currently this will prompt hibernation, as the CDB process may
|
||||||
|
%% not be needed for a period.
|
||||||
|
cdb_clerkcomplete(Pid) ->
|
||||||
|
gen_fsm:send_all_state_event(Pid, clerk_complete).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
@ -436,28 +446,31 @@ starting({open_writer, Filename}, _From, State) ->
|
||||||
{WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy),
|
{WriteOps, UpdStrategy} = set_writeops(State#state.sync_strategy),
|
||||||
leveled_log:log("CDB13", [WriteOps]),
|
leveled_log:log("CDB13", [WriteOps]),
|
||||||
{ok, Handle} = file:open(Filename, WriteOps),
|
{ok, Handle} = file:open(Filename, WriteOps),
|
||||||
{reply, ok, writer, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
sync_strategy = UpdStrategy,
|
sync_strategy = UpdStrategy,
|
||||||
last_position=LastPosition,
|
last_position=LastPosition,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hashtree=HashTree}};
|
hashtree=HashTree},
|
||||||
|
{reply, ok, writer, State0, hibernate};
|
||||||
starting({open_reader, Filename}, _From, State) ->
|
starting({open_reader, Filename}, _From, State) ->
|
||||||
leveled_log:save(State#state.log_options),
|
leveled_log:save(State#state.log_options),
|
||||||
leveled_log:log("CDB02", [Filename]),
|
leveled_log:log("CDB02", [Filename]),
|
||||||
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
|
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
|
||||||
{reply, ok, reader, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hash_index=Index}};
|
hash_index=Index},
|
||||||
|
{reply, ok, reader, State0, hibernate};
|
||||||
starting({open_reader, Filename, LastKey}, _From, State) ->
|
starting({open_reader, Filename, LastKey}, _From, State) ->
|
||||||
leveled_log:save(State#state.log_options),
|
leveled_log:save(State#state.log_options),
|
||||||
leveled_log:log("CDB02", [Filename]),
|
leveled_log:log("CDB02", [Filename]),
|
||||||
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
|
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
|
||||||
{reply, ok, reader, State#state{handle=Handle,
|
State0 = State#state{handle=Handle,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hash_index=Index}}.
|
hash_index=Index},
|
||||||
|
{reply, ok, reader, State0, hibernate}.
|
||||||
|
|
||||||
writer({get_kv, Key}, _From, State) ->
|
writer({get_kv, Key}, _From, State) ->
|
||||||
{reply,
|
{reply,
|
||||||
|
@ -566,18 +579,16 @@ rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
|
||||||
ets:delete(State#state.hashtree),
|
ets:delete(State#state.hashtree),
|
||||||
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
|
{NewHandle, Index, LastKey} = open_for_readonly(NewName,
|
||||||
State#state.last_key),
|
State#state.last_key),
|
||||||
|
State0 = State#state{handle=NewHandle,
|
||||||
|
last_key=LastKey,
|
||||||
|
filename=NewName,
|
||||||
|
hash_index=Index},
|
||||||
case State#state.deferred_delete of
|
case State#state.deferred_delete of
|
||||||
true ->
|
true ->
|
||||||
{reply, ok, delete_pending, State#state{handle=NewHandle,
|
{reply, ok, delete_pending, State0};
|
||||||
last_key=LastKey,
|
|
||||||
filename=NewName,
|
|
||||||
hash_index=Index}};
|
|
||||||
false ->
|
false ->
|
||||||
leveled_log:log_timer("CDB18", [], SW),
|
leveled_log:log_timer("CDB18", [], SW),
|
||||||
{reply, ok, reader, State#state{handle=NewHandle,
|
{reply, ok, reader, State0, hibernate}
|
||||||
last_key=LastKey,
|
|
||||||
filename=NewName,
|
|
||||||
hash_index=Index}}
|
|
||||||
end;
|
end;
|
||||||
rolling(check_hashtable, _From, State) ->
|
rolling(check_hashtable, _From, State) ->
|
||||||
{reply, false, rolling, State}.
|
{reply, false, rolling, State}.
|
||||||
|
@ -618,33 +629,40 @@ reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
|
||||||
_ ->
|
_ ->
|
||||||
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
|
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
|
||||||
end;
|
end;
|
||||||
reader({direct_fetch, PositionList, Info}, _From, State) ->
|
reader({direct_fetch, PositionList, Info}, From, State) ->
|
||||||
H = State#state.handle,
|
H = State#state.handle,
|
||||||
FilterFalseKey = fun(Tpl) -> case element(1, Tpl) of
|
FilterFalseKey =
|
||||||
false ->
|
fun(Tpl) ->
|
||||||
false;
|
case element(1, Tpl) of
|
||||||
_Key ->
|
false ->
|
||||||
{true, Tpl}
|
false;
|
||||||
end end,
|
_Key ->
|
||||||
Reply =
|
{true, Tpl}
|
||||||
case Info of
|
end
|
||||||
key_only ->
|
|
||||||
FM = lists:filtermap(
|
|
||||||
fun(P) ->
|
|
||||||
FilterFalseKey(extract_key(H, P)) end,
|
|
||||||
PositionList),
|
|
||||||
lists:map(fun(T) -> element(1, T) end, FM);
|
|
||||||
key_size ->
|
|
||||||
lists:filtermap(
|
|
||||||
fun(P) ->
|
|
||||||
FilterFalseKey(extract_key_size(H, P)) end,
|
|
||||||
PositionList);
|
|
||||||
key_value_check ->
|
|
||||||
BM = State#state.binary_mode,
|
|
||||||
lists:map(fun(P) -> extract_key_value_check(H, P, BM) end,
|
|
||||||
PositionList)
|
|
||||||
end,
|
end,
|
||||||
{reply, Reply, reader, State};
|
|
||||||
|
case Info of
|
||||||
|
key_only ->
|
||||||
|
FM = lists:filtermap(
|
||||||
|
fun(P) ->
|
||||||
|
FilterFalseKey(extract_key(H, P)) end,
|
||||||
|
PositionList),
|
||||||
|
MapFun = fun(T) -> element(1, T) end,
|
||||||
|
{reply, lists:map(MapFun, FM), reader, State};
|
||||||
|
key_size ->
|
||||||
|
FilterFun = fun(P) -> FilterFalseKey(extract_key_size(H, P)) end,
|
||||||
|
{reply, lists:filtermap(FilterFun, PositionList), reader, State};
|
||||||
|
key_value_check ->
|
||||||
|
BM = State#state.binary_mode,
|
||||||
|
MapFun = fun(P) -> extract_key_value_check(H, P, BM) end,
|
||||||
|
% direct_fetch will occur in batches, so it doesn't make sense to
|
||||||
|
% hibernate the process that is likely to be used again. However,
|
||||||
|
% a significant amount of unused binary references may have
|
||||||
|
% accumulated, so push a GC at this point
|
||||||
|
gen_fsm:reply(From, lists:map(MapFun, PositionList)),
|
||||||
|
garbage_collect(),
|
||||||
|
{next_state, reader, State}
|
||||||
|
end;
|
||||||
reader(cdb_complete, _From, State) ->
|
reader(cdb_complete, _From, State) ->
|
||||||
leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]),
|
leveled_log:log("CDB05", [State#state.filename, reader, cdb_ccomplete]),
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
|
@ -720,24 +738,21 @@ delete_pending(destroy, State) ->
|
||||||
|
|
||||||
|
|
||||||
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
||||||
_From,
|
From,
|
||||||
StateName,
|
StateName,
|
||||||
State) ->
|
State) ->
|
||||||
{ok, EndPos0} = file:position(State#state.handle, eof),
|
{ok, EndPos0} = file:position(State#state.handle, eof),
|
||||||
{ok, StartPos0} = case StartPos of
|
{ok, StartPos0} =
|
||||||
undefined ->
|
case StartPos of
|
||||||
file:position(State#state.handle,
|
undefined ->
|
||||||
?BASE_POSITION);
|
file:position(State#state.handle, ?BASE_POSITION);
|
||||||
StartPos ->
|
StartPos ->
|
||||||
{ok, StartPos}
|
{ok, StartPos}
|
||||||
end,
|
end,
|
||||||
file:position(State#state.handle, StartPos0),
|
file:position(State#state.handle, StartPos0),
|
||||||
file:advise(State#state.handle,
|
MaybeEnd =
|
||||||
StartPos0,
|
(check_last_key(State#state.last_key) == empty) or
|
||||||
EndPos0 - StartPos0,
|
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
|
||||||
sequential),
|
|
||||||
MaybeEnd = (check_last_key(State#state.last_key) == empty) or
|
|
||||||
(StartPos0 >= (EndPos0 - ?DWORD_SIZE)),
|
|
||||||
{LastPosition, Acc2} =
|
{LastPosition, Acc2} =
|
||||||
case MaybeEnd of
|
case MaybeEnd of
|
||||||
true ->
|
true ->
|
||||||
|
@ -749,12 +764,17 @@ handle_sync_event({cdb_scan, FilterFun, Acc, StartPos},
|
||||||
Acc,
|
Acc,
|
||||||
State#state.last_key)
|
State#state.last_key)
|
||||||
end,
|
end,
|
||||||
{ok, LastReadPos} = file:position(State#state.handle, cur),
|
% The scan may have created a lot of binary references, clear up the
|
||||||
file:advise(State#state.handle,
|
% reference counters for this process here manually. The cdb process
|
||||||
StartPos0,
|
% may be inactive for a period after the scan, and so GC may not kick in
|
||||||
LastReadPos - StartPos0,
|
% otherwise
|
||||||
dont_need),
|
%
|
||||||
{reply, {LastPosition, Acc2}, StateName, State};
|
% garbage_collect/0 is used in preference to hibernate, as we're generally
|
||||||
|
% scanning in batches at startup - so the process will be needed straight
|
||||||
|
% away.
|
||||||
|
gen_fsm:reply(From, {LastPosition, Acc2}),
|
||||||
|
garbage_collect(),
|
||||||
|
{next_state, StateName, State};
|
||||||
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
|
handle_sync_event(cdb_lastkey, _From, StateName, State) ->
|
||||||
{reply, State#state.last_key, StateName, State};
|
{reply, State#state.last_key, StateName, State};
|
||||||
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
|
handle_sync_event(cdb_firstkey, _From, StateName, State) ->
|
||||||
|
@ -790,8 +810,8 @@ handle_sync_event(cdb_close, _From, StateName, State) ->
|
||||||
file:close(State#state.handle),
|
file:close(State#state.handle),
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
handle_event(_Msg, StateName, State) ->
|
handle_event(clerk_complete, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State, hibernate}.
|
||||||
|
|
||||||
handle_info(_Msg, StateName, State) ->
|
handle_info(_Msg, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
@ -2668,8 +2688,6 @@ getpositions_sample_test() ->
|
||||||
|
|
||||||
|
|
||||||
nonsense_coverage_test() ->
|
nonsense_coverage_test() ->
|
||||||
{ok, Pid} = gen_fsm:start_link(?MODULE, [#cdb_options{}], []),
|
|
||||||
ok = gen_fsm:send_all_state_event(Pid, nonsense),
|
|
||||||
?assertMatch({next_state, reader, #state{}}, handle_info(nonsense,
|
?assertMatch({next_state, reader, #state{}}, handle_info(nonsense,
|
||||||
reader,
|
reader,
|
||||||
#state{})),
|
#state{})),
|
||||||
|
|
|
@ -361,13 +361,12 @@ handle_cast(scoring_complete, State) ->
|
||||||
ok = CloseFun(FilterServer),
|
ok = CloseFun(FilterServer),
|
||||||
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
|
ok = leveled_inker:ink_clerkcomplete(State#state.inker,
|
||||||
ManifestSlice,
|
ManifestSlice,
|
||||||
FilesToDelete),
|
FilesToDelete);
|
||||||
{noreply, State#state{scoring_state = undefined}};
|
|
||||||
false ->
|
false ->
|
||||||
ok = CloseFun(FilterServer),
|
ok = CloseFun(FilterServer),
|
||||||
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], []),
|
ok = leveled_inker:ink_clerkcomplete(State#state.inker, [], [])
|
||||||
{noreply, State#state{scoring_state = undefined}}
|
end,
|
||||||
end;
|
{noreply, State#state{scoring_state = undefined}, hibernate};
|
||||||
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
|
handle_cast({trim, PersistedSQN, ManifestAsList}, State) ->
|
||||||
FilesToDelete =
|
FilesToDelete =
|
||||||
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
|
leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList),
|
||||||
|
@ -548,7 +547,8 @@ size_comparison_score(KeySizeList, FilterFun, FilterServer, MaxSQN) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
fetch_inbatches([], _BatchSize, _CDB, CheckedList) ->
|
fetch_inbatches([], _BatchSize, CDB, CheckedList) ->
|
||||||
|
ok = leveled_cdb:cdb_clerkcomplete(CDB),
|
||||||
CheckedList;
|
CheckedList;
|
||||||
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) ->
|
||||||
{Batch, Tail} = if
|
{Batch, Tail} = if
|
||||||
|
@ -699,6 +699,11 @@ compact_files([Batch|T], CDBopts, ActiveJournal0,
|
||||||
ActiveJournal0,
|
ActiveJournal0,
|
||||||
ManSlice0,
|
ManSlice0,
|
||||||
PressMethod),
|
PressMethod),
|
||||||
|
% The inker's clerk will no longer need these (potentially large) binaries,
|
||||||
|
% so force garbage collection at this point. This will mean when we roll
|
||||||
|
% each CDB file there will be no remaining references to the binaries that
|
||||||
|
% have been transferred and the memory can immediately be cleared
|
||||||
|
garbage_collect(),
|
||||||
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
|
compact_files(T, CDBopts, ActiveJournal1, FilterFun, FilterServer, MaxSQN,
|
||||||
RStrategy, PressMethod, ManSlice1).
|
RStrategy, PressMethod, ManSlice1).
|
||||||
|
|
||||||
|
@ -763,7 +768,7 @@ filter_output(KVCs, FilterFun, FilterServer, MaxSQN, ReloadStrategy) ->
|
||||||
% strategy
|
% strategy
|
||||||
[KVC0|Acc];
|
[KVC0|Acc];
|
||||||
{false, retain} ->
|
{false, retain} ->
|
||||||
% If we have a retain startegy, it can't be
|
% If we have a retain strategy, it can't be
|
||||||
% discarded - but the value part is no longer
|
% discarded - but the value part is no longer
|
||||||
% required as this version has been replaced
|
% required as this version has been replaced
|
||||||
{JK0, JV0} =
|
{JK0, JV0} =
|
||||||
|
|
|
@ -152,6 +152,11 @@ copy_manifest(Manifest) ->
|
||||||
%% manifest. The PidFun should be able to return the Pid of a file process
|
%% manifest. The PidFun should be able to return the Pid of a file process
|
||||||
%% (having started one). The SQNFun will return the max sequence number
|
%% (having started one). The SQNFun will return the max sequence number
|
||||||
%% of that file, if passed the Pid that owns it.
|
%% of that file, if passed the Pid that owns it.
|
||||||
|
%%
|
||||||
|
%% The manifest is started from the basement first, and then the higher levels
|
||||||
|
%% as the page cache will be loaded with each file, and it would be
|
||||||
|
%% preferable to have the higher levels in the cache if memory is insufficient
|
||||||
|
%% to load each level
|
||||||
load_manifest(Manifest, LoadFun, SQNFun) ->
|
load_manifest(Manifest, LoadFun, SQNFun) ->
|
||||||
UpdateLevelFun =
|
UpdateLevelFun =
|
||||||
fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) ->
|
fun(LevelIdx, {AccMaxSQN, AccMan, AccFL}) ->
|
||||||
|
@ -171,7 +176,7 @@ load_manifest(Manifest, LoadFun, SQNFun) ->
|
||||||
end,
|
end,
|
||||||
lists:foldl(UpdateLevelFun,
|
lists:foldl(UpdateLevelFun,
|
||||||
{0, Manifest, []},
|
{0, Manifest, []},
|
||||||
lists:seq(0, Manifest#manifest.basement)).
|
lists:reverse(lists:seq(0, Manifest#manifest.basement))).
|
||||||
|
|
||||||
-spec close_manifest(manifest(), fun()) -> ok.
|
-spec close_manifest(manifest(), fun()) -> ok.
|
||||||
%% @doc
|
%% @doc
|
||||||
|
|
|
@ -486,7 +486,9 @@ init([]) ->
|
||||||
starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
|
starting({sst_open, RootPath, Filename, OptsSST, Level}, _From, State) ->
|
||||||
leveled_log:save(OptsSST#sst_options.log_options),
|
leveled_log:save(OptsSST#sst_options.log_options),
|
||||||
{UpdState, Bloom} =
|
{UpdState, Bloom} =
|
||||||
read_file(Filename, State#state{root_path=RootPath}),
|
read_file(Filename,
|
||||||
|
State#state{root_path=RootPath},
|
||||||
|
OptsSST#sst_options.pagecache_level >= Level),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
{reply,
|
{reply,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}, Bloom},
|
||||||
|
@ -509,7 +511,8 @@ starting({sst_new,
|
||||||
YBQ = Level =< 2,
|
YBQ = Level =< 2,
|
||||||
{UpdState, Bloom} =
|
{UpdState, Bloom} =
|
||||||
read_file(ActualFilename,
|
read_file(ActualFilename,
|
||||||
State#state{root_path=RootPath, yield_blockquery=YBQ}),
|
State#state{root_path=RootPath, yield_blockquery=YBQ},
|
||||||
|
OptsSST#sst_options.pagecache_level >= Level),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
leveled_log:log_timer("SST08",
|
leveled_log:log_timer("SST08",
|
||||||
[ActualFilename, Level, Summary#summary.max_sqn],
|
[ActualFilename, Level, Summary#summary.max_sqn],
|
||||||
|
@ -573,7 +576,8 @@ starting(complete_l0startup, State) ->
|
||||||
% Important to empty this from state rather
|
% Important to empty this from state rather
|
||||||
% than carry it through to the next stage
|
% than carry it through to the next stage
|
||||||
new_slots=undefined,
|
new_slots=undefined,
|
||||||
deferred_startup_tuple=undefined}),
|
deferred_startup_tuple=undefined},
|
||||||
|
true),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
Time4 = timer:now_diff(os:timestamp(), SW4),
|
Time4 = timer:now_diff(os:timestamp(), SW4),
|
||||||
|
|
||||||
|
@ -671,7 +675,7 @@ reader({get_kvrange, StartKey, EndKey, ScanWidth, SegList, LowLastMod},
|
||||||
{_ID, none} ->
|
{_ID, none} ->
|
||||||
Cache;
|
Cache;
|
||||||
{ID, Header} ->
|
{ID, Header} ->
|
||||||
array:set(ID - 1, Header, Cache)
|
array:set(ID - 1, binary:copy(Header), Cache)
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC),
|
BlockIdxC0 = lists:foldl(FoldFun, State#state.blockindex_cache, BIC),
|
||||||
|
@ -717,8 +721,7 @@ reader(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
reader({switch_levels, NewLevel}, State) ->
|
reader({switch_levels, NewLevel}, State) ->
|
||||||
erlang:garbage_collect(self()),
|
{next_state, reader, State#state{level = NewLevel}, hibernate}.
|
||||||
{next_state, reader, State#state{level = NewLevel}}.
|
|
||||||
|
|
||||||
|
|
||||||
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
delete_pending({get_kv, LedgerKey, Hash}, _From, State) ->
|
||||||
|
@ -792,8 +795,7 @@ handle_info(tidyup_after_startup, delete_pending, State) ->
|
||||||
handle_info(tidyup_after_startup, StateName, State) ->
|
handle_info(tidyup_after_startup, StateName, State) ->
|
||||||
case is_process_alive(State#state.starting_pid) of
|
case is_process_alive(State#state.starting_pid) of
|
||||||
true ->
|
true ->
|
||||||
erlang:garbage_collect(self()),
|
{next_state, StateName, State, hibernate};
|
||||||
{next_state, StateName, State};
|
|
||||||
false ->
|
false ->
|
||||||
{stop, normal, State}
|
{stop, normal, State}
|
||||||
end.
|
end.
|
||||||
|
@ -1062,7 +1064,9 @@ fetch(LedgerKey, Hash, State, Timings0) ->
|
||||||
{Result, Header} =
|
{Result, Header} =
|
||||||
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
binaryslot_get(SlotBin, LedgerKey, Hash, PressMethod, IdxModDate),
|
||||||
BlockIndexCache =
|
BlockIndexCache =
|
||||||
array:set(SlotID - 1, Header, State#state.blockindex_cache),
|
array:set(SlotID - 1,
|
||||||
|
binary:copy(Header),
|
||||||
|
State#state.blockindex_cache),
|
||||||
{_SW3, Timings3} =
|
{_SW3, Timings3} =
|
||||||
update_timings(SW2, Timings2, noncached_block, false),
|
update_timings(SW2, Timings2, noncached_block, false),
|
||||||
{Result,
|
{Result,
|
||||||
|
@ -1217,9 +1221,10 @@ write_file(RootPath, Filename, SummaryBin, SlotsBin,
|
||||||
filename:join(RootPath, FinalName)),
|
filename:join(RootPath, FinalName)),
|
||||||
FinalName.
|
FinalName.
|
||||||
|
|
||||||
read_file(Filename, State) ->
|
read_file(Filename, State, LoadPageCache) ->
|
||||||
{Handle, FileVersion, SummaryBin} =
|
{Handle, FileVersion, SummaryBin} =
|
||||||
open_reader(filename:join(State#state.root_path, Filename)),
|
open_reader(filename:join(State#state.root_path, Filename),
|
||||||
|
LoadPageCache),
|
||||||
UpdState0 = imp_fileversion(FileVersion, State),
|
UpdState0 = imp_fileversion(FileVersion, State),
|
||||||
{Summary, Bloom, SlotList} = read_table_summary(SummaryBin),
|
{Summary, Bloom, SlotList} = read_table_summary(SummaryBin),
|
||||||
BlockIndexCache = array:new([{size, Summary#summary.size},
|
BlockIndexCache = array:new([{size, Summary#summary.size},
|
||||||
|
@ -1271,12 +1276,18 @@ imp_fileversion(VersionInt, State) ->
|
||||||
end,
|
end,
|
||||||
UpdState1.
|
UpdState1.
|
||||||
|
|
||||||
open_reader(Filename) ->
|
open_reader(Filename, LoadPageCache) ->
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
{ok, Lengths} = file:pread(Handle, 0, 9),
|
{ok, Lengths} = file:pread(Handle, 0, 9),
|
||||||
<<FileVersion:8/integer,
|
<<FileVersion:8/integer,
|
||||||
SlotsLength:32/integer,
|
SlotsLength:32/integer,
|
||||||
SummaryLength:32/integer>> = Lengths,
|
SummaryLength:32/integer>> = Lengths,
|
||||||
|
case LoadPageCache of
|
||||||
|
true ->
|
||||||
|
file:advise(Handle, 9, SlotsLength, will_need);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength),
|
{ok, SummaryBin} = file:pread(Handle, SlotsLength + 9, SummaryLength),
|
||||||
{Handle, FileVersion, SummaryBin}.
|
{Handle, FileVersion, SummaryBin}.
|
||||||
|
|
||||||
|
@ -2117,7 +2128,7 @@ crc_check_slot(FullBin) ->
|
||||||
CRC32H:32/integer,
|
CRC32H:32/integer,
|
||||||
Rest/binary>> = FullBin,
|
Rest/binary>> = FullBin,
|
||||||
PosBL0 = min(PosBL, byte_size(FullBin) - 12),
|
PosBL0 = min(PosBL, byte_size(FullBin) - 12),
|
||||||
% If the position has been bit-flipped to beyond the maximum paossible
|
% If the position has been bit-flipped to beyond the maximum possible
|
||||||
% length, use the maximum possible length
|
% length, use the maximum possible length
|
||||||
<<Header:PosBL0/binary, Blocks/binary>> = Rest,
|
<<Header:PosBL0/binary, Blocks/binary>> = Rest,
|
||||||
case {hmac(Header), hmac(PosBL0)} of
|
case {hmac(Header), hmac(PosBL0)} of
|
||||||
|
@ -2619,16 +2630,20 @@ generate_randomkeys(_Seqn, 0, Acc, _BucketLow, _BucketHigh) ->
|
||||||
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
|
generate_randomkeys(Seqn, Count, Acc, BucketLow, BRange) ->
|
||||||
BRand = leveled_rand:uniform(BRange),
|
BRand = leveled_rand:uniform(BRange),
|
||||||
BNumber =
|
BNumber =
|
||||||
lists:flatten(io_lib:format("K~4..0B", [BucketLow + BRand])),
|
lists:flatten(io_lib:format("B~6..0B", [BucketLow + BRand])),
|
||||||
KNumber =
|
KNumber =
|
||||||
lists:flatten(io_lib:format("K~6..0B", [leveled_rand:uniform(1000)])),
|
lists:flatten(io_lib:format("K~8..0B", [leveled_rand:uniform(1000000)])),
|
||||||
LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
|
LK = leveled_codec:to_ledgerkey("Bucket" ++ BNumber, "Key" ++ KNumber, o),
|
||||||
Chunk = leveled_rand:rand_bytes(64),
|
Chunk = leveled_rand:rand_bytes(64),
|
||||||
{_B, _K, MV, _H, _LMs} =
|
{_B, _K, MV, _H, _LMs} =
|
||||||
leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
|
leveled_codec:generate_ledgerkv(LK, Seqn, Chunk, 64, infinity),
|
||||||
|
MD = element(4, MV),
|
||||||
|
?assertMatch(undefined, element(3, MD)),
|
||||||
|
MD0 = [{magic_md, [<<0:32/integer>>, base64:encode(Chunk)]}],
|
||||||
|
MV0 = setelement(4, MV, setelement(3, MD, MD0)),
|
||||||
generate_randomkeys(Seqn + 1,
|
generate_randomkeys(Seqn + 1,
|
||||||
Count - 1,
|
Count - 1,
|
||||||
[{LK, MV}|Acc],
|
[{LK, MV0}|Acc],
|
||||||
BucketLow,
|
BucketLow,
|
||||||
BRange).
|
BRange).
|
||||||
|
|
||||||
|
@ -2650,6 +2665,7 @@ generate_indexkey(Term, Count) ->
|
||||||
Count,
|
Count,
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
|
|
||||||
form_slot_test() ->
|
form_slot_test() ->
|
||||||
% If a skip key happens, mustn't switch to loookup by accident as could be
|
% If a skip key happens, mustn't switch to loookup by accident as could be
|
||||||
% over the expected size
|
% over the expected size
|
||||||
|
@ -3232,13 +3248,17 @@ simple_persisted_test_bothformats() ->
|
||||||
simple_persisted_tester(fun testsst_new/6).
|
simple_persisted_tester(fun testsst_new/6).
|
||||||
|
|
||||||
simple_persisted_tester(SSTNewFun) ->
|
simple_persisted_tester(SSTNewFun) ->
|
||||||
|
Level = 3,
|
||||||
{RP, Filename} = {?TEST_AREA, "simple_test"},
|
{RP, Filename} = {?TEST_AREA, "simple_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
KVList0 = generate_randomkeys(1, ?LOOK_SLOTSIZE * 32, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
{LastKey, _LV} = lists:last(KVList1),
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
{ok, Pid, {FirstKey, LastKey}, _Bloom} =
|
{ok, Pid, {FirstKey, LastKey}, _Bloom} =
|
||||||
SSTNewFun(RP, Filename, 1, KVList1, length(KVList1), native),
|
SSTNewFun(RP, Filename, Level, KVList1, length(KVList1), native),
|
||||||
|
|
||||||
|
B0 = check_binary_references(Pid),
|
||||||
|
|
||||||
SW0 = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
lists:foreach(fun({K, V}) ->
|
lists:foreach(fun({K, V}) ->
|
||||||
?assertMatch({K, V}, sst_get(Pid, K))
|
?assertMatch({K, V}, sst_get(Pid, K))
|
||||||
|
@ -3335,9 +3355,44 @@ simple_persisted_tester(SSTNewFun) ->
|
||||||
FetchedListB4 = lists:foldl(FoldFun, [], FetchListB4),
|
FetchedListB4 = lists:foldl(FoldFun, [], FetchListB4),
|
||||||
?assertMatch([{Eight000Key, _v800}], FetchedListB4),
|
?assertMatch([{Eight000Key, _v800}], FetchedListB4),
|
||||||
|
|
||||||
|
B1 = check_binary_references(Pid),
|
||||||
|
|
||||||
ok = sst_close(Pid),
|
ok = sst_close(Pid),
|
||||||
|
|
||||||
|
io:format(user, "Reopen SST file~n", []),
|
||||||
|
OptsSST = #sst_options{press_method=native,
|
||||||
|
log_options=leveled_log:get_opts()},
|
||||||
|
{ok, OpenP, {FirstKey, LastKey}, _Bloom} =
|
||||||
|
sst_open(RP, Filename ++ ".sst", OptsSST, Level),
|
||||||
|
|
||||||
|
B2 = check_binary_references(OpenP),
|
||||||
|
|
||||||
|
lists:foreach(fun({K, V}) ->
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP, K)),
|
||||||
|
?assertMatch({K, V}, sst_get(OpenP, K))
|
||||||
|
end,
|
||||||
|
KVList1),
|
||||||
|
|
||||||
|
garbage_collect(OpenP),
|
||||||
|
B3 = check_binary_references(OpenP),
|
||||||
|
?assertMatch(0, B2), % Opens with an empty cache
|
||||||
|
?assertMatch(true, B3 > B2), % Now has headers in cache
|
||||||
|
?assertMatch(false, B3 > B0 * 2),
|
||||||
|
% Not significantly bigger than when created new
|
||||||
|
?assertMatch(false, B3 > B1 * 2),
|
||||||
|
% Not significantly bigger than when created new
|
||||||
|
|
||||||
|
ok = sst_close(OpenP),
|
||||||
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||||
|
|
||||||
|
check_binary_references(Pid) ->
|
||||||
|
garbage_collect(Pid),
|
||||||
|
{binary, BinList} = process_info(Pid, binary),
|
||||||
|
TotalBinMem =
|
||||||
|
lists:foldl(fun({_R, BM, _RC}, Acc) -> Acc + BM end, 0, BinList),
|
||||||
|
io:format(user, "Total binary memory ~w~n", [TotalBinMem]),
|
||||||
|
TotalBinMem.
|
||||||
|
|
||||||
key_dominates_test() ->
|
key_dominates_test() ->
|
||||||
KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}},
|
KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}},
|
||||||
KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}},
|
KV2 = {{o, "Bucket", "Key3", null}, {6, {active, infinity}, 0, []}},
|
||||||
|
|
|
@ -1214,8 +1214,8 @@ dollar_bucket_index(_Config) ->
|
||||||
bigobject_memorycheck(_Config) ->
|
bigobject_memorycheck(_Config) ->
|
||||||
RootPath = testutil:reset_filestructure(),
|
RootPath = testutil:reset_filestructure(),
|
||||||
{ok, Bookie} = leveled_bookie:book_start(RootPath,
|
{ok, Bookie} = leveled_bookie:book_start(RootPath,
|
||||||
100,
|
200,
|
||||||
100000000,
|
1000000000,
|
||||||
testutil:sync_strategy()),
|
testutil:sync_strategy()),
|
||||||
Bucket = <<"B">>,
|
Bucket = <<"B">>,
|
||||||
IndexGen = fun() -> [] end,
|
IndexGen = fun() -> [] end,
|
||||||
|
@ -1227,7 +1227,7 @@ bigobject_memorycheck(_Config) ->
|
||||||
{Obj, Spc} = testutil:set_object(Bucket, Key, Value, IndexGen, []),
|
{Obj, Spc} = testutil:set_object(Bucket, Key, Value, IndexGen, []),
|
||||||
testutil:book_riakput(Bookie, Obj, Spc)
|
testutil:book_riakput(Bookie, Obj, Spc)
|
||||||
end,
|
end,
|
||||||
lists:foreach(ObjPutFun, lists:seq(1, 600)),
|
lists:foreach(ObjPutFun, lists:seq(1, 700)),
|
||||||
{ok, _Ink, Pcl} = leveled_bookie:book_returnactors(Bookie),
|
{ok, _Ink, Pcl} = leveled_bookie:book_returnactors(Bookie),
|
||||||
{binary, BL} = process_info(Pcl, binary),
|
{binary, BL} = process_info(Pcl, binary),
|
||||||
{memory, M0} = process_info(Pcl, memory),
|
{memory, M0} = process_info(Pcl, memory),
|
||||||
|
@ -1235,5 +1235,54 @@ bigobject_memorycheck(_Config) ->
|
||||||
io:format("Pcl binary memory ~w ~w memory ~w~n", [B0, length(BL), M0]),
|
io:format("Pcl binary memory ~w ~w memory ~w~n", [B0, length(BL), M0]),
|
||||||
true = B0 < 500 * 4000,
|
true = B0 < 500 * 4000,
|
||||||
true = M0 < 500 * 4000,
|
true = M0 < 500 * 4000,
|
||||||
|
% All processes
|
||||||
|
{_TotalCDBBinMem, _TotalCDBProcesses} = cdb_memory_check(),
|
||||||
ok = leveled_bookie:book_close(Bookie),
|
ok = leveled_bookie:book_close(Bookie),
|
||||||
testutil:reset_filestructure().
|
{ok, BookieR} = leveled_bookie:book_start(RootPath,
|
||||||
|
2000,
|
||||||
|
1000000000,
|
||||||
|
testutil:sync_strategy()),
|
||||||
|
{RS_TotalCDBBinMem, _RS_TotalCDBProcesses} = cdb_memory_check(),
|
||||||
|
true = RS_TotalCDBBinMem < 1024 * 1024,
|
||||||
|
% No binary object references exist after startup
|
||||||
|
ok = leveled_bookie:book_close(BookieR),
|
||||||
|
testutil:reset_filestructure().
|
||||||
|
|
||||||
|
|
||||||
|
cdb_memory_check() ->
|
||||||
|
TotalCDBProcesses =
|
||||||
|
lists:filter(fun(P) ->
|
||||||
|
{dictionary, PD} =
|
||||||
|
process_info(P, dictionary),
|
||||||
|
case lists:keyfind('$initial_call', 1, PD) of
|
||||||
|
{'$initial_call',{leveled_cdb,init,1}} ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
processes()),
|
||||||
|
TotalCDBBinMem =
|
||||||
|
lists:foldl(fun(P, Acc) ->
|
||||||
|
BinMem = calc_total_binary_memory(P),
|
||||||
|
io:format("Memory for pid ~w is ~w~n", [P, BinMem]),
|
||||||
|
BinMem + Acc
|
||||||
|
end,
|
||||||
|
0,
|
||||||
|
TotalCDBProcesses),
|
||||||
|
io:format("Total binary memory ~w in ~w CDB processes~n",
|
||||||
|
[TotalCDBBinMem, length(TotalCDBProcesses)]),
|
||||||
|
{TotalCDBBinMem, TotalCDBProcesses}.
|
||||||
|
|
||||||
|
calc_total_binary_memory(Pid) ->
|
||||||
|
{binary, BL} = process_info(Pid, binary),
|
||||||
|
TBM = lists:foldl(fun({_R, Sz, _C}, Acc) -> Acc + Sz end, 0, BL),
|
||||||
|
case TBM > 1000000 of
|
||||||
|
true ->
|
||||||
|
FilteredBL =
|
||||||
|
lists:filter(fun(BMD) -> element(2, BMD) > 1024 end, BL),
|
||||||
|
io:format("Big-ref details for ~w ~w~n", [Pid, FilteredBL]);
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
TBM.
|
Loading…
Add table
Add a link
Reference in a new issue