From edfe9e3bed1dd0c73e3e0c98cad3cd2392690f46 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Thu, 8 Sep 2016 14:21:30 +0100 Subject: [PATCH] Improved testing Improve testing of Penciller to show startup and shutdown with push, merging and fetch --- include/leveled.hrl | 6 +- src/leveled_bookie.erl | 74 +++++++++++++++++++++++++ src/leveled_inker.erl | 12 ++-- src/leveled_penciller.erl | 114 +++++++++++++++++++++++++++++++++----- src/leveled_sft.erl | 10 +++- 5 files changed, 193 insertions(+), 23 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index f68a12d..80cdc87 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -26,4 +26,8 @@ -record(inker_options, {cdb_max_size :: integer(), root_path :: string(), - cdb_options :: #cdb_options{}}). \ No newline at end of file + cdb_options :: #cdb_options{}}). + +-record(penciller_options, + {root_path :: string(), + max_inmemory_tablesize :: integer()}). \ No newline at end of file diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0eab5bc..58d24b2 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -90,3 +90,77 @@ -module(leveled_bookie). +-behaviour(gen_server). + +-include("../include/leveled.hrl"). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-include_lib("eunit/include/eunit.hrl"). + + +-record(state, {inker :: pid(), + penciller :: pid()}). + + +%%%============================================================================ +%%% API +%%%============================================================================ + + + +%%%============================================================================ +%%% gen_server callbacks +%%%============================================================================ + +init([Opts]) -> + {InkerOpts, PencillerOpts} = set_options(Opts), + {Inker, Penciller} = startup(InkerOpts, PencillerOpts), + {ok, #state{inker=Inker, penciller=Penciller}}. + + +handle_call(_, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%============================================================================ +%%% Internal functions +%%%============================================================================ + +set_options(_Opts) -> + {#inker_options{}, #penciller_options{}}. + +startup(InkerOpts, PencillerOpts) -> + {ok, Inker} = leveled_inker:ink_start(InkerOpts), + {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), + LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), + KeyChanges = leveled_inker:ink_fetchkeychangesfrom(Inker, LedgerSQN), + ok = leveled_penciller:pcl_pushmem(Penciller, KeyChanges), + {Inker, Penciller}. + + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + +-endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 6513e72..25b839b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -539,7 +539,7 @@ manifest_printer(Manifest) -> -ifdef(TEST). build_dummy_journal() -> - RootPath = "../test/inker", + RootPath = "../test/journal", JournalFP = filepath(RootPath, journal_dir), ManifestFP = filepath(RootPath, manifest_dir), ok = filelib:ensure_dir(RootPath), @@ -577,7 +577,7 @@ clean_subdir(DirPath) -> Files). simple_buildmanifest_test() -> - RootPath = "../test/inker", + RootPath = "../test/journal", build_dummy_journal(), Res = build_manifest(["1.man"], ["nursery_1.cdb", "nursery_3.pnd"], @@ -596,7 +596,7 @@ simple_buildmanifest_test() -> another_buildmanifest_test() -> %% There is a rolled jounral file which is not yet in the manifest - RootPath = "../test/inker", + RootPath = "../test/journal", build_dummy_journal(), FN = filepath(RootPath, 3, new_journal), {ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN), @@ -628,7 +628,7 @@ another_buildmanifest_test() -> empty_buildmanifest_test() -> - RootPath = "../test/inker/", + RootPath = "../test/journal", Res = build_manifest([], [], fun simple_manifest_reader/2, @@ -649,7 +649,7 @@ empty_buildmanifest_test() -> simplejournal_test() -> %% build up a database, and then open it through the gen_server wrap %% Get and Put some keys - RootPath = "../test/inker", + RootPath = "../test/journal", build_dummy_journal(), {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=#cdb_options{}}), @@ -666,7 +666,7 @@ simplejournal_test() -> clean_testdir(RootPath). rollafile_simplejournal_test() -> - RootPath = "../test/inker", + RootPath = "../test/journal", build_dummy_journal(), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 6fe6eac..009cffb 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -197,15 +197,16 @@ levelzero_pending = ?L0PEND_RESET :: tuple(), levelzero_snapshot = [] :: list(), memtable, - backlog = false :: boolean()}). + backlog = false :: boolean(), + memtable_maxsize :: integer}). %%%============================================================================ %%% API %%%============================================================================ -pcl_start(RootDir) -> - gen_server:start(?MODULE, [RootDir], []). +pcl_start(PCLopts) -> + gen_server:start(?MODULE, [PCLopts], []). pcl_pushmem(Pid, DumpList) -> %% Bookie to dump memory onto penciller @@ -236,10 +237,20 @@ pcl_close(Pid) -> %%% gen_server callbacks %%%============================================================================ -init([RootPath]) -> +init([PCLopts]) -> + RootPath = PCLopts#penciller_options.root_path, + MaxTableSize = case PCLopts#penciller_options.max_inmemory_tablesize of + undefined -> + ?MAX_TABLESIZE; + M -> + M + end, TID = ets:new(?MEMTABLE, [ordered_set]), {ok, Clerk} = leveled_clerk:clerk_new(self()), - InitState = #state{memtable=TID, clerk=Clerk, root_path=RootPath}, + InitState = #state{memtable=TID, + clerk=Clerk, + root_path=RootPath, + memtable_maxsize=MaxTableSize}, %% Open manifest ManifestPath = InitState#state.root_path ++ "/" ++ ?MANIFEST_FP ++ "/", @@ -320,7 +331,7 @@ init([RootPath]) -> handle_call({push_mem, DumpList}, _From, State) -> StartWatch = os:timestamp(), Response = case assess_sqn(DumpList) of - {MinSQN, MaxSQN} when MaxSQN > MinSQN, + {MinSQN, MaxSQN} when MaxSQN >= MinSQN, MinSQN >= State#state.ledger_sqn -> io:format("SQN check completed in ~w microseconds~n", [timer:now_diff(os:timestamp(),StartWatch)]), @@ -436,6 +447,10 @@ terminate(_Reason, State) -> file:rename(FileName ++ ".pnd", FileName ++ ".sft"); {?L0PEND_RESET, [], L} when L == 0 -> io:format("No keys to dump from memory when closing~n"); + {{true, L0Pid, _TS}, _, _} -> + leveled_sft:sft_close(L0Pid), + io:format("No opportunity to persist memory before closing " + ++ "with ~w keys discarded~n", [length(Dump)]); _ -> io:format("No opportunity to persist memory before closing " ++ "with ~w keys discarded~n", [length(Dump)]) @@ -481,7 +496,8 @@ push_to_memory(DumpList, State) -> MemoryInsertion = do_push_to_mem(DumpList, TableSize, UpdState#state.memtable, - UpdState#state.levelzero_snapshot), + UpdState#state.levelzero_snapshot, + UpdState#state.memtable_maxsize), case MemoryInsertion of {twist, ApproxTableSize, UpdSnapshot} -> @@ -557,16 +573,16 @@ fetch(Key, Manifest, Level, FetchFun) -> end end. -do_push_to_mem(DumpList, TableSize, MemTable, Snapshot) -> +do_push_to_mem(DumpList, TableSize, MemTable, Snapshot, MaxSize) -> SW = os:timestamp(), UpdSnapshot = lists:append(Snapshot, DumpList), ets:insert(MemTable, DumpList), io:format("Push into memory timed at ~w microseconds~n", [timer:now_diff(os:timestamp(), SW)]), case TableSize + length(DumpList) of - ApproxTableSize when ApproxTableSize > ?MAX_TABLESIZE -> + ApproxTableSize when ApproxTableSize > MaxSize -> case ets:info(MemTable, size) of - ActTableSize when ActTableSize > ?MAX_TABLESIZE -> + ActTableSize when ActTableSize > MaxSize -> {roll, ActTableSize, UpdSnapshot}; ActTableSize -> io:format("Table size is actually ~w~n", [ActTableSize]), @@ -769,14 +785,19 @@ rename_manifest_files(RootPath, NewMSN) -> file:rename(filepath(RootPath, NewMSN, pending_manifest), filepath(RootPath, NewMSN, current_manifest)). +filepath(RootPath, manifest) -> + RootPath ++ "/" ++ ?MANIFEST_FP; +filepath(RootPath, files) -> + RootPath ++ "/" ++ ?FILES_FP. + filepath(RootPath, NewMSN, pending_manifest) -> - RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_" + filepath(RootPath, manifest) ++ "/" ++ "nonzero_" ++ integer_to_list(NewMSN) ++ "." ++ ?PENDING_FILEX; filepath(RootPath, NewMSN, current_manifest) -> - RootPath ++ "/" ++ ?MANIFEST_FP ++ "/" ++ "nonzero_" + filepath(RootPath, manifest) ++ "/" ++ "nonzero_" ++ integer_to_list(NewMSN) ++ "." ++ ?CURRENT_FILEX; filepath(RootPath, NewMSN, new_merge_files) -> - RootPath ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(NewMSN). + filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN). update_deletions([], _NewMSN, UnreferencedFiles) -> UnreferencedFiles; @@ -822,6 +843,24 @@ assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> %%% Test %%%============================================================================ +-ifdef(TEST). + +clean_testdir(RootPath) -> + clean_subdir(filepath(RootPath, manifest)), + clean_subdir(filepath(RootPath, files)). + +clean_subdir(DirPath) -> + case filelib:is_dir(DirPath) of + true -> + {ok, Files} = file:list_dir(DirPath), + lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)), + io:format("Delete file ~s/~s~n", + [DirPath, FN]) + end, + Files); + false -> + ok + end. compaction_work_assessment_test() -> L0 = [{{o, "B1", "K1"}, {o, "B3", "K3"}, dummy_pid}], @@ -854,4 +893,51 @@ confirm_delete_test() -> ?assertMatch(R2, false), RegisteredIterators3 = [{dummy_pid, 9}, {dummy_pid, 12}], R3 = confirm_delete(Filename, UnreferencedFiles, RegisteredIterators3), - ?assertMatch(R3, false). \ No newline at end of file + ?assertMatch(R3, false). + + +simple_server_test() -> + RootPath = "../test/ledger", + clean_testdir(RootPath), + {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000}), + Key1 = {{o,"Bucket0001", "Key0001"},1, {active, infinity}, null}, + KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})), + Key2 = {{o,"Bucket0002", "Key0002"},1002, {active, infinity}, null}, + KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})), + Key3 = {{o,"Bucket0003", "Key0003"},2002, {active, infinity}, null}, + ok = pcl_pushmem(PCL, [Key1]), + R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), + ?assertMatch(R1, Key1), + ok = pcl_pushmem(PCL, KL1), + R2 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), + ?assertMatch(R2, Key1), + S1 = pcl_pushmem(PCL, [Key2]), + if S1 == pause -> timer:sleep(2); true -> ok end, + R3 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), + R4 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}), + ?assertMatch(R3, Key1), + ?assertMatch(R4, Key2), + S2 = pcl_pushmem(PCL, KL2), + if S2 == pause -> timer:sleep(2000); true -> ok end, + S3 = pcl_pushmem(PCL, [Key3]), + if S3 == pause -> timer:sleep(2000); true -> ok end, + R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), + R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}), + R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}), + ?assertMatch(R5, Key1), + ?assertMatch(R6, Key2), + ?assertMatch(R7, Key3), + ok = pcl_close(PCL), + {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, + max_inmemory_tablesize=1000}), + R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), + R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), + R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), + ?assertMatch(R8, Key1), + ?assertMatch(R9, Key2), + ?assertMatch(R10, Key3), + ok = pcl_close(PCLr), + clean_testdir(RootPath). + +-endif. \ No newline at end of file diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 12e36e5..d420829 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -200,7 +200,7 @@ filter_pointer :: integer(), summ_pointer :: integer(), summ_length :: integer(), - filename :: string(), + filename = "not set" :: string(), handle :: file:fd(), background_complete = false :: boolean(), background_failure = "Unknown" :: string(), @@ -387,7 +387,12 @@ terminate(Reason, State) -> ok = file:close(State#state.handle), ok = file:delete(State#state.filename); _ -> - ok = file:close(State#state.handle) + case State#state.handle of + undefined -> + ok; + Handle -> + file:close(Handle) + end end. code_change(_OldVsn, State, _Extra) -> @@ -520,6 +525,7 @@ complete_file(Handle, FileMD, KL1, KL2, Level, Rename) -> false -> open_file(FileMD); {true, OldName, NewName} -> + io:format("Renaming file from ~s to ~s~n", [OldName, NewName]), ok = file:rename(OldName, NewName), open_file(FileMD#state{filename=NewName}) end,