Loop for get_positions

The CDB FSM process can be blocked by get_positions for all positions,
so loop around the index outside of the FSM process to allow for other
messages to interleave.
This commit is contained in:
martinsumner 2017-05-20 12:25:06 +01:00
parent 319dc5f388
commit 695638f34c
2 changed files with 51 additions and 60 deletions

View file

@ -152,10 +152,37 @@ cdb_mput(Pid, KVList) ->
%% SampleSize can be an integer or the atom all
cdb_getpositions(Pid, SampleSize) ->
SW = os:timestamp(),
R = gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity),
leveled_log:log_timer("CDB19", [SampleSize], SW),
R.
% Getting many positions from the index, especially getting all positions
% can take time (about 1s for all positions). Rather than queue all
% requests waiting for this to complete, loop over each of the 256 indexes
% outside of the FSM processing loop - to allow for other messages to be
% interleaved
case SampleSize of
all ->
FoldFun =
fun(Index, Acc) ->
cdb_getpositions_fromidx(Pid, all, Index, Acc)
end,
IdxList = lists:seq(0, 255),
lists:foldl(FoldFun, [], IdxList);
S0 ->
FoldFun =
fun({_R, Index}, Acc) ->
case length(Acc) of
S0 ->
Acc;
L when L < S0 ->
cdb_getpositions_fromidx(Pid, S0, Index, Acc)
end
end,
RandFun = fun(X) -> {random:uniform(), X} end,
SeededL = lists:map(RandFun, lists:seq(0, 255)),
SortedL = lists:keysort(1, SeededL),
lists:foldl(FoldFun, [], SortedL)
end.
cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) ->
gen_fsm:sync_send_event(Pid, {get_positions, SampleSize, Index, Acc}).
%% Info can be key_only, key_size (size being the size of the value) or
%% key_value_check (with the check part indicating if the CRC is correct for
@ -355,8 +382,8 @@ rolling({key_check, Key}, _From, State) ->
loose_presence),
rolling,
State};
rolling({get_positions, _SampleSize}, _From, State) ->
{reply, [], rolling, State};
rolling({get_positions, _SampleSize, _Index, SampleAcc}, _From, State) ->
{reply, SampleAcc, rolling, State};
rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) ->
SW = os:timestamp(),
Handle = State#state.handle,
@ -408,28 +435,14 @@ reader({key_check, Key}, _From, State) ->
State#state.binary_mode),
reader,
State};
reader({get_positions, SampleSize}, _From, State) ->
reader({get_positions, SampleSize, Index, Acc}, _From, State) ->
{Index, {Pos, Count}} = lists:keyfind(Index, 1, State#state.hash_index),
UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc),
case SampleSize of
all ->
{reply,
scan_index(State#state.handle,
State#state.hash_index,
{fun scan_index_returnpositions/4, []}),
reader,
State};
{reply, UpdAcc, reader, State};
_ ->
SeededL = lists:map(fun(X) -> {random:uniform(), X} end,
State#state.hash_index),
SortedL = lists:keysort(1, SeededL),
RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL),
{reply,
scan_index_forsample(State#state.handle,
RandomisedHashIndex,
fun scan_index_returnpositions/4,
[],
SampleSize),
reader,
State}
{reply, lists:sublist(UpdAcc, SampleSize), reader, State}
end;
reader({direct_fetch, PositionList, Info}, _From, State) ->
H = State#state.handle,
@ -842,18 +855,21 @@ open_for_readonly(Filename, LastKeyKnown) ->
load_index(Handle) ->
Index = lists:seq(0, 255),
lists:map(fun(X) ->
LoadIndexFun =
fun(X) ->
file:position(Handle, {bof, ?DWORD_SIZE * X}),
{HashTablePos, Count} = read_next_2_integers(Handle),
{X, {HashTablePos, Count}} end,
Index).
{X, {HashTablePos, Count}}
end,
lists:map(LoadIndexFun, Index).
%% Function to find the LastKey in the file
find_lastkey(Handle, IndexCache) ->
{LastPosition, TotalKeys} = scan_index(Handle,
IndexCache,
{fun scan_index_findlast/4,
{0, 0}}),
ScanIndexFun =
fun({_X, {Pos, Count}}, {LastPos, KeyCount}) ->
scan_index_findlast(Handle, Pos, Count, {LastPos, KeyCount})
end,
{LastPosition, TotalKeys} = lists:foldl(ScanIndexFun, {0, 0}, IndexCache),
case TotalKeys of
0 ->
empty;
@ -864,29 +880,6 @@ find_lastkey(Handle, IndexCache) ->
end.
scan_index(Handle, IndexCache, {ScanFun, InitAcc}) ->
lists:foldl(fun({_X, {Pos, Count}}, Acc) ->
ScanFun(Handle, Pos, Count, Acc)
end,
InitAcc,
IndexCache).
scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) ->
lists:sublist(Acc, SampleSize);
scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) ->
case length(Acc) of
L when L >= SampleSize ->
lists:sublist(Acc, SampleSize);
_ ->
{_X, {Pos, Count}} = CacheEntry,
scan_index_forsample(Handle,
Tail,
ScanFun,
ScanFun(Handle, Pos, Count, Acc),
SampleSize)
end.
scan_index_findlast(Handle, Position, Count, {LastPosition, TotalKeys}) ->
{ok, _} = file:position(Handle, Position),
MaxPosFun = fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end,

View file

@ -316,9 +316,7 @@
{info, "After ~w PUTs total write time is ~w total sync time is ~w "
++ "and max write time is ~w and max sync time is ~w"}},
{"CDB18",
{info, "Handled return and write of hashtable"}},
{"CDB19",
{info, "Scan of positions for SampleSize=~w"}}
{info, "Handled return and write of hashtable"}}
])).