From 9be0f964061fb588dccc400caf06ab0d7102822a Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 14 Oct 2016 13:36:12 +0100 Subject: [PATCH] Or process calculation of the Hash Table When the journal CDB file is called to roll it now starts a new clerk to perform the hashtable calculation (which may take many seconds). This stops the store from getting blocked if there is an attempt to GET from the journal that has just been rolled. The journal file process now has anumber fo distinct states (reading, writing, pending_roll, closing). A future refactor may look to make leveled_cdb a gen_fsm rather than a gen_server. --- src/leveled_cdb.erl | 151 ++++++++++++++++++++++++----------------- src/leveled_iclerk.erl | 9 +++ 2 files changed, 98 insertions(+), 62 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 9b9af5a..e49138a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -67,8 +67,10 @@ cdb_close/1, cdb_complete/1, cdb_roll/1, + cdb_returnhashtable/3, cdb_destroy/1, - cdb_deletepending/1]). + cdb_deletepending/1, + hashtable_calc/2]). -include_lib("eunit/include/eunit.hrl"). @@ -79,6 +81,7 @@ -define(BINARY_MODE, false). -define(BASE_POSITION, 2048). -define(WRITE_OPS, [binary, raw, read, write]). +-define(PENDING_ROLL_WAIT, 30). -record(state, {hashtree, last_position :: integer(), @@ -88,6 +91,7 @@ handle :: file:fd(), writer :: boolean(), max_size :: integer(), + pending_roll = false :: boolean(), pending_delete = false :: boolean(), binary_mode = false :: boolean()}). @@ -135,7 +139,21 @@ cdb_directfetch(Pid, PositionList, Info) -> gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity). cdb_close(Pid) -> - gen_server:call(Pid, cdb_close, infinity). + cdb_close(Pid, ?PENDING_ROLL_WAIT). + +cdb_close(Pid, WaitsLeft) -> + if + WaitsLeft > 0 -> + case gen_server:call(Pid, cdb_close, infinity) of + pending_roll -> + timer:sleep(1), + cdb_close(Pid, WaitsLeft - 1); + R -> + R + end; + true -> + gen_server:call(Pid, cdb_kill, infinity) + end. cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). @@ -143,6 +161,9 @@ cdb_complete(Pid) -> cdb_roll(Pid) -> gen_server:cast(Pid, cdb_roll). +cdb_returnhashtable(Pid, IndexList, HashTreeBin) -> + gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). + cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -210,46 +231,36 @@ handle_call({open_reader, Filename}, _From, State) -> writer=false, hash_index=Index}}; handle_call({get_kv, Key}, _From, State) -> - case {State#state.writer, State#state.hash_index} of - {true, _} -> + case State#state.writer of + true -> {reply, get_mem(Key, State#state.handle, State#state.hashtree), State}; - {false, []} -> + false -> {reply, - get(State#state.handle, Key), - State}; - {false, Cache} -> - {reply, - get_withcache(State#state.handle, Key, Cache), + get_withcache(State#state.handle, Key, State#state.hash_index), State} end; handle_call({key_check, Key}, _From, State) -> - case {State#state.writer, State#state.hash_index} of - {true, _} -> + case State#state.writer of + true -> {reply, get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), State}; - {false, []} -> - {reply, - get(State#state.handle, - Key, - loose_presence), - State}; - {false, Cache} -> + false -> {reply, get(State#state.handle, Key, loose_presence, - Cache), + State#state.hash_index), State} end; handle_call({put_kv, Key, Value}, _From, State) -> - case State#state.writer of - true -> + case {State#state.writer, State#state.pending_roll} of + {true, false} -> Result = put(State#state.handle, Key, Value, {State#state.last_position, State#state.hashtree}, @@ -265,7 +276,7 @@ handle_call({put_kv, Key, Value}, _From, State) -> last_key=Key, hashtree=HashTree}} end; - false -> + _ -> {reply, {error, read_only}, State} @@ -334,9 +345,14 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> empty -> {reply, {eof, Acc}, State} end; +handle_call(cdb_close, _From, State=#state{pending_roll=RollPending}) + when RollPending == true -> + {reply, pending_roll, State}; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=undefined}}; +handle_call(cdb_kill, _From, State) -> + {stop, killed, ok, State}; handle_call(cdb_complete, _From, State=#state{writer=Writer}) when Writer == true -> NewName = determine_new_filename(State#state.filename), @@ -347,7 +363,25 @@ handle_call(cdb_complete, _From, State=#state{writer=Writer}) {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}. + {stop, normal, {ok, State#state.filename}, State}; +handle_call({return_hashtable, IndexList, HashTreeBin}, + _From, + State=#state{pending_roll=RollPending}) when RollPending == true -> + Handle = State#state.handle, + {ok, BasePos} = file:position(Handle, State#state.last_position), + NewName = determine_new_filename(State#state.filename), + ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), + ok = write_top_index_table(Handle, BasePos, IndexList), + file:close(Handle), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {NewHandle, Index, LastKey} = open_for_readonly(NewName), + {reply, ok, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + writer=false, + pending_roll=false, + hash_index=Index}}. handle_cast(destroy, State) -> @@ -355,22 +389,12 @@ handle_cast(destroy, State) -> ok = file:delete(State#state.filename), {noreply, State}; handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete = true}}; + {noreply, State#state{pending_delete=true}}; handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> - NewName = determine_new_filename(State#state.filename), - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - ok = rename_for_read(State#state.filename, NewName), - io:format("Opening file for reading with filename ~s~n", [NewName]), - {Handle, Index, LastKey} = open_for_readonly(NewName), - {noreply, State#state{handle=Handle, - last_key=LastKey, - filename=NewName, - writer=false, - hash_index=Index}}; -handle_cast(_Msg, State) -> - {noreply, State}. + ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, + State#state.last_position, + self()), + {noreply, State#state{pending_roll=true}}. handle_info(_Info, State) -> {noreply, State}. @@ -670,6 +694,17 @@ fold_keys(Handle, FoldFun, Acc0) -> {FirstHashPosition, _} = read_next_2_integers(Handle), fold(Handle, FoldFun, Acc0, {256 * ?DWORD_SIZE, FirstHashPosition}, true). +hashtable_calc(HashTree, StartPos) -> + Seq = lists:seq(0, 255), + SWC = os:timestamp(), + {IndexList, HashTreeBin} = write_hash_tables(Seq, + HashTree, + StartPos, + [], + <<>>), + io:format("HashTree computed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWC)]), + {IndexList, HashTreeBin}. %%%%%%%%%%%%%%%%%%%% %% Internal functions @@ -756,18 +791,11 @@ scan_index_returnpositions(Handle, Position, Count, PosList0) -> %% the hash tables close_file(Handle, HashTree, BasePos) -> {ok, BasePos} = file:position(Handle, BasePos), - SW1 = os:timestamp(), - L2 = write_hash_tables(Handle, HashTree), - SW2 = os:timestamp(), - io:format("Hash Table write took ~w microseconds~n", - [timer:now_diff(SW2, SW1)]), - write_top_index_table(Handle, BasePos, L2), - io:format("Top Index Table write took ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW2)]), + IndexList = write_hash_tables(Handle, HashTree), + ok = write_top_index_table(Handle, BasePos, IndexList), file:close(Handle). - %% Fetch a list of positions by passing a key to the HashTree get_hashtree(Key, HashTree) -> Hash = hash(Key), @@ -897,7 +925,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> check_last_key(LastKey) -> case LastKey of - undefined -> error; empty -> empty; _ -> ok end. @@ -1096,23 +1123,20 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) -> %% corresponding to a key and the second word is a file pointer to the %% corresponding {key,value} tuple. write_hash_tables(Handle, HashTree) -> - Seq = lists:seq(0, 255), {ok, StartPos} = file:position(Handle, cur), - SWC = os:timestamp(), - {IndexList, HashTreeBin} = write_hash_tables(Seq, - HashTree, - StartPos, - [], - <<>>), - io:format("HashTree computed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWC)]), + {IndexList, HashTreeBin} = hashtable_calc(HashTree, StartPos), + ok = perform_write_hash_tables(Handle, HashTreeBin, StartPos), + IndexList. + +perform_write_hash_tables(Handle, HashTreeBin, StartPos) -> SWW = os:timestamp(), ok = file:write(Handle, HashTreeBin), - io:format("HashTree written in ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWW)]), {ok, EndPos} = file:position(Handle, cur), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), - IndexList. + io:format("HashTree written in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWW)]), + ok. + write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) -> {IndexList, HashTreeBin}; @@ -1217,7 +1241,8 @@ write_top_index_table(Handle, BasePos, List) -> CompleteList), {ok, _} = file:position(Handle, 0), ok = file:write(Handle, IndexBin), - ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need). + ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need), + ok. %% To make this compatible with original Bernstein format this endian flip %% and also the use of the standard hash function required. @@ -1622,9 +1647,11 @@ find_lastkey_test() -> ok = cdb_put(P1, "Key2", "Value2"), ?assertMatch("Key2", cdb_lastkey(P1)), ?assertMatch("Key1", cdb_firstkey(P1)), + probably = cdb_keycheck(P1, "Key2"), ok = cdb_close(P1), {ok, P2} = cdb_open_writer("../test/lastkey.pnd"), ?assertMatch("Key2", cdb_lastkey(P2)), + probably = cdb_keycheck(P2, "Key2"), {ok, F2} = cdb_complete(P2), {ok, P3} = cdb_open_reader(F2), ?assertMatch("Key2", cdb_lastkey(P3)), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 33e5e9b..8b58c4b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -13,6 +13,7 @@ terminate/2, clerk_new/1, clerk_compact/6, + clerk_hashtablecalc/3, clerk_stop/1, code_change/3]). @@ -56,6 +57,10 @@ clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) -> Inker, Timeout}). +clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> + {ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []), + gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}). + clerk_stop(Pid) -> gen_server:cast(Pid, stop). @@ -129,6 +134,10 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, ok = leveled_inker:ink_compactioncomplete(Inker), {noreply, State} end; +handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> + {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), + ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), + {stop, normal, State}; handle_cast(stop, State) -> {stop, normal, State}.