Or process calculation of the Hash Table

When the journal CDB file is called to roll it now starts a new clerk to
perform the hashtable calculation (which may take many seconds).  This
stops the store from getting blocked if there is an attempt to GET from
the journal that has just been rolled.

The journal file process now has  anumber fo distinct states (reading,
writing, pending_roll, closing).  A future refactor may look to make
leveled_cdb a gen_fsm rather than a gen_server.
This commit is contained in:
martinsumner 2016-10-14 13:36:12 +01:00
parent bbdac65f8d
commit 9be0f96406
2 changed files with 98 additions and 62 deletions

View file

@ -67,8 +67,10 @@
cdb_close/1,
cdb_complete/1,
cdb_roll/1,
cdb_returnhashtable/3,
cdb_destroy/1,
cdb_deletepending/1]).
cdb_deletepending/1,
hashtable_calc/2]).
-include_lib("eunit/include/eunit.hrl").
@ -79,6 +81,7 @@
-define(BINARY_MODE, false).
-define(BASE_POSITION, 2048).
-define(WRITE_OPS, [binary, raw, read, write]).
-define(PENDING_ROLL_WAIT, 30).
-record(state, {hashtree,
last_position :: integer(),
@ -88,6 +91,7 @@
handle :: file:fd(),
writer :: boolean(),
max_size :: integer(),
pending_roll = false :: boolean(),
pending_delete = false :: boolean(),
binary_mode = false :: boolean()}).
@ -135,7 +139,21 @@ cdb_directfetch(Pid, PositionList, Info) ->
gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity).
cdb_close(Pid) ->
gen_server:call(Pid, cdb_close, infinity).
cdb_close(Pid, ?PENDING_ROLL_WAIT).
cdb_close(Pid, WaitsLeft) ->
if
WaitsLeft > 0 ->
case gen_server:call(Pid, cdb_close, infinity) of
pending_roll ->
timer:sleep(1),
cdb_close(Pid, WaitsLeft - 1);
R ->
R
end;
true ->
gen_server:call(Pid, cdb_kill, infinity)
end.
cdb_complete(Pid) ->
gen_server:call(Pid, cdb_complete, infinity).
@ -143,6 +161,9 @@ cdb_complete(Pid) ->
cdb_roll(Pid) ->
gen_server:cast(Pid, cdb_roll).
cdb_returnhashtable(Pid, IndexList, HashTreeBin) ->
gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity).
cdb_destroy(Pid) ->
gen_server:cast(Pid, destroy).
@ -210,46 +231,36 @@ handle_call({open_reader, Filename}, _From, State) ->
writer=false,
hash_index=Index}};
handle_call({get_kv, Key}, _From, State) ->
case {State#state.writer, State#state.hash_index} of
{true, _} ->
case State#state.writer of
true ->
{reply,
get_mem(Key, State#state.handle, State#state.hashtree),
State};
{false, []} ->
false ->
{reply,
get(State#state.handle, Key),
State};
{false, Cache} ->
{reply,
get_withcache(State#state.handle, Key, Cache),
get_withcache(State#state.handle, Key, State#state.hash_index),
State}
end;
handle_call({key_check, Key}, _From, State) ->
case {State#state.writer, State#state.hash_index} of
{true, _} ->
case State#state.writer of
true ->
{reply,
get_mem(Key,
State#state.handle,
State#state.hashtree,
loose_presence),
State};
{false, []} ->
{reply,
get(State#state.handle,
Key,
loose_presence),
State};
{false, Cache} ->
false ->
{reply,
get(State#state.handle,
Key,
loose_presence,
Cache),
State#state.hash_index),
State}
end;
handle_call({put_kv, Key, Value}, _From, State) ->
case State#state.writer of
true ->
case {State#state.writer, State#state.pending_roll} of
{true, false} ->
Result = put(State#state.handle,
Key, Value,
{State#state.last_position, State#state.hashtree},
@ -265,7 +276,7 @@ handle_call({put_kv, Key, Value}, _From, State) ->
last_key=Key,
hashtree=HashTree}}
end;
false ->
_ ->
{reply,
{error, read_only},
State}
@ -334,9 +345,14 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
empty ->
{reply, {eof, Acc}, State}
end;
handle_call(cdb_close, _From, State=#state{pending_roll=RollPending})
when RollPending == true ->
{reply, pending_roll, State};
handle_call(cdb_close, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State#state{handle=undefined}};
handle_call(cdb_kill, _From, State) ->
{stop, killed, ok, State};
handle_call(cdb_complete, _From, State=#state{writer=Writer})
when Writer == true ->
NewName = determine_new_filename(State#state.filename),
@ -347,7 +363,25 @@ handle_call(cdb_complete, _From, State=#state{writer=Writer})
{stop, normal, {ok, NewName}, State};
handle_call(cdb_complete, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, {ok, State#state.filename}, State}.
{stop, normal, {ok, State#state.filename}, State};
handle_call({return_hashtable, IndexList, HashTreeBin},
_From,
State=#state{pending_roll=RollPending}) when RollPending == true ->
Handle = State#state.handle,
{ok, BasePos} = file:position(Handle, State#state.last_position),
NewName = determine_new_filename(State#state.filename),
ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos),
ok = write_top_index_table(Handle, BasePos, IndexList),
file:close(Handle),
ok = rename_for_read(State#state.filename, NewName),
io:format("Opening file for reading with filename ~s~n", [NewName]),
{NewHandle, Index, LastKey} = open_for_readonly(NewName),
{reply, ok, State#state{handle=NewHandle,
last_key=LastKey,
filename=NewName,
writer=false,
pending_roll=false,
hash_index=Index}}.
handle_cast(destroy, State) ->
@ -357,20 +391,10 @@ handle_cast(destroy, State) ->
handle_cast(delete_pending, State) ->
{noreply, State#state{pending_delete=true}};
handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true ->
NewName = determine_new_filename(State#state.filename),
ok = close_file(State#state.handle,
State#state.hashtree,
State#state.last_position),
ok = rename_for_read(State#state.filename, NewName),
io:format("Opening file for reading with filename ~s~n", [NewName]),
{Handle, Index, LastKey} = open_for_readonly(NewName),
{noreply, State#state{handle=Handle,
last_key=LastKey,
filename=NewName,
writer=false,
hash_index=Index}};
handle_cast(_Msg, State) ->
{noreply, State}.
ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree,
State#state.last_position,
self()),
{noreply, State#state{pending_roll=true}}.
handle_info(_Info, State) ->
{noreply, State}.
@ -670,6 +694,17 @@ fold_keys(Handle, FoldFun, Acc0) ->
{FirstHashPosition, _} = read_next_2_integers(Handle),
fold(Handle, FoldFun, Acc0, {256 * ?DWORD_SIZE, FirstHashPosition}, true).
hashtable_calc(HashTree, StartPos) ->
Seq = lists:seq(0, 255),
SWC = os:timestamp(),
{IndexList, HashTreeBin} = write_hash_tables(Seq,
HashTree,
StartPos,
[],
<<>>),
io:format("HashTree computed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWC)]),
{IndexList, HashTreeBin}.
%%%%%%%%%%%%%%%%%%%%
%% Internal functions
@ -756,18 +791,11 @@ scan_index_returnpositions(Handle, Position, Count, PosList0) ->
%% the hash tables
close_file(Handle, HashTree, BasePos) ->
{ok, BasePos} = file:position(Handle, BasePos),
SW1 = os:timestamp(),
L2 = write_hash_tables(Handle, HashTree),
SW2 = os:timestamp(),
io:format("Hash Table write took ~w microseconds~n",
[timer:now_diff(SW2, SW1)]),
write_top_index_table(Handle, BasePos, L2),
io:format("Top Index Table write took ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW2)]),
IndexList = write_hash_tables(Handle, HashTree),
ok = write_top_index_table(Handle, BasePos, IndexList),
file:close(Handle).
%% Fetch a list of positions by passing a key to the HashTree
get_hashtree(Key, HashTree) ->
Hash = hash(Key),
@ -897,7 +925,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) ->
check_last_key(LastKey) ->
case LastKey of
undefined -> error;
empty -> empty;
_ -> ok
end.
@ -1096,23 +1123,20 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) ->
%% corresponding to a key and the second word is a file pointer to the
%% corresponding {key,value} tuple.
write_hash_tables(Handle, HashTree) ->
Seq = lists:seq(0, 255),
{ok, StartPos} = file:position(Handle, cur),
SWC = os:timestamp(),
{IndexList, HashTreeBin} = write_hash_tables(Seq,
HashTree,
StartPos,
[],
<<>>),
io:format("HashTree computed in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWC)]),
{IndexList, HashTreeBin} = hashtable_calc(HashTree, StartPos),
ok = perform_write_hash_tables(Handle, HashTreeBin, StartPos),
IndexList.
perform_write_hash_tables(Handle, HashTreeBin, StartPos) ->
SWW = os:timestamp(),
ok = file:write(Handle, HashTreeBin),
io:format("HashTree written in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWW)]),
{ok, EndPos} = file:position(Handle, cur),
ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need),
IndexList.
io:format("HashTree written in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SWW)]),
ok.
write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) ->
{IndexList, HashTreeBin};
@ -1217,7 +1241,8 @@ write_top_index_table(Handle, BasePos, List) ->
CompleteList),
{ok, _} = file:position(Handle, 0),
ok = file:write(Handle, IndexBin),
ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need).
ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need),
ok.
%% To make this compatible with original Bernstein format this endian flip
%% and also the use of the standard hash function required.
@ -1622,9 +1647,11 @@ find_lastkey_test() ->
ok = cdb_put(P1, "Key2", "Value2"),
?assertMatch("Key2", cdb_lastkey(P1)),
?assertMatch("Key1", cdb_firstkey(P1)),
probably = cdb_keycheck(P1, "Key2"),
ok = cdb_close(P1),
{ok, P2} = cdb_open_writer("../test/lastkey.pnd"),
?assertMatch("Key2", cdb_lastkey(P2)),
probably = cdb_keycheck(P2, "Key2"),
{ok, F2} = cdb_complete(P2),
{ok, P3} = cdb_open_reader(F2),
?assertMatch("Key2", cdb_lastkey(P3)),

View file

@ -13,6 +13,7 @@
terminate/2,
clerk_new/1,
clerk_compact/6,
clerk_hashtablecalc/3,
clerk_stop/1,
code_change/3]).
@ -56,6 +57,10 @@ clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) ->
Inker,
Timeout}).
clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
{ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []),
gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}).
clerk_stop(Pid) ->
gen_server:cast(Pid, stop).
@ -129,6 +134,10 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
ok = leveled_inker:ink_compactioncomplete(Inker),
{noreply, State}
end;
handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) ->
{IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos),
ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin),
{stop, normal, State};
handle_cast(stop, State) ->
{stop, normal, State}.