diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 6e95b53..5c8ae29 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -151,10 +151,8 @@ merge(SrcLevel, Manifest, RootPath) -> Src), {Man0, []}; _ -> - FilePath = leveled_penciller:filepath(RootPath, - NewSQN, - new_merge_files), - perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN) + RootPath = leveled_penciller:sst_rootpath(RootPath), + perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) end. notify_deletions([], _Penciller) -> @@ -199,11 +197,13 @@ do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) -> leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]), Additions; do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) -> - FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst", - [SinkLevel, length(Additions)])), + FileName = leveled_penciller:sst_filename(NewSQN, + SinkLevel, + length(Additions)), leveled_log:log("PC012", [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), - case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of + case leveled_sst:sst_new(RP, FileName, + KL1, KL2, SinkB, SinkLevel, MaxSQN) of empty -> leveled_log:log("PC013", [FileName]), do_merge([], [], @@ -261,49 +261,54 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) -> merge_file_test() -> KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)), - {ok, PidL1_1, _} = leveled_sst:sst_new("../test/KL1_L1.sst", + {ok, PidL1_1, _} = leveled_sst:sst_new("../test/", + "KL1_L1.sst", 1, KL1_L1, undefined), KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)), - {ok, PidL2_1, _} = leveled_sst:sst_new("../test/KL1_L2.sst", + {ok, PidL2_1, _} = leveled_sst:sst_new("../test/", + "KL1_L2.sst", 2, KL1_L2, undefined), KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)), - {ok, PidL2_2, _} = leveled_sst:sst_new("../test/KL2_L2.sst", + {ok, PidL2_2, _} = leveled_sst:sst_new("../test/", + "KL2_L2.sst", 2, KL2_L2, undefined), KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)), - {ok, PidL2_3, _} = leveled_sst:sst_new("../test/KL3_L2.sst", + {ok, PidL2_3, _} = leveled_sst:sst_new("../test/", + "KL3_L2.sst", 2, KL3_L2, undefined), KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)), - {ok, PidL2_4, _} = leveled_sst:sst_new("../test/KL4_L2.sst", + {ok, PidL2_4, _} = leveled_sst:sst_new("../test/", + "KL4_L2.sst", 2, KL4_L2, undefined), E1 = #manifest_entry{owner = PidL1_1, - filename = "../test/KL1_L1.sst", + filename = "./KL1_L1.sst", end_key = lists:last(KL1_L1), start_key = lists:nth(1, KL1_L1)}, E2 = #manifest_entry{owner = PidL2_1, - filename = "../test/KL1_L2.sst", + filename = "./KL1_L2.sst", end_key = lists:last(KL1_L2), start_key = lists:nth(1, KL1_L2)}, E3 = #manifest_entry{owner = PidL2_2, - filename = "../test/KL2_L2.sst", + filename = "./KL2_L2.sst", end_key = lists:last(KL2_L2), start_key = lists:nth(1, KL2_L2)}, E4 = #manifest_entry{owner = PidL2_3, - filename = "../test/KL3_L2.sst", + filename = "./KL3_L2.sst", end_key = lists:last(KL3_L2), start_key = lists:nth(1, KL3_L2)}, E5 = #manifest_entry{owner = PidL2_4, - filename = "../test/KL4_L2.sst", + filename = "./KL4_L2.sst", end_key = lists:last(KL4_L2), start_key = lists:nth(1, KL4_L2)}, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 4bbf6ad..8d89420 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -189,7 +189,10 @@ pcl_getstartupsequencenumber/1]). -export([ - filepath/3, + sst_rootpath/1, + sst_filename/3]). + +-export([ clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -654,6 +657,19 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%============================================================================ +%%% Path functions +%%%============================================================================ + +sst_rootpath(RootPath) -> + FP = RootPath ++ "/" ++ ?FILES_FP, + filelib:ensure_dir(FP ++ "/"), + FP. + +sst_filename(ManSQN, Level, Count) -> + lists:flatten(io_lib:format("~w_~w_~w.sst", [ManSQN, Level, Count])). + + %%%============================================================================ %%% Internal functions %%%============================================================================ @@ -685,7 +701,9 @@ start_from_file(PCLopts) -> Manifest0 = leveled_pmanifest:open_manifest(RootPath), OpenFun = fun(FN) -> - {ok, Pid, {_FK, _LK}} = leveled_sst:sst_open(FN), + {ok, + Pid, + {_FK, _LK}} = leveled_sst:sst_open(sst_rootpath(RootPath), FN), Pid end, SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1, @@ -696,13 +714,12 @@ start_from_file(PCLopts) -> ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1), leveled_log:log("P0035", [ManSQN]), %% Find any L0 files - L0FN = filepath(RootPath, ManSQN + 1, new_merge_files) ++ "_0_0.sst", - case filelib:is_file(L0FN) of + L0FN = sst_filename(ManSQN + 1, 0, 0), + case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of true -> leveled_log:log("P0015", [L0FN]), - {ok, - L0Pid, - {L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN), + L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN), + {ok, L0Pid, {L0StartKey, L0EndKey}} = L0Open, L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid), L0Entry = #manifest_entry{start_key = L0StartKey, end_key = L0EndKey, @@ -786,11 +803,14 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN}, %% where as the Wait of true is used at shutdown roll_memory(State, false) -> - FileName = levelzero_filename(State), + ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, + RootPath = sst_rootpath(State#state.root_path), + FileName = sst_filename(ManSQN, 0, 0), leveled_log:log("P0019", [FileName, State#state.ledger_sqn]), PCL = self(), FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, - R = leveled_sst:sst_newlevelzero(FileName, + R = leveled_sst:sst_newlevelzero(RootPath, + FileName, length(State#state.levelzero_cache), FetchFun, PCL, @@ -798,21 +818,20 @@ roll_memory(State, false) -> {ok, Constructor, _} = R, Constructor; roll_memory(State, true) -> - FileName = levelzero_filename(State), + ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, + RootPath = sst_rootpath(State#state.root_path), + FileName = sst_filename(ManSQN, 0, 0), FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end, KVList = leveled_pmem:to_list(length(State#state.levelzero_cache), FetchFun), - R = leveled_sst:sst_new(FileName, 0, KVList, State#state.ledger_sqn), + R = leveled_sst:sst_new(RootPath, + FileName, + 0, + KVList, + State#state.ledger_sqn), {ok, Constructor, _} = R, Constructor. -levelzero_filename(State) -> - ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(ManSQN) ++ "_0_0", - FileName. - timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) -> SW = os:timestamp(), {R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index), @@ -1089,16 +1108,6 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange, end. -filepath(RootPath, files) -> - FP = RootPath ++ "/" ++ ?FILES_FP, - filelib:ensure_dir(FP ++ "/"), - FP. - -filepath(RootPath, NewMSN, new_merge_files) -> - filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN). - - - %%%============================================================================ %%% Test %%%============================================================================ @@ -1127,8 +1136,8 @@ generate_randomkeys(Count, SQN, Acc) -> clean_testdir(RootPath) -> - clean_subdir(leveled_pmanifest:filepath(RootPath, manifest)), - clean_subdir(filepath(RootPath, files)). + clean_subdir(sst_rootpath(RootPath)), + clean_subdir(filename:join(RootPath, ?MANIFEST_FP)). clean_subdir(DirPath) -> case filelib:is_dir(DirPath) of @@ -1458,18 +1467,19 @@ foldwithimm_simple_test() -> {{o, "Bucket1", "Key6", null}, 7}], AccB). create_file_test() -> - Filename = "../test/new_file.sst", - ok = file:write_file(Filename, term_to_binary("hello")), + {RP, Filename} = {"../test/", "new_file.sst"}, + ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")), KVL = lists:usort(generate_randomkeys(10000)), Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE), FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end, {ok, SP, - noreply} = leveled_sst:sst_newlevelzero(Filename, - 1, - FetchFun, - undefined, - 10000), + noreply} = leveled_sst:sst_newlevelzero(RP, + Filename, + 1, + FetchFun, + undefined, + 10000), lists:foreach(fun(X) -> case checkready(SP) of timeout -> @@ -1482,7 +1492,7 @@ create_file_test() -> io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]), ?assertMatch({o, _, _, _}, StartKey), ?assertMatch({o, _, _, _}, EndKey), - ?assertMatch("../test/new_file.sst", SrcFN), + ?assertMatch("./new_file.sst", SrcFN), ok = leveled_sst:sst_clear(SP), {ok, Bin} = file:read_file("../test/new_file.sst.discarded"), ?assertMatch("hello", binary_to_term(Bin)). diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 1d2b1c0..016af0a 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -90,10 +90,10 @@ delete_pending/2, delete_pending/3]). --export([sst_new/4, - sst_new/6, - sst_newlevelzero/5, - sst_open/1, +-export([sst_new/5, + sst_new/7, + sst_newlevelzero/6, + sst_open/2, sst_get/2, sst_get/3, sst_getkvrange/4, @@ -123,6 +123,7 @@ handle :: file:fd(), sst_timings :: tuple(), penciller :: pid(), + root_path, filename, blockindex_cache}). @@ -131,17 +132,20 @@ %%% API %%%============================================================================ -sst_open(Filename) -> +sst_open(RootPath, Filename) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), - case gen_fsm:sync_send_event(Pid, {sst_open, Filename}, infinity) of + case gen_fsm:sync_send_event(Pid, + {sst_open, RootPath, Filename}, + infinity) of {ok, {SK, EK}} -> {ok, Pid, {SK, EK}} end. -sst_new(Filename, Level, KVList, MaxSQN) -> +sst_new(RootPath, Filename, Level, KVList, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, {sst_new, + RootPath, Filename, Level, KVList, @@ -151,7 +155,7 @@ sst_new(Filename, Level, KVList, MaxSQN) -> {ok, Pid, {SK, EK}} end. -sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> +sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> {{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}), case MergedList of [] -> @@ -160,6 +164,7 @@ sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), case gen_fsm:sync_send_event(Pid, {sst_new, + RootPath, Filename, Level, MergedList, @@ -170,10 +175,11 @@ sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) -> end end. -sst_newlevelzero(Filename, Slots, FetchFun, Penciller, MaxSQN) -> +sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) -> {ok, Pid} = gen_fsm:start(?MODULE, [], []), gen_fsm:send_event(Pid, {sst_newlevelzero, + RootPath, Filename, Slots, FetchFun, @@ -228,14 +234,14 @@ sst_printtimings(Pid) -> init([]) -> {ok, starting, #state{}}. -starting({sst_open, Filename}, _From, State) -> - UpdState = read_file(Filename, State), +starting({sst_open, RootPath, Filename}, _From, State) -> + UpdState = read_file(Filename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, {reply, {ok, {Summary#summary.first_key, Summary#summary.last_key}}, reader, UpdState}; -starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) -> +starting({sst_new, RootPath, Filename, Level, KVList, MaxSQN}, _From, State) -> SW = os:timestamp(), {FirstKey, Length, @@ -247,8 +253,8 @@ starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) -> FirstKey, Length, MaxSQN), - ActualFilename = write_file(Filename, SummaryBin, SlotsBin), - UpdState = read_file(ActualFilename, State), + ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + UpdState = read_file(ActualFilename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, leveled_log:log_timer("SST08", [ActualFilename, Level, Summary#summary.max_sqn], @@ -258,8 +264,8 @@ starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) -> reader, UpdState#state{blockindex_cache = BlockIndex}}. -starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller, MaxSQN}, - State) -> +starting({sst_newlevelzero, RootPath, Filename, + Slots, FetchFun, Penciller, MaxSQN}, State) -> SW = os:timestamp(), KVList = leveled_pmem:to_list(Slots, FetchFun), {FirstKey, @@ -272,8 +278,8 @@ starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller, MaxSQN}, FirstKey, Length, MaxSQN), - ActualFilename = write_file(Filename, SummaryBin, SlotsBin), - UpdState = read_file(ActualFilename, State), + ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin), + UpdState = read_file(ActualFilename, State#state{root_path=RootPath}), Summary = UpdState#state.summary, leveled_log:log_timer("SST08", [ActualFilename, 0, Summary#summary.max_sqn], @@ -357,7 +363,8 @@ delete_pending({get_slots, SlotList}, _From, State) -> delete_pending(close, _From, State) -> leveled_log:log("SST07", [State#state.filename]), ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), + ok = file:delete(filename:join(State#state.root_path, + State#state.filename)), {stop, normal, ok, State}. delete_pending(timeout, State) -> @@ -368,7 +375,8 @@ delete_pending(timeout, State) -> delete_pending(close, State) -> leveled_log:log("SST07", [State#state.filename]), ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), + ok = file:delete(filename:join(State#state.root_path, + State#state.filename)), {stop, normal, State}. handle_sync_event(_Msg, _From, StateName, State) -> @@ -380,6 +388,8 @@ handle_event(_Msg, StateName, State) -> handle_info(_Msg, StateName, State) -> {next_state, StateName, State}. +terminate(normal, delete_pending, _State) -> + ok; terminate(Reason, _StateName, State) -> leveled_log:log("SST04", [Reason, State#state.filename]). @@ -497,31 +507,32 @@ fetch_range(StartKey, EndKey, ScanWidth, State) -> lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint. -write_file(Filename, SummaryBin, SlotsBin) -> +write_file(RootPath, Filename, SummaryBin, SlotsBin) -> SummaryLength = byte_size(SummaryBin), SlotsLength = byte_size(SlotsBin), {PendingName, FinalName} = generate_filenames(Filename), - ok = file:write_file(PendingName, + ok = file:write_file(filename:join(RootPath, PendingName), <>, [raw]), - case filelib:is_file(FinalName) of + case filelib:is_file(filename:join(RootPath, FinalName)) of true -> - AltName = filename:join(filename:dirname(FinalName), - filename:basename(FinalName)) + AltName = filename:join(RootPath, filename:basename(FinalName)) ++ ?DISCARD_EXT, leveled_log:log("SST05", [FinalName, AltName]), - ok = file:rename(FinalName, AltName); + ok = file:rename(filename:join(RootPath, FinalName), AltName); false -> ok end, - file:rename(PendingName, FinalName), + file:rename(filename:join(RootPath, PendingName), + filename:join(RootPath, FinalName)), FinalName. read_file(Filename, State) -> - {Handle, SummaryBin} = open_reader(Filename), + {Handle, SummaryBin} = open_reader(filename:join(State#state.root_path, + Filename)), {Summary, SlotList} = read_table_summary(SummaryBin), SlotCount = length(SlotList), BlockIndexCache = array:new([{size, SlotCount}, {default, none}]), @@ -537,6 +548,7 @@ read_file(Filename, State) -> filename = Filename}. open_reader(Filename) -> + io:format("Reading filename ~s~n", [Filename]), {ok, Handle} = file:open(Filename, [binary, raw, read]), {ok, Lengths} = file:pread(Handle, 0, 8), <> = Lengths, @@ -1364,8 +1376,8 @@ merge_test() -> KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)), KVL3 = lists:ukeymerge(1, KVL1, KVL2), SW0 = os:timestamp(), - {ok, P1, {FK1, LK1}} = sst_new("../test/level1_src", 1, KVL1, 6000), - {ok, P2, {FK2, LK2}} = sst_new("../test/level2_src", 2, KVL2, 3000), + {ok, P1, {FK1, LK1}} = sst_new("../test/", "level1_src", 1, KVL1, 6000), + {ok, P2, {FK2, LK2}} = sst_new("../test/", "level2_src", 2, KVL2, 3000), ExpFK1 = element(1, lists:nth(1, KVL1)), ExpLK1 = element(1, lists:last(KVL1)), ExpFK2 = element(1, lists:nth(1, KVL2)), @@ -1376,7 +1388,8 @@ merge_test() -> ?assertMatch(ExpLK2, LK2), ML1 = [{next, #manifest_entry{owner = P1}, FK1}], ML2 = [{next, #manifest_entry{owner = P2}, FK2}], - {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/level2_merge", + {ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/", + "level2_merge", ML1, ML2, false, @@ -1408,12 +1421,13 @@ merge_test() -> simple_persisted_range_test() -> - Filename = "../test/simple_test", + {RP, Filename} = {"../test/", "simple_test"}, KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 16, 1, 20), KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(Filename, + {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, + Filename, 1, KVList1, length(KVList1)), @@ -1450,12 +1464,13 @@ simple_persisted_range_test() -> simple_persisted_test() -> - Filename = "../test/simple_test", + {RP, Filename} = {"../test/", "simple_test"}, KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 32, 1, 20), KVList1 = lists:ukeysort(1, KVList0), [{FirstKey, _FV}|_Rest] = KVList1, {LastKey, _LV} = lists:last(KVList1), - {ok, Pid, {FirstKey, LastKey}} = sst_new(Filename, + {ok, Pid, {FirstKey, LastKey}} = sst_new(RP, + Filename, 1, KVList1, length(KVList1)), @@ -1558,7 +1573,7 @@ simple_persisted_test() -> ?assertMatch([{Eight000Key, _v800}], FetchedListB4), ok = sst_close(Pid), - ok = file:delete(Filename ++ ".sst"). + ok = file:delete(filename:join(RP, Filename ++ ".sst")). key_dominates_test() -> KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}},