Change SST reference to split filename
The manifest and the logs are bloated by having the full file path for every filename in there - given the root path is constant. Could also cause issues if the mount point is ever changed.
This commit is contained in:
parent
3582f4bc34
commit
4c59342600
3 changed files with 121 additions and 91 deletions
|
@ -151,10 +151,8 @@ merge(SrcLevel, Manifest, RootPath) ->
|
||||||
Src),
|
Src),
|
||||||
{Man0, []};
|
{Man0, []};
|
||||||
_ ->
|
_ ->
|
||||||
FilePath = leveled_penciller:filepath(RootPath,
|
RootPath = leveled_penciller:sst_rootpath(RootPath),
|
||||||
NewSQN,
|
perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN)
|
||||||
new_merge_files),
|
|
||||||
perform_merge(Manifest, Src, SinkList, SrcLevel, FilePath, NewSQN)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
notify_deletions([], _Penciller) ->
|
notify_deletions([], _Penciller) ->
|
||||||
|
@ -199,11 +197,13 @@ do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Additions) ->
|
||||||
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
|
leveled_log:log("PC011", [NewSQN, SinkLevel, length(Additions)]),
|
||||||
Additions;
|
Additions;
|
||||||
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
|
do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, Additions) ->
|
||||||
FileName = lists:flatten(io_lib:format(RP ++ "_~w_~w.sst",
|
FileName = leveled_penciller:sst_filename(NewSQN,
|
||||||
[SinkLevel, length(Additions)])),
|
SinkLevel,
|
||||||
|
length(Additions)),
|
||||||
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
|
leveled_log:log("PC012", [NewSQN, FileName, SinkB]),
|
||||||
TS1 = os:timestamp(),
|
TS1 = os:timestamp(),
|
||||||
case leveled_sst:sst_new(FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN) of
|
case leveled_sst:sst_new(RP, FileName,
|
||||||
|
KL1, KL2, SinkB, SinkLevel, MaxSQN) of
|
||||||
empty ->
|
empty ->
|
||||||
leveled_log:log("PC013", [FileName]),
|
leveled_log:log("PC013", [FileName]),
|
||||||
do_merge([], [],
|
do_merge([], [],
|
||||||
|
@ -261,49 +261,54 @@ generate_randomkeys(Count, Acc, BucketLow, BRange) ->
|
||||||
|
|
||||||
merge_file_test() ->
|
merge_file_test() ->
|
||||||
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
|
KL1_L1 = lists:sort(generate_randomkeys(8000, 0, 1000)),
|
||||||
{ok, PidL1_1, _} = leveled_sst:sst_new("../test/KL1_L1.sst",
|
{ok, PidL1_1, _} = leveled_sst:sst_new("../test/",
|
||||||
|
"KL1_L1.sst",
|
||||||
1,
|
1,
|
||||||
KL1_L1,
|
KL1_L1,
|
||||||
undefined),
|
undefined),
|
||||||
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
|
KL1_L2 = lists:sort(generate_randomkeys(8000, 0, 250)),
|
||||||
{ok, PidL2_1, _} = leveled_sst:sst_new("../test/KL1_L2.sst",
|
{ok, PidL2_1, _} = leveled_sst:sst_new("../test/",
|
||||||
|
"KL1_L2.sst",
|
||||||
2,
|
2,
|
||||||
KL1_L2,
|
KL1_L2,
|
||||||
undefined),
|
undefined),
|
||||||
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
|
KL2_L2 = lists:sort(generate_randomkeys(8000, 250, 250)),
|
||||||
{ok, PidL2_2, _} = leveled_sst:sst_new("../test/KL2_L2.sst",
|
{ok, PidL2_2, _} = leveled_sst:sst_new("../test/",
|
||||||
|
"KL2_L2.sst",
|
||||||
2,
|
2,
|
||||||
KL2_L2,
|
KL2_L2,
|
||||||
undefined),
|
undefined),
|
||||||
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
|
KL3_L2 = lists:sort(generate_randomkeys(8000, 500, 250)),
|
||||||
{ok, PidL2_3, _} = leveled_sst:sst_new("../test/KL3_L2.sst",
|
{ok, PidL2_3, _} = leveled_sst:sst_new("../test/",
|
||||||
|
"KL3_L2.sst",
|
||||||
2,
|
2,
|
||||||
KL3_L2,
|
KL3_L2,
|
||||||
undefined),
|
undefined),
|
||||||
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
|
KL4_L2 = lists:sort(generate_randomkeys(8000, 750, 250)),
|
||||||
{ok, PidL2_4, _} = leveled_sst:sst_new("../test/KL4_L2.sst",
|
{ok, PidL2_4, _} = leveled_sst:sst_new("../test/",
|
||||||
|
"KL4_L2.sst",
|
||||||
2,
|
2,
|
||||||
KL4_L2,
|
KL4_L2,
|
||||||
undefined),
|
undefined),
|
||||||
|
|
||||||
E1 = #manifest_entry{owner = PidL1_1,
|
E1 = #manifest_entry{owner = PidL1_1,
|
||||||
filename = "../test/KL1_L1.sst",
|
filename = "./KL1_L1.sst",
|
||||||
end_key = lists:last(KL1_L1),
|
end_key = lists:last(KL1_L1),
|
||||||
start_key = lists:nth(1, KL1_L1)},
|
start_key = lists:nth(1, KL1_L1)},
|
||||||
E2 = #manifest_entry{owner = PidL2_1,
|
E2 = #manifest_entry{owner = PidL2_1,
|
||||||
filename = "../test/KL1_L2.sst",
|
filename = "./KL1_L2.sst",
|
||||||
end_key = lists:last(KL1_L2),
|
end_key = lists:last(KL1_L2),
|
||||||
start_key = lists:nth(1, KL1_L2)},
|
start_key = lists:nth(1, KL1_L2)},
|
||||||
E3 = #manifest_entry{owner = PidL2_2,
|
E3 = #manifest_entry{owner = PidL2_2,
|
||||||
filename = "../test/KL2_L2.sst",
|
filename = "./KL2_L2.sst",
|
||||||
end_key = lists:last(KL2_L2),
|
end_key = lists:last(KL2_L2),
|
||||||
start_key = lists:nth(1, KL2_L2)},
|
start_key = lists:nth(1, KL2_L2)},
|
||||||
E4 = #manifest_entry{owner = PidL2_3,
|
E4 = #manifest_entry{owner = PidL2_3,
|
||||||
filename = "../test/KL3_L2.sst",
|
filename = "./KL3_L2.sst",
|
||||||
end_key = lists:last(KL3_L2),
|
end_key = lists:last(KL3_L2),
|
||||||
start_key = lists:nth(1, KL3_L2)},
|
start_key = lists:nth(1, KL3_L2)},
|
||||||
E5 = #manifest_entry{owner = PidL2_4,
|
E5 = #manifest_entry{owner = PidL2_4,
|
||||||
filename = "../test/KL4_L2.sst",
|
filename = "./KL4_L2.sst",
|
||||||
end_key = lists:last(KL4_L2),
|
end_key = lists:last(KL4_L2),
|
||||||
start_key = lists:nth(1, KL4_L2)},
|
start_key = lists:nth(1, KL4_L2)},
|
||||||
|
|
||||||
|
|
|
@ -189,7 +189,10 @@
|
||||||
pcl_getstartupsequencenumber/1]).
|
pcl_getstartupsequencenumber/1]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
filepath/3,
|
sst_rootpath/1,
|
||||||
|
sst_filename/3]).
|
||||||
|
|
||||||
|
-export([
|
||||||
clean_testdir/1]).
|
clean_testdir/1]).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
@ -654,6 +657,19 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
|
|
||||||
|
%%%============================================================================
|
||||||
|
%%% Path functions
|
||||||
|
%%%============================================================================
|
||||||
|
|
||||||
|
sst_rootpath(RootPath) ->
|
||||||
|
FP = RootPath ++ "/" ++ ?FILES_FP,
|
||||||
|
filelib:ensure_dir(FP ++ "/"),
|
||||||
|
FP.
|
||||||
|
|
||||||
|
sst_filename(ManSQN, Level, Count) ->
|
||||||
|
lists:flatten(io_lib:format("~w_~w_~w.sst", [ManSQN, Level, Count])).
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -685,7 +701,9 @@ start_from_file(PCLopts) ->
|
||||||
Manifest0 = leveled_pmanifest:open_manifest(RootPath),
|
Manifest0 = leveled_pmanifest:open_manifest(RootPath),
|
||||||
OpenFun =
|
OpenFun =
|
||||||
fun(FN) ->
|
fun(FN) ->
|
||||||
{ok, Pid, {_FK, _LK}} = leveled_sst:sst_open(FN),
|
{ok,
|
||||||
|
Pid,
|
||||||
|
{_FK, _LK}} = leveled_sst:sst_open(sst_rootpath(RootPath), FN),
|
||||||
Pid
|
Pid
|
||||||
end,
|
end,
|
||||||
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
|
SQNFun = fun leveled_sst:sst_getmaxsequencenumber/1,
|
||||||
|
@ -696,13 +714,12 @@ start_from_file(PCLopts) ->
|
||||||
ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1),
|
ManSQN = leveled_pmanifest:get_manifest_sqn(Manifest1),
|
||||||
leveled_log:log("P0035", [ManSQN]),
|
leveled_log:log("P0035", [ManSQN]),
|
||||||
%% Find any L0 files
|
%% Find any L0 files
|
||||||
L0FN = filepath(RootPath, ManSQN + 1, new_merge_files) ++ "_0_0.sst",
|
L0FN = sst_filename(ManSQN + 1, 0, 0),
|
||||||
case filelib:is_file(L0FN) of
|
case filelib:is_file(filename:join(sst_rootpath(RootPath), L0FN)) of
|
||||||
true ->
|
true ->
|
||||||
leveled_log:log("P0015", [L0FN]),
|
leveled_log:log("P0015", [L0FN]),
|
||||||
{ok,
|
L0Open = leveled_sst:sst_open(sst_rootpath(RootPath), L0FN),
|
||||||
L0Pid,
|
{ok, L0Pid, {L0StartKey, L0EndKey}} = L0Open,
|
||||||
{L0StartKey, L0EndKey}} = leveled_sst:sst_open(L0FN),
|
|
||||||
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
|
L0SQN = leveled_sst:sst_getmaxsequencenumber(L0Pid),
|
||||||
L0Entry = #manifest_entry{start_key = L0StartKey,
|
L0Entry = #manifest_entry{start_key = L0StartKey,
|
||||||
end_key = L0EndKey,
|
end_key = L0EndKey,
|
||||||
|
@ -786,11 +803,14 @@ update_levelzero(L0Size, {PushedTree, PushedIdx, MinSQN, MaxSQN},
|
||||||
%% where as the Wait of true is used at shutdown
|
%% where as the Wait of true is used at shutdown
|
||||||
|
|
||||||
roll_memory(State, false) ->
|
roll_memory(State, false) ->
|
||||||
FileName = levelzero_filename(State),
|
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
|
||||||
|
RootPath = sst_rootpath(State#state.root_path),
|
||||||
|
FileName = sst_filename(ManSQN, 0, 0),
|
||||||
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
|
leveled_log:log("P0019", [FileName, State#state.ledger_sqn]),
|
||||||
PCL = self(),
|
PCL = self(),
|
||||||
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
|
||||||
R = leveled_sst:sst_newlevelzero(FileName,
|
R = leveled_sst:sst_newlevelzero(RootPath,
|
||||||
|
FileName,
|
||||||
length(State#state.levelzero_cache),
|
length(State#state.levelzero_cache),
|
||||||
FetchFun,
|
FetchFun,
|
||||||
PCL,
|
PCL,
|
||||||
|
@ -798,21 +818,20 @@ roll_memory(State, false) ->
|
||||||
{ok, Constructor, _} = R,
|
{ok, Constructor, _} = R,
|
||||||
Constructor;
|
Constructor;
|
||||||
roll_memory(State, true) ->
|
roll_memory(State, true) ->
|
||||||
FileName = levelzero_filename(State),
|
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
|
||||||
|
RootPath = sst_rootpath(State#state.root_path),
|
||||||
|
FileName = sst_filename(ManSQN, 0, 0),
|
||||||
FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
FetchFun = fun(Slot) -> lists:nth(Slot, State#state.levelzero_cache) end,
|
||||||
KVList = leveled_pmem:to_list(length(State#state.levelzero_cache),
|
KVList = leveled_pmem:to_list(length(State#state.levelzero_cache),
|
||||||
FetchFun),
|
FetchFun),
|
||||||
R = leveled_sst:sst_new(FileName, 0, KVList, State#state.ledger_sqn),
|
R = leveled_sst:sst_new(RootPath,
|
||||||
|
FileName,
|
||||||
|
0,
|
||||||
|
KVList,
|
||||||
|
State#state.ledger_sqn),
|
||||||
{ok, Constructor, _} = R,
|
{ok, Constructor, _} = R,
|
||||||
Constructor.
|
Constructor.
|
||||||
|
|
||||||
levelzero_filename(State) ->
|
|
||||||
ManSQN = leveled_pmanifest:get_manifest_sqn(State#state.manifest) + 1,
|
|
||||||
FileName = State#state.root_path
|
|
||||||
++ "/" ++ ?FILES_FP ++ "/"
|
|
||||||
++ integer_to_list(ManSQN) ++ "_0_0",
|
|
||||||
FileName.
|
|
||||||
|
|
||||||
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
|
timed_fetch_mem(Key, Hash, Manifest, L0Cache, L0Index, HeadTimer) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
{R, Level} = fetch_mem(Key, Hash, Manifest, L0Cache, L0Index),
|
||||||
|
@ -1089,16 +1108,6 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
filepath(RootPath, files) ->
|
|
||||||
FP = RootPath ++ "/" ++ ?FILES_FP,
|
|
||||||
filelib:ensure_dir(FP ++ "/"),
|
|
||||||
FP.
|
|
||||||
|
|
||||||
filepath(RootPath, NewMSN, new_merge_files) ->
|
|
||||||
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Test
|
%%% Test
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -1127,8 +1136,8 @@ generate_randomkeys(Count, SQN, Acc) ->
|
||||||
|
|
||||||
|
|
||||||
clean_testdir(RootPath) ->
|
clean_testdir(RootPath) ->
|
||||||
clean_subdir(leveled_pmanifest:filepath(RootPath, manifest)),
|
clean_subdir(sst_rootpath(RootPath)),
|
||||||
clean_subdir(filepath(RootPath, files)).
|
clean_subdir(filename:join(RootPath, ?MANIFEST_FP)).
|
||||||
|
|
||||||
clean_subdir(DirPath) ->
|
clean_subdir(DirPath) ->
|
||||||
case filelib:is_dir(DirPath) of
|
case filelib:is_dir(DirPath) of
|
||||||
|
@ -1458,18 +1467,19 @@ foldwithimm_simple_test() ->
|
||||||
{{o, "Bucket1", "Key6", null}, 7}], AccB).
|
{{o, "Bucket1", "Key6", null}, 7}], AccB).
|
||||||
|
|
||||||
create_file_test() ->
|
create_file_test() ->
|
||||||
Filename = "../test/new_file.sst",
|
{RP, Filename} = {"../test/", "new_file.sst"},
|
||||||
ok = file:write_file(Filename, term_to_binary("hello")),
|
ok = file:write_file(filename:join(RP, Filename), term_to_binary("hello")),
|
||||||
KVL = lists:usort(generate_randomkeys(10000)),
|
KVL = lists:usort(generate_randomkeys(10000)),
|
||||||
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
|
Tree = leveled_tree:from_orderedlist(KVL, ?CACHE_TYPE),
|
||||||
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
|
FetchFun = fun(Slot) -> lists:nth(Slot, [Tree]) end,
|
||||||
{ok,
|
{ok,
|
||||||
SP,
|
SP,
|
||||||
noreply} = leveled_sst:sst_newlevelzero(Filename,
|
noreply} = leveled_sst:sst_newlevelzero(RP,
|
||||||
1,
|
Filename,
|
||||||
FetchFun,
|
1,
|
||||||
undefined,
|
FetchFun,
|
||||||
10000),
|
undefined,
|
||||||
|
10000),
|
||||||
lists:foreach(fun(X) ->
|
lists:foreach(fun(X) ->
|
||||||
case checkready(SP) of
|
case checkready(SP) of
|
||||||
timeout ->
|
timeout ->
|
||||||
|
@ -1482,7 +1492,7 @@ create_file_test() ->
|
||||||
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
|
io:format("StartKey ~w EndKey ~w~n", [StartKey, EndKey]),
|
||||||
?assertMatch({o, _, _, _}, StartKey),
|
?assertMatch({o, _, _, _}, StartKey),
|
||||||
?assertMatch({o, _, _, _}, EndKey),
|
?assertMatch({o, _, _, _}, EndKey),
|
||||||
?assertMatch("../test/new_file.sst", SrcFN),
|
?assertMatch("./new_file.sst", SrcFN),
|
||||||
ok = leveled_sst:sst_clear(SP),
|
ok = leveled_sst:sst_clear(SP),
|
||||||
{ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
|
{ok, Bin} = file:read_file("../test/new_file.sst.discarded"),
|
||||||
?assertMatch("hello", binary_to_term(Bin)).
|
?assertMatch("hello", binary_to_term(Bin)).
|
||||||
|
|
|
@ -90,10 +90,10 @@
|
||||||
delete_pending/2,
|
delete_pending/2,
|
||||||
delete_pending/3]).
|
delete_pending/3]).
|
||||||
|
|
||||||
-export([sst_new/4,
|
-export([sst_new/5,
|
||||||
sst_new/6,
|
sst_new/7,
|
||||||
sst_newlevelzero/5,
|
sst_newlevelzero/6,
|
||||||
sst_open/1,
|
sst_open/2,
|
||||||
sst_get/2,
|
sst_get/2,
|
||||||
sst_get/3,
|
sst_get/3,
|
||||||
sst_getkvrange/4,
|
sst_getkvrange/4,
|
||||||
|
@ -123,6 +123,7 @@
|
||||||
handle :: file:fd(),
|
handle :: file:fd(),
|
||||||
sst_timings :: tuple(),
|
sst_timings :: tuple(),
|
||||||
penciller :: pid(),
|
penciller :: pid(),
|
||||||
|
root_path,
|
||||||
filename,
|
filename,
|
||||||
blockindex_cache}).
|
blockindex_cache}).
|
||||||
|
|
||||||
|
@ -131,17 +132,20 @@
|
||||||
%%% API
|
%%% API
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
|
||||||
sst_open(Filename) ->
|
sst_open(RootPath, Filename) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
||||||
case gen_fsm:sync_send_event(Pid, {sst_open, Filename}, infinity) of
|
case gen_fsm:sync_send_event(Pid,
|
||||||
|
{sst_open, RootPath, Filename},
|
||||||
|
infinity) of
|
||||||
{ok, {SK, EK}} ->
|
{ok, {SK, EK}} ->
|
||||||
{ok, Pid, {SK, EK}}
|
{ok, Pid, {SK, EK}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
sst_new(Filename, Level, KVList, MaxSQN) ->
|
sst_new(RootPath, Filename, Level, KVList, MaxSQN) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
||||||
case gen_fsm:sync_send_event(Pid,
|
case gen_fsm:sync_send_event(Pid,
|
||||||
{sst_new,
|
{sst_new,
|
||||||
|
RootPath,
|
||||||
Filename,
|
Filename,
|
||||||
Level,
|
Level,
|
||||||
KVList,
|
KVList,
|
||||||
|
@ -151,7 +155,7 @@ sst_new(Filename, Level, KVList, MaxSQN) ->
|
||||||
{ok, Pid, {SK, EK}}
|
{ok, Pid, {SK, EK}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
sst_new(RootPath, Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
||||||
{{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}),
|
{{Rem1, Rem2}, MergedList} = merge_lists(KL1, KL2, {IsBasement, Level}),
|
||||||
case MergedList of
|
case MergedList of
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -160,6 +164,7 @@ sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
||||||
case gen_fsm:sync_send_event(Pid,
|
case gen_fsm:sync_send_event(Pid,
|
||||||
{sst_new,
|
{sst_new,
|
||||||
|
RootPath,
|
||||||
Filename,
|
Filename,
|
||||||
Level,
|
Level,
|
||||||
MergedList,
|
MergedList,
|
||||||
|
@ -170,10 +175,11 @@ sst_new(Filename, KL1, KL2, IsBasement, Level, MaxSQN) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
sst_newlevelzero(Filename, Slots, FetchFun, Penciller, MaxSQN) ->
|
sst_newlevelzero(RootPath, Filename, Slots, FetchFun, Penciller, MaxSQN) ->
|
||||||
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
{ok, Pid} = gen_fsm:start(?MODULE, [], []),
|
||||||
gen_fsm:send_event(Pid,
|
gen_fsm:send_event(Pid,
|
||||||
{sst_newlevelzero,
|
{sst_newlevelzero,
|
||||||
|
RootPath,
|
||||||
Filename,
|
Filename,
|
||||||
Slots,
|
Slots,
|
||||||
FetchFun,
|
FetchFun,
|
||||||
|
@ -228,14 +234,14 @@ sst_printtimings(Pid) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, starting, #state{}}.
|
{ok, starting, #state{}}.
|
||||||
|
|
||||||
starting({sst_open, Filename}, _From, State) ->
|
starting({sst_open, RootPath, Filename}, _From, State) ->
|
||||||
UpdState = read_file(Filename, State),
|
UpdState = read_file(Filename, State#state{root_path=RootPath}),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
{reply,
|
{reply,
|
||||||
{ok, {Summary#summary.first_key, Summary#summary.last_key}},
|
{ok, {Summary#summary.first_key, Summary#summary.last_key}},
|
||||||
reader,
|
reader,
|
||||||
UpdState};
|
UpdState};
|
||||||
starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) ->
|
starting({sst_new, RootPath, Filename, Level, KVList, MaxSQN}, _From, State) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
{FirstKey,
|
{FirstKey,
|
||||||
Length,
|
Length,
|
||||||
|
@ -247,8 +253,8 @@ starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) ->
|
||||||
FirstKey,
|
FirstKey,
|
||||||
Length,
|
Length,
|
||||||
MaxSQN),
|
MaxSQN),
|
||||||
ActualFilename = write_file(Filename, SummaryBin, SlotsBin),
|
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||||
UpdState = read_file(ActualFilename, State),
|
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
leveled_log:log_timer("SST08",
|
leveled_log:log_timer("SST08",
|
||||||
[ActualFilename, Level, Summary#summary.max_sqn],
|
[ActualFilename, Level, Summary#summary.max_sqn],
|
||||||
|
@ -258,8 +264,8 @@ starting({sst_new, Filename, Level, KVList, MaxSQN}, _From, State) ->
|
||||||
reader,
|
reader,
|
||||||
UpdState#state{blockindex_cache = BlockIndex}}.
|
UpdState#state{blockindex_cache = BlockIndex}}.
|
||||||
|
|
||||||
starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller, MaxSQN},
|
starting({sst_newlevelzero, RootPath, Filename,
|
||||||
State) ->
|
Slots, FetchFun, Penciller, MaxSQN}, State) ->
|
||||||
SW = os:timestamp(),
|
SW = os:timestamp(),
|
||||||
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
KVList = leveled_pmem:to_list(Slots, FetchFun),
|
||||||
{FirstKey,
|
{FirstKey,
|
||||||
|
@ -272,8 +278,8 @@ starting({sst_newlevelzero, Filename, Slots, FetchFun, Penciller, MaxSQN},
|
||||||
FirstKey,
|
FirstKey,
|
||||||
Length,
|
Length,
|
||||||
MaxSQN),
|
MaxSQN),
|
||||||
ActualFilename = write_file(Filename, SummaryBin, SlotsBin),
|
ActualFilename = write_file(RootPath, Filename, SummaryBin, SlotsBin),
|
||||||
UpdState = read_file(ActualFilename, State),
|
UpdState = read_file(ActualFilename, State#state{root_path=RootPath}),
|
||||||
Summary = UpdState#state.summary,
|
Summary = UpdState#state.summary,
|
||||||
leveled_log:log_timer("SST08",
|
leveled_log:log_timer("SST08",
|
||||||
[ActualFilename, 0, Summary#summary.max_sqn],
|
[ActualFilename, 0, Summary#summary.max_sqn],
|
||||||
|
@ -357,7 +363,8 @@ delete_pending({get_slots, SlotList}, _From, State) ->
|
||||||
delete_pending(close, _From, State) ->
|
delete_pending(close, _From, State) ->
|
||||||
leveled_log:log("SST07", [State#state.filename]),
|
leveled_log:log("SST07", [State#state.filename]),
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
ok = file:delete(State#state.filename),
|
ok = file:delete(filename:join(State#state.root_path,
|
||||||
|
State#state.filename)),
|
||||||
{stop, normal, ok, State}.
|
{stop, normal, ok, State}.
|
||||||
|
|
||||||
delete_pending(timeout, State) ->
|
delete_pending(timeout, State) ->
|
||||||
|
@ -368,7 +375,8 @@ delete_pending(timeout, State) ->
|
||||||
delete_pending(close, State) ->
|
delete_pending(close, State) ->
|
||||||
leveled_log:log("SST07", [State#state.filename]),
|
leveled_log:log("SST07", [State#state.filename]),
|
||||||
ok = file:close(State#state.handle),
|
ok = file:close(State#state.handle),
|
||||||
ok = file:delete(State#state.filename),
|
ok = file:delete(filename:join(State#state.root_path,
|
||||||
|
State#state.filename)),
|
||||||
{stop, normal, State}.
|
{stop, normal, State}.
|
||||||
|
|
||||||
handle_sync_event(_Msg, _From, StateName, State) ->
|
handle_sync_event(_Msg, _From, StateName, State) ->
|
||||||
|
@ -380,6 +388,8 @@ handle_event(_Msg, StateName, State) ->
|
||||||
handle_info(_Msg, StateName, State) ->
|
handle_info(_Msg, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
|
||||||
|
terminate(normal, delete_pending, _State) ->
|
||||||
|
ok;
|
||||||
terminate(Reason, _StateName, State) ->
|
terminate(Reason, _StateName, State) ->
|
||||||
leveled_log:log("SST04", [Reason, State#state.filename]).
|
leveled_log:log("SST04", [Reason, State#state.filename]).
|
||||||
|
|
||||||
|
@ -497,31 +507,32 @@ fetch_range(StartKey, EndKey, ScanWidth, State) ->
|
||||||
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint.
|
lists:foldl(FetchFun, [], SlotsToFetchBinList) ++ SlotsToPoint.
|
||||||
|
|
||||||
|
|
||||||
write_file(Filename, SummaryBin, SlotsBin) ->
|
write_file(RootPath, Filename, SummaryBin, SlotsBin) ->
|
||||||
SummaryLength = byte_size(SummaryBin),
|
SummaryLength = byte_size(SummaryBin),
|
||||||
SlotsLength = byte_size(SlotsBin),
|
SlotsLength = byte_size(SlotsBin),
|
||||||
{PendingName, FinalName} = generate_filenames(Filename),
|
{PendingName, FinalName} = generate_filenames(Filename),
|
||||||
ok = file:write_file(PendingName,
|
ok = file:write_file(filename:join(RootPath, PendingName),
|
||||||
<<SlotsLength:32/integer,
|
<<SlotsLength:32/integer,
|
||||||
SummaryLength:32/integer,
|
SummaryLength:32/integer,
|
||||||
SlotsBin/binary,
|
SlotsBin/binary,
|
||||||
SummaryBin/binary>>,
|
SummaryBin/binary>>,
|
||||||
[raw]),
|
[raw]),
|
||||||
case filelib:is_file(FinalName) of
|
case filelib:is_file(filename:join(RootPath, FinalName)) of
|
||||||
true ->
|
true ->
|
||||||
AltName = filename:join(filename:dirname(FinalName),
|
AltName = filename:join(RootPath, filename:basename(FinalName))
|
||||||
filename:basename(FinalName))
|
|
||||||
++ ?DISCARD_EXT,
|
++ ?DISCARD_EXT,
|
||||||
leveled_log:log("SST05", [FinalName, AltName]),
|
leveled_log:log("SST05", [FinalName, AltName]),
|
||||||
ok = file:rename(FinalName, AltName);
|
ok = file:rename(filename:join(RootPath, FinalName), AltName);
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
file:rename(PendingName, FinalName),
|
file:rename(filename:join(RootPath, PendingName),
|
||||||
|
filename:join(RootPath, FinalName)),
|
||||||
FinalName.
|
FinalName.
|
||||||
|
|
||||||
read_file(Filename, State) ->
|
read_file(Filename, State) ->
|
||||||
{Handle, SummaryBin} = open_reader(Filename),
|
{Handle, SummaryBin} = open_reader(filename:join(State#state.root_path,
|
||||||
|
Filename)),
|
||||||
{Summary, SlotList} = read_table_summary(SummaryBin),
|
{Summary, SlotList} = read_table_summary(SummaryBin),
|
||||||
SlotCount = length(SlotList),
|
SlotCount = length(SlotList),
|
||||||
BlockIndexCache = array:new([{size, SlotCount}, {default, none}]),
|
BlockIndexCache = array:new([{size, SlotCount}, {default, none}]),
|
||||||
|
@ -537,6 +548,7 @@ read_file(Filename, State) ->
|
||||||
filename = Filename}.
|
filename = Filename}.
|
||||||
|
|
||||||
open_reader(Filename) ->
|
open_reader(Filename) ->
|
||||||
|
io:format("Reading filename ~s~n", [Filename]),
|
||||||
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
{ok, Handle} = file:open(Filename, [binary, raw, read]),
|
||||||
{ok, Lengths} = file:pread(Handle, 0, 8),
|
{ok, Lengths} = file:pread(Handle, 0, 8),
|
||||||
<<SlotsLength:32/integer, SummaryLength:32/integer>> = Lengths,
|
<<SlotsLength:32/integer, SummaryLength:32/integer>> = Lengths,
|
||||||
|
@ -1364,8 +1376,8 @@ merge_test() ->
|
||||||
KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)),
|
KVL2 = lists:ukeysort(1, generate_randomkeys(1, N, 1, 20)),
|
||||||
KVL3 = lists:ukeymerge(1, KVL1, KVL2),
|
KVL3 = lists:ukeymerge(1, KVL1, KVL2),
|
||||||
SW0 = os:timestamp(),
|
SW0 = os:timestamp(),
|
||||||
{ok, P1, {FK1, LK1}} = sst_new("../test/level1_src", 1, KVL1, 6000),
|
{ok, P1, {FK1, LK1}} = sst_new("../test/", "level1_src", 1, KVL1, 6000),
|
||||||
{ok, P2, {FK2, LK2}} = sst_new("../test/level2_src", 2, KVL2, 3000),
|
{ok, P2, {FK2, LK2}} = sst_new("../test/", "level2_src", 2, KVL2, 3000),
|
||||||
ExpFK1 = element(1, lists:nth(1, KVL1)),
|
ExpFK1 = element(1, lists:nth(1, KVL1)),
|
||||||
ExpLK1 = element(1, lists:last(KVL1)),
|
ExpLK1 = element(1, lists:last(KVL1)),
|
||||||
ExpFK2 = element(1, lists:nth(1, KVL2)),
|
ExpFK2 = element(1, lists:nth(1, KVL2)),
|
||||||
|
@ -1376,7 +1388,8 @@ merge_test() ->
|
||||||
?assertMatch(ExpLK2, LK2),
|
?assertMatch(ExpLK2, LK2),
|
||||||
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
|
ML1 = [{next, #manifest_entry{owner = P1}, FK1}],
|
||||||
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
|
ML2 = [{next, #manifest_entry{owner = P2}, FK2}],
|
||||||
{ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/level2_merge",
|
{ok, P3, {{Rem1, Rem2}, FK3, LK3}} = sst_new("../test/",
|
||||||
|
"level2_merge",
|
||||||
ML1,
|
ML1,
|
||||||
ML2,
|
ML2,
|
||||||
false,
|
false,
|
||||||
|
@ -1408,12 +1421,13 @@ merge_test() ->
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_range_test() ->
|
simple_persisted_range_test() ->
|
||||||
Filename = "../test/simple_test",
|
{RP, Filename} = {"../test/", "simple_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 16, 1, 20),
|
KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 16, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
{LastKey, _LV} = lists:last(KVList1),
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
{ok, Pid, {FirstKey, LastKey}} = sst_new(Filename,
|
{ok, Pid, {FirstKey, LastKey}} = sst_new(RP,
|
||||||
|
Filename,
|
||||||
1,
|
1,
|
||||||
KVList1,
|
KVList1,
|
||||||
length(KVList1)),
|
length(KVList1)),
|
||||||
|
@ -1450,12 +1464,13 @@ simple_persisted_range_test() ->
|
||||||
|
|
||||||
|
|
||||||
simple_persisted_test() ->
|
simple_persisted_test() ->
|
||||||
Filename = "../test/simple_test",
|
{RP, Filename} = {"../test/", "simple_test"},
|
||||||
KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 32, 1, 20),
|
KVList0 = generate_randomkeys(1, ?SLOT_SIZE * 32, 1, 20),
|
||||||
KVList1 = lists:ukeysort(1, KVList0),
|
KVList1 = lists:ukeysort(1, KVList0),
|
||||||
[{FirstKey, _FV}|_Rest] = KVList1,
|
[{FirstKey, _FV}|_Rest] = KVList1,
|
||||||
{LastKey, _LV} = lists:last(KVList1),
|
{LastKey, _LV} = lists:last(KVList1),
|
||||||
{ok, Pid, {FirstKey, LastKey}} = sst_new(Filename,
|
{ok, Pid, {FirstKey, LastKey}} = sst_new(RP,
|
||||||
|
Filename,
|
||||||
1,
|
1,
|
||||||
KVList1,
|
KVList1,
|
||||||
length(KVList1)),
|
length(KVList1)),
|
||||||
|
@ -1558,7 +1573,7 @@ simple_persisted_test() ->
|
||||||
?assertMatch([{Eight000Key, _v800}], FetchedListB4),
|
?assertMatch([{Eight000Key, _v800}], FetchedListB4),
|
||||||
|
|
||||||
ok = sst_close(Pid),
|
ok = sst_close(Pid),
|
||||||
ok = file:delete(Filename ++ ".sst").
|
ok = file:delete(filename:join(RP, Filename ++ ".sst")).
|
||||||
|
|
||||||
key_dominates_test() ->
|
key_dominates_test() ->
|
||||||
KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}},
|
KV1 = {{o, "Bucket", "Key1", null}, {5, {active, infinity}, 0, []}},
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue