diff --git a/include/leveled.hrl b/include/leveled.hrl index 209a9b7..9b32415 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -66,6 +66,7 @@ {root_path :: string(), cache_size :: integer(), max_journalsize :: integer(), + max_pencillercachesize :: integer(), snapshot_bookie :: pid(), reload_strategy = [] :: list(), max_run_length :: integer()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ee63357..45a1884 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -514,7 +514,7 @@ set_options(Opts) -> AltStrategy = Opts#bookie_options.reload_strategy, ReloadStrategy = leveled_codec:inker_reload_strategy(AltStrategy), - + PCLL0CacheSize = Opts#bookie_options.max_pencillercachesize, JournalFP = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP, LedgerFP = Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP, {#inker_options{root_path = JournalFP, @@ -522,7 +522,8 @@ set_options(Opts) -> max_run_length = Opts#bookie_options.max_run_length, cdb_options = #cdb_options{max_size=MaxJournalSize, binary_mode=true}}, - #penciller_options{root_path = LedgerFP}}. + #penciller_options{root_path = LedgerFP, + max_inmemory_tablesize = PCLL0CacheSize}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 17554d9..18459f9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -223,6 +223,7 @@ code_change/3, pcl_start/1, pcl_pushmem/2, + pcl_fetchlevelzero/2, pcl_fetch/2, pcl_fetchkeys/5, pcl_checksequencenumber/3, @@ -287,6 +288,9 @@ pcl_start(PCLopts) -> pcl_pushmem(Pid, DumpList) -> %% Bookie to dump memory onto penciller gen_server:call(Pid, {push_mem, DumpList}, infinity). + +pcl_fetchlevelzero(Pid, Slot) -> + gen_server:call(Pid, {fetch_levelzero, Slot}, infinity). pcl_fetch(Pid, Key) -> gen_server:call(Pid, {fetch, Key}, infinity). @@ -471,6 +475,8 @@ handle_call({load_snapshot, BookieIncrTree}, _From, State) -> levelzero_size=L0Size, ledger_sqn=LedgerSQN, snapshot_fully_loaded=true}}; +handle_call({fetch_levelzero, Slot}, _From, State) -> + {reply, lists:nth(Slot, State#state.levelzero_cache), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -553,7 +559,7 @@ terminate(Reason, State) -> {false, [], 0} -> io:format("Level 0 cache empty at close of Penciller~n"); {false, [], _N} -> - L0Pid = roll_memory(UpdState, State#state.levelzero_cache, true), + L0Pid = roll_memory(UpdState, true), ok = leveled_sft:sft_close(L0Pid); _ -> io:format("No level zero action on close of Penciller~n") @@ -696,7 +702,7 @@ update_levelzero(L0Index, L0Size, PushedTree, LedgerSQN, L0Cache, State) -> Level0Free = length(get_item(0, State#state.manifest, [])) == 0, case {CacheTooBig, Level0Free} of {true, true} -> - L0Constructor = roll_memory(State, UpdL0Cache), + L0Constructor = roll_memory(UpdState, false), UpdState#state{levelzero_pending=true, levelzero_constructor=L0Constructor}; _ -> @@ -719,19 +725,46 @@ checkready(Pid) -> timeout end. -roll_memory(State, L0Cache) -> - roll_memory(State, L0Cache, false). +%% Casting a large object (the levelzero cache) to the gen_server did not lead +%% to an immediate return as expected. With 32K keys in the TreeList it could +%% take around 35-40ms. +%% +%% To avoid blocking this gen_server, the SFT file cna request each item of the +%% cache one at a time. +%% +%% The Wait is set to false to use a cast when calling this in normal operation +%% where as the Wait of true is used at shutdown -roll_memory(State, L0Cache, Wait) -> +roll_memory(State, false) -> + FileName = levelzero_filename(State), + io:format("Rolling level zero to file ~s~n", [FileName]), + Opts = #sft_options{wait=false}, + PCL = self(), + FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, + % FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, + R = leveled_sft:sft_newfroml0cache(FileName, + length(State#state.levelzero_cache), + FetchFun, + Opts), + {ok, Constructor, _} = R, + Constructor; +roll_memory(State, true) -> + FileName = levelzero_filename(State), + Opts = #sft_options{wait=true}, + FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, + R = leveled_sft:sft_newfroml0cache(FileName, + length(State#state.levelzero_cache), + FetchFun, + Opts), + {ok, Constructor, _} = R, + Constructor. + +levelzero_filename(State) -> MSN = State#state.manifest_sqn, FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", - Opts = #sft_options{wait=Wait}, - {ok, Constructor, _} = leveled_sft:sft_newfroml0cache(FileName, - L0Cache, - Opts), - Constructor. + FileName. fetch_mem(Key, Manifest, L0Index, L0Cache) -> @@ -1636,10 +1669,12 @@ create_file_test() -> ok = file:write_file(Filename, term_to_binary("hello")), KVL = lists:usort(leveled_sft:generate_randomkeys(10000)), Tree = gb_trees:from_orddict(KVL), + FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, {ok, SP, noreply} = leveled_sft:sft_newfroml0cache(Filename, - [Tree], + 1, + FetchFun, #sft_options{wait=false}), lists:foreach(fun(X) -> case checkready(SP) of diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index ae7bf09..e7b218a 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -38,7 +38,7 @@ -export([ add_to_index/5, - to_list/1, + to_list/2, new_index/0, check_levelzero/3, merge_trees/4 @@ -80,17 +80,19 @@ add_to_index(L0Index, L0Size, LevelMinus1, LedgerSQN, TreeList) -> end. -to_list(TreeList) -> +to_list(Slots, FetchFun) -> SW = os:timestamp(), - OutList = lists:foldr(fun(Tree, CompleteList) -> + SlotList = lists:reverse(lists:seq(1, Slots)), + FullList = lists:foldl(fun(Slot, Acc) -> + Tree = FetchFun(Slot), L = gb_trees:to_list(Tree), - lists:ukeymerge(1, CompleteList, L) + lists:ukeymerge(1, Acc, L) end, [], - TreeList), + SlotList), io:format("L0 cache converted to list of size ~w in ~w microseconds~n", - [length(OutList), timer:now_diff(os:timestamp(), SW)]), - OutList. + [length(FullList), timer:now_diff(os:timestamp(), SW)]), + FullList. new_index() -> @@ -237,7 +239,8 @@ compare_method_test() -> StartKey = {o, "Bucket0100", null, null}, EndKey = {o, "Bucket0200", null, null}, SWa = os:timestamp(), - DumpList = to_list(TreeList), + FetchFun = fun(Slot) -> lists:nth(Slot, TreeList) end, + DumpList = to_list(length(TreeList), FetchFun), Q0 = lists:foldl(fun({K, V}, Acc) -> P = leveled_codec:endkey_passed(EndKey, K), case {K, P} of diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 0004d0a..8f4a870 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -152,7 +152,7 @@ terminate/2, code_change/3, sft_new/4, - sft_newfroml0cache/3, + sft_newfroml0cache/4, sft_open/1, sft_get/2, sft_getkvrange/4, @@ -228,11 +228,11 @@ sft_new(Filename, KL1, KL2, LevelInfo) -> infinity), {ok, Pid, Reply}. -sft_newfroml0cache(Filename, L0Cache, Options) -> +sft_newfroml0cache(Filename, Slots, FetchFun, Options) -> {ok, Pid} = gen_server:start(?MODULE, [], []), case Options#sft_options.wait of true -> - KL1 = leveled_pmem:to_list(L0Cache), + KL1 = leveled_pmem:to_list(Slots, FetchFun), Reply = gen_server:call(Pid, {sft_new, Filename, @@ -243,10 +243,10 @@ sft_newfroml0cache(Filename, L0Cache, Options) -> {ok, Pid, Reply}; false -> gen_server:cast(Pid, - {sft_newfromcache, + {sft_newfroml0cache, Filename, - L0Cache, - #level{level=0}}), + Slots, + FetchFun}), {ok, Pid, noreply} end. @@ -355,10 +355,9 @@ handle_call({set_for_delete, Penciller}, _From, State) -> handle_call(get_maxsqn, _From, State) -> statecheck_onreply(State#state.highest_sqn, State). -handle_cast({sft_newfromcache, Filename, L0Cache, _LevelR=#level{level=L}}, - _State) when L == 0-> +handle_cast({sft_newfroml0cache, Filename, Slots, FetchFun}, _State) -> SW = os:timestamp(), - Inp1 = leveled_pmem:to_list(L0Cache), + Inp1 = leveled_pmem:to_list(Slots, FetchFun), {ok, State} = create_levelzero(Inp1, Filename), io:format("File creation of L0 file ~s took ~w microseconds~n", [Filename, timer:now_diff(os:timestamp(), SW)]), diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 16fcd0f..442390c 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -69,14 +69,16 @@ simple_put_fetch_head_delete(_Config) -> many_put_fetch_head(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = #bookie_options{root_path=RootPath}, + StartOpts1 = #bookie_options{root_path=RootPath, + max_pencillercachesize=16000}, {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), ok = leveled_bookie:book_close(Bookie1), StartOpts2 = #bookie_options{root_path=RootPath, - max_journalsize=1000000000}, + max_journalsize=1000000000, + max_pencillercachesize=32000}, {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), GenList = [2, 20002, 40002, 60002, 80002,