Rename clerk and CDB Speed-Up

CDB did many "bitty" reads/writes when scanning or writing hash tables -
change these to bult reads and writes to speed up.

CDB also added capabilities to fetch positions and get keys by position
to help with iclerk role.
This commit is contained in:
martinsumner 2016-09-20 16:13:36 +01:00
parent c10eaa75cb
commit aa7d235c4d
4 changed files with 225 additions and 44 deletions

View file

@ -60,6 +60,8 @@
cdb_open_reader/1, cdb_open_reader/1,
cdb_get/2, cdb_get/2,
cdb_put/3, cdb_put/3,
cdb_getpositions/1,
cdb_getkey/2,
cdb_lastkey/1, cdb_lastkey/1,
cdb_filename/1, cdb_filename/1,
cdb_keycheck/2, cdb_keycheck/2,
@ -96,7 +98,7 @@ cdb_open_writer(Filename) ->
cdb_open_writer(Filename, Opts) -> cdb_open_writer(Filename, Opts) ->
{ok, Pid} = gen_server:start(?MODULE, [Opts], []), {ok, Pid} = gen_server:start(?MODULE, [Opts], []),
case gen_server:call(Pid, {cdb_open_writer, Filename}, infinity) of case gen_server:call(Pid, {open_writer, Filename}, infinity) of
ok -> ok ->
{ok, Pid}; {ok, Pid};
Error -> Error ->
@ -105,7 +107,7 @@ cdb_open_writer(Filename, Opts) ->
cdb_open_reader(Filename) -> cdb_open_reader(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []), {ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []),
case gen_server:call(Pid, {cdb_open_reader, Filename}, infinity) of case gen_server:call(Pid, {open_reader, Filename}, infinity) of
ok -> ok ->
{ok, Pid}; {ok, Pid};
Error -> Error ->
@ -113,10 +115,16 @@ cdb_open_reader(Filename) ->
end. end.
cdb_get(Pid, Key) -> cdb_get(Pid, Key) ->
gen_server:call(Pid, {cdb_get, Key}, infinity). gen_server:call(Pid, {get_kv, Key}, infinity).
cdb_put(Pid, Key, Value) -> cdb_put(Pid, Key, Value) ->
gen_server:call(Pid, {cdb_put, Key, Value}, infinity). gen_server:call(Pid, {put_kv, Key, Value}, infinity).
cdb_getpositions(Pid) ->
gen_server:call(Pid, get_positions, infinity).
cdb_getkey(Pid, Position) ->
gen_server:call(Pid, {get_key, Position}, infinity).
cdb_close(Pid) -> cdb_close(Pid) ->
gen_server:call(Pid, cdb_close, infinity). gen_server:call(Pid, cdb_close, infinity).
@ -148,7 +156,7 @@ cdb_filename(Pid) ->
%% Check to see if the key is probably present, will return either %% Check to see if the key is probably present, will return either
%% probably or missing. Does not do a definitive check %% probably or missing. Does not do a definitive check
cdb_keycheck(Pid, Key) -> cdb_keycheck(Pid, Key) ->
gen_server:call(Pid, {cdb_keycheck, Key}, infinity). gen_server:call(Pid, {key_check, Key}, infinity).
%%%============================================================================ %%%============================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -163,7 +171,7 @@ init([Opts]) ->
end, end,
{ok, #state{max_size=MaxSize}}. {ok, #state{max_size=MaxSize}}.
handle_call({cdb_open_writer, Filename}, _From, State) -> handle_call({open_writer, Filename}, _From, State) ->
io:format("Opening file for writing with filename ~s~n", [Filename]), io:format("Opening file for writing with filename ~s~n", [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename), {LastPosition, HashTree, LastKey} = open_active_file(Filename),
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
@ -173,7 +181,7 @@ handle_call({cdb_open_writer, Filename}, _From, State) ->
filename=Filename, filename=Filename,
hashtree=HashTree, hashtree=HashTree,
writer=true}}; writer=true}};
handle_call({cdb_open_reader, Filename}, _From, State) -> handle_call({open_reader, Filename}, _From, State) ->
io:format("Opening file for reading with filename ~s~n", [Filename]), io:format("Opening file for reading with filename ~s~n", [Filename]),
{ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Handle} = file:open(Filename, [binary, raw, read]),
Index = load_index(Handle), Index = load_index(Handle),
@ -183,7 +191,7 @@ handle_call({cdb_open_reader, Filename}, _From, State) ->
filename=Filename, filename=Filename,
writer=false, writer=false,
hash_index=Index}}; hash_index=Index}};
handle_call({cdb_get, Key}, _From, State) -> handle_call({get_kv, Key}, _From, State) ->
case {State#state.writer, State#state.hash_index} of case {State#state.writer, State#state.hash_index} of
{true, _} -> {true, _} ->
{reply, {reply,
@ -198,7 +206,7 @@ handle_call({cdb_get, Key}, _From, State) ->
get_withcache(State#state.handle, Key, Cache), get_withcache(State#state.handle, Key, Cache),
State} State}
end; end;
handle_call({cdb_keycheck, Key}, _From, State) -> handle_call({key_check, Key}, _From, State) ->
case {State#state.writer, State#state.hash_index} of case {State#state.writer, State#state.hash_index} of
{true, _} -> {true, _} ->
{reply, {reply,
@ -221,7 +229,7 @@ handle_call({cdb_keycheck, Key}, _From, State) ->
Cache), Cache),
State} State}
end; end;
handle_call({cdb_put, Key, Value}, _From, State) -> handle_call({put_kv, Key, Value}, _From, State) ->
case State#state.writer of case State#state.writer of
true -> true ->
Result = put(State#state.handle, Result = put(State#state.handle,
@ -247,6 +255,13 @@ handle_call(cdb_lastkey, _From, State) ->
{reply, State#state.last_key, State}; {reply, State#state.last_key, State};
handle_call(cdb_filename, _From, State) -> handle_call(cdb_filename, _From, State) ->
{reply, State#state.filename, State}; {reply, State#state.filename, State};
handle_call(get_positions, _From, State) ->
{reply, scan_index(State#state.handle,
State#state.hash_index,
{fun scan_index_returnpositions/4, []}),
State};
handle_call({get_key, Position}, _From, State) ->
{reply, extract_key(State#state.handle, Position), State};
handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
{ok, StartPos0} = case StartPos of {ok, StartPos0} = case StartPos of
undefined -> undefined ->
@ -353,7 +368,6 @@ dump(FileName, CRCCheck) ->
Fn1 = fun(_I,Acc) -> Fn1 = fun(_I,Acc) ->
{KL,VL} = read_next_2_integers(Handle), {KL,VL} = read_next_2_integers(Handle),
Key = read_next_term(Handle, KL), Key = read_next_term(Handle, KL),
io:format("Key read of ~w~n", [Key]),
case read_next_term(Handle, VL, crc, CRCCheck) of case read_next_term(Handle, VL, crc, CRCCheck) of
{false, _} -> {false, _} ->
{ok, CurrLoc} = file:position(Handle, cur), {ok, CurrLoc} = file:position(Handle, cur),
@ -597,28 +611,33 @@ load_index(Handle) ->
%% Function to find the LastKey in the file %% Function to find the LastKey in the file
find_lastkey(Handle, IndexCache) -> find_lastkey(Handle, IndexCache) ->
LastPosition = scan_index(Handle, IndexCache), LastPosition = scan_index(Handle, IndexCache, {fun scan_index_findlast/4, 0}),
{ok, _} = file:position(Handle, LastPosition), {ok, _} = file:position(Handle, LastPosition),
{KeyLength, _ValueLength} = read_next_2_integers(Handle), {KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength). read_next_term(Handle, KeyLength).
scan_index(Handle, IndexCache) -> scan_index(Handle, IndexCache, {ScanFun, InitAcc}) ->
lists:foldl(fun({_X, {Pos, Count}}, LastPosition) -> lists:foldl(fun({_X, {Pos, Count}}, Acc) ->
scan_index(Handle, Pos, 0, Count, LastPosition) end, ScanFun(Handle, Pos, Count, Acc)
0, end,
InitAcc,
IndexCache). IndexCache).
scan_index(_Handle, _Position, Count, Checks, LastPosition) scan_index_findlast(Handle, Position, Count, LastPosition) ->
when Count == Checks -> {ok, _} = file:position(Handle, Position),
LastPosition; lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end,
scan_index(Handle, Position, Count, Checks, LastPosition) -> LastPosition,
{ok, _} = file:position(Handle, Position + ?DWORD_SIZE * Count), read_next_n_integerpairs(Handle, Count)).
{_Hash, HPosition} = read_next_2_integers(Handle),
scan_index(Handle, scan_index_returnpositions(Handle, Position, Count, PosList0) ->
Position, {ok, _} = file:position(Handle, Position),
Count + 1 , lists:foldl(fun({Hash, HPosition}, PosList) ->
Checks, case Hash of
max(LastPosition, HPosition)). 0 -> PosList;
_ -> PosList ++ [HPosition]
end end,
PosList0,
read_next_n_integerpairs(Handle, Count)).
%% Take an active file and write the hash details necessary to close that %% Take an active file and write the hash details necessary to close that
@ -628,13 +647,14 @@ scan_index(Handle, Position, Count, Checks, LastPosition) ->
%% the hash tables %% the hash tables
close_file(Handle, HashTree, BasePos) -> close_file(Handle, HashTree, BasePos) ->
{ok, BasePos} = file:position(Handle, BasePos), {ok, BasePos} = file:position(Handle, BasePos),
SW = os:timestamp(), SW1 = os:timestamp(),
L2 = write_hash_tables(Handle, HashTree), L2 = write_hash_tables(Handle, HashTree),
SW2 = os:timestamp(),
io:format("Hash Table write took ~w microseconds~n", io:format("Hash Table write took ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW)]), [timer:now_diff(SW2, SW1)]),
write_top_index_table(Handle, BasePos, L2), write_top_index_table(Handle, BasePos, L2),
io:format("Top Index Table write took ~w microseconds~n", io:format("Top Index Table write took ~w microseconds~n",
[timer:now_diff(os:timestamp(),SW)]), [timer:now_diff(os:timestamp(),SW2)]),
file:close(Handle). file:close(Handle).
@ -683,6 +703,11 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) ->
extract_kvpair(Handle, Rest, Key, Check) extract_kvpair(Handle, Rest, Key, Check)
end. end.
extract_key(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength).
%% Scan through the file until there is a failure to crc check an input, and %% Scan through the file until there is a failure to crc check an input, and
%% at that point return the position and the key dictionary scanned so far %% at that point return the position and the key dictionary scanned so far
startup_scan_over_file(Handle, Position) -> startup_scan_over_file(Handle, Position) ->
@ -876,6 +901,17 @@ read_next_2_integers(Handle) ->
ReadError ReadError
end. end.
read_next_n_integerpairs(Handle, NumberOfPairs) ->
{ok, Block} = file:read(Handle, ?DWORD_SIZE * NumberOfPairs),
read_integerpairs(Block, []).
read_integerpairs(<<>>, Pairs) ->
Pairs;
read_integerpairs(<<Int1:32, Int2:32, Rest/binary>>, Pairs) ->
read_integerpairs(<<Rest/binary>>,
Pairs ++ [{endian_flip(Int1),
endian_flip(Int2)}]).
%% Seach the hash table for the matching hash and key. Be prepared for %% Seach the hash table for the matching hash and key. Be prepared for
%% multiple keys to have the same hash value. %% multiple keys to have the same hash value.
%% %%
@ -941,17 +977,19 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) ->
write_hash_tables(Handle, HashTree) -> write_hash_tables(Handle, HashTree) ->
Seq = lists:seq(0, 255), Seq = lists:seq(0, 255),
{ok, StartPos} = file:position(Handle, cur), {ok, StartPos} = file:position(Handle, cur),
write_hash_tables(Seq, Handle, HashTree, StartPos, []). {IndexList, HashTreeBin} = write_hash_tables(Seq, HashTree, StartPos, [], <<>>),
ok = file:write(Handle, HashTreeBin),
write_hash_tables([], Handle, _, StartPos, IndexList) ->
{ok, EndPos} = file:position(Handle, cur), {ok, EndPos} = file:position(Handle, cur),
ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need),
IndexList; IndexList.
write_hash_tables([Index|Rest], Handle, HashTree, StartPos, IndexList) ->
write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) ->
{IndexList, HashTreeBin};
write_hash_tables([Index|Rest], HashTree, CurrPos, IndexList, HashTreeBin) ->
Tree = array:get(Index, HashTree), Tree = array:get(Index, HashTree),
case gb_trees:keys(Tree) of case gb_trees:keys(Tree) of
[] -> [] ->
write_hash_tables(Rest, Handle, HashTree, StartPos, IndexList); write_hash_tables(Rest, HashTree, CurrPos, IndexList, HashTreeBin);
_ -> _ ->
HashList = gb_trees:to_list(Tree), HashList = gb_trees:to_list(Tree),
BinList = build_binaryhashlist(HashList, []), BinList = build_binaryhashlist(HashList, []),
@ -965,10 +1003,14 @@ write_hash_tables([Index|Rest], Handle, HashTree, StartPos, IndexList) ->
end, end,
NewSlotList = lists:foldl(Fn, SlotList, BinList), NewSlotList = lists:foldl(Fn, SlotList, BinList),
{ok, CurrPos} = file:position(Handle, cur), NewSlotBin = lists:foldl(fun(X, Acc) -> <<Acc/binary, X/binary>> end,
file:write(Handle, NewSlotList), HashTreeBin,
write_hash_tables(Rest, Handle, HashTree, StartPos, NewSlotList),
[{Index, CurrPos, IndexLength}|IndexList]) write_hash_tables(Rest,
HashTree,
CurrPos + length(NewSlotList) * ?DWORD_SIZE,
[{Index, CurrPos, IndexLength}|IndexList],
NewSlotBin)
end. end.
%% The list created from the original HashTree may have duplicate positions %% The list created from the original HashTree may have duplicate positions
@ -1452,4 +1494,53 @@ find_lastkey_test() ->
ok = cdb_close(P3), ok = cdb_close(P3),
ok = file:delete("../test/lastkey.cdb"). ok = file:delete("../test/lastkey.cdb").
get_keys_byposition_simple_test() ->
{ok, P1} = cdb_open_writer("../test/poskey.pnd"),
ok = cdb_put(P1, "Key1", "Value1"),
ok = cdb_put(P1, "Key3", "Value3"),
ok = cdb_put(P1, "Key2", "Value2"),
KeyList = ["Key1", "Key2", "Key3"],
{ok, F2} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(F2),
PositionList = cdb_getpositions(P2),
io:format("Position list of ~w~n", [PositionList]),
L1 = length(PositionList),
?assertMatch(L1, 3),
lists:foreach(fun(Pos) ->
Key = cdb_getkey(P2, Pos),
Check = lists:member(Key, KeyList),
?assertMatch(Check, true) end,
PositionList),
ok = cdb_close(P2),
ok = file:delete(F2).
generate_sequentialkeys(0, KVList) ->
KVList;
generate_sequentialkeys(Count, KVList) ->
KV = {"Key" ++ integer_to_list(Count), "Value" ++ integer_to_list(Count)},
generate_sequentialkeys(Count - 1, KVList ++ [KV]).
get_keys_byposition_manykeys_test() ->
KeyCount = 1024,
{ok, P1} = cdb_open_writer("../test/poskeymany.pnd"),
KVList = generate_sequentialkeys(KeyCount, []),
lists:foreach(fun({K, V}) -> cdb_put(P1, K, V) end, KVList),
SW1 = os:timestamp(),
{ok, F2} = cdb_complete(P1),
SW2 = os:timestamp(),
io:format("CDB completed in ~w microseconds~n",
[timer:now_diff(SW2, SW1)]),
{ok, P2} = cdb_open_reader(F2),
SW3 = os:timestamp(),
io:format("CDB opened for read in ~w microseconds~n",
[timer:now_diff(SW3, SW2)]),
PositionList = cdb_getpositions(P2),
io:format("Positions fetched in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW3)]),
L1 = length(PositionList),
?assertMatch(L1, KeyCount),
ok = cdb_close(P2),
ok = file:delete(F2).
-endif. -endif.

90
src/leveled_iclerk.erl Normal file
View file

@ -0,0 +1,90 @@
-module(leveled_iclerk).
-behaviour(gen_server).
-include("../include/leveled.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
clerk_new/1,
clerk_compact/3,
clerk_remove/2,
clerk_stop/1,
code_change/3]).
-include_lib("eunit/include/eunit.hrl").
-define(KEYS_TO_CHECK, 100).
-record(state, {owner :: pid()}).
%%%============================================================================
%%% API
%%%============================================================================
clerk_new(Owner) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
ok = gen_server:call(Pid, {register, Owner}, infinity),
{ok, Pid}.
clerk_compact(Pid, InkerManifest, Penciller) ->
gen_server:cast(Pid, {compact, InkerManifest, Penciller}),
ok.
clerk_remove(Pid, Removals) ->
gen_server:cast(Pid, {remove, Removals}),
ok.
clerk_stop(Pid) ->
gen_server:cast(Pid, stop).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([]) ->
{ok, #state{}}.
handle_call({register, Owner}, _From, State) ->
{reply, ok, State#state{owner=Owner}}.
handle_cast({compact, InkerManifest, Penciller, Timeout}, State) ->
ok = journal_compact(InkerManifest, Penciller, Timeout, State#state.owner),
{noreply, State};
handle_cast({remove, _Removals}, State) ->
{noreply, State};
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% Internal functions
%%%============================================================================
journal_compact(_InkerManifest, _Penciller, _Timeout, _Owner) ->
ok.
check_all_files(_InkerManifest) ->
ok.
window_closed(_Timeout) ->
true.
%%%============================================================================
%%% Test
%%%============================================================================

View file

@ -2,7 +2,7 @@
%% level and cleaning out of old files across a level %% level and cleaning out of old files across a level
-module(leveled_clerk). -module(leveled_pclerk).
-behaviour(gen_server). -behaviour(gen_server).

View file

@ -251,7 +251,7 @@ init([PCLopts]) ->
M M
end, end,
TID = ets:new(?MEMTABLE, [ordered_set]), TID = ets:new(?MEMTABLE, [ordered_set]),
{ok, Clerk} = leveled_clerk:clerk_new(self()), {ok, Clerk} = leveled_pclerk:clerk_new(self()),
InitState = #state{memtable=TID, InitState = #state{memtable=TID,
clerk=Clerk, clerk=Clerk,
root_path=RootPath, root_path=RootPath,
@ -435,7 +435,7 @@ terminate(_Reason, State) ->
%% The cast may not succeed as the clerk could be synchronously calling %% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit %% the penciller looking for a manifest commit
%% %%
leveled_clerk:clerk_stop(State#state.clerk), leveled_pclerk:clerk_stop(State#state.clerk),
Dump = ets:tab2list(State#state.memtable), Dump = ets:tab2list(State#state.memtable),
case {State#state.levelzero_pending, case {State#state.levelzero_pending,
get_item(0, State#state.manifest, []), length(Dump)} of get_item(0, State#state.manifest, []), length(Dump)} of
@ -499,7 +499,7 @@ push_to_memory(DumpList, State) ->
end, end,
%% Prompt clerk to ask about work - do this for every push_mem %% Prompt clerk to ask about work - do this for every push_mem
ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller), ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller),
MemoryInsertion = do_push_to_mem(DumpList, MemoryInsertion = do_push_to_mem(DumpList,
TableSize, TableSize,