Reformat of CDB

CDB was failing tests (was it always this way?).  There has been a
little bit of a patch-up of the test, but there are still some
potentially outstanding issues with scanning over a file when attempting
to read beyond the end of the file.

Tabbing reformatting and general tidy.

Concierge documentation development ongoing.
This commit is contained in:
martinsumner 2016-07-29 17:19:30 +01:00
parent c1f6a042d9
commit 28f612426a
2 changed files with 814 additions and 633 deletions

View file

@ -46,7 +46,17 @@
-module(leveled_cdb).
-export([from_dict/2,
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3,
cdb_open_writer/1,
cdb_open_reader/1,
from_dict/2,
create/2,
dump/1,
get/2,
@ -66,7 +76,88 @@
-define(MAX_FILE_SIZE, 3221225472).
-define(BASE_POSITION, 2048).
%%
-record(state, {hashtree,
last_position :: integer(),
smallest_sqn :: integer(),
highest_sqn :: integer(),
filename :: string(),
handle :: file:fd(),
writer :: boolean}).
%%%============================================================================
%%% API
%%%============================================================================
cdb_open_writer(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {cdb_open_writer, Filename}, infinity) of
ok ->
{ok, Pid};
Error ->
Error
end.
cdb_open_reader(Filename) ->
{ok, Pid} = gen_server:start(?MODULE, [], []),
case gen_server:call(Pid, {cdb_open_reader, Filename}, infinity) of
ok ->
{ok, Pid};
Error ->
Error
end.
%cdb_get(Pid, Key) ->
% gen_server:call(Pid, {cdb_get, Key}, infinity).
%
%cdb_put(Pid, Key, Value) ->
% gen_server:call(Pid, {cdb_put, Key, Value}, infinity).
%
%cdb_close(Pid) ->
% gen_server:call(Pid, cdb_close, infinity).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([]) ->
{ok, #state{}}.
handle_call({cdb_open_writer, Filename}, _From, State) ->
io:format("Opening file for writing with filename ~s~n", [Filename]),
{LastPosition, HashTree} = open_active_file(Filename),
{ok, Handle} = file:open(Filename, [binary, raw, read,
write, delayed_write]),
{reply, ok, State#state{handle=Handle,
last_position=LastPosition,
filename=Filename,
hashtree=HashTree,
writer=true}};
handle_call({cdb_open_reader, Filename}, _From, State) ->
io:format("Opening file for reading with filename ~s~n", [Filename]),
{ok, Handle} = file:open(Filename, [binary, raw, read]),
{reply, ok, State#state{handle=Handle,
filename=Filename,
writer=false}}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% Internal functions
%%%============================================================================
%% from_dict(FileName,ListOfKeyValueTuples)
%% Given a filename and a dictionary, create a cdb
%% using the key value pairs from the dict.
@ -102,7 +193,6 @@ dump(FileName, CRCCheck) ->
end,
NumberOfPairs = lists:foldl(Fn, 0, lists:seq(0,255)) bsr 1,
io:format("Count of keys in db is ~w~n", [NumberOfPairs]),
{ok, _} = file:position(Handle, {bof, 2048}),
Fn1 = fun(_I,Acc) ->
{KL,VL} = read_next_2_integers(Handle),
@ -114,7 +204,8 @@ dump(FileName, CRCCheck) ->
Return = {crc_wonky, get(Handle, Key)};
{_, Value} ->
{ok, CurrLoc} = file:position(Handle, cur),
Return = case get(Handle, Key) of
Return =
case get(Handle, Key) of
{Key,Value} -> {Key ,Value};
X -> {wonky, X}
end
@ -180,7 +271,6 @@ get(FileNameOrHandle, Key) ->
get(FileName, Key, CRCCheck) when is_list(FileName), is_list(Key) ->
{ok,Handle} = file:open(FileName,[binary, raw, read]),
get(Handle,Key, CRCCheck);
get(Handle, Key, CRCCheck) when is_tuple(Handle), is_list(Key) ->
Hash = hash(Key),
Index = hash_to_index(Hash),
@ -236,7 +326,7 @@ get_nextkey(Handle, {Position, FirstHashPosition}) ->
end;
eof ->
nomorekeys
end.
end.
%% Fold over all of the objects in the file, applying FoldFun to each object
@ -260,21 +350,35 @@ fold(Handle, FoldFun, Acc0, {Position, FirstHashPosition}, KeyOnly) ->
case read_next_2_integers(Handle) of
{KeyLength, ValueLength} ->
NextKey = read_next_term(Handle, KeyLength),
NextPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE,
NextPosition = Position
+ KeyLength + ValueLength +
?DWORD_SIZE,
case KeyOnly of
true ->
fold(Handle, FoldFun, FoldFun(NextKey, Acc0),
{NextPosition, FirstHashPosition}, KeyOnly);
fold(Handle,
FoldFun,
FoldFun(NextKey, Acc0),
{NextPosition, FirstHashPosition},
KeyOnly);
false ->
case read_next_term(Handle, ValueLength, crc, ?CRC_CHECK) of
case read_next_term(Handle,
ValueLength,
crc,
?CRC_CHECK) of
{false, _} ->
io:format("Skipping value for Key ~w as CRC check failed~n",
[NextKey]),
fold(Handle, FoldFun, Acc0,
{NextPosition, FirstHashPosition}, KeyOnly);
io:format("Skipping value for Key ~w as CRC
check failed~n", [NextKey]),
fold(Handle,
FoldFun,
Acc0,
{NextPosition, FirstHashPosition},
KeyOnly);
{_, Value} ->
fold(Handle, FoldFun, FoldFun(NextKey, Value, Acc0),
{NextPosition, FirstHashPosition}, KeyOnly)
fold(Handle,
FoldFun,
FoldFun(NextKey, Value, Acc0),
{NextPosition, FirstHashPosition},
KeyOnly)
end
end;
eof ->
@ -369,11 +473,14 @@ scan_over_file(Handle, Position, HashTree) ->
{Key, ValueAsBin, KeyLength, ValueLength} ->
case crccheck_value(ValueAsBin) of
true ->
NewPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE,
scan_over_file(Handle, NewPosition,
NewPosition = Position + KeyLength + ValueLength
+ ?DWORD_SIZE,
scan_over_file(Handle,
NewPosition,
put_hashtree(Key, Position, HashTree));
false ->
io:format("CRC check returned false on key of ~w ~n", [Key]),
io:format("CRC check returned false on key of ~w ~n",
[Key]),
{Position, HashTree}
end;
eof ->
@ -391,12 +498,16 @@ saferead_keyvalue(Handle) ->
eof ->
false;
{KeyL, ValueL} ->
case read_next_term(Handle, KeyL) of
io:format("KeyL ~w ValueL ~w~n", [KeyL, ValueL]),
case safe_read_next_term(Handle, KeyL) of
{error, einval} ->
false;
eof ->
false;
false ->
false;
Key ->
io:format("Found Key of ~s~n", [Key]),
case file:read(Handle, ValueL) of
{error, einval} ->
false;
@ -408,6 +519,16 @@ saferead_keyvalue(Handle) ->
end
end.
safe_read_next_term(Handle, Length) ->
try read_next_term(Handle, Length) of
Term ->
Term
catch
error:badarg ->
false
end.
%% The first four bytes of the value are the crc check
crccheck_value(Value) when byte_size(Value) >4 ->
<< Hash:32/integer, Tail/bitstring>> = Value,
@ -480,8 +601,7 @@ read_next_2_integers(Handle) ->
case file:read(Handle,?DWORD_SIZE) of
{ok, <<Int1:32,Int2:32>>} ->
{endian_flip(Int1), endian_flip(Int2)};
ReadError
->
ReadError ->
ReadError
end.
@ -557,8 +677,8 @@ write_hash_tables([Index|Rest], Handle, HashTree, StartPos, IndexList) ->
{L1, [<<0:32, 0:32>>|L2]} = lists:split(Slot1, AccSlotList),
lists:append(L1, [Binary|L2])
end,
NewSlotList = lists:foldl(Fn, SlotList, BinList),
NewSlotList = lists:foldl(Fn, SlotList, BinList),
{ok, CurrPos} = file:position(Handle, cur),
file:write(Handle, NewSlotList),
write_hash_tables(Rest, Handle, HashTree, StartPos,
@ -679,7 +799,7 @@ key_value_to_record({Key, Value}) ->
-ifdef(TEST).
write_key_value_pairs_1_test() ->
{ok,Handle} = file:open("test.cdb",write),
{ok,Handle} = file:open("../test/test.cdb",write),
{_, HashTree} = write_key_value_pairs(Handle,[{"key1","value1"},{"key2","value2"}]),
Hash1 = hash("key1"),
Index1 = hash_to_index(Hash1),
@ -691,18 +811,18 @@ write_key_value_pairs_1_test() ->
io:format("HashTree is ~w~n", [HashTree]),
io:format("Expected HashTree is ~w~n", [R2]),
?assertMatch(R2, HashTree),
ok = file:delete("test.cdb").
ok = file:delete("../test/test.cdb").
write_hash_tables_1_test() ->
{ok, Handle} = file:open("test.cdb",write),
{ok, Handle} = file:open("../test/testx.cdb",write),
R0 = array:new(256, {default, gb_trees:empty()}),
R1 = array:set(64, gb_trees:insert(6383014720, [18], array:get(64, R0)), R0),
R2 = array:set(67, gb_trees:insert(6383014723, [0], array:get(67, R1)), R1),
Result = write_hash_tables(Handle, R2),
io:format("write hash tables result of ~w ~n", [Result]),
?assertMatch(Result,[{67,16,2},{64,0,2}]),
ok = file:delete("test.cdb").
ok = file:delete("../test/testx.cdb").
find_open_slot_1_test() ->
List = [<<1:32,1:32>>,<<0:32,0:32>>,<<1:32,1:32>>,<<1:32,1:32>>],
@ -731,10 +851,10 @@ find_open_slot_5_test() ->
full_1_test() ->
List1 = lists:sort([{"key1","value1"},{"key2","value2"}]),
create("simple.cdb",lists:sort([{"key1","value1"},{"key2","value2"}])),
List2 = lists:sort(dump("simple.cdb")),
create("../test/simple.cdb",lists:sort([{"key1","value1"},{"key2","value2"}])),
List2 = lists:sort(dump("../test/simple.cdb")),
?assertMatch(List1,List2),
ok = file:delete("simple.cdb").
ok = file:delete("../test/simple.cdb").
full_2_test() ->
List1 = lists:sort([{lists:flatten(io_lib:format("~s~p",[Prefix,Plug])),
@ -742,34 +862,34 @@ full_2_test() ->
|| Plug <- lists:seq(1,2000),
Prefix <- ["dsd","so39ds","oe9%#*(","020dkslsldclsldowlslf%$#",
"tiep4||","qweq"]]),
create("full.cdb",List1),
List2 = lists:sort(dump("full.cdb")),
create("../test/full.cdb",List1),
List2 = lists:sort(dump("../test/full.cdb")),
?assertMatch(List1,List2),
ok = file:delete("full.cdb").
ok = file:delete("../test/full.cdb").
from_dict_test() ->
D = dict:new(),
D1 = dict:store("a","b",D),
D2 = dict:store("c","d",D1),
ok = from_dict("from_dict_test.cdb",D2),
ok = from_dict("../test/from_dict_test.cdb",D2),
io:format("Store created ~n", []),
KVP = lists:sort(dump("from_dict_test.cdb")),
KVP = lists:sort(dump("../test/from_dict_test.cdb")),
D3 = lists:sort(dict:to_list(D2)),
io:format("KVP is ~w~n", [KVP]),
io:format("D3 is ~w~n", [D3]),
?assertMatch(KVP, D3),
ok = file:delete("from_dict_test.cdb").
ok = file:delete("../test/from_dict_test.cdb").
to_dict_test() ->
D = dict:new(),
D1 = dict:store("a","b",D),
D2 = dict:store("c","d",D1),
ok = from_dict("from_dict_test.cdb",D2),
Dict = to_dict("from_dict_test.cdb"),
ok = from_dict("../test/from_dict_test1.cdb",D2),
Dict = to_dict("../test/from_dict_test1.cdb"),
D3 = lists:sort(dict:to_list(D2)),
D4 = lists:sort(dict:to_list(Dict)),
?assertMatch(D4,D3),
ok = file:delete("from_dict_test.cdb").
ok = file:delete("../test/from_dict_test1.cdb").
crccheck_emptyvalue_test() ->
?assertMatch(false, crccheck_value(<<>>)).
@ -807,23 +927,23 @@ activewrite_singlewrite_test() ->
Value = "some text as new value",
InitialD = dict:new(),
InitialD1 = dict:store("0001", "Initial value", InitialD),
ok = from_dict("test_mem.cdb", InitialD1),
ok = from_dict("../test/test_mem.cdb", InitialD1),
io:format("New db file created ~n", []),
{LastPosition, KeyDict} = open_active_file("test_mem.cdb"),
{LastPosition, KeyDict} = open_active_file("../test/test_mem.cdb"),
io:format("File opened as new active file "
"with LastPosition=~w ~n", [LastPosition]),
{_, _, UpdKeyDict} = put("test_mem.cdb", Key, Value, {LastPosition, KeyDict}),
{_, _, UpdKeyDict} = put("../test/test_mem.cdb", Key, Value, {LastPosition, KeyDict}),
io:format("New key and value added to active file ~n", []),
?assertMatch({Key, Value}, get_mem(Key, "test_mem.cdb", UpdKeyDict)),
ok = file:delete("test_mem.cdb").
?assertMatch({Key, Value}, get_mem(Key, "../test/test_mem.cdb", UpdKeyDict)),
ok = file:delete("../test/test_mem.cdb").
search_hash_table_findinslot_test() ->
Key1 = "key1", % this is in slot 3 if count is 8
D = dict:from_list([{Key1, "value1"}, {"K2", "V2"}, {"K3", "V3"},
{"K4", "V4"}, {"K5", "V5"}, {"K6", "V6"}, {"K7", "V7"},
{"K8", "V8"}]),
ok = from_dict("hashtable1_test.cdb",D),
{ok, Handle} = file:open("hashtable1_test.cdb", [binary, raw, read, write]),
ok = from_dict("../test/hashtable1_test.cdb",D),
{ok, Handle} = file:open("../test/hashtable1_test.cdb", [binary, raw, read, write]),
Hash = hash(Key1),
Index = hash_to_index(Hash),
{ok, _} = file:position(Handle, {bof, ?DWORD_SIZE*Index}),
@ -850,15 +970,15 @@ search_hash_table_findinslot_test() ->
ok = file:pwrite(Handle, FirstHashPosition + (Slot -1) * ?DWORD_SIZE, RBin),
ok = file:close(Handle),
io:format("Find key following change to hash table~n"),
?assertMatch(missing, get("hashtable1_test.cdb", Key1)),
ok = file:delete("hashtable1_test.cdb").
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)),
ok = file:delete("../test/hashtable1_test.cdb").
getnextkey_inclemptyvalue_test() ->
L = [{"K9", "V9"}, {"K2", "V2"}, {"K3", ""},
{"K4", "V4"}, {"K5", "V5"}, {"K6", "V6"}, {"K7", "V7"},
{"K8", "V8"}, {"K1", "V1"}],
ok = create("hashtable1_test.cdb", L),
{FirstKey, Handle, P1} = get_nextkey("hashtable1_test.cdb"),
ok = create("../test/hashtable2_test.cdb", L),
{FirstKey, Handle, P1} = get_nextkey("../test/hashtable2_test.cdb"),
io:format("Next position details of ~w~n", [P1]),
?assertMatch("K9", FirstKey),
{SecondKey, Handle, P2} = get_nextkey(Handle, P1),
@ -873,14 +993,14 @@ getnextkey_inclemptyvalue_test() ->
{LastKey, Info} = get_nextkey(Handle, P8),
?assertMatch(nomorekeys, Info),
?assertMatch("K1", LastKey),
ok = file:delete("hashtable1_test.cdb").
ok = file:delete("../test/hashtable2_test.cdb").
newactivefile_test() ->
{LastPosition, _} = open_active_file("activefile_test.cdb"),
{LastPosition, _} = open_active_file("../test/activefile_test.cdb"),
?assertMatch(256 * ?DWORD_SIZE, LastPosition),
Response = get_nextkey("activefile_test.cdb"),
Response = get_nextkey("../test/activefile_test.cdb"),
?assertMatch(nomorekeys, Response),
ok = file:delete("activefile_test.cdb").
ok = file:delete("../test/activefile_test.cdb").
emptyvalue_fromdict_test() ->
D = dict:new(),
@ -888,14 +1008,14 @@ emptyvalue_fromdict_test() ->
D2 = dict:store("K2", "", D1),
D3 = dict:store("K3", "V3", D2),
D4 = dict:store("K4", "", D3),
ok = from_dict("from_dict_test_ev.cdb",D4),
ok = from_dict("../test/from_dict_test_ev.cdb",D4),
io:format("Store created ~n", []),
KVP = lists:sort(dump("from_dict_test_ev.cdb")),
KVP = lists:sort(dump("../test/from_dict_test_ev.cdb")),
D_Result = lists:sort(dict:to_list(D4)),
io:format("KVP is ~w~n", [KVP]),
io:format("D_Result is ~w~n", [D_Result]),
?assertMatch(KVP, D_Result),
ok = file:delete("from_dict_test_ev.cdb").
ok = file:delete("../test/from_dict_test_ev.cdb").
fold_test() ->
K1 = {"Key1", 1},
@ -909,7 +1029,7 @@ fold_test() ->
K5 = {"Key1", 5},
V5 = 32,
D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3}, {K4, V4}, {K5, V5}]),
ok = from_dict("fold_test.cdb", D),
ok = from_dict("../test/fold_test.cdb", D),
FromSN = 2,
FoldFun = fun(K, V, Acc) ->
{_Key, Seq} = K,
@ -919,8 +1039,8 @@ fold_test() ->
Acc
end
end,
?assertMatch(56, fold("fold_test.cdb", FoldFun, 0)),
ok = file:delete("fold_test.cdb").
?assertMatch(56, fold("../test/fold_test.cdb", FoldFun, 0)),
ok = file:delete("../test/fold_test.cdb").
fold_keys_test() ->
K1 = {"Key1", 1},
@ -934,7 +1054,7 @@ fold_keys_test() ->
K5 = {"Key5", 5},
V5 = 32,
D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3}, {K4, V4}, {K5, V5}]),
ok = from_dict("fold_keys_test.cdb", D),
ok = from_dict("../test/fold_keys_test.cdb", D),
FromSN = 2,
FoldFun = fun(K, Acc) ->
{Key, Seq} = K,
@ -944,9 +1064,9 @@ fold_keys_test() ->
Acc
end
end,
Result = fold_keys("fold_keys_test.cdb", FoldFun, []),
Result = fold_keys("../test/fold_keys_test.cdb", FoldFun, []),
?assertMatch(["Key3", "Key4", "Key5"], lists:sort(Result)),
ok = file:delete("fold_keys_test.cdb").
ok = file:delete("../test/fold_keys_test.cdb").
fold2_test() ->
K1 = {"Key1", 1},
@ -963,7 +1083,7 @@ fold2_test() ->
V6 = 64,
D = dict:from_list([{K1, V1}, {K2, V2}, {K3, V3},
{K4, V4}, {K5, V5}, {K6, V6}]),
ok = from_dict("fold2_test.cdb", D),
ok = from_dict("../test/fold2_test.cdb", D),
FoldFun = fun(K, V, Acc) ->
{Key, Seq} = K,
case dict:find(Key, Acc) of
@ -978,8 +1098,8 @@ fold2_test() ->
RD = dict:new(),
RD1 = dict:store("Key1", {5, 32}, RD),
RD2 = dict:store("Key2", {1, 64}, RD1),
Result = fold("fold2_test.cdb", FoldFun, dict:new()),
Result = fold("../test/fold2_test.cdb", FoldFun, dict:new()),
?assertMatch(RD2, Result),
ok = file:delete("fold2_test.cdb").
ok = file:delete("../test/fold2_test.cdb").
-endif.

View file

@ -1,3 +1,71 @@
%% -------- Overview ---------
%%
%% The eleveleddb is based on the LSM-tree similar to leveldb, except that:
%% - Values are kept seperately to Keys & Metadata
%% - Different file formats are used for value store (based on constant
%% database), and key store (based on sst)
%% - It is not intended to be general purpose, but be specifically suited for
%% use as a Riak backend in specific circumstances (relatively large values,
%% and frequent use of iterators)
%% - The Value store is an extended nursery log in leveldb terms. It is keyed
%% on the sequence number of the write
%% - The Key Store is a LSM tree, where the key is the actaul object key, and
%% the value is the metadata of the object including the sequence number
%%
%% -------- Concierge & Manifest ---------
%%
%% The concierge is responsible for opening up the store, and keeps a manifest
%% of where items can be found. The manifest keeps a mapping of:
%% - Sequence Number ranges and the PID of the Value Store file that contains
%% that range
%% - Key ranges to PID mappings for each leval of the KeyStore
%%
%% -------- GET --------
%%
%% A GET request for Key and Metadata requires a lookup in the KeyStore only.
%% - The concierge should consult the manifest for the lowest level to find
%% the PID which may contain the Key
%% - The concierge should ask the file owner if the Key is present, if not
%% present lower levels should be consulted until the objetc is found
%%
%% If a value is required, when the Key/Metadata has been fetched from the
%% KeyStore, the sequence number should be tkane, and matched in the ValueStore
%% manifest to find the right value.
%%
%% For recent PUTs the Key/Metadata is added into memory, and there is an
%% in-memory hash table for the entries in the most recent ValueStore CDB.
%%
%% -------- PUT --------
%%
%% A PUT request must be persisted to the open (and append only) CDB file which
%% acts as a transaction log to persist the change. The Key & Metadata needs
%% also to be placed in memory.
%%
%% Once the CDB file is full, the managing process should be requested to
%% complete the lookup hash, and a new CDB file be started.
%%
%% Once the in-memory
%%
%% -------- Snapshots (Key Only) --------
%%
%% If there is a iterator/snapshot request, the concierge will simply handoff a
%% copy of the manifest, and register the interest of the iterator at the
%% manifest sequence number at the time of the request. Iterators should
%% de-register themselves from the manager on completion. Iterators should be
%% automatically release after a timeout period. A file can be deleted if
%% there are no registered iterators from before the point the file was
%% removed from the manifest.
%%
%% -------- Snapshots (Key & Value) --------
%%
%%
%%
%% -------- Special Ops --------
%%
%% e.g. Get all for SegmentID/Partition
%%
%% -------- KeyStore ---------
%%
%% The concierge is responsible for controlling access to the store and
%% maintaining both an in-memory view and a persisted state of all the sft
%% files in use across the store.
@ -34,14 +102,7 @@
%% will call the manifets manager on a timeout to confirm that they are no
%% longer in use (by any iterators).
%%
%% If there is a iterator/snapshot request, the concierge will simply handoff a
%% copy of the manifest, and register the interest of the iterator at the
%% manifest sequence number at the time of the request. Iterators should
%% de-register themselves from the manager on completion. Iterators should be
%% automatically release after a timeout period. A file can be deleted if
%% there are no registered iterators from before the point the file was
%% removed from the manifest.
%%