Support for random sampling

Makes the ability to get positions and the fetch directly by position
more generic - supporting the fetch of different flavours of
combinations, and requesting a sample of positions not just all
This commit is contained in:
martinsumner 2016-09-20 18:24:05 +01:00
parent aa7d235c4d
commit 66d6db4e11

View file

@ -6,18 +6,15 @@
%% The primary differences are:
%% - Support for incrementally writing a CDB file while keeping the hash table
%% in memory
%% - Support for merging of multiple CDB files with a key-checking function to
%% allow for compaction
%% - Automatic adding of a helper object that will keep a small proportion of
%% keys to be used when checking to see if the cdb file is a candidate for
%% compaction
%% - The ability to scan a database and accumulate all the Key, Values to
%% rebuild in-memory tables on startup
%% - The ability to scan a database in blocks of sequence numbers
%%
%% This is to be used in eleveledb, and in this context:
%% - Keys will be a Sequence Number
%% - Values will be a Checksum | Object | KeyAdditions
%% Where the KeyAdditions are all the Key changes required to be added to the
%% - Keys will be a combinatio of the PrimaryKey and the Sequence Number
%% - Values will be a serialised version on the whole object, and the
%% IndexChanges associated with the transaction
%% Where the IndexChanges are all the Key changes required to be added to the
%% ledger to complete the changes (the addition of postings and tombstones).
%%
%% This module provides functions to create and query a CDB (constant database).
@ -60,8 +57,8 @@
cdb_open_reader/1,
cdb_get/2,
cdb_put/3,
cdb_getpositions/1,
cdb_getkey/2,
cdb_getpositions/2,
cdb_directfetch/3,
cdb_lastkey/1,
cdb_filename/1,
cdb_keycheck/2,
@ -120,11 +117,15 @@ cdb_get(Pid, Key) ->
cdb_put(Pid, Key, Value) ->
gen_server:call(Pid, {put_kv, Key, Value}, infinity).
cdb_getpositions(Pid) ->
gen_server:call(Pid, get_positions, infinity).
%% SampleSize can be an integer or the atom all
cdb_getpositions(Pid, SampleSize) ->
gen_server:call(Pid, {get_positions, SampleSize}, infinity).
cdb_getkey(Pid, Position) ->
gen_server:call(Pid, {get_key, Position}, infinity).
%% Info can be key_only, key_size (size being the size of the value) or
%% key_value_check (with the check part indicating if the CRC is correct for
%% the value)
cdb_directfetch(Pid, PositionList, Info) ->
gen_server:call(Pid, {direct_fetch, PositionList, Info}, infinity).
cdb_close(Pid) ->
gen_server:call(Pid, cdb_close, infinity).
@ -255,13 +256,45 @@ handle_call(cdb_lastkey, _From, State) ->
{reply, State#state.last_key, State};
handle_call(cdb_filename, _From, State) ->
{reply, State#state.filename, State};
handle_call(get_positions, _From, State) ->
handle_call({get_positions, SampleSize}, _From, State) ->
case SampleSize of
all ->
{reply, scan_index(State#state.handle,
State#state.hash_index,
{fun scan_index_returnpositions/4, []}),
State};
handle_call({get_key, Position}, _From, State) ->
{reply, extract_key(State#state.handle, Position), State};
_ ->
SeededL = lists:map(fun(X) -> {random:uniform(), X} end,
State#state.hash_index),
SortedL = lists:keysort(1, SeededL),
RandomisedHashIndex = lists:map(fun({_R, X}) -> X end, SortedL),
{reply,
scan_index_forsample(State#state.handle,
RandomisedHashIndex,
fun scan_index_returnpositions/4,
[],
SampleSize),
State}
end;
handle_call({direct_fetch, PositionList, Info}, _From, State) ->
H = State#state.handle,
case Info of
key_only ->
KeyList = lists:map(fun(P) ->
extract_key(H, P) end,
PositionList),
{reply, KeyList, State};
key_size ->
KeySizeList = lists:map(fun(P) ->
extract_key_size(H, P) end,
PositionList),
{reply, KeySizeList, State};
key_value_check ->
KVCList = lists:map(fun(P) ->
extract_key_value_check(H, P) end,
PositionList),
{reply, KVCList, State}
end;
handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) ->
{ok, StartPos0} = case StartPos of
undefined ->
@ -611,7 +644,9 @@ load_index(Handle) ->
%% Function to find the LastKey in the file
find_lastkey(Handle, IndexCache) ->
LastPosition = scan_index(Handle, IndexCache, {fun scan_index_findlast/4, 0}),
LastPosition = scan_index(Handle,
IndexCache,
{fun scan_index_findlast/4, 0}),
{ok, _} = file:position(Handle, LastPosition),
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength).
@ -623,6 +658,22 @@ scan_index(Handle, IndexCache, {ScanFun, InitAcc}) ->
InitAcc,
IndexCache).
scan_index_forsample(_Handle, [], _ScanFun, Acc, SampleSize) ->
lists:sublist(Acc, SampleSize);
scan_index_forsample(Handle, [CacheEntry|Tail], ScanFun, Acc, SampleSize) ->
case length(Acc) of
L when L >= SampleSize ->
lists:sublist(Acc, SampleSize);
_ ->
{_X, {Pos, Count}} = CacheEntry,
scan_index_forsample(Handle,
Tail,
ScanFun,
ScanFun(Handle, Pos, Count, Acc),
SampleSize)
end.
scan_index_findlast(Handle, Position, Count, LastPosition) ->
{ok, _} = file:position(Handle, Position),
lists:foldl(fun({_Hash, HPos}, MaxPos) -> max(HPos, MaxPos) end,
@ -708,6 +759,18 @@ extract_key(Handle, Position) ->
{KeyLength, _ValueLength} = read_next_2_integers(Handle),
read_next_term(Handle, KeyLength).
extract_key_size(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle),
{read_next_term(Handle, KeyLength), ValueLength}.
extract_key_value_check(Handle, Position) ->
{ok, _} = file:position(Handle, Position),
{KeyLength, ValueLength} = read_next_2_integers(Handle),
K = read_next_term(Handle, KeyLength),
{Check, V} = read_next_term(Handle, ValueLength, crc, true),
{K, V, Check}.
%% Scan through the file until there is a failure to crc check an input, and
%% at that point return the position and the key dictionary scanned so far
startup_scan_over_file(Handle, Position) ->
@ -977,7 +1040,11 @@ write_key_value_pairs(Handle, [HeadPair|TailList], Acc) ->
write_hash_tables(Handle, HashTree) ->
Seq = lists:seq(0, 255),
{ok, StartPos} = file:position(Handle, cur),
{IndexList, HashTreeBin} = write_hash_tables(Seq, HashTree, StartPos, [], <<>>),
{IndexList, HashTreeBin} = write_hash_tables(Seq,
HashTree,
StartPos,
[],
<<>>),
ok = file:write(Handle, HashTreeBin),
{ok, EndPos} = file:position(Handle, cur),
ok = file:advise(Handle, StartPos, EndPos - StartPos, will_need),
@ -1003,7 +1070,8 @@ write_hash_tables([Index|Rest], HashTree, CurrPos, IndexList, HashTreeBin) ->
end,
NewSlotList = lists:foldl(Fn, SlotList, BinList),
NewSlotBin = lists:foldl(fun(X, Acc) -> <<Acc/binary, X/binary>> end,
NewSlotBin = lists:foldl(fun(X, Acc) ->
<<Acc/binary, X/binary>> end,
HashTreeBin,
NewSlotList),
write_hash_tables(Rest,
@ -1027,9 +1095,11 @@ build_binaryhashlist([{Hash, [Position|TailP]}|TailKV], BinList) ->
NewBin = <<HashLE:32, PosLE:32>>,
case TailP of
[] ->
build_binaryhashlist(TailKV, [{Hash, NewBin}|BinList]);
build_binaryhashlist(TailKV,
[{Hash, NewBin}|BinList]);
_ ->
build_binaryhashlist([{Hash, TailP}|TailKV], [{Hash, NewBin}|BinList])
build_binaryhashlist([{Hash, TailP}|TailKV],
[{Hash, NewBin}|BinList])
end.
%% Slot is zero based because it comes from a REM
@ -1502,15 +1572,29 @@ get_keys_byposition_simple_test() ->
KeyList = ["Key1", "Key2", "Key3"],
{ok, F2} = cdb_complete(P1),
{ok, P2} = cdb_open_reader(F2),
PositionList = cdb_getpositions(P2),
PositionList = cdb_getpositions(P2, all),
io:format("Position list of ~w~n", [PositionList]),
L1 = length(PositionList),
?assertMatch(L1, 3),
lists:foreach(fun(Pos) ->
Key = cdb_getkey(P2, Pos),
?assertMatch(3, length(PositionList)),
R1 = cdb_directfetch(P2, PositionList, key_only),
?assertMatch(3, length(R1)),
lists:foreach(fun(Key) ->
Check = lists:member(Key, KeyList),
?assertMatch(Check, true) end,
PositionList),
R1),
R2 = cdb_directfetch(P2, PositionList, key_size),
?assertMatch(3, length(R2)),
lists:foreach(fun({Key, _Size}) ->
Check = lists:member(Key, KeyList),
?assertMatch(Check, true) end,
R2),
R3 = cdb_directfetch(P2, PositionList, key_value_check),
?assertMatch(3, length(R3)),
lists:foreach(fun({Key, Value, Check}) ->
?assertMatch(Check, true),
{K, V} = cdb_get(P2, Key),
?assertMatch(K, Key),
?assertMatch(V, Value) end,
R3),
ok = cdb_close(P2),
ok = file:delete(F2).
@ -1534,11 +1618,19 @@ get_keys_byposition_manykeys_test() ->
SW3 = os:timestamp(),
io:format("CDB opened for read in ~w microseconds~n",
[timer:now_diff(SW3, SW2)]),
PositionList = cdb_getpositions(P2),
PositionList = cdb_getpositions(P2, all),
io:format("Positions fetched in ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW3)]),
L1 = length(PositionList),
?assertMatch(L1, KeyCount),
SampleList1 = cdb_getpositions(P2, 10),
?assertMatch(10, length(SampleList1)),
SampleList2 = cdb_getpositions(P2, KeyCount),
?assertMatch(KeyCount, length(SampleList2)),
SampleList3 = cdb_getpositions(P2, KeyCount + 1),
?assertMatch(KeyCount, length(SampleList3)),
ok = cdb_close(P2),
ok = file:delete(F2).