Basic working
Some basic tests working - but still outstanding issues.
This commit is contained in:
parent
76bdd83346
commit
13c81f0ed1
5 changed files with 51 additions and 27 deletions
|
@ -251,6 +251,8 @@
|
||||||
{info, "Completed creation of ~s at level ~w with max sqn ~w"}},
|
{info, "Completed creation of ~s at level ~w with max sqn ~w"}},
|
||||||
{"SST09",
|
{"SST09",
|
||||||
{warn, "Read request exposes slot with bad CRC"}},
|
{warn, "Read request exposes slot with bad CRC"}},
|
||||||
|
{"SST10",
|
||||||
|
{info, "Expansion sought to support pointer to pid ~w status ~w"}},
|
||||||
|
|
||||||
{"CDB01",
|
{"CDB01",
|
||||||
{info, "Opening file for writing with filename ~s"}},
|
{info, "Opening file for writing with filename ~s"}},
|
||||||
|
|
|
@ -49,7 +49,8 @@
|
||||||
is_basement/2,
|
is_basement/2,
|
||||||
dump_pidmap/1,
|
dump_pidmap/1,
|
||||||
levelzero_present/1,
|
levelzero_present/1,
|
||||||
pointer_convert/2
|
pointer_convert/2,
|
||||||
|
delete_confirmed/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -220,7 +221,7 @@ range_lookup(Manifest, Level, StartKey, EndKey) ->
|
||||||
merge_lookup(Manifest, Level, StartKey, EndKey) ->
|
merge_lookup(Manifest, Level, StartKey, EndKey) ->
|
||||||
MapFun =
|
MapFun =
|
||||||
fun({{_Level, LastKey, FN}, FirstKey}) ->
|
fun({{_Level, LastKey, FN}, FirstKey}) ->
|
||||||
Owner = dict:fetch(FN, Manifest#manifest.pidmap),
|
{Owner, _DelSQN} = dict:fetch(FN, Manifest#manifest.pidmap),
|
||||||
#manifest_entry{filename = FN,
|
#manifest_entry{filename = FN,
|
||||||
owner = Owner,
|
owner = Owner,
|
||||||
start_key = FirstKey,
|
start_key = FirstKey,
|
||||||
|
@ -231,10 +232,9 @@ merge_lookup(Manifest, Level, StartKey, EndKey) ->
|
||||||
pointer_convert(Manifest, EntryList) ->
|
pointer_convert(Manifest, EntryList) ->
|
||||||
MapFun =
|
MapFun =
|
||||||
fun(Entry) ->
|
fun(Entry) ->
|
||||||
{next,
|
{Pid, _DelSQN} = dict:fetch(Entry#manifest_entry.filename,
|
||||||
dict:fetch(Entry#manifest_entry.filename,
|
Manifest#manifest.pidmap),
|
||||||
Manifest#manifest.pidmap),
|
{next, Pid, all}
|
||||||
all}
|
|
||||||
end,
|
end,
|
||||||
lists:map(MapFun, EntryList).
|
lists:map(MapFun, EntryList).
|
||||||
|
|
||||||
|
@ -333,6 +333,11 @@ levelzero_present(Manifest) ->
|
||||||
true
|
true
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
delete_confirmed(Manifest, Filename) ->
|
||||||
|
PidMap = dict:erase(Filename, Manifest#manifest.pidmap),
|
||||||
|
% Would be better to clear ETS at this point rather than on lookup?
|
||||||
|
Manifest#manifest{pidmap = PidMap}.
|
||||||
|
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
%%% Internal Functions
|
%%% Internal Functions
|
||||||
%%%============================================================================
|
%%%============================================================================
|
||||||
|
@ -418,7 +423,9 @@ ready_to_delete(SnapList, FileDeleteSQN, Now) ->
|
||||||
lists:foldl(FilterFun, true, SnapList).
|
lists:foldl(FilterFun, true, SnapList).
|
||||||
|
|
||||||
filepath(RootPath, manifest) ->
|
filepath(RootPath, manifest) ->
|
||||||
RootPath ++ "/" ++ ?MANIFEST_FP ++ "/".
|
MFP = RootPath ++ "/" ++ ?MANIFEST_FP ++ "/",
|
||||||
|
filelib:ensure_dir(MFP),
|
||||||
|
MFP.
|
||||||
|
|
||||||
filepath(RootPath, NewMSN, current_manifest) ->
|
filepath(RootPath, NewMSN, current_manifest) ->
|
||||||
filepath(RootPath, manifest) ++ "nonzero_"
|
filepath(RootPath, manifest) ++ "nonzero_"
|
||||||
|
@ -455,7 +462,7 @@ key_lookup(Manifest, Level, {LastKey, LastFN}, KeyToFind, ManSQN, GC) ->
|
||||||
Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN),
|
Active = (ManSQN >= ActiveSQN) and (ManSQN < TombSQN),
|
||||||
case Active of
|
case Active of
|
||||||
true ->
|
true ->
|
||||||
InRange = KeyToFind >= FirstKey,
|
InRange = (KeyToFind >= FirstKey) or (KeyToFind == all),
|
||||||
case InRange of
|
case InRange of
|
||||||
true ->
|
true ->
|
||||||
NextFN;
|
NextFN;
|
||||||
|
@ -665,5 +672,16 @@ rangequery_manifest_test() ->
|
||||||
RL3_1B = range_lookup(Man13, 1, SK3, EK3),
|
RL3_1B = range_lookup(Man13, 1, SK3, EK3),
|
||||||
?assertMatch([{next, "pid_y4", {o, "Bucket1", "K994", null}}], RL3_1B).
|
?assertMatch([{next, "pid_y4", {o, "Bucket1", "K994", null}}], RL3_1B).
|
||||||
|
|
||||||
|
levelzero_present_test() ->
|
||||||
|
E0 = #manifest_entry{start_key={i, "Bucket1", {"Idx1", "Fld1"}, "K8"},
|
||||||
|
end_key={o, "Bucket1", "Key996", null},
|
||||||
|
filename="Z0",
|
||||||
|
owner="pid_z0"},
|
||||||
|
|
||||||
|
Man0 = new_manifest(),
|
||||||
|
?assertMatch(false, levelzero_present(Man0)),
|
||||||
|
% insert_manifest_entry(Manifest, ManSQN, Level, Entry)
|
||||||
|
Man1 = insert_manifest_entry(Man0, 1, 0, E0),
|
||||||
|
?assertMatch(true, levelzero_present(Man1)).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
|
@ -184,7 +184,8 @@ perform_merge(Manifest, Src, SinkList, SrcLevel, RootPath, NewSQN) ->
|
||||||
Entry)
|
Entry)
|
||||||
end,
|
end,
|
||||||
Man1 = lists:foldl(RemoveFun, Man0, SinkList),
|
Man1 = lists:foldl(RemoveFun, Man0, SinkList),
|
||||||
leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src).
|
Man2 = leveled_manifest:remove_manifest_entry(Man1, NewSQN, SrcLevel, Src),
|
||||||
|
{Man2, [Src|SinkList]}.
|
||||||
|
|
||||||
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) ->
|
do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, Counter, Man0) ->
|
||||||
leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]),
|
leveled_log:log("PC011", [NewSQN, SinkLevel, Counter]),
|
||||||
|
@ -273,12 +274,12 @@ merge_file_test() ->
|
||||||
|
|
||||||
Man0 = leveled_manifest:new_manifest(),
|
Man0 = leveled_manifest:new_manifest(),
|
||||||
Man1 = leveled_manifest:insert_manifest_entry(Man0, 1, 2, E1),
|
Man1 = leveled_manifest:insert_manifest_entry(Man0, 1, 2, E1),
|
||||||
Man2 = leveled_manifest:insert_manifest_entry(Man1, 1, 2, E1),
|
Man2 = leveled_manifest:insert_manifest_entry(Man1, 1, 2, E2),
|
||||||
Man3 = leveled_manifest:insert_manifest_entry(Man2, 1, 2, E1),
|
Man3 = leveled_manifest:insert_manifest_entry(Man2, 1, 2, E3),
|
||||||
Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E1),
|
Man4 = leveled_manifest:insert_manifest_entry(Man3, 1, 2, E4),
|
||||||
Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E1),
|
Man5 = leveled_manifest:insert_manifest_entry(Man4, 2, 1, E5),
|
||||||
|
|
||||||
Man6 = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3),
|
{Man6, _Dels} = perform_merge(Man5, E1, [E2, E3, E4, E5], 1, "../test", 3),
|
||||||
|
|
||||||
?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)).
|
?assertMatch(3, leveled_manifest:get_manifest_sqn(Man6)).
|
||||||
|
|
||||||
|
|
|
@ -498,7 +498,9 @@ handle_cast({confirm_delete, Filename}, State=#state{is_snapshot=Snap})
|
||||||
{true, Pid} ->
|
{true, Pid} ->
|
||||||
leveled_log:log("P0005", [Filename]),
|
leveled_log:log("P0005", [Filename]),
|
||||||
ok = leveled_sst:sst_deleteconfirmed(Pid),
|
ok = leveled_sst:sst_deleteconfirmed(Pid),
|
||||||
{noreply, State};
|
Man0 = leveled_manifest:delete_confirmed(State#state.manifest,
|
||||||
|
Filename),
|
||||||
|
{noreply, State#state{manifest=Man0}};
|
||||||
{false, _Pid} ->
|
{false, _Pid} ->
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
@ -545,9 +547,10 @@ terminate(Reason, State) ->
|
||||||
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
ok = leveled_pclerk:clerk_close(State#state.clerk),
|
||||||
|
|
||||||
leveled_log:log("P0008", [Reason]),
|
leveled_log:log("P0008", [Reason]),
|
||||||
L0 = leveled_manifest:key_lookup(State#state.manifest, 0, all),
|
L0_Present = leveled_manifest:key_lookup(State#state.manifest, 0, all),
|
||||||
case {State#state.levelzero_pending, L0} of
|
L0_Left = State#state.levelzero_size > 0,
|
||||||
{false, false} ->
|
case {State#state.levelzero_pending, L0_Present, L0_Left} of
|
||||||
|
{false, false, true} ->
|
||||||
L0Pid = roll_memory(State, true),
|
L0Pid = roll_memory(State, true),
|
||||||
ok = leveled_sst:sst_close(L0Pid);
|
ok = leveled_sst:sst_close(L0Pid);
|
||||||
StatusTuple ->
|
StatusTuple ->
|
||||||
|
@ -719,7 +722,7 @@ roll_memory(State, true) ->
|
||||||
Constructor.
|
Constructor.
|
||||||
|
|
||||||
levelzero_filename(State) ->
|
levelzero_filename(State) ->
|
||||||
ManSQN = leveled_manifest:get_manifest_sqn(State#state.manifest),
|
ManSQN = leveled_manifest:get_manifest_sqn(State#state.manifest) + 1,
|
||||||
FileName = State#state.root_path
|
FileName = State#state.root_path
|
||||||
++ "/" ++ ?FILES_FP ++ "/"
|
++ "/" ++ ?FILES_FP ++ "/"
|
||||||
++ integer_to_list(ManSQN) ++ "_0_0",
|
++ integer_to_list(ManSQN) ++ "_0_0",
|
||||||
|
@ -1000,7 +1003,9 @@ keyfolder({[{IMMKey, IMMVal}|NxIMMiterator], SSTiterator}, KeyRange,
|
||||||
|
|
||||||
|
|
||||||
filepath(RootPath, files) ->
|
filepath(RootPath, files) ->
|
||||||
RootPath ++ "/" ++ ?FILES_FP.
|
FP = RootPath ++ "/" ++ ?FILES_FP,
|
||||||
|
filelib:ensure_dir(FP ++ "/"),
|
||||||
|
FP.
|
||||||
|
|
||||||
filepath(RootPath, NewMSN, new_merge_files) ->
|
filepath(RootPath, NewMSN, new_merge_files) ->
|
||||||
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
|
filepath(RootPath, files) ++ "/" ++ integer_to_list(NewMSN).
|
||||||
|
@ -1135,11 +1140,13 @@ simple_server_test() ->
|
||||||
?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})),
|
?assertMatch(Key2, pcl_fetch(PCLr, {o,"Bucket0002", "Key0002", null})),
|
||||||
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
|
?assertMatch(Key3, pcl_fetch(PCLr, {o,"Bucket0003", "Key0003", null})),
|
||||||
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
|
?assertMatch(Key4, pcl_fetch(PCLr, {o,"Bucket0004", "Key0004", null})),
|
||||||
|
|
||||||
SnapOpts = #penciller_options{start_snapshot = true,
|
SnapOpts = #penciller_options{start_snapshot = true,
|
||||||
source_penciller = PCLr},
|
source_penciller = PCLr},
|
||||||
{ok, PclSnap} = pcl_start(SnapOpts),
|
{ok, PclSnap} = pcl_start(SnapOpts),
|
||||||
leveled_bookie:load_snapshot(PclSnap,
|
leveled_bookie:load_snapshot(PclSnap,
|
||||||
leveled_bookie:empty_ledgercache()),
|
leveled_bookie:empty_ledgercache()),
|
||||||
|
|
||||||
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
|
?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001", null})),
|
||||||
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002", null})),
|
||||||
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
|
?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003", null})),
|
||||||
|
@ -1171,6 +1178,7 @@ simple_server_test() ->
|
||||||
% Add some more keys and confirm that check sequence number still
|
% Add some more keys and confirm that check sequence number still
|
||||||
% sees the old version in the previous snapshot, but will see the new version
|
% sees the old version in the previous snapshot, but will see the new version
|
||||||
% in a new snapshot
|
% in a new snapshot
|
||||||
|
|
||||||
Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
|
Key1A_Pre = {{o,"Bucket0001", "Key0001", null},
|
||||||
{4005, {active, infinity}, null}},
|
{4005, {active, infinity}, null}},
|
||||||
Key1A = add_missing_hash(Key1A_Pre),
|
Key1A = add_missing_hash(Key1A_Pre),
|
||||||
|
@ -1184,7 +1192,7 @@ simple_server_test() ->
|
||||||
null},
|
null},
|
||||||
1)),
|
1)),
|
||||||
ok = pcl_close(PclSnap),
|
ok = pcl_close(PclSnap),
|
||||||
|
|
||||||
{ok, PclSnap2} = pcl_start(SnapOpts),
|
{ok, PclSnap2} = pcl_start(SnapOpts),
|
||||||
leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()),
|
leveled_bookie:load_snapshot(PclSnap2, leveled_bookie:empty_ledgercache()),
|
||||||
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
?assertMatch(false, pcl_checksequencenumber(PclSnap2,
|
||||||
|
@ -1199,12 +1207,6 @@ simple_server_test() ->
|
||||||
"Key0001",
|
"Key0001",
|
||||||
null},
|
null},
|
||||||
4005)),
|
4005)),
|
||||||
|
|
||||||
io:format("Snap2 B2 K2 ~w~n",
|
|
||||||
[pcl_fetch(PclSnap2, {o, "Bucket0002", "Key0002", null})]),
|
|
||||||
io:format("r B2 K2 ~w~n",
|
|
||||||
[pcl_fetch(PCLr, {o, "Bucket0002", "Key0002", null})]),
|
|
||||||
|
|
||||||
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
?assertMatch(true, pcl_checksequencenumber(PclSnap2,
|
||||||
{o,
|
{o,
|
||||||
"Bucket0002",
|
"Bucket0002",
|
||||||
|
|
|
@ -1203,6 +1203,7 @@ expand_list_by_pointer({pointer, SSTPid, Slot, StartKey, EndKey}, Tail, Width) -
|
||||||
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
|
ExpPointers = leveled_sst:sst_getslots(SSTPid, AccPointers),
|
||||||
lists:append(ExpPointers, AccTail);
|
lists:append(ExpPointers, AccTail);
|
||||||
expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) ->
|
expand_list_by_pointer({next, SSTPid, StartKey, EndKey}, Tail, Width) ->
|
||||||
|
leveled_log:log("SST10", [SSTPid, is_pid(SSTPid)]),
|
||||||
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
|
ExpPointer = leveled_sst:sst_getkvrange(SSTPid, StartKey, EndKey, Width),
|
||||||
ExpPointer ++ Tail.
|
ExpPointer ++ Tail.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue