diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index b3672f9..40a4aaf 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -6,18 +6,15 @@ %% The primary differences are: %% - Support for incrementally writing a CDB file while keeping the hash table %% in memory -%% - Support for merging of multiple CDB files with a key-checking function to -%% allow for compaction -%% - Automatic adding of a helper object that will keep a small proportion of -%% keys to be used when checking to see if the cdb file is a candidate for -%% compaction %% - The ability to scan a database and accumulate all the Key, Values to %% rebuild in-memory tables on startup +%% - The ability to scan a database in blocks of sequence numbers %% %% This is to be used in eleveledb, and in this context: -%% - Keys will be a Sequence Number -%% - Values will be a Checksum | Object | KeyAdditions -%% Where the KeyAdditions are all the Key changes required to be added to the +%% - Keys will be a combinatio of the PrimaryKey and the Sequence Number +%% - Values will be a serialised version on the whole object, and the +%% IndexChanges associated with the transaction +%% Where the IndexChanges are all the Key changes required to be added to the %% ledger to complete the changes (the addition of postings and tombstones). %% %% This module provides functions to create and query a CDB (constant database). @@ -60,8 +57,8 @@ cdb_open_reader/1, cdb_get/2, cdb_put/3, - cdb_getpositions/1, - cdb_getkey/2, + cdb_getpositions/2, + cdb_directfetch/3, cdb_lastkey/1, cdb_filename/1, cdb_keycheck/2, @@ -120,11 +117,15 @@ cdb_get(Pid, Key) -> cdb_put(Pid, Key, Value) -> gen_server:call(Pid, {put_kv, Key, Value}, infinity). -cdb_getpositions(Pid) -> - gen_server:call(Pid, get_positions, infinity). +%% SampleSize can be an integer or the atom all +cdb_getpositions(Pid, SampleSize) -> + gen_server:call(Pid, {get_positions, SampleSize}, infinity). -cdb_getkey(Pid, Position) -> - gen_server:call(Pid, {get_key, Position}, infinity). +%% Info can be key_only, key_size (size being the size of the value) or +%% key_value_check (with the check part indicating if the CRC is correct for +%% the value) +cdb_directfetch(Pid, PositionList, Info) -> + gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity). cdb_close(Pid) -> gen_server:call(Pid, cdb_close, infinity). @@ -255,13 +256,45 @@ handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; -handle_call(get_positions, _From, State) -> - {reply, scan_index(State#state.handle, - State#state.hash_index, - {fun scan_index_returnpositions/4, []}), - State}; -handle_call({get_key, Position}, _From, State) -> - {reply, extract_key(State#state.handle, Position), State}; +handle_call({get_positions, SampleSize}, _From, State) -> + case SampleSize of + all -> + {reply, scan_index(State#state.handle, + State#state.hash_index, + {fun scan_index_returnpositions/4, []}), + State}; + _ -> + SeededL = lists:map(fun(X) -> {random:uniform(), X} end, + State#state.hash_index), + SortedL = lists:keysort(1, SeededL), + RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL), + {reply, + scan_index_forsample(State#state.handle, + RandomisedHashIndex, + fun scan_index_returnpositions/4, + [], + SampleSize), + State} + end; +handle_call({direct_fetch, PositionList, Info}, _From, State) -> + H = State#state.handle, + case Info of + key_only -> + KeyList = lists:map(fun(P) -> + extract_key(H, P) end, + PositionList), + {reply, KeyList, State}; + key_size -> + KeySizeList = lists:map(fun(P) -> + extract_key_size(H, P) end, + PositionList), + {reply, KeySizeList, State}; + key_value_check -> + KVCList = lists:map(fun(P) -> + extract_key_value_check(H, P) end, + PositionList), + {reply, KVCList, State} + end; handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> {ok, StartPos0} = case StartPos of undefined -> @@ -383,7 +416,7 @@ dump(FileName, CRCCheck) -> {ok, _} = file:position(Handle, CurrLoc), [Return | Acc] end, - lists:foldr(Fn1,[],lists:seq(0,NumberOfPairs-1)). + lists:foldr(Fn1, [], lists:seq(0, NumberOfPairs-1)). %% Open an active file - one for which it is assumed the hash tables have not %% yet been written @@ -611,7 +644,9 @@ load_index(Handle) -> %% Function to find the LastKey in the file find_lastkey(Handle, IndexCache) -> - LastPosition = scan_index(Handle, IndexCache, {fun scan_index_findlast/4, 0}), + LastPosition = scan_index(Handle, + IndexCache, + {fun scan_index_findlast/4, 0}), {ok, _} = file:position(Handle, LastPosition), {KeyLength, _ValueLength} = read_next_2_integers(Handle), read_next_term(Handle, KeyLength). @@ -623,6 +658,22 @@ scan_index(Handle, IndexCache, {ScanFun, InitAcc}) -> InitAcc, IndexCache). +scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) -> + lists:sublist(Acc, SampleSize); +scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) -> + case length(Acc) of + L when L >= SampleSize -> + lists:sublist(Acc, SampleSize); + _ -> + {_X, {Pos, Count}} = CacheEntry, + scan_index_forsample(Handle, + Tail, + ScanFun, + ScanFun(Handle, Pos, Count, Acc), + SampleSize) + end. + + scan_index_findlast(Handle, Position, Count, LastPosition) -> {ok, _} = file:position(Handle, Position), lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end, @@ -708,6 +759,18 @@ extract_key(Handle, Position) -> {KeyLength, _ValueLength} = read_next_2_integers(Handle), read_next_term(Handle, KeyLength). +extract_key_size(Handle, Position) -> + {ok, _} = file:position(Handle, Position), + {KeyLength, ValueLength} = read_next_2_integers(Handle), + {read_next_term(Handle, KeyLength), ValueLength}. + +extract_key_value_check(Handle, Position) -> + {ok, _} = file:position(Handle, Position), + {KeyLength, ValueLength} = read_next_2_integers(Handle), + K = read_next_term(Handle, KeyLength), + {Check, V} = read_next_term(Handle, ValueLength, crc, true), + {K, V, Check}. + %% Scan through the file until there is a failure to crc check an input, and %% at that point return the position and the key dictionary scanned so far startup_scan_over_file(Handle, Position) -> @@ -977,7 +1040,11 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) -> write_hash_tables(Handle, HashTree) -> Seq = lists:seq(0, 255), {ok, StartPos} = file:position(Handle, cur), - {IndexList, HashTreeBin} = write_hash_tables(Seq, HashTree, StartPos, [], <<>>), + {IndexList, HashTreeBin} = write_hash_tables(Seq, + HashTree, + StartPos, + [], + <<>>), ok = file:write(Handle, HashTreeBin), {ok, EndPos} = file:position(Handle, cur), ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need), @@ -1003,7 +1070,8 @@ write_hash_tables([Index|Rest], HashTree, CurrPos, IndexList, HashTreeBin) -> end, NewSlotList = lists:foldl(Fn, SlotList, BinList), - NewSlotBin = lists:foldl(fun(X, Acc) -> <> end, + NewSlotBin = lists:foldl(fun(X, Acc) -> + <> end, HashTreeBin, NewSlotList), write_hash_tables(Rest, @@ -1027,9 +1095,11 @@ build_binaryhashlist([{Hash, [Position|TailP]}|TailKV], BinList) -> NewBin = <>, case TailP of [] -> - build_binaryhashlist(TailKV, [{Hash, NewBin}|BinList]); + build_binaryhashlist(TailKV, + [{Hash, NewBin}|BinList]); _ -> - build_binaryhashlist([{Hash, TailP}|TailKV], [{Hash, NewBin}|BinList]) + build_binaryhashlist([{Hash, TailP}|TailKV], + [{Hash, NewBin}|BinList]) end. %% Slot is zero based because it comes from a REM @@ -1502,15 +1572,29 @@ get_keys_byposition_simple_test() -> KeyList = ["Key1", "Key2", "Key3"], {ok, F2} = cdb_complete(P1), {ok, P2} = cdb_open_reader(F2), - PositionList = cdb_getpositions(P2), + PositionList = cdb_getpositions(P2, all), io:format("Position list of ~w~n", [PositionList]), - L1 = length(PositionList), - ?assertMatch(L1, 3), - lists:foreach(fun(Pos) -> - Key = cdb_getkey(P2, Pos), + ?assertMatch(3, length(PositionList)), + R1 = cdb_directfetch(P2, PositionList, key_only), + ?assertMatch(3, length(R1)), + lists:foreach(fun(Key) -> Check = lists:member(Key, KeyList), ?assertMatch(Check, true) end, - PositionList), + R1), + R2 = cdb_directfetch(P2, PositionList, key_size), + ?assertMatch(3, length(R2)), + lists:foreach(fun({Key, _Size}) -> + Check = lists:member(Key, KeyList), + ?assertMatch(Check, true) end, + R2), + R3 = cdb_directfetch(P2, PositionList, key_value_check), + ?assertMatch(3, length(R3)), + lists:foreach(fun({Key, Value, Check}) -> + ?assertMatch(Check, true), + {K, V} = cdb_get(P2, Key), + ?assertMatch(K, Key), + ?assertMatch(V, Value) end, + R3), ok = cdb_close(P2), ok = file:delete(F2). @@ -1534,11 +1618,19 @@ get_keys_byposition_manykeys_test() -> SW3 = os:timestamp(), io:format("CDB opened for read in ~w microseconds~n", [timer:now_diff(SW3, SW2)]), - PositionList = cdb_getpositions(P2), + PositionList = cdb_getpositions(P2, all), io:format("Positions fetched in ~w microseconds~n", [timer:now_diff(os:timestamp(), SW3)]), L1 = length(PositionList), ?assertMatch(L1, KeyCount), + + SampleList1 = cdb_getpositions(P2, 10), + ?assertMatch(10, length(SampleList1)), + SampleList2 = cdb_getpositions(P2, KeyCount), + ?assertMatch(KeyCount, length(SampleList2)), + SampleList3 = cdb_getpositions(P2, KeyCount + 1), + ?assertMatch(KeyCount, length(SampleList3)), + ok = cdb_close(P2), ok = file:delete(F2).