Testing of Inker rolling Journal
Add test to show inker rolling journal. to achieve needs to make CDB size an option, and also alter the manifest sorting so that find_in_manifest actually works!
This commit is contained in:
parent
f0e1c1d7ea
commit
0d905639be
3 changed files with 216 additions and 61 deletions
|
@ -18,4 +18,12 @@
|
||||||
{start_key :: tuple(),
|
{start_key :: tuple(),
|
||||||
end_key :: tuple(),
|
end_key :: tuple(),
|
||||||
owner :: pid(),
|
owner :: pid(),
|
||||||
filename :: string()}).
|
filename :: string()}).
|
||||||
|
|
||||||
|
-record(cdb_options,
|
||||||
|
{max_size :: integer()}).
|
||||||
|
|
||||||
|
-record(inker_options,
|
||||||
|
{cdb_max_size :: integer(),
|
||||||
|
root_path :: string(),
|
||||||
|
cdb_options :: #cdb_options{}}).
|
|
@ -47,6 +47,7 @@
|
||||||
-module(leveled_cdb).
|
-module(leveled_cdb).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
-include("../include/leveled.hrl").
|
||||||
|
|
||||||
-export([init/1,
|
-export([init/1,
|
||||||
handle_call/3,
|
handle_call/3,
|
||||||
|
@ -55,6 +56,7 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3,
|
code_change/3,
|
||||||
cdb_open_writer/1,
|
cdb_open_writer/1,
|
||||||
|
cdb_open_writer/2,
|
||||||
cdb_open_reader/1,
|
cdb_open_reader/1,
|
||||||
cdb_get/2,
|
cdb_get/2,
|
||||||
cdb_put/3,
|
cdb_put/3,
|
||||||
|
@ -79,7 +81,8 @@
|
||||||
hash_index = [] :: list(),
|
hash_index = [] :: list(),
|
||||||
filename :: string(),
|
filename :: string(),
|
||||||
handle :: file:fd(),
|
handle :: file:fd(),
|
||||||
writer :: boolean}).
|
writer :: boolean,
|
||||||
|
max_size :: integer()}).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -87,7 +90,11 @@
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
cdb_open_writer(Filename) ->
|
cdb_open_writer(Filename) ->
|
||||||
{ok, Pid} = gen_server:start(?MODULE, [], []),
|
%% No options passed
|
||||||
|
cdb_open_writer(Filename, #cdb_options{}).
|
||||||
|
|
||||||
|
cdb_open_writer(Filename, Opts) ->
|
||||||
|
{ok, Pid} = gen_server:start(?MODULE, [Opts], []),
|
||||||
case gen_server:call(Pid, {cdb_open_writer, Filename}, infinity) of
|
case gen_server:call(Pid, {cdb_open_writer, Filename}, infinity) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, Pid};
|
{ok, Pid};
|
||||||
|
@ -96,7 +103,7 @@ cdb_open_writer(Filename) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
cdb_open_reader(Filename) ->
|
cdb_open_reader(Filename) ->
|
||||||
{ok, Pid} = gen_server:start(?MODULE, [], []),
|
{ok, Pid} = gen_server:start(?MODULE, [#cdb_options{}], []),
|
||||||
case gen_server:call(Pid, {cdb_open_reader, Filename}, infinity) of
|
case gen_server:call(Pid, {cdb_open_reader, Filename}, infinity) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, Pid};
|
{ok, Pid};
|
||||||
|
@ -134,27 +141,33 @@ cdb_keycheck(Pid, Key) ->
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
init([]) ->
|
init([Opts]) ->
|
||||||
{ok, #state{}}.
|
MaxSize = case Opts#cdb_options.max_size of
|
||||||
|
undefined ->
|
||||||
|
?MAX_FILE_SIZE;
|
||||||
|
M ->
|
||||||
|
M
|
||||||
|
end,
|
||||||
|
{ok, #state{max_size=MaxSize}}.
|
||||||
|
|
||||||
handle_call({cdb_open_writer, Filename}, _From, State) ->
|
handle_call({cdb_open_writer, Filename}, _From, State) ->
|
||||||
io:format("Opening file for writing with filename ~s~n", [Filename]),
|
io:format("Opening file for writing with filename ~s~n", [Filename]),
|
||||||
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
|
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
|
||||||
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
|
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
|
||||||
{reply, ok, State#state{handle=Handle,
|
{reply, ok, State#state{handle=Handle,
|
||||||
last_position=LastPosition,
|
last_position=LastPosition,
|
||||||
last_key=LastKey,
|
last_key=LastKey,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
hashtree=HashTree,
|
hashtree=HashTree,
|
||||||
writer=true}};
|
writer=true}};
|
||||||
handle_call({cdb_open_reader, Filename}, _From, State) ->
|
handle_call({cdb_open_reader, Filename}, _From, State) ->
|
||||||
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
io:format("Opening file for reading with filename ~s~n", [Filename]),
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
Index = load_index(Handle),
|
Index = load_index(Handle),
|
||||||
{reply, ok, State#state{handle=Handle,
|
{reply, ok, State#state{handle=Handle,
|
||||||
filename=Filename,
|
filename=Filename,
|
||||||
writer=false,
|
writer=false,
|
||||||
hash_index=Index}};
|
hash_index=Index}};
|
||||||
handle_call({cdb_get, Key}, _From, State) ->
|
handle_call({cdb_get, Key}, _From, State) ->
|
||||||
case {State#state.writer, State#state.hash_index} of
|
case {State#state.writer, State#state.hash_index} of
|
||||||
{true, _} ->
|
{true, _} ->
|
||||||
|
@ -198,7 +211,8 @@ handle_call({cdb_put, Key, Value}, _From, State) ->
|
||||||
true ->
|
true ->
|
||||||
Result = put(State#state.handle,
|
Result = put(State#state.handle,
|
||||||
Key, Value,
|
Key, Value,
|
||||||
{State#state.last_position, State#state.hashtree}),
|
{State#state.last_position, State#state.hashtree},
|
||||||
|
State#state.max_size),
|
||||||
case Result of
|
case Result of
|
||||||
roll ->
|
roll ->
|
||||||
%% Key and value could not be written
|
%% Key and value could not be written
|
||||||
|
@ -230,7 +244,8 @@ handle_call(cdb_complete, _From, State) ->
|
||||||
%% Rename file
|
%% Rename file
|
||||||
NewName = filename:rootname(State#state.filename, ".pnd")
|
NewName = filename:rootname(State#state.filename, ".pnd")
|
||||||
++ ".cdb",
|
++ ".cdb",
|
||||||
io:format("Renaming file from ~s to ~s~n", [State#state.filename, NewName]),
|
io:format("Renaming file from ~s to ~s~n",
|
||||||
|
[State#state.filename, NewName]),
|
||||||
ok = file:rename(State#state.filename, NewName),
|
ok = file:rename(State#state.filename, NewName),
|
||||||
{stop, normal, {ok, NewName}, State};
|
{stop, normal, {ok, NewName}, State};
|
||||||
false ->
|
false ->
|
||||||
|
@ -349,19 +364,24 @@ open_active_file(FileName) when is_list(FileName) ->
|
||||||
%% Append to an active file a new key/value pair returning an updated
|
%% Append to an active file a new key/value pair returning an updated
|
||||||
%% dictionary of Keys and positions. Returns an updated Position
|
%% dictionary of Keys and positions. Returns an updated Position
|
||||||
%%
|
%%
|
||||||
put(FileName, Key, Value, {LastPosition, HashTree}) when is_list(FileName) ->
|
put(FileName, Key, Value, {LastPosition, HashTree}, MaxSize) when is_list(FileName) ->
|
||||||
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
|
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
|
||||||
put(Handle, Key, Value, {LastPosition, HashTree});
|
put(Handle, Key, Value, {LastPosition, HashTree}, MaxSize);
|
||||||
put(Handle, Key, Value, {LastPosition, HashTree}) ->
|
put(Handle, Key, Value, {LastPosition, HashTree}, MaxSize) ->
|
||||||
Bin = key_value_to_record({Key, Value}),
|
Bin = key_value_to_record({Key, Value}),
|
||||||
PotentialNewSize = LastPosition + byte_size(Bin),
|
PotentialNewSize = LastPosition + byte_size(Bin),
|
||||||
if PotentialNewSize > ?MAX_FILE_SIZE ->
|
if PotentialNewSize > MaxSize ->
|
||||||
roll;
|
roll;
|
||||||
true ->
|
true ->
|
||||||
ok = file:pwrite(Handle, LastPosition, Bin),
|
ok = file:pwrite(Handle, LastPosition, Bin),
|
||||||
{Handle, PotentialNewSize, put_hashtree(Key, LastPosition, HashTree)}
|
{Handle, PotentialNewSize, put_hashtree(Key, LastPosition, HashTree)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Should not be used for non-test PUTs by the inker - as the Max File Size
|
||||||
|
%% should be taken from the startup options not the default
|
||||||
|
put(FileName, Key, Value, {LastPosition, HashTree}) ->
|
||||||
|
put(FileName, Key, Value, {LastPosition, HashTree}, ?MAX_FILE_SIZE).
|
||||||
|
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% get(FileName,Key) -> {key,value}
|
%% get(FileName,Key) -> {key,value}
|
||||||
|
@ -393,10 +413,14 @@ get(Handle, Key, CRCCheck, Cache) when is_tuple(Handle) ->
|
||||||
Slot = hash_to_slot(Hash, Count),
|
Slot = hash_to_slot(Hash, Count),
|
||||||
{ok, _} = file:position(Handle, {cur, Slot * ?DWORD_SIZE}),
|
{ok, _} = file:position(Handle, {cur, Slot * ?DWORD_SIZE}),
|
||||||
LastHashPosition = HashTable + ((Count-1) * ?DWORD_SIZE),
|
LastHashPosition = HashTable + ((Count-1) * ?DWORD_SIZE),
|
||||||
LocList = lists:seq(FirstHashPosition, LastHashPosition, ?DWORD_SIZE),
|
LocList = lists:seq(FirstHashPosition,
|
||||||
|
LastHashPosition,
|
||||||
|
?DWORD_SIZE),
|
||||||
% Split list around starting slot.
|
% Split list around starting slot.
|
||||||
{L1, L2} = lists:split(Slot, LocList),
|
{L1, L2} = lists:split(Slot, LocList),
|
||||||
search_hash_table(Handle, lists:append(L2, L1), Hash, Key, CRCCheck)
|
search_hash_table(Handle,
|
||||||
|
lists:append(L2, L1),
|
||||||
|
Hash, Key, CRCCheck)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_index(Handle, Index, no_cache) ->
|
get_index(Handle, Index, no_cache) ->
|
||||||
|
@ -758,7 +782,11 @@ search_hash_table(Handle, [Entry|RestOfEntries], Hash, Key, CRCCheck) ->
|
||||||
end,
|
end,
|
||||||
case KV of
|
case KV of
|
||||||
missing ->
|
missing ->
|
||||||
search_hash_table(Handle, RestOfEntries, Hash, Key, CRCCheck);
|
search_hash_table(Handle,
|
||||||
|
RestOfEntries,
|
||||||
|
Hash,
|
||||||
|
Key,
|
||||||
|
CRCCheck);
|
||||||
_ ->
|
_ ->
|
||||||
KV
|
KV
|
||||||
end;
|
end;
|
||||||
|
@ -948,14 +976,24 @@ key_value_to_record({Key, Value}) ->
|
||||||
|
|
||||||
write_key_value_pairs_1_test() ->
|
write_key_value_pairs_1_test() ->
|
||||||
{ok,Handle} = file:open("../test/test.cdb",write),
|
{ok,Handle} = file:open("../test/test.cdb",write),
|
||||||
{_, HashTree} = write_key_value_pairs(Handle,[{"key1","value1"},{"key2","value2"}]),
|
{_, HashTree} = write_key_value_pairs(Handle,
|
||||||
|
[{"key1","value1"},
|
||||||
|
{"key2","value2"}]),
|
||||||
Hash1 = hash("key1"),
|
Hash1 = hash("key1"),
|
||||||
Index1 = hash_to_index(Hash1),
|
Index1 = hash_to_index(Hash1),
|
||||||
Hash2 = hash("key2"),
|
Hash2 = hash("key2"),
|
||||||
Index2 = hash_to_index(Hash2),
|
Index2 = hash_to_index(Hash2),
|
||||||
R0 = array:new(256, {default, gb_trees:empty()}),
|
R0 = array:new(256, {default, gb_trees:empty()}),
|
||||||
R1 = array:set(Index1, gb_trees:insert(Hash1, [0], array:get(Index1, R0)), R0),
|
R1 = array:set(Index1,
|
||||||
R2 = array:set(Index2, gb_trees:insert(Hash2, [30], array:get(Index2, R1)), R1),
|
gb_trees:insert(Hash1,
|
||||||
|
[0],
|
||||||
|
array:get(Index1, R0)),
|
||||||
|
R0),
|
||||||
|
R2 = array:set(Index2,
|
||||||
|
gb_trees:insert(Hash2,
|
||||||
|
[30],
|
||||||
|
array:get(Index2, R1)),
|
||||||
|
R1),
|
||||||
io:format("HashTree is ~w~n", [HashTree]),
|
io:format("HashTree is ~w~n", [HashTree]),
|
||||||
io:format("Expected HashTree is ~w~n", [R2]),
|
io:format("Expected HashTree is ~w~n", [R2]),
|
||||||
?assertMatch(R2, HashTree),
|
?assertMatch(R2, HashTree),
|
||||||
|
@ -965,8 +1003,16 @@ write_key_value_pairs_1_test() ->
|
||||||
write_hash_tables_1_test() ->
|
write_hash_tables_1_test() ->
|
||||||
{ok, Handle} = file:open("../test/testx.cdb",write),
|
{ok, Handle} = file:open("../test/testx.cdb",write),
|
||||||
R0 = array:new(256, {default, gb_trees:empty()}),
|
R0 = array:new(256, {default, gb_trees:empty()}),
|
||||||
R1 = array:set(64, gb_trees:insert(6383014720, [18], array:get(64, R0)), R0),
|
R1 = array:set(64,
|
||||||
R2 = array:set(67, gb_trees:insert(6383014723, [0], array:get(67, R1)), R1),
|
gb_trees:insert(6383014720,
|
||||||
|
[18],
|
||||||
|
array:get(64, R0)),
|
||||||
|
R0),
|
||||||
|
R2 = array:set(67,
|
||||||
|
gb_trees:insert(6383014723,
|
||||||
|
[0],
|
||||||
|
array:get(67, R1)),
|
||||||
|
R1),
|
||||||
Result = write_hash_tables(Handle, R2),
|
Result = write_hash_tables(Handle, R2),
|
||||||
io:format("write hash tables result of ~w ~n", [Result]),
|
io:format("write hash tables result of ~w ~n", [Result]),
|
||||||
?assertMatch(Result,[{67,16,2},{64,0,2}]),
|
?assertMatch(Result,[{67,16,2},{64,0,2}]),
|
||||||
|
@ -999,7 +1045,8 @@ find_open_slot_5_test() ->
|
||||||
|
|
||||||
full_1_test() ->
|
full_1_test() ->
|
||||||
List1 = lists:sort([{"key1","value1"},{"key2","value2"}]),
|
List1 = lists:sort([{"key1","value1"},{"key2","value2"}]),
|
||||||
create("../test/simple.cdb",lists:sort([{"key1","value1"},{"key2","value2"}])),
|
create("../test/simple.cdb",
|
||||||
|
lists:sort([{"key1","value1"},{"key2","value2"}])),
|
||||||
List2 = lists:sort(dump("../test/simple.cdb")),
|
List2 = lists:sort(dump("../test/simple.cdb")),
|
||||||
?assertMatch(List1,List2),
|
?assertMatch(List1,List2),
|
||||||
ok = file:delete("../test/simple.cdb").
|
ok = file:delete("../test/simple.cdb").
|
||||||
|
@ -1103,7 +1150,8 @@ search_hash_table_findinslot_test() ->
|
||||||
{"K4", "V4"}, {"K5", "V5"}, {"K6", "V6"}, {"K7", "V7"},
|
{"K4", "V4"}, {"K5", "V5"}, {"K6", "V6"}, {"K7", "V7"},
|
||||||
{"K8", "V8"}]),
|
{"K8", "V8"}]),
|
||||||
ok = from_dict("../test/hashtable1_test.cdb",D),
|
ok = from_dict("../test/hashtable1_test.cdb",D),
|
||||||
{ok, Handle} = file:open("../test/hashtable1_test.cdb", [binary, raw, read, write]),
|
{ok, Handle} = file:open("../test/hashtable1_test.cdb",
|
||||||
|
[binary, raw, read, write]),
|
||||||
Hash = hash(Key1),
|
Hash = hash(Key1),
|
||||||
Index = hash_to_index(Hash),
|
Index = hash_to_index(Hash),
|
||||||
{ok, _} = file:position(Handle, {bof, ?DWORD_SIZE*Index}),
|
{ok, _} = file:position(Handle, {bof, ?DWORD_SIZE*Index}),
|
||||||
|
@ -1124,12 +1172,17 @@ search_hash_table_findinslot_test() ->
|
||||||
{ok, _} = file:position(Handle, FirstHashPosition),
|
{ok, _} = file:position(Handle, FirstHashPosition),
|
||||||
FlipH3 = endian_flip(ReadH3),
|
FlipH3 = endian_flip(ReadH3),
|
||||||
FlipP3 = endian_flip(ReadP3),
|
FlipP3 = endian_flip(ReadP3),
|
||||||
RBin = <<FlipH3:32/integer, FlipP3:32/integer, 0:32/integer, 0:32/integer>>,
|
RBin = <<FlipH3:32/integer,
|
||||||
|
FlipP3:32/integer,
|
||||||
|
0:32/integer,
|
||||||
|
0:32/integer>>,
|
||||||
io:format("Replacement binary of ~w~n", [RBin]),
|
io:format("Replacement binary of ~w~n", [RBin]),
|
||||||
{ok, OldBin} = file:pread(Handle,
|
{ok, OldBin} = file:pread(Handle,
|
||||||
FirstHashPosition + (Slot -1) * ?DWORD_SIZE, 16),
|
FirstHashPosition + (Slot -1) * ?DWORD_SIZE, 16),
|
||||||
io:format("Bin to be replaced is ~w ~n", [OldBin]),
|
io:format("Bin to be replaced is ~w ~n", [OldBin]),
|
||||||
ok = file:pwrite(Handle, FirstHashPosition + (Slot -1) * ?DWORD_SIZE, RBin),
|
ok = file:pwrite(Handle,
|
||||||
|
FirstHashPosition + (Slot -1) * ?DWORD_SIZE,
|
||||||
|
RBin),
|
||||||
ok = file:close(Handle),
|
ok = file:close(Handle),
|
||||||
io:format("Find key following change to hash table~n"),
|
io:format("Find key following change to hash table~n"),
|
||||||
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)),
|
?assertMatch(missing, get("../test/hashtable1_test.cdb", Key1)),
|
||||||
|
|
|
@ -103,6 +103,7 @@
|
||||||
ink_get/3,
|
ink_get/3,
|
||||||
ink_snap/1,
|
ink_snap/1,
|
||||||
ink_close/1,
|
ink_close/1,
|
||||||
|
ink_print_manifest/1,
|
||||||
build_dummy_journal/0,
|
build_dummy_journal/0,
|
||||||
simple_manifest_reader/2]).
|
simple_manifest_reader/2]).
|
||||||
|
|
||||||
|
@ -121,15 +122,16 @@
|
||||||
active_journaldb :: pid(),
|
active_journaldb :: pid(),
|
||||||
active_journaldb_sqn :: integer(),
|
active_journaldb_sqn :: integer(),
|
||||||
removed_journaldbs = [] :: list(),
|
removed_journaldbs = [] :: list(),
|
||||||
root_path :: string()}).
|
root_path :: string(),
|
||||||
|
cdb_options :: #cdb_options{}}).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
ink_start(RootDir) ->
|
ink_start(InkerOpts) ->
|
||||||
gen_server:start(?MODULE, [RootDir], []).
|
gen_server:start(?MODULE, [InkerOpts], []).
|
||||||
|
|
||||||
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
|
ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
|
||||||
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
|
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity).
|
||||||
|
@ -143,11 +145,16 @@ ink_snap(Pid) ->
|
||||||
ink_close(Pid) ->
|
ink_close(Pid) ->
|
||||||
gen_server:call(Pid, close, infinity).
|
gen_server:call(Pid, close, infinity).
|
||||||
|
|
||||||
|
ink_print_manifest(Pid) ->
|
||||||
|
gen_server:call(Pid, print_manifest, infinity).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
init([RootPath]) ->
|
init([InkerOpts]) ->
|
||||||
|
RootPath = InkerOpts#inker_options.root_path,
|
||||||
|
CDBopts = InkerOpts#inker_options.cdb_options,
|
||||||
JournalFP = filepath(RootPath, journal_dir),
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
{ok, JournalFilenames} = case filelib:is_dir(JournalFP) of
|
{ok, JournalFilenames} = case filelib:is_dir(JournalFP) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -170,13 +177,15 @@ init([RootPath]) ->
|
||||||
ManifestSQN} = build_manifest(ManifestFilenames,
|
ManifestSQN} = build_manifest(ManifestFilenames,
|
||||||
JournalFilenames,
|
JournalFilenames,
|
||||||
fun simple_manifest_reader/2,
|
fun simple_manifest_reader/2,
|
||||||
RootPath),
|
RootPath,
|
||||||
|
CDBopts),
|
||||||
{ok, #state{manifest = Manifest,
|
{ok, #state{manifest = Manifest,
|
||||||
manifest_sqn = ManifestSQN,
|
manifest_sqn = ManifestSQN,
|
||||||
journal_sqn = JournalSQN,
|
journal_sqn = JournalSQN,
|
||||||
active_journaldb = ActiveJournal,
|
active_journaldb = ActiveJournal,
|
||||||
active_journaldb_sqn = LowActiveSQN,
|
active_journaldb_sqn = LowActiveSQN,
|
||||||
root_path = RootPath}}.
|
root_path = RootPath,
|
||||||
|
cdb_options = CDBopts}}.
|
||||||
|
|
||||||
|
|
||||||
handle_call({put, Key, Object, KeyChanges}, From, State) ->
|
handle_call({put, Key, Object, KeyChanges}, From, State) ->
|
||||||
|
@ -208,6 +217,9 @@ handle_call(snapshot, _From , State) ->
|
||||||
State#state.active_journaldb,
|
State#state.active_journaldb,
|
||||||
State#state.active_journaldb_sqn},
|
State#state.active_journaldb_sqn},
|
||||||
State};
|
State};
|
||||||
|
handle_call(print_manifest, _From, State) ->
|
||||||
|
manifest_printer(State#state.manifest),
|
||||||
|
{reply, ok, State};
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State) ->
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
|
@ -221,7 +233,8 @@ terminate(Reason, State) ->
|
||||||
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
||||||
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
||||||
[State#state.journal_sqn, State#state.manifest_sqn]),
|
[State#state.journal_sqn, State#state.manifest_sqn]),
|
||||||
io:format("Manifest when closing is ~w~n", [State#state.manifest]),
|
io:format("Manifest when closing is: ~n"),
|
||||||
|
manifest_printer(State#state.manifest),
|
||||||
close_allmanifest(State#state.manifest, State#state.active_journaldb).
|
close_allmanifest(State#state.manifest, State#state.active_journaldb).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -242,7 +255,8 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
|
||||||
{ok, State#state{journal_sqn=NewSQN}};
|
{ok, State#state{journal_sqn=NewSQN}};
|
||||||
roll ->
|
roll ->
|
||||||
FileName = filepath(State#state.root_path, NewSQN, new_journal),
|
FileName = filepath(State#state.root_path, NewSQN, new_journal),
|
||||||
{ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName),
|
CDBopts = State#state.cdb_options,
|
||||||
|
{ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName, CDBopts),
|
||||||
case leveled_cdb:cdb_put(NewJournalP,
|
case leveled_cdb:cdb_put(NewJournalP,
|
||||||
{NewSQN, PrimaryKey},
|
{NewSQN, PrimaryKey},
|
||||||
Bin1) of
|
Bin1) of
|
||||||
|
@ -265,13 +279,13 @@ roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
|
||||||
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
|
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
|
||||||
JournalRegex2,
|
JournalRegex2,
|
||||||
'SQN'),
|
'SQN'),
|
||||||
NewManifest = lists:append(Manifest, [{JournalSQN, NewFilename, PidR}]),
|
NewManifest = add_to_manifest(Manifest, {JournalSQN, NewFilename, PidR}),
|
||||||
NewManifestSQN = ManifestSQN + 1,
|
NewManifestSQN = ManifestSQN + 1,
|
||||||
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
|
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
|
||||||
{NewManifest, NewManifestSQN}.
|
{NewManifest, NewManifestSQN}.
|
||||||
|
|
||||||
get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) ->
|
get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) ->
|
||||||
if
|
Obj = if
|
||||||
SQN < ActiveJournalSQN ->
|
SQN < ActiveJournalSQN ->
|
||||||
JournalP = find_in_manifest(SQN, Manifest),
|
JournalP = find_in_manifest(SQN, Manifest),
|
||||||
if
|
if
|
||||||
|
@ -284,13 +298,30 @@ get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) ->
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey})
|
leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey})
|
||||||
|
end,
|
||||||
|
case Obj of
|
||||||
|
{{SQN, PK}, Bin} ->
|
||||||
|
{{SQN, PK}, binary_to_term(Bin)};
|
||||||
|
_ ->
|
||||||
|
Obj
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
build_manifest(ManifestFilenames,
|
build_manifest(ManifestFilenames,
|
||||||
JournalFilenames,
|
JournalFilenames,
|
||||||
ManifestRdrFun,
|
ManifestRdrFun,
|
||||||
RootPath) ->
|
RootPath) ->
|
||||||
|
build_manifest(ManifestFilenames,
|
||||||
|
JournalFilenames,
|
||||||
|
ManifestRdrFun,
|
||||||
|
RootPath,
|
||||||
|
#cdb_options{}).
|
||||||
|
|
||||||
|
build_manifest(ManifestFilenames,
|
||||||
|
JournalFilenames,
|
||||||
|
ManifestRdrFun,
|
||||||
|
RootPath,
|
||||||
|
CDBopts) ->
|
||||||
%% Setup root paths
|
%% Setup root paths
|
||||||
JournalFP = filepath(RootPath, journal_dir),
|
JournalFP = filepath(RootPath, journal_dir),
|
||||||
%% Find the manifest with a highest Manifest sequence number
|
%% Find the manifest with a highest Manifest sequence number
|
||||||
|
@ -336,7 +367,7 @@ build_manifest(ManifestFilenames,
|
||||||
integer_to_list(X)
|
integer_to_list(X)
|
||||||
++ "." ++
|
++ "." ++
|
||||||
?JOURNAL_FILEX,
|
?JOURNAL_FILEX,
|
||||||
Acc ++ [{X, FN}];
|
add_to_manifest(Acc, {X, FN});
|
||||||
true
|
true
|
||||||
-> Acc
|
-> Acc
|
||||||
end end,
|
end end,
|
||||||
|
@ -345,11 +376,12 @@ build_manifest(ManifestFilenames,
|
||||||
|
|
||||||
%% Enrich the manifest so it contains the Pid of any of the immutable
|
%% Enrich the manifest so it contains the Pid of any of the immutable
|
||||||
%% entries
|
%% entries
|
||||||
io:format("Manifest1 is ~w~n", [Manifest1]),
|
io:format("Manifest on startup is: ~n"),
|
||||||
Manifest2 = lists:map(fun({X, Y}) ->
|
manifest_printer(Manifest1),
|
||||||
FN = filename:join(JournalFP, Y),
|
Manifest2 = lists:map(fun({LowSQN, FN}) ->
|
||||||
{ok, Pid} = leveled_cdb:cdb_open_reader(FN),
|
FP = filename:join(JournalFP, FN),
|
||||||
{X, Y, Pid} end,
|
{ok, Pid} = leveled_cdb:cdb_open_reader(FP),
|
||||||
|
{LowSQN, FN, Pid} end,
|
||||||
Manifest1),
|
Manifest1),
|
||||||
|
|
||||||
%% Find any more recent mutable files that have a higher sequence number
|
%% Find any more recent mutable files that have a higher sequence number
|
||||||
|
@ -378,7 +410,8 @@ build_manifest(ManifestFilenames,
|
||||||
end,
|
end,
|
||||||
LowActiveSQN = TopSQNInManifest + 1,
|
LowActiveSQN = TopSQNInManifest + 1,
|
||||||
ActiveFN = filepath(RootPath, LowActiveSQN, new_journal),
|
ActiveFN = filepath(RootPath, LowActiveSQN, new_journal),
|
||||||
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
|
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN,
|
||||||
|
CDBopts),
|
||||||
{Manifest2,
|
{Manifest2,
|
||||||
{ActiveJournal, LowActiveSQN},
|
{ActiveJournal, LowActiveSQN},
|
||||||
TopSQNInManifest,
|
TopSQNInManifest,
|
||||||
|
@ -391,7 +424,8 @@ build_manifest(ManifestFilenames,
|
||||||
%% Need to work out highest sequence number in tail file to feed
|
%% Need to work out highest sequence number in tail file to feed
|
||||||
%% into opening of pending journal
|
%% into opening of pending journal
|
||||||
ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal),
|
ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal),
|
||||||
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN),
|
{ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN,
|
||||||
|
CDBopts),
|
||||||
{HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal),
|
{HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||||
{Manifest3,
|
{Manifest3,
|
||||||
{ActiveJournal, ActiveJournalSQN},
|
{ActiveJournal, ActiveJournalSQN},
|
||||||
|
@ -416,8 +450,8 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
|
||||||
{ok, NewFilename} = leveled_cdb:cdb_complete(PidW),
|
{ok, NewFilename} = leveled_cdb:cdb_complete(PidW),
|
||||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
|
||||||
roll_pending_journals(T,
|
roll_pending_journals(T,
|
||||||
lists:append(Manifest,
|
add_to_manifest(Manifest,
|
||||||
[{JournalSQN, NewFilename, PidR}]),
|
{JournalSQN, NewFilename, PidR}),
|
||||||
RootPath).
|
RootPath).
|
||||||
|
|
||||||
|
|
||||||
|
@ -437,6 +471,9 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
|
||||||
[],
|
[],
|
||||||
Filenames).
|
Filenames).
|
||||||
|
|
||||||
|
add_to_manifest(Manifest, Entry) ->
|
||||||
|
lists:reverse(lists:sort([Entry|Manifest])).
|
||||||
|
|
||||||
find_in_manifest(_SQN, []) ->
|
find_in_manifest(_SQN, []) ->
|
||||||
error;
|
error;
|
||||||
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
||||||
|
@ -483,7 +520,17 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
manifest_printer(Manifest) ->
|
||||||
|
lists:foreach(fun(X) ->
|
||||||
|
{SQN, FN} = case X of
|
||||||
|
{A, B, _PID} ->
|
||||||
|
{A, B};
|
||||||
|
{A, B} ->
|
||||||
|
{A, B}
|
||||||
|
end,
|
||||||
|
io:format("At SQN=~w journal has filename ~s~n",
|
||||||
|
[SQN, FN]) end,
|
||||||
|
Manifest).
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
|
@ -502,15 +549,15 @@ build_dummy_journal() ->
|
||||||
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
||||||
{K1, V1} = {"Key1", "TestValue1"},
|
{K1, V1} = {"Key1", "TestValue1"},
|
||||||
{K2, V2} = {"Key2", "TestValue2"},
|
{K2, V2} = {"Key2", "TestValue2"},
|
||||||
ok = leveled_cdb:cdb_put(J1, {1, K1}, V1),
|
ok = leveled_cdb:cdb_put(J1, {1, K1}, term_to_binary({V1, []})),
|
||||||
ok = leveled_cdb:cdb_put(J1, {2, K2}, V2),
|
ok = leveled_cdb:cdb_put(J1, {2, K2}, term_to_binary({V2, []})),
|
||||||
{ok, _} = leveled_cdb:cdb_complete(J1),
|
{ok, _} = leveled_cdb:cdb_complete(J1),
|
||||||
F2 = filename:join(JournalFP, "nursery_3.pnd"),
|
F2 = filename:join(JournalFP, "nursery_3.pnd"),
|
||||||
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
||||||
{K1, V3} = {"Key1", "TestValue3"},
|
{K1, V3} = {"Key1", "TestValue3"},
|
||||||
{K4, V4} = {"Key4", "TestValue4"},
|
{K4, V4} = {"Key4", "TestValue4"},
|
||||||
ok = leveled_cdb:cdb_put(J2, {3, K1}, V3),
|
ok = leveled_cdb:cdb_put(J2, {3, K1}, term_to_binary({V3, []})),
|
||||||
ok = leveled_cdb:cdb_put(J2, {4, K4}, V4),
|
ok = leveled_cdb:cdb_put(J2, {4, K4}, term_to_binary({V4, []})),
|
||||||
ok = leveled_cdb:cdb_close(J2),
|
ok = leveled_cdb:cdb_close(J2),
|
||||||
Manifest = {2, [{1, "nursery_1.cdb"}], []},
|
Manifest = {2, [{1, "nursery_1.cdb"}], []},
|
||||||
ManifestBin = term_to_binary(Manifest),
|
ManifestBin = term_to_binary(Manifest),
|
||||||
|
@ -558,8 +605,8 @@ another_buildmanifest_test() ->
|
||||||
{ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2),
|
{ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2),
|
||||||
{K5, V5} = {"Key5", "TestValue5"},
|
{K5, V5} = {"Key5", "TestValue5"},
|
||||||
{K6, V6} = {"Key6", "TestValue6"},
|
{K6, V6} = {"Key6", "TestValue6"},
|
||||||
ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, V5),
|
ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, term_to_binary({V5, []})),
|
||||||
ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, V6),
|
ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, term_to_binary({V6, []})),
|
||||||
ok = leveled_cdb:cdb_close(NewActiveJN),
|
ok = leveled_cdb:cdb_close(NewActiveJN),
|
||||||
%% Test setup - now build manifest
|
%% Test setup - now build manifest
|
||||||
Res = build_manifest(["1.man"],
|
Res = build_manifest(["1.man"],
|
||||||
|
@ -572,7 +619,7 @@ another_buildmanifest_test() ->
|
||||||
{Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res,
|
{Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res,
|
||||||
?assertMatch(HighSQN, 6),
|
?assertMatch(HighSQN, 6),
|
||||||
?assertMatch(ManSQN, 1),
|
?assertMatch(ManSQN, 1),
|
||||||
?assertMatch([{1, "nursery_1.cdb", _}, {3, "nursery_3.cdb", _}], Man),
|
?assertMatch([{3, "nursery_3.cdb", _}, {1, "nursery_1.cdb", _}], Man),
|
||||||
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
|
{ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal),
|
||||||
?assertMatch(ActSQN, 6),
|
?assertMatch(ActSQN, 6),
|
||||||
?assertMatch(ActJournalSQN, 5),
|
?assertMatch(ActJournalSQN, 5),
|
||||||
|
@ -599,5 +646,52 @@ empty_buildmanifest_test() ->
|
||||||
close_allmanifest(Man, ActJournal),
|
close_allmanifest(Man, ActJournal),
|
||||||
clean_testdir(RootPath).
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
simplejournal_test() ->
|
||||||
|
%% build up a database, and then open it through the gen_server wrap
|
||||||
|
%% Get and Put some keys
|
||||||
|
RootPath = "../test/inker",
|
||||||
|
build_dummy_journal(),
|
||||||
|
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||||
|
cdb_options=#cdb_options{}}),
|
||||||
|
R1 = ink_get(Ink1, "Key1", 1),
|
||||||
|
?assertMatch(R1, {{1, "Key1"}, {"TestValue1", []}}),
|
||||||
|
R2 = ink_get(Ink1, "Key1", 3),
|
||||||
|
?assertMatch(R2, {{3, "Key1"}, {"TestValue3", []}}),
|
||||||
|
{ok, NewSQN1} = ink_put(Ink1, "Key99", "TestValue99", []),
|
||||||
|
?assertMatch(NewSQN1, 5),
|
||||||
|
R3 = ink_get(Ink1, "Key99", 5),
|
||||||
|
io:format("Result 3 is ~w~n", [R3]),
|
||||||
|
?assertMatch(R3, {{5, "Key99"}, {"TestValue99", []}}),
|
||||||
|
ink_close(Ink1),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
rollafile_simplejournal_test() ->
|
||||||
|
RootPath = "../test/inker",
|
||||||
|
build_dummy_journal(),
|
||||||
|
CDBopts = #cdb_options{max_size=300000},
|
||||||
|
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||||
|
cdb_options=CDBopts}),
|
||||||
|
FunnyLoop = lists:seq(1, 48),
|
||||||
|
{ok, NewSQN1} = ink_put(Ink1, "KeyAA", "TestValueAA", []),
|
||||||
|
?assertMatch(NewSQN1, 5),
|
||||||
|
ok = ink_print_manifest(Ink1),
|
||||||
|
R0 = ink_get(Ink1, "KeyAA", 5),
|
||||||
|
?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}),
|
||||||
|
lists:foreach(fun(X) ->
|
||||||
|
{ok, _} = ink_put(Ink1,
|
||||||
|
"KeyZ" ++ integer_to_list(X),
|
||||||
|
crypto:rand_bytes(10000),
|
||||||
|
[]) end,
|
||||||
|
FunnyLoop),
|
||||||
|
{ok, NewSQN2} = ink_put(Ink1, "KeyBB", "TestValueBB", []),
|
||||||
|
?assertMatch(NewSQN2, 54),
|
||||||
|
ok = ink_print_manifest(Ink1),
|
||||||
|
R1 = ink_get(Ink1, "KeyAA", 5),
|
||||||
|
?assertMatch(R1, {{5, "KeyAA"}, {"TestValueAA", []}}),
|
||||||
|
R2 = ink_get(Ink1, "KeyBB", 54),
|
||||||
|
?assertMatch(R2, {{54, "KeyBB"}, {"TestValueBB", []}}),
|
||||||
|
ink_close(Ink1),
|
||||||
|
clean_testdir(RootPath).
|
||||||
|
|
||||||
|
|
||||||
-endif.
|
-endif.
|
Loading…
Add table
Add a link
Reference in a new issue