Improved testing

Improve testing of Penciller to show startup and shutdown with push,
merging and fetch
This commit is contained in:
martinsumner 2016-09-08 14:21:30 +01:00
parent 0d905639be
commit edfe9e3bed
5 changed files with 193 additions and 23 deletions

View file

@ -26,4 +26,8 @@
-record(inker_options,
{cdb_max_size :: integer(),
root_path :: string(),
cdb_options :: #cdb_options{}}).
cdb_options :: #cdb_options{}}).
-record(penciller_options,
{root_path :: string(),
max_inmemory_tablesize :: integer()}).

View file

@ -90,3 +90,77 @@
-module(leveled_bookie).
-behaviour(gen_server).
-include("../include/leveled.hrl").
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-include_lib("eunit/include/eunit.hrl").
-record(state, {inker :: pid(),
penciller :: pid()}).
%%%============================================================================
%%% API
%%%============================================================================
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
init([Opts]) ->
{InkerOpts, PencillerOpts} = set_options(Opts),
{Inker, Penciller} = startup(InkerOpts, PencillerOpts),
{ok, #state{inker=Inker, penciller=Penciller}}.
handle_call(_, _From, State) ->
{reply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%============================================================================
%%% Internal functions
%%%============================================================================
set_options(_Opts) ->
{#inker_options{}, #penciller_options{}}.
startup(InkerOpts, PencillerOpts) ->
{ok, Inker} = leveled_inker:ink_start(InkerOpts),
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
KeyChanges = leveled_inker:ink_fetchkeychangesfrom(Inker, LedgerSQN),
ok = leveled_penciller:pcl_pushmem(Penciller, KeyChanges),
{Inker, Penciller}.
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
-endif.

View file

@ -539,7 +539,7 @@ manifest_printer(Manifest) ->
-ifdef(TEST).
build_dummy_journal() ->
RootPath = "../test/inker",
RootPath = "../test/journal",
JournalFP = filepath(RootPath, journal_dir),
ManifestFP = filepath(RootPath, manifest_dir),
ok = filelib:ensure_dir(RootPath),
@ -577,7 +577,7 @@ clean_subdir(DirPath) ->
Files).
simple_buildmanifest_test() ->
RootPath = "../test/inker",
RootPath = "../test/journal",
build_dummy_journal(),
Res = build_manifest(["1.man"],
["nursery_1.cdb", "nursery_3.pnd"],
@ -596,7 +596,7 @@ simple_buildmanifest_test() ->
another_buildmanifest_test() ->
%% There is a rolled jounral file which is not yet in the manifest
RootPath = "../test/inker",
RootPath = "../test/journal",
build_dummy_journal(),
FN = filepath(RootPath, 3, new_journal),
{ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN),
@ -628,7 +628,7 @@ another_buildmanifest_test() ->
empty_buildmanifest_test() ->
RootPath = "../test/inker/",
RootPath = "../test/journal",
Res = build_manifest([],
[],
fun simple_manifest_reader/2,
@ -649,7 +649,7 @@ empty_buildmanifest_test() ->
simplejournal_test() ->
%% build up a database, and then open it through the gen_server wrap
%% Get and Put some keys
RootPath = "../test/inker",
RootPath = "../test/journal",
build_dummy_journal(),
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
cdb_options=#cdb_options{}}),
@ -666,7 +666,7 @@ simplejournal_test() ->
clean_testdir(RootPath).
rollafile_simplejournal_test() ->
RootPath = "../test/inker",
RootPath = "../test/journal",
build_dummy_journal(),
CDBopts = #cdb_options{max_size=300000},
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,

View file

@ -197,15 +197,16 @@
levelzero_pending = ?L0PEND_RESET :: tuple(),
levelzero_snapshot = [] :: list(),
memtable,
backlog = false :: boolean()}).
backlog = false :: boolean(),
memtable_maxsize :: integer}).
%%%============================================================================
%%% API
%%%============================================================================
pcl_start(RootDir) ->
gen_server:start(?MODULE, [RootDir], []).
pcl_start(PCLopts) ->
gen_server:start(?MODULE, [PCLopts], []).
pcl_pushmem(Pid, DumpList) ->
%% Bookie to dump memory onto penciller
@ -236,10 +237,20 @@ pcl_close(Pid) ->
%%% gen_server callbacks
%%%============================================================================
init([RootPath]) ->
init([PCLopts]) ->
RootPath = PCLopts#penciller_options.root_path,
MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of
undefined ->
?MAX_TABLESIZE;
M ->
M
end,
TID = ets:new(?MEMTABLE, [ordered_set]),
{ok, Clerk} = leveled_clerk:clerk_new(self()),
InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath},
InitState = #state{memtable=TID,
clerk=Clerk,
root_path=RootPath,
memtable_maxsize=MaxTableSize},
%% Open manifest
ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/",
@ -320,7 +331,7 @@ init([RootPath]) ->
handle_call({push_mem, DumpList}, _From, State) ->
StartWatch = os:timestamp(),
Response = case assess_sqn(DumpList) of
{MinSQN, MaxSQN} when MaxSQN > MinSQN,
{MinSQN, MaxSQN} when MaxSQN >= MinSQN,
MinSQN >= State#state.ledger_sqn ->
io:format("SQN check completed in ~w microseconds~n",
[timer:now_diff(os:timestamp(),StartWatch)]),
@ -436,6 +447,10 @@ terminate(_Reason, State) ->
file:rename(FileName ++ ".pnd", FileName ++ ".sft");
{?L0PEND_RESET, [], L} when L == 0 ->
io:format("No keys to dump from memory when closing~n");
{{true, L0Pid, _TS}, _, _} ->
leveled_sft:sft_close(L0Pid),
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)]);
_ ->
io:format("No opportunity to persist memory before closing "
++ "with ~w keys discarded~n", [length(Dump)])
@ -481,7 +496,8 @@ push_to_memory(DumpList, State) ->
MemoryInsertion = do_push_to_mem(DumpList,
TableSize,
UpdState#state.memtable,
UpdState#state.levelzero_snapshot),
UpdState#state.levelzero_snapshot,
UpdState#state.memtable_maxsize),
case MemoryInsertion of
{twist, ApproxTableSize, UpdSnapshot} ->
@ -557,16 +573,16 @@ fetch(Key, Manifest, Level, FetchFun) ->
end
end.
do_push_to_mem(DumpList, TableSize, MemTable, Snapshot) ->
do_push_to_mem(DumpList, TableSize, MemTable, Snapshot, MaxSize) ->
SW = os:timestamp(),
UpdSnapshot = lists:append(Snapshot, DumpList),
ets:insert(MemTable, DumpList),
io:format("Push into memory timed at ~w microseconds~n",
[timer:now_diff(os:timestamp(), SW)]),
case TableSize + length(DumpList) of
ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE ->
ApproxTableSize when ApproxTableSize > MaxSize ->
case ets:info(MemTable, size) of
ActTableSize when ActTableSize > ?MAX_TABLESIZE ->
ActTableSize when ActTableSize > MaxSize ->
{roll, ActTableSize, UpdSnapshot};
ActTableSize ->
io:format("Table size is actually ~w~n", [ActTableSize]),
@ -769,14 +785,19 @@ rename_manifest_files(RootPath, NewMSN) ->
file:rename(filepath(RootPath, NewMSN, pending_manifest),
filepath(RootPath, NewMSN, current_manifest)).
filepath(RootPath, manifest) ->
RootPath ++ "/" ++ ?MANIFEST_FP;
filepath(RootPath, files) ->
RootPath ++ "/" ++ ?FILES_FP.
filepath(RootPath, NewMSN, pending_manifest) ->
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_"
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX;
filepath(RootPath, NewMSN, current_manifest) ->
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_"
filepath(RootPath, manifest) ++ "/" ++ "nonzero_"
++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX;
filepath(RootPath, NewMSN, new_merge_files) ->
RootPath ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(NewMSN).
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
update_deletions([], _NewMSN, UnreferencedFiles) ->
UnreferencedFiles;
@ -822,6 +843,24 @@ assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) ->
%%% Test
%%%============================================================================
-ifdef(TEST).
clean_testdir(RootPath) ->
clean_subdir(filepath(RootPath, manifest)),
clean_subdir(filepath(RootPath, files)).
clean_subdir(DirPath) ->
case filelib:is_dir(DirPath) of
true ->
{ok, Files} = file:list_dir(DirPath),
lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)),
io:format("Delete file ~s/~s~n",
[DirPath, FN])
end,
Files);
false ->
ok
end.
compaction_work_assessment_test() ->
L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}],
@ -854,4 +893,51 @@ confirm_delete_test() ->
?assertMatch(R2, false),
RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}],
R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3),
?assertMatch(R3, false).
?assertMatch(R3, false).
simple_server_test() ->
RootPath = "../test/ledger",
clean_testdir(RootPath),
{ok, PCL} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
Key1 = {{o,"Bucket0001", "Key0001"},1, {active, infinity}, null},
KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})),
Key2 = {{o,"Bucket0002", "Key0002"},1002, {active, infinity}, null},
KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})),
Key3 = {{o,"Bucket0003", "Key0003"},2002, {active, infinity}, null},
ok = pcl_pushmem(PCL, [Key1]),
R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
?assertMatch(R1, Key1),
ok = pcl_pushmem(PCL, KL1),
R2 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
?assertMatch(R2, Key1),
S1 = pcl_pushmem(PCL, [Key2]),
if S1 == pause -> timer:sleep(2); true -> ok end,
R3 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
R4 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
?assertMatch(R3, Key1),
?assertMatch(R4, Key2),
S2 = pcl_pushmem(PCL, KL2),
if S2 == pause -> timer:sleep(2000); true -> ok end,
S3 = pcl_pushmem(PCL, [Key3]),
if S3 == pause -> timer:sleep(2000); true -> ok end,
R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}),
R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}),
?assertMatch(R5, Key1),
?assertMatch(R6, Key2),
?assertMatch(R7, Key3),
ok = pcl_close(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
?assertMatch(R8, Key1),
?assertMatch(R9, Key2),
?assertMatch(R10, Key3),
ok = pcl_close(PCLr),
clean_testdir(RootPath).
-endif.

View file

@ -200,7 +200,7 @@
filter_pointer :: integer(),
summ_pointer :: integer(),
summ_length :: integer(),
filename :: string(),
filename = "not set" :: string(),
handle :: file:fd(),
background_complete = false :: boolean(),
background_failure = "Unknown" :: string(),
@ -387,7 +387,12 @@ terminate(Reason, State) ->
ok = file:close(State#state.handle),
ok = file:delete(State#state.filename);
_ ->
ok = file:close(State#state.handle)
case State#state.handle of
undefined ->
ok;
Handle ->
file:close(Handle)
end
end.
code_change(_OldVsn, State, _Extra) ->
@ -520,6 +525,7 @@ complete_file(Handle, FileMD, KL1, KL2, Level, Rename) ->
false ->
open_file(FileMD);
{true, OldName, NewName} ->
io:format("Renaming file from ~s to ~s~n", [OldName, NewName]),
ok = file:rename(OldName, NewName),
open_file(FileMD#state{filename=NewName})
end,