diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 9b9af5a..e49138a 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -67,8 +67,10 @@ cdb_close/1, cdb_complete/1, cdb_roll/1, + cdb_returnhashtable/3, cdb_destroy/1, - cdb_deletepending/1]). + cdb_deletepending/1, + hashtable_calc/2]). -include_lib("eunit/include/eunit.hrl"). @@ -79,6 +81,7 @@ -define(BINARY_MODE, false). -define(BASE_POSITION, 2048). -define(WRITE_OPS, [binary, raw, read, write]). +-define(PENDING_ROLL_WAIT, 30). -record(state, {hashtree, last_position :: integer(), @@ -88,6 +91,7 @@ handle :: file:fd(), writer :: boolean(), max_size :: integer(), + pending_roll = false :: boolean(), pending_delete = false :: boolean(), binary_mode = false :: boolean()}). @@ -135,7 +139,21 @@ cdb_directfetch(Pid, PositionList, Info) -> gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity). cdb_close(Pid) -> - gen_server:call(Pid, cdb_close, infinity). + cdb_close(Pid, ?PENDING_ROLL_WAIT). + +cdb_close(Pid, WaitsLeft) -> + if + WaitsLeft > 0 -> + case gen_server:call(Pid, cdb_close, infinity) of + pending_roll -> + timer:sleep(1), + cdb_close(Pid, WaitsLeft - 1); + R -> + R + end; + true -> + gen_server:call(Pid, cdb_kill, infinity) + end. cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). @@ -143,6 +161,9 @@ cdb_complete(Pid) -> cdb_roll(Pid) -> gen_server:cast(Pid, cdb_roll). +cdb_returnhashtable(Pid, IndexList, HashTreeBin) -> + gen_server:call(Pid, {return_hashtable, IndexList, HashTreeBin}, infinity). + cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -210,46 +231,36 @@ handle_call({open_reader, Filename}, _From, State) -> writer=false, hash_index=Index}}; handle_call({get_kv, Key}, _From, State) -> - case {State#state.writer, State#state.hash_index} of - {true, _} -> + case State#state.writer of + true -> {reply, get_mem(Key, State#state.handle, State#state.hashtree), State}; - {false, []} -> + false -> {reply, - get(State#state.handle, Key), - State}; - {false, Cache} -> - {reply, - get_withcache(State#state.handle, Key, Cache), + get_withcache(State#state.handle, Key, State#state.hash_index), State} end; handle_call({key_check, Key}, _From, State) -> - case {State#state.writer, State#state.hash_index} of - {true, _} -> + case State#state.writer of + true -> {reply, get_mem(Key, State#state.handle, State#state.hashtree, loose_presence), State}; - {false, []} -> - {reply, - get(State#state.handle, - Key, - loose_presence), - State}; - {false, Cache} -> + false -> {reply, get(State#state.handle, Key, loose_presence, - Cache), + State#state.hash_index), State} end; handle_call({put_kv, Key, Value}, _From, State) -> - case State#state.writer of - true -> + case {State#state.writer, State#state.pending_roll} of + {true, false} -> Result = put(State#state.handle, Key, Value, {State#state.last_position, State#state.hashtree}, @@ -265,7 +276,7 @@ handle_call({put_kv, Key, Value}, _From, State) -> last_key=Key, hashtree=HashTree}} end; - false -> + _ -> {reply, {error, read_only}, State} @@ -334,9 +345,14 @@ handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> empty -> {reply, {eof, Acc}, State} end; +handle_call(cdb_close, _From, State=#state{pending_roll=RollPending}) + when RollPending == true -> + {reply, pending_roll, State}; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=undefined}}; +handle_call(cdb_kill, _From, State) -> + {stop, killed, ok, State}; handle_call(cdb_complete, _From, State=#state{writer=Writer}) when Writer == true -> NewName = determine_new_filename(State#state.filename), @@ -347,7 +363,25 @@ handle_call(cdb_complete, _From, State=#state{writer=Writer}) {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}. + {stop, normal, {ok, State#state.filename}, State}; +handle_call({return_hashtable, IndexList, HashTreeBin}, + _From, + State=#state{pending_roll=RollPending}) when RollPending == true -> + Handle = State#state.handle, + {ok, BasePos} = file:position(Handle, State#state.last_position), + NewName = determine_new_filename(State#state.filename), + ok = perform_write_hash_tables(Handle, HashTreeBin, BasePos), + ok = write_top_index_table(Handle, BasePos, IndexList), + file:close(Handle), + ok = rename_for_read(State#state.filename, NewName), + io:format("Opening file for reading with filename ~s~n", [NewName]), + {NewHandle, Index, LastKey} = open_for_readonly(NewName), + {reply, ok, State#state{handle=NewHandle, + last_key=LastKey, + filename=NewName, + writer=false, + pending_roll=false, + hash_index=Index}}. handle_cast(destroy, State) -> @@ -355,22 +389,12 @@ handle_cast(destroy, State) -> ok = file:delete(State#state.filename), {noreply, State}; handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete = true}}; + {noreply, State#state{pending_delete=true}}; handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> - NewName = determine_new_filename(State#state.filename), - ok = close_file(State#state.handle, - State#state.hashtree, - State#state.last_position), - ok = rename_for_read(State#state.filename, NewName), - io:format("Opening file for reading with filename ~s~n", [NewName]), - {Handle, Index, LastKey} = open_for_readonly(NewName), - {noreply, State#state{handle=Handle, - last_key=LastKey, - filename=NewName, - writer=false, - hash_index=Index}}; -handle_cast(_Msg, State) -> - {noreply, State}. + ok = leveled_iclerk:clerk_hashtablecalc(State#state.hashtree, + State#state.last_position, + self()), + {noreply, State#state{pending_roll=true}}. handle_info(_Info, State) -> {noreply, State}. @@ -670,6 +694,17 @@ fold_keys(Handle, FoldFun, Acc0) -> {FirstHashPosition, _} = read_next_2_integers(Handle), fold(Handle, FoldFun, Acc0, {256 * ?DWORD_SIZE, FirstHashPosition}, true). +hashtable_calc(HashTree, StartPos) -> + Seq = lists:seq(0, 255), + SWC = os:timestamp(), + {IndexList, HashTreeBin} = write_hash_tables(Seq, + HashTree, + StartPos, + [], + <<>>), + io:format("HashTree computed in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWC)]), + {IndexList, HashTreeBin}. %%%%%%%%%%%%%%%%%%%% %% Internal functions @@ -756,18 +791,11 @@ scan_index_returnpositions(Handle, Position, Count, PosList0) -> %% the hash tables close_file(Handle, HashTree, BasePos) -> {ok, BasePos} = file:position(Handle, BasePos), - SW1 = os:timestamp(), - L2 = write_hash_tables(Handle, HashTree), - SW2 = os:timestamp(), - io:format("Hash Table write took ~w microseconds~n", - [timer:now_diff(SW2, SW1)]), - write_top_index_table(Handle, BasePos, L2), - io:format("Top Index Table write took ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW2)]), + IndexList = write_hash_tables(Handle, HashTree), + ok = write_top_index_table(Handle, BasePos, IndexList), file:close(Handle). - %% Fetch a list of positions by passing a key to the HashTree get_hashtree(Key, HashTree) -> Hash = hash(Key), @@ -897,7 +925,6 @@ scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> check_last_key(LastKey) -> case LastKey of - undefined -> error; empty -> empty; _ -> ok end. @@ -1096,23 +1123,20 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) -> %% corresponding to a key and the second word is a file pointer to the %% corresponding {key,value} tuple. write_hash_tables(Handle, HashTree) -> - Seq = lists:seq(0, 255), {ok, StartPos} = file:position(Handle, cur), - SWC = os:timestamp(), - {IndexList, HashTreeBin} = write_hash_tables(Seq, - HashTree, - StartPos, - [], - <<>>), - io:format("HashTree computed in ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWC)]), + {IndexList, HashTreeBin} = hashtable_calc(HashTree, StartPos), + ok = perform_write_hash_tables(Handle, HashTreeBin, StartPos), + IndexList. + +perform_write_hash_tables(Handle, HashTreeBin, StartPos) -> SWW = os:timestamp(), ok = file:write(Handle, HashTreeBin), - io:format("HashTree written in ~w microseconds~n", - [timer:now_diff(os:timestamp(), SWW)]), {ok, EndPos} = file:position(Handle, cur), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), - IndexList. + io:format("HashTree written in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SWW)]), + ok. + write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) -> {IndexList, HashTreeBin}; @@ -1217,7 +1241,8 @@ write_top_index_table(Handle, BasePos, List) -> CompleteList), {ok, _} = file:position(Handle, 0), ok = file:write(Handle, IndexBin), - ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need). + ok = file:advise(Handle, 0, ?DWORD_SIZE * 256, will_need), + ok. %% To make this compatible with original Bernstein format this endian flip %% and also the use of the standard hash function required. @@ -1622,9 +1647,11 @@ find_lastkey_test() -> ok = cdb_put(P1, "Key2", "Value2"), ?assertMatch("Key2", cdb_lastkey(P1)), ?assertMatch("Key1", cdb_firstkey(P1)), + probably = cdb_keycheck(P1, "Key2"), ok = cdb_close(P1), {ok, P2} = cdb_open_writer("../test/lastkey.pnd"), ?assertMatch("Key2", cdb_lastkey(P2)), + probably = cdb_keycheck(P2, "Key2"), {ok, F2} = cdb_complete(P2), {ok, P3} = cdb_open_reader(F2), ?assertMatch("Key2", cdb_lastkey(P3)), diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 33e5e9b..8b58c4b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -13,6 +13,7 @@ terminate/2, clerk_new/1, clerk_compact/6, + clerk_hashtablecalc/3, clerk_stop/1, code_change/3]). @@ -56,6 +57,10 @@ clerk_compact(Pid, Checker, InitiateFun, FilterFun, Inker, Timeout) -> Inker, Timeout}). +clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> + {ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []), + gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}). + clerk_stop(Pid) -> gen_server:cast(Pid, stop). @@ -129,6 +134,10 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, ok = leveled_inker:ink_compactioncomplete(Inker), {noreply, State} end; +handle_cast({hashtable_calc, HashTree, StartPos, CDBpid}, State) -> + {IndexList, HashTreeBin} = leveled_cdb:hashtable_calc(HashTree, StartPos), + ok = leveled_cdb:cdb_returnhashtable(CDBpid, IndexList, HashTreeBin), + {stop, normal, State}; handle_cast(stop, State) -> {stop, normal, State}.