Merge pull request #17 from martinsumner/mas-manifesttidy-inker
Mas manifesttidy inker
This commit is contained in:
commit
30c72c3bd6
5 changed files with 383 additions and 179 deletions
|
@ -65,6 +65,7 @@
|
|||
-export([cdb_open_writer/1,
|
||||
cdb_open_writer/2,
|
||||
cdb_open_reader/1,
|
||||
cdb_reopen_reader/2,
|
||||
cdb_get/2,
|
||||
cdb_put/3,
|
||||
cdb_mput/2,
|
||||
|
@ -125,6 +126,13 @@ cdb_open_writer(Filename, Opts) ->
|
|||
ok = gen_fsm:sync_send_event(Pid, {open_writer, Filename}, infinity),
|
||||
{ok, Pid}.
|
||||
|
||||
cdb_reopen_reader(Filename, LastKey) ->
|
||||
{ok, Pid} = gen_fsm:start(?MODULE, [#cdb_options{binary_mode=true}], []),
|
||||
ok = gen_fsm:sync_send_event(Pid,
|
||||
{open_reader, Filename, LastKey},
|
||||
infinity),
|
||||
{ok, Pid}.
|
||||
|
||||
cdb_open_reader(Filename) ->
|
||||
cdb_open_reader(Filename, #cdb_options{binary_mode=true}).
|
||||
|
||||
|
@ -241,6 +249,13 @@ starting({open_writer, Filename}, _From, State) ->
|
|||
starting({open_reader, Filename}, _From, State) ->
|
||||
leveled_log:log("CDB02", [Filename]),
|
||||
{Handle, Index, LastKey} = open_for_readonly(Filename, false),
|
||||
{reply, ok, reader, State#state{handle=Handle,
|
||||
last_key=LastKey,
|
||||
filename=Filename,
|
||||
hash_index=Index}};
|
||||
starting({open_reader, Filename, LastKey}, _From, State) ->
|
||||
leveled_log:log("CDB02", [Filename]),
|
||||
{Handle, Index, LastKey} = open_for_readonly(Filename, LastKey),
|
||||
{reply, ok, reader, State#state{handle=Handle,
|
||||
last_key=LastKey,
|
||||
filename=Filename,
|
||||
|
|
|
@ -194,7 +194,8 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
|||
FilesToDelete = lists:map(fun(C) ->
|
||||
{C#candidate.low_sqn,
|
||||
C#candidate.filename,
|
||||
C#candidate.journal}
|
||||
C#candidate.journal,
|
||||
undefined}
|
||||
end,
|
||||
BestRun1),
|
||||
leveled_log:log("IC002", [length(FilesToDelete)]),
|
||||
|
@ -274,7 +275,7 @@ scan_all_files(Manifest, FilterFun, FilterServer, MaxSQN) ->
|
|||
scan_all_files([], _FilterFun, _FilterServer, _MaxSQN, CandidateList) ->
|
||||
CandidateList;
|
||||
scan_all_files([Entry|Tail], FilterFun, FilterServer, MaxSQN, CandidateList) ->
|
||||
{LowSQN, FN, JournalP} = Entry,
|
||||
{LowSQN, FN, JournalP, _LK} = Entry,
|
||||
CpctPerc = check_single_file(JournalP,
|
||||
FilterFun,
|
||||
FilterServer,
|
||||
|
@ -390,7 +391,7 @@ update_inker(Inker, ManifestSlice, FilesToDelete) ->
|
|||
FilesToDelete),
|
||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||
leveled_log:log("IC007", []),
|
||||
lists:foreach(fun({_SQN, _FN, J2D}) ->
|
||||
lists:foreach(fun({_SQN, _FN, J2D, _LK}) ->
|
||||
leveled_cdb:cdb_deletepending(J2D,
|
||||
ManSQN,
|
||||
Inker)
|
||||
|
@ -415,7 +416,7 @@ compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN,
|
|||
ManSlice0;
|
||||
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN,
|
||||
_RStrategy, ManSlice0) ->
|
||||
ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0),
|
||||
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0),
|
||||
ManSlice1;
|
||||
compact_files([Batch|T], CDBopts, ActiveJournal0,
|
||||
FilterFun, FilterServer, MaxSQN,
|
||||
|
@ -511,23 +512,10 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
|
|||
ok ->
|
||||
{Journal1, ManSlice0};
|
||||
roll ->
|
||||
ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1),
|
||||
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1),
|
||||
write_values(KVCList, CDBopts, null, ManSlice1)
|
||||
end.
|
||||
|
||||
|
||||
generate_manifest_entry(ActiveJournal) ->
|
||||
{ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal),
|
||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
|
||||
case leveled_cdb:cdb_firstkey(PidR) of
|
||||
{StartSQN, _Type, _PK} ->
|
||||
[{StartSQN, NewFN, PidR}];
|
||||
empty ->
|
||||
leveled_log:log("IC013", [NewFN]),
|
||||
[]
|
||||
end.
|
||||
|
||||
|
||||
clear_waste(State) ->
|
||||
WP = State#state.waste_path,
|
||||
WRP = State#state.waste_retention_period,
|
||||
|
@ -728,12 +716,13 @@ compact_single_file_recovr_test() ->
|
|||
LedgerFun1,
|
||||
CompactFP,
|
||||
CDB} = compact_single_file_setup(),
|
||||
[{LowSQN, FN, PidR}] = compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
[{?STD_TAG, recovr}]),
|
||||
[{LowSQN, FN, PidR, _LastKey}] =
|
||||
compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
[{?STD_TAG, recovr}]),
|
||||
io:format("FN of ~s~n", [FN]),
|
||||
?assertMatch(2, LowSQN),
|
||||
?assertMatch(probably,
|
||||
|
@ -764,12 +753,13 @@ compact_single_file_retain_test() ->
|
|||
LedgerFun1,
|
||||
CompactFP,
|
||||
CDB} = compact_single_file_setup(),
|
||||
[{LowSQN, FN, PidR}] = compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
[{?STD_TAG, retain}]),
|
||||
[{LowSQN, FN, PidR, _LK}] =
|
||||
compact_files([Candidate],
|
||||
#cdb_options{file_path=CompactFP},
|
||||
LedgerFun1,
|
||||
LedgerSrv1,
|
||||
9,
|
||||
[{?STD_TAG, retain}]),
|
||||
io:format("FN of ~s~n", [FN]),
|
||||
?assertMatch(1, LowSQN),
|
||||
?assertMatch(probably,
|
||||
|
@ -847,7 +837,7 @@ compact_singlefile_totwosmallfiles_test() ->
|
|||
900,
|
||||
[{?STD_TAG, recovr}]),
|
||||
?assertMatch(2, length(ManifestSlice)),
|
||||
lists:foreach(fun({_SQN, _FN, CDB}) ->
|
||||
lists:foreach(fun({_SQN, _FN, CDB, _LK}) ->
|
||||
ok = leveled_cdb:cdb_deletepending(CDB),
|
||||
ok = leveled_cdb:cdb_destroy(CDB)
|
||||
end,
|
||||
|
|
223
src/leveled_imanifest.erl
Normal file
223
src/leveled_imanifest.erl
Normal file
|
@ -0,0 +1,223 @@
|
|||
%% -------- Inker Manifest ---------
|
||||
%%
|
||||
|
||||
|
||||
-module(leveled_imanifest).
|
||||
|
||||
-include("include/leveled.hrl").
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-export([
|
||||
generate_entry/1,
|
||||
add_entry/3,
|
||||
append_lastkey/3,
|
||||
remove_entry/2,
|
||||
find_entry/2,
|
||||
head_entry/1,
|
||||
to_list/1,
|
||||
from_list/1,
|
||||
reader/2,
|
||||
writer/3,
|
||||
printer/1,
|
||||
complete_filex/0
|
||||
|
||||
]).
|
||||
|
||||
-define(MANIFEST_FILEX, "man").
|
||||
-define(PENDING_FILEX, "pnd").
|
||||
-define(SKIP_WIDTH, 16).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% API
|
||||
%%%============================================================================
|
||||
|
||||
generate_entry(Journal) ->
|
||||
{ok, NewFN} = leveled_cdb:cdb_complete(Journal),
|
||||
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
|
||||
case leveled_cdb:cdb_firstkey(PidR) of
|
||||
{StartSQN, _Type, _PK} ->
|
||||
LastKey = leveled_cdb:cdb_lastkey(PidR),
|
||||
[{StartSQN, NewFN, PidR, LastKey}];
|
||||
empty ->
|
||||
leveled_log:log("IC013", [NewFN]),
|
||||
[]
|
||||
end.
|
||||
|
||||
add_entry(Manifest, Entry, ToEnd) ->
|
||||
{SQN, FN, PidR, LastKey} = Entry,
|
||||
StrippedName = filename:rootname(FN),
|
||||
case ToEnd of
|
||||
true ->
|
||||
prepend_entry({SQN, StrippedName, PidR, LastKey}, Manifest);
|
||||
false ->
|
||||
Man0 = [{SQN, StrippedName, PidR, LastKey}|to_list(Manifest)],
|
||||
Man1 = lists:reverse(lists:sort(Man0)),
|
||||
from_list(Man1)
|
||||
end.
|
||||
|
||||
append_lastkey(Manifest, Pid, LastKey) ->
|
||||
[{SQNMarker, SQNL}|ManifestTail] = Manifest,
|
||||
[{E_SQN, E_FN, E_P, E_LK}|SQNL_Tail] = SQNL,
|
||||
case {E_P, E_LK} of
|
||||
{Pid, empty} ->
|
||||
UpdEntry = {E_SQN, E_FN, E_P, LastKey},
|
||||
[{SQNMarker, [UpdEntry|SQNL_Tail]}|ManifestTail];
|
||||
_ ->
|
||||
Manifest
|
||||
end.
|
||||
|
||||
remove_entry(Manifest, Entry) ->
|
||||
{SQN, FN, _PidR, _LastKey} = Entry,
|
||||
leveled_log:log("I0013", [FN]),
|
||||
Man0 = lists:keydelete(SQN, 1, to_list(Manifest)),
|
||||
from_list(Man0).
|
||||
|
||||
find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker ->
|
||||
find_subentry(SQN, SubL);
|
||||
find_entry(SQN, [_TopEntry|Tail]) ->
|
||||
find_entry(SQN, Tail).
|
||||
|
||||
head_entry(Manifest) ->
|
||||
[{_SQNMarker, SQNL}|_Tail] = Manifest,
|
||||
[HeadEntry|_SQNL_Tail] = SQNL,
|
||||
HeadEntry.
|
||||
|
||||
to_list(Manifest) ->
|
||||
FoldFun =
|
||||
fun({_SQNMarker, SubL}, Acc) ->
|
||||
Acc ++ SubL
|
||||
end,
|
||||
lists:foldl(FoldFun, [], Manifest).
|
||||
|
||||
reader(SQN, RootPath) ->
|
||||
ManifestPath = leveled_inker:filepath(RootPath, manifest_dir),
|
||||
leveled_log:log("I0015", [ManifestPath, SQN]),
|
||||
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
||||
integer_to_list(SQN)
|
||||
++ ".man")),
|
||||
from_list(lists:reverse(lists:sort(binary_to_term(MBin)))).
|
||||
|
||||
|
||||
writer(Manifest, ManSQN, RootPath) ->
|
||||
ManPath = leveled_inker:filepath(RootPath, manifest_dir),
|
||||
NewFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX),
|
||||
TmpFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
|
||||
MBin = term_to_binary(to_list(Manifest), [compressed]),
|
||||
case filelib:is_file(NewFN) of
|
||||
false ->
|
||||
leveled_log:log("I0016", [ManSQN]),
|
||||
ok = file:write_file(TmpFN, MBin),
|
||||
ok = file:rename(TmpFN, NewFN),
|
||||
ok
|
||||
end.
|
||||
|
||||
printer(Manifest) ->
|
||||
lists:foreach(fun({SQN, FN, _PID, _LK}) ->
|
||||
leveled_log:log("I0017", [SQN, FN]) end,
|
||||
to_list(Manifest)).
|
||||
|
||||
complete_filex() ->
|
||||
?MANIFEST_FILEX.
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Internal Functions
|
||||
%%%============================================================================
|
||||
|
||||
from_list(Manifest) ->
|
||||
% Manifest should already be sorted with the highest SQN at the head
|
||||
% This will be maintained so that we can fold from the left, and find
|
||||
% more recently added entries quicker - under the assumptions that fresh
|
||||
% reads are more common than stale reads
|
||||
lists:foldr(fun prepend_entry/2, [], Manifest).
|
||||
|
||||
prepend_entry(Entry, AccL) ->
|
||||
{SQN, _FN, _PidR, _LastKey} = Entry,
|
||||
case AccL of
|
||||
[] ->
|
||||
[{SQN, [Entry]}];
|
||||
[{SQNMarker, SubL}|Tail] ->
|
||||
case length(SubL) < ?SKIP_WIDTH of
|
||||
true ->
|
||||
[{SQNMarker, [Entry|SubL]}|Tail];
|
||||
false ->
|
||||
[{SQN, [Entry]}|AccL]
|
||||
end
|
||||
end.
|
||||
|
||||
find_subentry(SQN, [{ME_SQN, _FN, ME_P, _LK}|_Tail]) when SQN >= ME_SQN ->
|
||||
ME_P;
|
||||
find_subentry(SQN, [_TopEntry|Tail]) ->
|
||||
find_subentry(SQN, Tail).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
%%% Test
|
||||
%%%============================================================================
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
build_testmanifest_aslist() ->
|
||||
ManifestMapFun =
|
||||
fun(N) ->
|
||||
NStr = integer_to_list(N),
|
||||
{max(1, N * 1000), "FN" ++ NStr, "pid" ++ NStr, "LK" ++ NStr}
|
||||
end,
|
||||
lists:map(ManifestMapFun, lists:reverse(lists:seq(0, 50))).
|
||||
|
||||
test_testmanifest(Man0) ->
|
||||
?assertMatch("pid0", find_entry(1, Man0)),
|
||||
?assertMatch("pid0", find_entry(2, Man0)),
|
||||
?assertMatch("pid1", find_entry(1001, Man0)),
|
||||
?assertMatch("pid20", find_entry(20000, Man0)),
|
||||
?assertMatch("pid20", find_entry(20001, Man0)),
|
||||
?assertMatch("pid20", find_entry(20999, Man0)),
|
||||
?assertMatch("pid50", find_entry(99999, Man0)).
|
||||
|
||||
buildfromlist_test() ->
|
||||
ManL = build_testmanifest_aslist(),
|
||||
Man0 = from_list(ManL),
|
||||
test_testmanifest(Man0),
|
||||
?assertMatch(ManL, to_list(Man0)).
|
||||
|
||||
buildfromend_test() ->
|
||||
ManL = build_testmanifest_aslist(),
|
||||
FoldFun =
|
||||
fun(E, Man) ->
|
||||
add_entry(Man, E, true)
|
||||
end,
|
||||
Man0 = lists:foldr(FoldFun, [], ManL),
|
||||
test_testmanifest(Man0),
|
||||
?assertMatch(ManL, to_list(Man0)).
|
||||
|
||||
buildrandomfashion_test() ->
|
||||
ManL0 = build_testmanifest_aslist(),
|
||||
RandMapFun =
|
||||
fun(X) ->
|
||||
{random:uniform(), X}
|
||||
end,
|
||||
ManL1 = lists:map(RandMapFun, ManL0),
|
||||
ManL2 = lists:sort(ManL1),
|
||||
|
||||
FoldFun =
|
||||
fun({_R, E}, Man) ->
|
||||
add_entry(Man, E, false)
|
||||
end,
|
||||
Man0 = lists:foldl(FoldFun, [], ManL2),
|
||||
|
||||
test_testmanifest(Man0),
|
||||
?assertMatch(ManL0, to_list(Man0)),
|
||||
|
||||
RandomEntry = lists:nth(random:uniform(50), ManL0),
|
||||
Man1 = remove_entry(Man0, RandomEntry),
|
||||
Man2 = add_entry(Man1, RandomEntry, false),
|
||||
|
||||
test_testmanifest(Man2),
|
||||
?assertMatch(ManL0, to_list(Man2)).
|
||||
|
||||
|
||||
-endif.
|
|
@ -110,7 +110,6 @@
|
|||
ink_close/1,
|
||||
ink_doom/1,
|
||||
build_dummy_journal/0,
|
||||
simple_manifest_reader/2,
|
||||
clean_testdir/1,
|
||||
filepath/2,
|
||||
filepath/3]).
|
||||
|
@ -122,7 +121,6 @@
|
|||
-define(COMPACT_FP, "post_compact").
|
||||
-define(WASTE_FP, "waste").
|
||||
-define(JOURNAL_FILEX, "cdb").
|
||||
-define(MANIFEST_FILEX, "man").
|
||||
-define(PENDING_FILEX, "pnd").
|
||||
-define(LOADING_PAUSE, 1000).
|
||||
-define(LOADING_BATCH, 1000).
|
||||
|
@ -168,7 +166,6 @@ ink_releasesnapshot(Pid, Snapshot) ->
|
|||
gen_server:cast(Pid, {release_snapshot, Snapshot}).
|
||||
|
||||
ink_confirmdelete(Pid, ManSQN) ->
|
||||
io:format("Confirm delete request received~n"),
|
||||
gen_server:call(Pid, {confirm_delete, ManSQN}).
|
||||
|
||||
ink_close(Pid) ->
|
||||
|
@ -244,10 +241,7 @@ init([InkerOpts]) ->
|
|||
|
||||
handle_call({put, Key, Object, KeyChanges}, _From, State) ->
|
||||
case put_object(Key, Object, KeyChanges, State) of
|
||||
{ok, UpdState, ObjSize} ->
|
||||
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState};
|
||||
{rolling, UpdState, ObjSize} ->
|
||||
ok = leveled_cdb:cdb_roll(State#state.active_journaldb),
|
||||
{_, UpdState, ObjSize} ->
|
||||
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
|
||||
end;
|
||||
handle_call({fetch, Key, SQN}, _From, State) ->
|
||||
|
@ -263,7 +257,7 @@ handle_call({get, Key, SQN}, _From, State) ->
|
|||
handle_call({key_check, Key, SQN}, _From, State) ->
|
||||
{reply, key_check(Key, SQN, State#state.manifest), State};
|
||||
handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) ->
|
||||
Manifest = lists:reverse(State#state.manifest),
|
||||
Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)),
|
||||
Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest),
|
||||
{reply, Reply, State};
|
||||
handle_call({register_snapshot, Requestor}, _From , State) ->
|
||||
|
@ -274,7 +268,6 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
|
|||
State#state.active_journaldb},
|
||||
State#state{registered_snapshots=Rs}};
|
||||
handle_call({confirm_delete, ManSQN}, _From, State) ->
|
||||
io:format("Confirm delete request to be processed~n"),
|
||||
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
|
||||
case SnapSQN >= ManSQN of
|
||||
true ->
|
||||
|
@ -284,32 +277,32 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
|
|||
end end,
|
||||
true,
|
||||
State#state.registered_snapshots),
|
||||
io:format("Confirm delete request complete with reply ~w~n", [Reply]),
|
||||
{reply, Reply, State};
|
||||
handle_call(get_manifest, _From, State) ->
|
||||
{reply, State#state.manifest, State};
|
||||
{reply, leveled_imanifest:to_list(State#state.manifest), State};
|
||||
handle_call({update_manifest,
|
||||
ManifestSnippet,
|
||||
DeletedFiles}, _From, State) ->
|
||||
Man0 = lists:foldl(fun(ManEntry, AccMan) ->
|
||||
remove_from_manifest(AccMan, ManEntry)
|
||||
end,
|
||||
State#state.manifest,
|
||||
DeletedFiles),
|
||||
Man1 = lists:foldl(fun(ManEntry, AccMan) ->
|
||||
add_to_manifest(AccMan, ManEntry) end,
|
||||
Man0,
|
||||
ManifestSnippet),
|
||||
DropFun =
|
||||
fun(E, Acc) ->
|
||||
leveled_imanifest:remove_entry(Acc, E)
|
||||
end,
|
||||
Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles),
|
||||
AddFun =
|
||||
fun(E, Acc) ->
|
||||
leveled_imanifest:add_entry(Acc, E, false)
|
||||
end,
|
||||
Man1 = lists:foldl(AddFun, Man0, ManifestSnippet),
|
||||
NewManifestSQN = State#state.manifest_sqn + 1,
|
||||
manifest_printer(Man1),
|
||||
simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path),
|
||||
leveled_imanifest:printer(Man1),
|
||||
leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path),
|
||||
{reply,
|
||||
{ok, NewManifestSQN},
|
||||
State#state{manifest=Man1,
|
||||
manifest_sqn=NewManifestSQN,
|
||||
pending_removals=DeletedFiles}};
|
||||
handle_call(print_manifest, _From, State) ->
|
||||
manifest_printer(State#state.manifest),
|
||||
leveled_imanifest:printer(State#state.manifest),
|
||||
{reply, ok, State};
|
||||
handle_call({compact,
|
||||
Checker,
|
||||
|
@ -359,8 +352,9 @@ terminate(Reason, State) ->
|
|||
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
|
||||
State#state.registered_snapshots),
|
||||
leveled_log:log("I0007", []),
|
||||
manifest_printer(State#state.manifest),
|
||||
ok = close_allmanifest(State#state.manifest)
|
||||
leveled_imanifest:printer(State#state.manifest),
|
||||
ManAsList = leveled_imanifest:to_list(State#state.manifest),
|
||||
ok = close_allmanifest(ManAsList)
|
||||
end.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -415,13 +409,14 @@ start_from_file(InkOpts) ->
|
|||
|
||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
ActiveJournal = State#state.active_journaldb,
|
||||
SW= os:timestamp(),
|
||||
{JournalKey, JournalBin} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
NewSQN,
|
||||
Object,
|
||||
KeyChanges),
|
||||
T0 = timer:now_diff(os:timestamp(), SW),
|
||||
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
||||
case leveled_cdb:cdb_put(ActiveJournal,
|
||||
JournalKey,
|
||||
JournalBin) of
|
||||
ok ->
|
||||
|
@ -434,22 +429,27 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
byte_size(JournalBin)};
|
||||
roll ->
|
||||
SWroll = os:timestamp(),
|
||||
LastKey = leveled_cdb:cdb_lastkey(ActiveJournal),
|
||||
ok = leveled_cdb:cdb_roll(ActiveJournal),
|
||||
Manifest0 = leveled_imanifest:append_lastkey(State#state.manifest,
|
||||
ActiveJournal,
|
||||
LastKey),
|
||||
CDBopts = State#state.cdb_options,
|
||||
ManEntry = start_new_activejournal(NewSQN,
|
||||
State#state.root_path,
|
||||
CDBopts),
|
||||
{_, _, NewJournalP} = ManEntry,
|
||||
NewManifest = add_to_manifest(State#state.manifest, ManEntry),
|
||||
ok = simple_manifest_writer(NewManifest,
|
||||
State#state.manifest_sqn + 1,
|
||||
State#state.root_path),
|
||||
{_, _, NewJournalP, _} = ManEntry,
|
||||
Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true),
|
||||
ok = leveled_imanifest:writer(Manifest1,
|
||||
State#state.manifest_sqn + 1,
|
||||
State#state.root_path),
|
||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||
JournalKey,
|
||||
JournalBin),
|
||||
leveled_log:log_timer("I0008", [], SWroll),
|
||||
{rolling,
|
||||
State#state{journal_sqn=NewSQN,
|
||||
manifest=NewManifest,
|
||||
manifest=Manifest1,
|
||||
manifest_sqn = State#state.manifest_sqn + 1,
|
||||
active_journaldb=NewJournalP},
|
||||
byte_size(JournalBin)}
|
||||
|
@ -457,7 +457,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
|
|||
|
||||
|
||||
get_object(LedgerKey, SQN, Manifest) ->
|
||||
JournalP = find_in_manifest(SQN, Manifest),
|
||||
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
|
||||
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
SQN,
|
||||
to_fetch,
|
||||
|
@ -466,7 +466,7 @@ get_object(LedgerKey, SQN, Manifest) ->
|
|||
leveled_codec:from_inkerkv(Obj).
|
||||
|
||||
key_check(LedgerKey, SQN, Manifest) ->
|
||||
JournalP = find_in_manifest(SQN, Manifest),
|
||||
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
|
||||
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
SQN,
|
||||
to_fetch,
|
||||
|
@ -478,7 +478,7 @@ build_manifest(ManifestFilenames,
|
|||
CDBopts) ->
|
||||
% Find the manifest with a highest Manifest sequence number
|
||||
% Open it and read it to get the current Confirmed Manifest
|
||||
ManifestRegex = "(?<MSQN>[0-9]+)\\." ++ ?MANIFEST_FILEX,
|
||||
ManifestRegex = "(?<MSQN>[0-9]+)\\." ++ leveled_imanifest:complete_filex(),
|
||||
ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames,
|
||||
ManifestRegex,
|
||||
'MSQN'),
|
||||
|
@ -488,7 +488,7 @@ build_manifest(ManifestFilenames,
|
|||
{[], 1};
|
||||
_ ->
|
||||
PersistedManSQN = lists:max(ValidManSQNs),
|
||||
M1 = simple_manifest_reader(PersistedManSQN,
|
||||
M1 = leveled_imanifest:reader(PersistedManSQN,
|
||||
RootPath),
|
||||
{M1, PersistedManSQN}
|
||||
end,
|
||||
|
@ -496,7 +496,11 @@ build_manifest(ManifestFilenames,
|
|||
% Open the manifest files, completing if necessary and ensure there is
|
||||
% a valid active journal at the head of the manifest
|
||||
OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts),
|
||||
{ActiveLowSQN, _FN, ActiveJournal} = lists:nth(1, OpenManifest),
|
||||
|
||||
{ActiveLowSQN,
|
||||
_FN,
|
||||
ActiveJournal,
|
||||
_LK} = leveled_imanifest:head_entry(OpenManifest),
|
||||
JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of
|
||||
empty ->
|
||||
ActiveLowSQN;
|
||||
|
@ -506,135 +510,135 @@ build_manifest(ManifestFilenames,
|
|||
|
||||
% Update the manifest if it has been changed by the process of laoding
|
||||
% the manifest (must also increment the manifest SQN).
|
||||
UpdManifestSQN = if
|
||||
length(OpenManifest) > length(Manifest) ->
|
||||
leveled_log:log("I0009", []),
|
||||
manifest_printer(OpenManifest),
|
||||
simple_manifest_writer(OpenManifest,
|
||||
ManifestSQN + 1,
|
||||
RootPath),
|
||||
ManifestSQN + 1;
|
||||
true ->
|
||||
leveled_log:log("I0010", []),
|
||||
manifest_printer(OpenManifest),
|
||||
ManifestSQN
|
||||
end,
|
||||
UpdManifestSQN =
|
||||
if
|
||||
length(OpenManifest) > length(Manifest) ->
|
||||
leveled_log:log("I0009", []),
|
||||
leveled_imanifest:printer(OpenManifest),
|
||||
NextSQN = ManifestSQN + 1,
|
||||
leveled_imanifest:writer(OpenManifest, NextSQN, RootPath),
|
||||
NextSQN;
|
||||
true ->
|
||||
leveled_log:log("I0010", []),
|
||||
leveled_imanifest:printer(OpenManifest),
|
||||
ManifestSQN
|
||||
end,
|
||||
{OpenManifest, UpdManifestSQN, JournalSQN, ActiveJournal}.
|
||||
|
||||
|
||||
close_allmanifest([]) ->
|
||||
ok;
|
||||
close_allmanifest([H|ManifestT]) ->
|
||||
{_, _, Pid} = H,
|
||||
{_, _, Pid, _} = H,
|
||||
ok = leveled_cdb:cdb_close(Pid),
|
||||
close_allmanifest(ManifestT).
|
||||
|
||||
|
||||
open_all_manifest([], RootPath, CDBOpts) ->
|
||||
leveled_log:log("I0011", []),
|
||||
add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts));
|
||||
leveled_imanifest:add_entry([],
|
||||
start_new_activejournal(1, RootPath, CDBOpts),
|
||||
true);
|
||||
open_all_manifest(Man0, RootPath, CDBOpts) ->
|
||||
Man1 = lists:reverse(lists:sort(Man0)),
|
||||
[{HeadSQN, HeadFN}|ManifestTail] = Man1,
|
||||
Man1 = leveled_imanifest:to_list(Man0),
|
||||
[{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1,
|
||||
OpenJournalFun =
|
||||
fun(ManEntry) ->
|
||||
case ManEntry of
|
||||
{LowSQN, FN, _, LK_RO} ->
|
||||
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||
case filelib:is_file(CFN) of
|
||||
true ->
|
||||
{ok, Pid} = leveled_cdb:cdb_reopen_reader(CFN,
|
||||
LK_RO),
|
||||
{LowSQN, FN, Pid, LK_RO};
|
||||
false ->
|
||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||
{ok, Pid} = W,
|
||||
ok = leveled_cdb:cdb_roll(Pid),
|
||||
LK_WR = leveled_cdb:cdb_lastkey(Pid),
|
||||
{LowSQN, FN, Pid, LK_WR}
|
||||
end;
|
||||
_ ->
|
||||
ManEntry
|
||||
end
|
||||
end,
|
||||
OpenedTailAsList = lists:map(OpenJournalFun, ManifestTail),
|
||||
OpenedTail = leveled_imanifest:from_list(OpenedTailAsList),
|
||||
CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX,
|
||||
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
|
||||
Man2 = case filelib:is_file(CompleteHeadFN) of
|
||||
true ->
|
||||
leveled_log:log("I0012", [HeadFN]),
|
||||
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
|
||||
{LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR),
|
||||
add_to_manifest(add_to_manifest(ManifestTail,
|
||||
{HeadSQN, HeadFN, HeadR}),
|
||||
start_new_activejournal(LastSQN + 1,
|
||||
RootPath,
|
||||
CDBOpts));
|
||||
false ->
|
||||
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
|
||||
CDBOpts),
|
||||
add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadW})
|
||||
end,
|
||||
lists:map(fun(ManEntry) ->
|
||||
case ManEntry of
|
||||
{LowSQN, FN} ->
|
||||
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
|
||||
PFN = FN ++ "." ++ ?PENDING_FILEX,
|
||||
case filelib:is_file(CFN) of
|
||||
true ->
|
||||
{ok,
|
||||
Pid} = leveled_cdb:cdb_open_reader(CFN),
|
||||
{LowSQN, FN, Pid};
|
||||
false ->
|
||||
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
|
||||
{ok, Pid} = W,
|
||||
ok = leveled_cdb:cdb_roll(Pid),
|
||||
{LowSQN, FN, Pid}
|
||||
end;
|
||||
_ ->
|
||||
ManEntry
|
||||
end end,
|
||||
Man2).
|
||||
case filelib:is_file(CompleteHeadFN) of
|
||||
true ->
|
||||
leveled_log:log("I0012", [HeadFN]),
|
||||
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
|
||||
LastKey = leveled_cdb:cdb_lastkey(HeadR),
|
||||
LastSQN = element(1, LastKey),
|
||||
ManToHead = leveled_imanifest:add_entry(OpenedTail,
|
||||
{HeadSQN,
|
||||
HeadFN,
|
||||
HeadR,
|
||||
LastKey},
|
||||
true),
|
||||
NewManEntry = start_new_activejournal(LastSQN + 1,
|
||||
RootPath,
|
||||
CDBOpts),
|
||||
leveled_imanifest:add_entry(ManToHead,
|
||||
NewManEntry,
|
||||
true);
|
||||
false ->
|
||||
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
|
||||
CDBOpts),
|
||||
leveled_imanifest:add_entry(OpenedTail,
|
||||
{HeadSQN, HeadFN, HeadW, HeadLK},
|
||||
true)
|
||||
end.
|
||||
|
||||
|
||||
start_new_activejournal(SQN, RootPath, CDBOpts) ->
|
||||
Filename = filepath(RootPath, SQN, new_journal),
|
||||
{ok, PidW} = leveled_cdb:cdb_open_writer(Filename, CDBOpts),
|
||||
{SQN, Filename, PidW}.
|
||||
|
||||
add_to_manifest(Manifest, Entry) ->
|
||||
{SQN, FN, PidR} = Entry,
|
||||
StrippedName = filename:rootname(FN),
|
||||
lists:reverse(lists:sort([{SQN, StrippedName, PidR}|Manifest])).
|
||||
|
||||
remove_from_manifest(Manifest, Entry) ->
|
||||
{SQN, FN, _PidR} = Entry,
|
||||
leveled_log:log("I0013", [FN]),
|
||||
lists:keydelete(SQN, 1, Manifest).
|
||||
|
||||
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
|
||||
Pid;
|
||||
find_in_manifest(SQN, [_Head|Tail]) ->
|
||||
find_in_manifest(SQN, Tail).
|
||||
|
||||
{SQN, Filename, PidW, empty}.
|
||||
|
||||
|
||||
%% Scan between sequence numbers applying FilterFun to each entry where
|
||||
%% FilterFun{K, V, Acc} -> Penciller Key List
|
||||
%% Load the output for the CDB file into the Penciller.
|
||||
|
||||
load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
|
||||
load_from_sequence(_MinSQN, _FilterFun, _PCL, []) ->
|
||||
ok;
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|Rest])
|
||||
load_from_sequence(MinSQN, FilterFun, PCL, [{LowSQN, FN, Pid, _LK}|Rest])
|
||||
when LowSQN >= MinSQN ->
|
||||
load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FilterFun,
|
||||
Penciller,
|
||||
PCL,
|
||||
Pid,
|
||||
undefined,
|
||||
FN,
|
||||
Rest);
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) ->
|
||||
load_from_sequence(MinSQN, FilterFun, PCL, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
|
||||
case Rest of
|
||||
[] ->
|
||||
load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FilterFun,
|
||||
Penciller,
|
||||
PCL,
|
||||
Pid,
|
||||
undefined,
|
||||
FN,
|
||||
Rest);
|
||||
[{NextSQN, _NxtFN, _NxtPid}|_Rest] when NextSQN > MinSQN ->
|
||||
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
|
||||
load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FilterFun,
|
||||
Penciller,
|
||||
PCL,
|
||||
Pid,
|
||||
undefined,
|
||||
FN,
|
||||
Rest);
|
||||
_ ->
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, Rest)
|
||||
load_from_sequence(MinSQN, FilterFun, PCL, Rest)
|
||||
end.
|
||||
|
||||
|
||||
|
@ -713,36 +717,6 @@ filepath(CompactFilePath, NewSQN, compact_journal) ->
|
|||
++ "." ++ ?PENDING_FILEX).
|
||||
|
||||
|
||||
simple_manifest_reader(SQN, RootPath) ->
|
||||
ManifestPath = filepath(RootPath, manifest_dir),
|
||||
leveled_log:log("I0015", [ManifestPath, SQN]),
|
||||
{ok, MBin} = file:read_file(filename:join(ManifestPath,
|
||||
integer_to_list(SQN)
|
||||
++ ".man")),
|
||||
binary_to_term(MBin).
|
||||
|
||||
|
||||
simple_manifest_writer(Manifest, ManSQN, RootPath) ->
|
||||
ManPath = filepath(RootPath, manifest_dir),
|
||||
NewFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX),
|
||||
TmpFN = filename:join(ManPath,
|
||||
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
|
||||
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
|
||||
Manifest), [compressed]),
|
||||
case filelib:is_file(NewFN) of
|
||||
false ->
|
||||
leveled_log:log("I0016", [ManSQN]),
|
||||
ok = file:write_file(TmpFN, MBin),
|
||||
ok = file:rename(TmpFN, NewFN),
|
||||
ok
|
||||
end.
|
||||
|
||||
manifest_printer(Manifest) ->
|
||||
lists:foreach(fun({SQN, FN, _PID}) ->
|
||||
leveled_log:log("I0017", [SQN, FN]) end,
|
||||
Manifest).
|
||||
|
||||
initiate_penciller_snapshot(Bookie) ->
|
||||
{ok,
|
||||
{LedgerSnap, LedgerCache},
|
||||
|
@ -777,6 +751,7 @@ build_dummy_journal(KeyConvertF) ->
|
|||
ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})),
|
||||
ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})),
|
||||
ok = leveled_cdb:cdb_roll(J1),
|
||||
LK1 = leveled_cdb:cdb_lastkey(J1),
|
||||
lists:foldl(fun(X, Closed) ->
|
||||
case Closed of
|
||||
true -> true;
|
||||
|
@ -795,9 +770,10 @@ build_dummy_journal(KeyConvertF) ->
|
|||
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
|
||||
ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})),
|
||||
ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})),
|
||||
LK2 = leveled_cdb:cdb_lastkey(J2),
|
||||
ok = leveled_cdb:cdb_close(J2),
|
||||
Manifest = [{1, "../test/journal/journal_files/nursery_1"},
|
||||
{3, "../test/journal/journal_files/nursery_3"}],
|
||||
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
|
||||
{3, "../test/journal/journal_files/nursery_3", "pid2", LK2}],
|
||||
ManifestBin = term_to_binary(Manifest),
|
||||
{ok, MF1} = file:open(filename:join(ManifestFP, "1.man"),
|
||||
[binary, raw, read, write]),
|
||||
|
@ -907,7 +883,7 @@ compact_journal_test() ->
|
|||
5000),
|
||||
timer:sleep(1000),
|
||||
CompactedManifest2 = ink_getmanifest(Ink1),
|
||||
R = lists:foldl(fun({_SQN, FN, _P}, Acc) ->
|
||||
R = lists:foldl(fun({_SQN, FN, _P, _LK}, Acc) ->
|
||||
case string:str(FN, "post_compact") of
|
||||
N when N > 0 ->
|
||||
true;
|
||||
|
|
|
@ -263,7 +263,7 @@
|
|||
{"SST09",
|
||||
{warn, "Read request exposes slot with bad CRC"}},
|
||||
{"SST10",
|
||||
{info, "Expansion sought to support pointer to pid ~w status ~w"}},
|
||||
{debug, "Expansion sought to support pointer to pid ~w status ~w"}},
|
||||
|
||||
{"CDB01",
|
||||
{info, "Opening file for writing with filename ~s"}},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue