diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 3355153..b3672f9 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -60,6 +60,8 @@ cdb_open_reader/1, cdb_get/2, cdb_put/3, + cdb_getpositions/1, + cdb_getkey/2, cdb_lastkey/1, cdb_filename/1, cdb_keycheck/2, @@ -96,7 +98,7 @@ cdb_open_writer(Filename) -> cdb_open_writer(Filename, Opts) -> {ok, Pid} = gen_server:start(?MODULE, [Opts], []), - case gen_server:call(Pid, {cdb_open_writer, Filename}, infinity) of + case gen_server:call(Pid, {open_writer, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -105,7 +107,7 @@ cdb_open_writer(Filename, Opts) -> cdb_open_reader(Filename) -> {ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []), - case gen_server:call(Pid, {cdb_open_reader, Filename}, infinity) of + case gen_server:call(Pid, {open_reader, Filename}, infinity) of ok -> {ok, Pid}; Error -> @@ -113,10 +115,16 @@ cdb_open_reader(Filename) -> end. cdb_get(Pid, Key) -> - gen_server:call(Pid, {cdb_get, Key}, infinity). + gen_server:call(Pid, {get_kv, Key}, infinity). cdb_put(Pid, Key, Value) -> - gen_server:call(Pid, {cdb_put, Key, Value}, infinity). + gen_server:call(Pid, {put_kv, Key, Value}, infinity). + +cdb_getpositions(Pid) -> + gen_server:call(Pid, get_positions, infinity). + +cdb_getkey(Pid, Position) -> + gen_server:call(Pid, {get_key, Position}, infinity). cdb_close(Pid) -> gen_server:call(Pid, cdb_close, infinity). @@ -148,7 +156,7 @@ cdb_filename(Pid) -> %% Check to see if the key is probably present, will return either %% probably or missing. Does not do a definitive check cdb_keycheck(Pid, Key) -> - gen_server:call(Pid, {cdb_keycheck, Key}, infinity). + gen_server:call(Pid, {key_check, Key}, infinity). %%%============================================================================ %%% gen_server callbacks @@ -163,7 +171,7 @@ init([Opts]) -> end, {ok, #state{max_size=MaxSize}}. -handle_call({cdb_open_writer, Filename}, _From, State) -> +handle_call({open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), @@ -173,7 +181,7 @@ handle_call({cdb_open_writer, Filename}, _From, State) -> filename=Filename, hashtree=HashTree, writer=true}}; -handle_call({cdb_open_reader, Filename}, _From, State) -> +handle_call({open_reader, Filename}, _From, State) -> io:format("Opening file for reading with filename ~s~n", [Filename]), {ok, Handle} = file:open(Filename, [binary, raw, read]), Index = load_index(Handle), @@ -183,7 +191,7 @@ handle_call({cdb_open_reader, Filename}, _From, State) -> filename=Filename, writer=false, hash_index=Index}}; -handle_call({cdb_get, Key}, _From, State) -> +handle_call({get_kv, Key}, _From, State) -> case {State#state.writer, State#state.hash_index} of {true, _} -> {reply, @@ -198,7 +206,7 @@ handle_call({cdb_get, Key}, _From, State) -> get_withcache(State#state.handle, Key, Cache), State} end; -handle_call({cdb_keycheck, Key}, _From, State) -> +handle_call({key_check, Key}, _From, State) -> case {State#state.writer, State#state.hash_index} of {true, _} -> {reply, @@ -221,7 +229,7 @@ handle_call({cdb_keycheck, Key}, _From, State) -> Cache), State} end; -handle_call({cdb_put, Key, Value}, _From, State) -> +handle_call({put_kv, Key, Value}, _From, State) -> case State#state.writer of true -> Result = put(State#state.handle, @@ -247,6 +255,13 @@ handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; +handle_call(get_positions, _From, State) -> + {reply, scan_index(State#state.handle, + State#state.hash_index, + {fun scan_index_returnpositions/4, []}), + State}; +handle_call({get_key, Position}, _From, State) -> + {reply, extract_key(State#state.handle, Position), State}; handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> {ok, StartPos0} = case StartPos of undefined -> @@ -353,7 +368,6 @@ dump(FileName, CRCCheck) -> Fn1 = fun(_I,Acc) -> {KL,VL} = read_next_2_integers(Handle), Key = read_next_term(Handle, KL), - io:format("Key read of ~w~n", [Key]), case read_next_term(Handle, VL, crc, CRCCheck) of {false, _} -> {ok, CurrLoc} = file:position(Handle, cur), @@ -597,28 +611,33 @@ load_index(Handle) -> %% Function to find the LastKey in the file find_lastkey(Handle, IndexCache) -> - LastPosition = scan_index(Handle, IndexCache), + LastPosition = scan_index(Handle, IndexCache, {fun scan_index_findlast/4, 0}), {ok, _} = file:position(Handle, LastPosition), {KeyLength, _ValueLength} = read_next_2_integers(Handle), read_next_term(Handle, KeyLength). -scan_index(Handle, IndexCache) -> - lists:foldl(fun({_X, {Pos, Count}}, LastPosition) -> - scan_index(Handle, Pos, 0, Count, LastPosition) end, - 0, +scan_index(Handle, IndexCache, {ScanFun, InitAcc}) -> + lists:foldl(fun({_X, {Pos, Count}}, Acc) -> + ScanFun(Handle, Pos, Count, Acc) + end, + InitAcc, IndexCache). -scan_index(_Handle, _Position, Count, Checks, LastPosition) - when Count == Checks -> - LastPosition; -scan_index(Handle, Position, Count, Checks, LastPosition) -> - {ok, _} = file:position(Handle, Position + ?DWORD_SIZE * Count), - {_Hash, HPosition} = read_next_2_integers(Handle), - scan_index(Handle, - Position, - Count + 1 , - Checks, - max(LastPosition, HPosition)). +scan_index_findlast(Handle, Position, Count, LastPosition) -> + {ok, _} = file:position(Handle, Position), + lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, + LastPosition, + read_next_n_integerpairs(Handle, Count)). + +scan_index_returnpositions(Handle, Position, Count, PosList0) -> + {ok, _} = file:position(Handle, Position), + lists:foldl(fun({Hash, HPosition}, PosList) -> + case Hash of + 0 -> PosList; + _ -> PosList ++ [HPosition] + end end, + PosList0, + read_next_n_integerpairs(Handle, Count)). %% Take an active file and write the hash details necessary to close that @@ -628,13 +647,14 @@ scan_index(Handle, Position, Count, Checks, LastPosition) -> %% the hash tables close_file(Handle, HashTree, BasePos) -> {ok, BasePos} = file:position(Handle, BasePos), - SW = os:timestamp(), + SW1 = os:timestamp(), L2 = write_hash_tables(Handle, HashTree), + SW2 = os:timestamp(), io:format("Hash Table write took ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW)]), + [timer:now_diff(SW2, SW1)]), write_top_index_table(Handle, BasePos, L2), io:format("Top Index Table write took ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW)]), + [timer:now_diff(os:timestamp(),SW2)]), file:close(Handle). @@ -683,6 +703,11 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) -> extract_kvpair(Handle, Rest, Key, Check) end. +extract_key(Handle, Position) -> + {ok, _} = file:position(Handle, Position), + {KeyLength, _ValueLength} = read_next_2_integers(Handle), + read_next_term(Handle, KeyLength). + %% Scan through the file until there is a failure to crc check an input, and %% at that point return the position and the key dictionary scanned so far startup_scan_over_file(Handle, Position) -> @@ -876,6 +901,17 @@ read_next_2_integers(Handle) -> ReadError end. +read_next_n_integerpairs(Handle, NumberOfPairs) -> + {ok, Block} = file:read(Handle, ?DWORD_SIZE * NumberOfPairs), + read_integerpairs(Block, []). + +read_integerpairs(<<>>, Pairs) -> + Pairs; +read_integerpairs(<>, Pairs) -> + read_integerpairs(<>, + Pairs ++ [{endian_flip(Int1), + endian_flip(Int2)}]). + %% Seach the hash table for the matching hash and key. Be prepared for %% multiple keys to have the same hash value. %% @@ -941,17 +977,19 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) -> write_hash_tables(Handle, HashTree) -> Seq = lists:seq(0, 255), {ok, StartPos} = file:position(Handle, cur), - write_hash_tables(Seq, Handle, HashTree, StartPos, []). - -write_hash_tables([], Handle, _, StartPos, IndexList) -> + {IndexList, HashTreeBin} = write_hash_tables(Seq, HashTree, StartPos, [], <<>>), + ok = file:write(Handle, HashTreeBin), {ok, EndPos} = file:position(Handle, cur), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), - IndexList; -write_hash_tables([Index|Rest], Handle, HashTree, StartPos, IndexList) -> + IndexList. + +write_hash_tables([], _HashTree, _CurrPos, IndexList, HashTreeBin) -> + {IndexList, HashTreeBin}; +write_hash_tables([Index|Rest], HashTree, CurrPos, IndexList, HashTreeBin) -> Tree = array:get(Index, HashTree), case gb_trees:keys(Tree) of [] -> - write_hash_tables(Rest, Handle, HashTree, StartPos, IndexList); + write_hash_tables(Rest, HashTree, CurrPos, IndexList, HashTreeBin); _ -> HashList = gb_trees:to_list(Tree), BinList = build_binaryhashlist(HashList, []), @@ -965,10 +1003,14 @@ write_hash_tables([Index|Rest], Handle, HashTree, StartPos, IndexList) -> end, NewSlotList = lists:foldl(Fn, SlotList, BinList), - {ok, CurrPos} = file:position(Handle, cur), - file:write(Handle, NewSlotList), - write_hash_tables(Rest, Handle, HashTree, StartPos, - [{Index, CurrPos, IndexLength}|IndexList]) + NewSlotBin = lists:foldl(fun(X, Acc) -> <> end, + HashTreeBin, + NewSlotList), + write_hash_tables(Rest, + HashTree, + CurrPos + length(NewSlotList) * ?DWORD_SIZE, + [{Index, CurrPos, IndexLength}|IndexList], + NewSlotBin) end. %% The list created from the original HashTree may have duplicate positions @@ -1452,4 +1494,53 @@ find_lastkey_test() -> ok = cdb_close(P3), ok = file:delete("../test/lastkey.cdb"). +get_keys_byposition_simple_test() -> + {ok, P1} = cdb_open_writer("../test/poskey.pnd"), + ok = cdb_put(P1, "Key1", "Value1"), + ok = cdb_put(P1, "Key3", "Value3"), + ok = cdb_put(P1, "Key2", "Value2"), + KeyList = ["Key1", "Key2", "Key3"], + {ok, F2} = cdb_complete(P1), + {ok, P2} = cdb_open_reader(F2), + PositionList = cdb_getpositions(P2), + io:format("Position list of ~w~n", [PositionList]), + L1 = length(PositionList), + ?assertMatch(L1, 3), + lists:foreach(fun(Pos) -> + Key = cdb_getkey(P2, Pos), + Check = lists:member(Key, KeyList), + ?assertMatch(Check, true) end, + PositionList), + ok = cdb_close(P2), + ok = file:delete(F2). + +generate_sequentialkeys(0, KVList) -> + KVList; +generate_sequentialkeys(Count, KVList) -> + KV = {"Key" ++ integer_to_list(Count), "Value" ++ integer_to_list(Count)}, + generate_sequentialkeys(Count - 1, KVList ++ [KV]). + +get_keys_byposition_manykeys_test() -> + KeyCount = 1024, + {ok, P1} = cdb_open_writer("../test/poskeymany.pnd"), + KVList = generate_sequentialkeys(KeyCount, []), + lists:foreach(fun({K, V}) -> cdb_put(P1, K, V) end, KVList), + SW1 = os:timestamp(), + {ok, F2} = cdb_complete(P1), + SW2 = os:timestamp(), + io:format("CDB completed in ~w microseconds~n", + [timer:now_diff(SW2, SW1)]), + {ok, P2} = cdb_open_reader(F2), + SW3 = os:timestamp(), + io:format("CDB opened for read in ~w microseconds~n", + [timer:now_diff(SW3, SW2)]), + PositionList = cdb_getpositions(P2), + io:format("Positions fetched in ~w microseconds~n", + [timer:now_diff(os:timestamp(), SW3)]), + L1 = length(PositionList), + ?assertMatch(L1, KeyCount), + ok = cdb_close(P2), + ok = file:delete(F2). + + -endif. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl new file mode 100644 index 0000000..3f95547 --- /dev/null +++ b/src/leveled_iclerk.erl @@ -0,0 +1,90 @@ + + +-module(leveled_iclerk). + +-behaviour(gen_server). + +-include("../include/leveled.hrl"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + clerk_new/1, + clerk_compact/3, + clerk_remove/2, + clerk_stop/1, + code_change/3]). + +-include_lib("eunit/include/eunit.hrl"). + +-define(KEYS_TO_CHECK, 100). + +-record(state, {owner :: pid()}). + +%%%============================================================================ +%%% API +%%%============================================================================ + +clerk_new(Owner) -> + {ok, Pid} = gen_server:start(?MODULE, [], []), + ok = gen_server:call(Pid, {register, Owner}, infinity), + {ok, Pid}. + +clerk_compact(Pid, InkerManifest, Penciller) -> + gen_server:cast(Pid, {compact, InkerManifest, Penciller}), + ok. + +clerk_remove(Pid, Removals) -> + gen_server:cast(Pid, {remove, Removals}), + ok. + +clerk_stop(Pid) -> + gen_server:cast(Pid, stop). + +%%%============================================================================ +%%% gen_server callbacks +%%%============================================================================ + +init([]) -> + {ok, #state{}}. + +handle_call({register, Owner}, _From, State) -> + {reply, ok, State#state{owner=Owner}}. + +handle_cast({compact, InkerManifest, Penciller, Timeout}, State) -> + ok = journal_compact(InkerManifest, Penciller, Timeout, State#state.owner), + {noreply, State}; +handle_cast({remove, _Removals}, State) -> + {noreply, State}; +handle_cast(stop, State) -> + {stop, normal, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +journal_compact(_InkerManifest, _Penciller, _Timeout, _Owner) -> + ok. + +check_all_files(_InkerManifest) -> + ok. + +window_closed(_Timeout) -> + true. + + +%%%============================================================================ +%%% Test +%%%============================================================================ diff --git a/src/leveled_clerk.erl b/src/leveled_pclerk.erl similarity index 99% rename from src/leveled_clerk.erl rename to src/leveled_pclerk.erl index 0197f7b..d3334da 100644 --- a/src/leveled_clerk.erl +++ b/src/leveled_pclerk.erl @@ -2,7 +2,7 @@ %% level and cleaning out of old files across a level --module(leveled_clerk). +-module(leveled_pclerk). -behaviour(gen_server). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a3feeef..8ec90c5 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -251,7 +251,7 @@ init([PCLopts]) -> M end, TID = ets:new(?MEMTABLE, [ordered_set]), - {ok, Clerk} = leveled_clerk:clerk_new(self()), + {ok, Clerk} = leveled_pclerk:clerk_new(self()), InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath, @@ -435,7 +435,7 @@ terminate(_Reason, State) -> %% The cast may not succeed as the clerk could be synchronously calling %% the penciller looking for a manifest commit %% - leveled_clerk:clerk_stop(State#state.clerk), + leveled_pclerk:clerk_stop(State#state.clerk), Dump = ets:tab2list(State#state.memtable), case {State#state.levelzero_pending, get_item(0, State#state.manifest, []), length(Dump)} of @@ -499,7 +499,7 @@ push_to_memory(DumpList, State) -> end, %% Prompt clerk to ask about work - do this for every push_mem - ok = leveled_clerk:clerk_prompt(UpdState#state.clerk, penciller), + ok = leveled_pclerk:clerk_prompt(UpdState#state.clerk, penciller), MemoryInsertion = do_push_to_mem(DumpList, TableSize,