From 695638f34c123c85f3946420a1616509d0789c69 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Sat, 20 May 2017 12:25:06 +0100 Subject: [PATCH] Loop for get_positions The CDB FSM process can be blocked by get_positions for all positions, so loop around the index outside of the FSM process to allow for other messages to interleave. --- src/leveled_cdb.erl | 107 +++++++++++++++++++++----------------------- src/leveled_log.erl | 4 +- 2 files changed, 51 insertions(+), 60 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 7996322..4b7d68c 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -152,10 +152,37 @@ cdb_mput(Pid, KVList) -> %% SampleSize can be an integer or the atom all cdb_getpositions(Pid, SampleSize) -> - SW = os:timestamp(), - R = gen_fsm:sync_send_event(Pid, {get_positions, SampleSize}, infinity), - leveled_log:log_timer("CDB19", [SampleSize], SW), - R. + % Getting many positions from the index, especially getting all positions + % can take time (about 1s for all positions). Rather than queue all + % requests waiting for this to complete, loop over each of the 256 indexes + % outside of the FSM processing loop - to allow for other messages to be + % interleaved + case SampleSize of + all -> + FoldFun = + fun(Index, Acc) -> + cdb_getpositions_fromidx(Pid, all, Index, Acc) + end, + IdxList = lists:seq(0, 255), + lists:foldl(FoldFun, [], IdxList); + S0 -> + FoldFun = + fun({_R, Index}, Acc) -> + case length(Acc) of + S0 -> + Acc; + L when L < S0 -> + cdb_getpositions_fromidx(Pid, S0, Index, Acc) + end + end, + RandFun = fun(X) -> {random:uniform(), X} end, + SeededL = lists:map(RandFun, lists:seq(0, 255)), + SortedL = lists:keysort(1, SeededL), + lists:foldl(FoldFun, [], SortedL) + end. + +cdb_getpositions_fromidx(Pid, SampleSize, Index, Acc) -> + gen_fsm:sync_send_event(Pid, {get_positions, SampleSize, Index, Acc}). %% Info can be key_only, key_size (size being the size of the value) or %% key_value_check (with the check part indicating if the CRC is correct for @@ -355,8 +382,8 @@ rolling({key_check, Key}, _From, State) -> loose_presence), rolling, State}; -rolling({get_positions, _SampleSize}, _From, State) -> - {reply, [], rolling, State}; +rolling({get_positions, _SampleSize, _Index, SampleAcc}, _From, State) -> + {reply, SampleAcc, rolling, State}; rolling({return_hashtable, IndexList, HashTreeBin}, _From, State) -> SW = os:timestamp(), Handle = State#state.handle, @@ -408,28 +435,14 @@ reader({key_check, Key}, _From, State) -> State#state.binary_mode), reader, State}; -reader({get_positions, SampleSize}, _From, State) -> +reader({get_positions, SampleSize, Index, Acc}, _From, State) -> + {Index, {Pos, Count}} = lists:keyfind(Index, 1, State#state.hash_index), + UpdAcc = scan_index_returnpositions(State#state.handle, Pos, Count, Acc), case SampleSize of all -> - {reply, - scan_index(State#state.handle, - State#state.hash_index, - {fun scan_index_returnpositions/4, []}), - reader, - State}; + {reply, UpdAcc, reader, State}; _ -> - SeededL = lists:map(fun(X) -> {random:uniform(), X} end, - State#state.hash_index), - SortedL = lists:keysort(1, SeededL), - RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL), - {reply, - scan_index_forsample(State#state.handle, - RandomisedHashIndex, - fun scan_index_returnpositions/4, - [], - SampleSize), - reader, - State} + {reply, lists:sublist(UpdAcc, SampleSize), reader, State} end; reader({direct_fetch, PositionList, Info}, _From, State) -> H = State#state.handle, @@ -842,18 +855,21 @@ open_for_readonly(Filename, LastKeyKnown) -> load_index(Handle) -> Index = lists:seq(0, 255), - lists:map(fun(X) -> - file:position(Handle, {bof, ?DWORD_SIZE * X}), - {HashTablePos, Count} = read_next_2_integers(Handle), - {X, {HashTablePos, Count}} end, - Index). + LoadIndexFun = + fun(X) -> + file:position(Handle, {bof, ?DWORD_SIZE * X}), + {HashTablePos, Count} = read_next_2_integers(Handle), + {X, {HashTablePos, Count}} + end, + lists:map(LoadIndexFun, Index). %% Function to find the LastKey in the file find_lastkey(Handle, IndexCache) -> - {LastPosition, TotalKeys} = scan_index(Handle, - IndexCache, - {fun scan_index_findlast/4, - {0, 0}}), + ScanIndexFun = + fun({_X, {Pos, Count}}, {LastPos, KeyCount}) -> + scan_index_findlast(Handle, Pos, Count, {LastPos, KeyCount}) + end, + {LastPosition, TotalKeys} = lists:foldl(ScanIndexFun, {0, 0}, IndexCache), case TotalKeys of 0 -> empty; @@ -864,29 +880,6 @@ find_lastkey(Handle, IndexCache) -> end. -scan_index(Handle, IndexCache, {ScanFun, InitAcc}) -> - lists:foldl(fun({_X, {Pos, Count}}, Acc) -> - ScanFun(Handle, Pos, Count, Acc) - end, - InitAcc, - IndexCache). - -scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) -> - lists:sublist(Acc, SampleSize); -scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) -> - case length(Acc) of - L when L >= SampleSize -> - lists:sublist(Acc, SampleSize); - _ -> - {_X, {Pos, Count}} = CacheEntry, - scan_index_forsample(Handle, - Tail, - ScanFun, - ScanFun(Handle, Pos, Count, Acc), - SampleSize) - end. - - scan_index_findlast(Handle, Position, Count, {LastPosition, TotalKeys}) -> {ok, _} = file:position(Handle, Position), MaxPosFun = fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, diff --git a/src/leveled_log.erl b/src/leveled_log.erl index 7681480..a65b587 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -316,9 +316,7 @@ {info, "After ~w PUTs total write time is ~w total sync time is ~w " ++ "and max write time is ~w and max sync time is ~w"}}, {"CDB18", - {info, "Handled return and write of hashtable"}}, - {"CDB19", - {info, "Scan of positions for SampleSize=~w"}} + {info, "Handled return and write of hashtable"}} ])).