Logging - Phase 1

Abstract out logging and introduce a logbase
This commit is contained in:
martinsumner 2016-11-02 18:14:46 +00:00
parent 0572f43b8a
commit 7147ec0470
5 changed files with 247 additions and 129 deletions

View file

@ -256,8 +256,7 @@ init([Opts]) ->
{InkerOpts, PencillerOpts} = set_options(Opts), {InkerOpts, PencillerOpts} = set_options(Opts),
{Inker, Penciller} = startup(InkerOpts, PencillerOpts), {Inker, Penciller} = startup(InkerOpts, PencillerOpts),
CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE), CacheSize = get_opt(cache_size, Opts, ?CACHE_SIZE),
io:format("Bookie starting with Pcl ~w Ink ~w~n", leveled_log:log("B0001", [Inker, Penciller]),
[Penciller, Inker]),
{ok, #state{inker=Inker, {ok, #state{inker=Inker,
penciller=Penciller, penciller=Penciller,
cache_size=CacheSize, cache_size=CacheSize,
@ -269,8 +268,7 @@ init([Opts]) ->
Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT), Inker} = book_snapshotstore(Bookie, self(), ?SNAPSHOT_TIMEOUT),
ok = leveled_penciller:pcl_loadsnapshot(Penciller, ok = leveled_penciller:pcl_loadsnapshot(Penciller,
gb_trees:empty()), gb_trees:empty()),
io:format("Snapshot starting with Pcl ~w Ink ~w~n", leveled_log:log("B0002", [Inker, Penciller]),
[Penciller, Inker]),
{ok, #state{penciller=Penciller, {ok, #state{penciller=Penciller,
inker=Inker, inker=Inker,
ledger_cache=LedgerCache, ledger_cache=LedgerCache,
@ -396,7 +394,7 @@ handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
terminate(Reason, State) -> terminate(Reason, State) ->
io:format("Bookie closing for reason ~w~n", [Reason]), leveled_log:log("B0003", [Reason]),
WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE), WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE),
ok = shutdown_wait(WaitList, State#state.inker), ok = shutdown_wait(WaitList, State#state.inker),
ok = leveled_penciller:pcl_close(State#state.penciller). ok = leveled_penciller:pcl_close(State#state.penciller).
@ -414,8 +412,7 @@ bucket_stats(State, Bucket, Tag) ->
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
[gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache), LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag),
@ -439,8 +436,7 @@ index_query(State,
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
[gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache), LedgerCache),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
@ -476,8 +472,7 @@ hashtree_query(State, Tag, JournalCheck) ->
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, SnapType), JournalSnapshot} = snapshot_store(State, SnapType),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
[gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache), LedgerCache),
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
@ -505,8 +500,7 @@ foldobjects_allkeys(State, Tag, FoldObjectsFun) ->
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
JournalSnapshot} = snapshot_store(State, store), JournalSnapshot} = snapshot_store(State, store),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
[gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache), LedgerCache),
StartKey = leveled_codec:to_ledgerkey(null, null, Tag), StartKey = leveled_codec:to_ledgerkey(null, null, Tag),
@ -528,8 +522,7 @@ allkey_query(State, Tag) ->
{LedgerSnapshot, LedgerCache}, {LedgerSnapshot, LedgerCache},
_JournalSnapshot} = snapshot_store(State, ledger), _JournalSnapshot} = snapshot_store(State, ledger),
Folder = fun() -> Folder = fun() ->
io:format("Length of increment in snapshot is ~w~n", leveled_log:log("B0004", [gb_trees:size(LedgerCache)]),
[gb_trees:size(LedgerCache)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
LedgerCache), LedgerCache),
SK = leveled_codec:to_ledgerkey(null, null, Tag), SK = leveled_codec:to_ledgerkey(null, null, Tag),
@ -601,7 +594,7 @@ startup(InkerOpts, PencillerOpts) ->
{ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Inker} = leveled_inker:ink_start(InkerOpts),
{ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts),
LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller),
io:format("LedgerSQN=~w at startup~n", [LedgerSQN]), leveled_log:log("B0005", [LedgerSQN]),
ok = leveled_inker:ink_loadpcl(Inker, ok = leveled_inker:ink_loadpcl(Inker,
LedgerSQN + 1, LedgerSQN + 1,
fun load_fun/5, fun load_fun/5,
@ -816,13 +809,12 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
Obj, VSize, IndexSpecs), Obj, VSize, IndexSpecs),
{loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; {loop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
MaxSQN -> MaxSQN ->
io:format("Reached end of load batch with SQN ~w~n", [SQN]), leveled_log:log("B0006", [SQN]),
Changes = preparefor_ledgercache(Type, PK, SQN, Changes = preparefor_ledgercache(Type, PK, SQN,
Obj, VSize, IndexSpecs), Obj, VSize, IndexSpecs),
{stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}}; {stop, {MinSQN, MaxSQN, addto_ledgercache(Changes, OutputTree)}};
SQN when SQN > MaxSQN -> SQN when SQN > MaxSQN ->
io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", leveled_log:log("B0007", [MaxSQN, SQN]),
[MaxSQN, SQN]),
{stop, Acc0} {stop, Acc0}
end. end.

View file

@ -229,20 +229,21 @@ print_key(Key) ->
{?IDX_TAG, B, {F, _V}, _K} -> {?IDX_TAG, B, {F, _V}, _K} ->
{"Index", B, F} {"Index", B, F}
end, end,
{B_STR, FB} = check_for_string(B_TERM), B_STR = turn_to_string(B_TERM),
{C_STR, FC} = check_for_string(C_TERM), C_STR = turn_to_string(C_TERM),
{A_STR, B_STR, C_STR, FB, FC}. {A_STR, B_STR, C_STR}.
check_for_string(Item) -> turn_to_string(Item) ->
if if
is_binary(Item) == true -> is_binary(Item) == true ->
{binary_to_list(Item), "~s"}; binary_to_list(Item);
is_integer(Item) == true -> is_integer(Item) == true ->
{integer_to_list(Item), "~s"}; integer_to_list(Item);
is_list(Item) == true -> is_list(Item) == true ->
{Item, "~s"}; Item;
true -> true ->
{Item, "~w"} [Output] = io_lib:format("~w", [Item]),
Output
end. end.
@ -392,8 +393,8 @@ endkey_passed_test() ->
?assertMatch(true, endkey_passed(TestKey, K2)). ?assertMatch(true, endkey_passed(TestKey, K2)).
stringcheck_test() -> stringcheck_test() ->
?assertMatch({"Bucket", "~s"}, check_for_string("Bucket")), ?assertMatch("Bucket", turn_to_string("Bucket")),
?assertMatch({"Bucket", "~s"}, check_for_string(<<"Bucket">>)), ?assertMatch("Bucket", turn_to_string(<<"Bucket">>)),
?assertMatch({bucket, "~w"}, check_for_string(bucket)). ?assertMatch("bucket", turn_to_string(bucket)).
-endif. -endif.

171
src/leveled_log.erl Normal file
View file

@ -0,0 +1,171 @@
%% Module to abstract from choice of logger, and allow use of logReferences
%% for fast lookup
-module(leveled_log).
-include("include/leveled.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([log/2,
log_timer/3]).
-define(LOG_LEVEL, [info, warn, error, critical]).
-define(LOGBASE, dict:from_list([
{"G0001",
{info, "Generic log point"}},
{"D0001",
{debug, "Generic debug log"}},
{"B0001",
{info, "Bookie starting with Ink ~w Pcl ~w"}},
{"B0002",
{info, "Snapshot starting with Ink ~w Pcl ~w"}},
{"B0003",
{info, "Bookie closing for reason ~w"}},
{"B0004",
{info, "Length of increment in snapshot is ~w"}},
{"B0005",
{info, "LedgerSQN=~w at startup"}},
{"B0006",
{info, "Reached end of load batch with SQN ~w"}},
{"B0007",
{info, "Skipping as exceeded MaxSQN ~w with SQN ~w"}},
{"P0001",
{info, "Ledger snapshot ~w registered"}},
{"P0002",
{info, "Handling of push completed with L0 cache size now ~w"}},
{"P0003",
{info, "Ledger snapshot ~w released"}},
{"P0004",
{info, "Remaining ledger snapshots are ~w"}},
{"P0005",
{info, "Delete confirmed as file is removed from " ++ "
unreferenced files ~w"}},
{"P0006",
{info, "Orphaned reply after timeout on L0 file write ~s"}},
{"P0007",
{info, "Sent release message for cloned Penciller following close for "
++ "reason ~w"}},
{"P0008",
{info, "Penciller closing for reason ~w"}},
{"P0009",
{info, "Level 0 cache empty at close of Penciller"}},
{"P0010",
{info, "No level zero action on close of Penciller"}},
{"P0011",
{info, "Shutdown complete for Penciller"}},
{"P0012",
{info, "Store to be started based on manifest sequence number of ~w"}},
{"P0013",
{warn, "Seqence number of 0 indicates no valid manifest"}},
{"P0014",
{info, "Maximum sequence number of ~w found in nonzero levels"}},
{"P0015",
{info, "L0 file found ~s"}},
{"P0016",
{info, "L0 file had maximum sequence number of ~w"}},
{"P0017",
{info, "No L0 file found"}},
{"P0018",
{info, "Respone to push_mem of ~w ~s"}},
{"P0019",
{info, "Rolling level zero to filename ~s"}},
{"P0020",
{info, "Work at Level ~w to be scheduled for ~w with ~w "
++ "queue items outstanding"}},
{"P0021",
{info, "Allocation of work blocked as L0 pending"}},
{"P0022",
{info, "Manifest at Level ~w"}},
{"P0023",
{info, "Manifest entry of startkey ~s ~s ~s endkey ~s ~s ~s "
++ "filename=~s~n"}},
{"P0024",
{info, "Outstanding compaction work items of ~w at level ~w"}},
{"P0025",
{info, "Merge to sqn ~w from Level ~w completed"}},
{"P0026",
{info, "Merge has been commmitted at sequence number ~w"}},
{"P0027",
{info, "Rename of manifest from ~s ~w to ~s ~w"}},
{"P0028",
{info, "Adding cleared file ~s to deletion list"}},
{"PC001",
{info, "Penciller's clerk ~w started with owner ~w"}},
{"PC002",
{info, "Request for manifest change from clerk on closing"}},
{"PC003",
{info, "Confirmation of manifest change on closing"}},
{"PC004",
{info, "Prompted confirmation of manifest change"}},
{"PC005",
{info, "Penciller's Clerk ~w shutdown now complete for reason ~w"}},
{"PC006",
{info, "Work prompted but none needed ~w"}},
{"PC007",
{info, "Clerk prompting Penciller regarding manifest change"}},
{"PC008",
{info, "Merge from level ~w to merge into ~w files below"}},
{"PC009",
{info, "File ~s to simply switch levels to level ~w"}},
{"PC010",
{info, "Merge to be commenced for FileToMerge=~s with MSN=~w"}},
{"PC011",
{info, "Merge completed with MSN=~w Level=~w and FileCounter=~w"}},
{"PC012",
{info, "File to be created as part of MSN=~w Filename=~s"}},
{"PC013",
{warn, "Merge resulted in empty file ~s"}},
{"PC014",
{info, "Empty file ~s to be cleared"}},
{"PC015",
{info, "File created"}}
])).
log(LogReference, Subs) ->
{ok, {LogLevel, LogText}} = dict:find(LogReference, ?LOGBASE),
case lists:member(LogLevel, ?LOG_LEVEL) of
true ->
io:format(LogText ++ "~n", Subs);
false ->
ok
end.
log_timer(LogReference, Subs, StartTime) ->
{ok, {LogLevel, LogText}} = dict:find(LogReference, ?LOGBASE),
case lists:member(LogLevel, ?LOG_LEVEL) of
true ->
MicroS = timer:now_diff(os:timestamp(), StartTime),
{Unit, Time} = case MicroS of
MicroS when MicroS < 100 ->
{"microsec", MicroS};
MicroS ->
{"ms", MicroS div 1000}
end,
io:format(LogText ++ " with time taken ~w " ++ Unit ++ "~n",
Subs ++ [Time]);
false ->
ok
end.
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
log_test() ->
?assertMatch(ok, log("D0001", [])),
?assertMatch(ok, log_timer("D0001", [], os:timestamp())).
-endif.

View file

@ -80,7 +80,7 @@
clerk_new(Owner) -> clerk_new(Owner) ->
{ok, Pid} = gen_server:start(?MODULE, [], []), {ok, Pid} = gen_server:start(?MODULE, [], []),
ok = gen_server:call(Pid, {register, Owner}, infinity), ok = gen_server:call(Pid, {register, Owner}, infinity),
io:format("Penciller's clerk ~w started with owner ~w~n", [Pid, Owner]), leveled_log:log("PC001", [Pid, Owner]),
{ok, Pid}. {ok, Pid}.
clerk_manifestchange(Pid, Action, Closing) -> clerk_manifestchange(Pid, Action, Closing) ->
@ -104,7 +104,7 @@ handle_call({register, Owner}, _From, State) ->
State#state{owner=Owner}, State#state{owner=Owner},
?MIN_TIMEOUT}; ?MIN_TIMEOUT};
handle_call({manifest_change, return, true}, _From, State) -> handle_call({manifest_change, return, true}, _From, State) ->
io:format("Request for manifest change from clerk on closing~n"), leveled_log:log("PC002", []),
case State#state.change_pending of case State#state.change_pending of
true -> true ->
WI = State#state.work_item, WI = State#state.work_item,
@ -115,13 +115,13 @@ handle_call({manifest_change, return, true}, _From, State) ->
handle_call({manifest_change, confirm, Closing}, From, State) -> handle_call({manifest_change, confirm, Closing}, From, State) ->
case Closing of case Closing of
true -> true ->
io:format("Confirmation of manifest change on closing~n"), leveled_log:log("PC003", []),
WI = State#state.work_item, WI = State#state.work_item,
ok = mark_for_delete(WI#penciller_work.unreferenced_files, ok = mark_for_delete(WI#penciller_work.unreferenced_files,
State#state.owner), State#state.owner),
{stop, normal, ok, State}; {stop, normal, ok, State};
false -> false ->
io:format("Prompted confirmation of manifest change~n"), leveled_log:log("PC004", []),
gen_server:reply(From, ok), gen_server:reply(From, ok),
WI = State#state.work_item, WI = State#state.work_item,
ok = mark_for_delete(WI#penciller_work.unreferenced_files, ok = mark_for_delete(WI#penciller_work.unreferenced_files,
@ -149,8 +149,7 @@ handle_info(timeout, State=#state{change_pending=Pnd}) when Pnd == false ->
terminate(Reason, _State) -> terminate(Reason, _State) ->
io:format("Penciller's Clerk ~w shutdown now complete for reason ~w~n", leveled_log:log("PC005", [self(), Reason]).
[self(), Reason]).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -163,13 +162,13 @@ code_change(_OldVsn, State, _Extra) ->
requestandhandle_work(State) -> requestandhandle_work(State) ->
case leveled_penciller:pcl_workforclerk(State#state.owner) of case leveled_penciller:pcl_workforclerk(State#state.owner) of
none -> none ->
io:format("Work prompted but none needed ~w~n", [self()]), leveled_log:log("PC006", [self()]),
{false, ?MAX_TIMEOUT}; {false, ?MAX_TIMEOUT};
WI -> WI ->
{NewManifest, FilesToDelete} = merge(WI), {NewManifest, FilesToDelete} = merge(WI),
UpdWI = WI#penciller_work{new_manifest=NewManifest, UpdWI = WI#penciller_work{new_manifest=NewManifest,
unreferenced_files=FilesToDelete}, unreferenced_files=FilesToDelete},
io:format("Clerk prompting Penciller regarding manifest change~n"), leveled_log:log("PC007", []),
ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner, ok = leveled_penciller:pcl_promptmanifestchange(State#state.owner,
UpdWI), UpdWI),
{true, UpdWI} {true, UpdWI}
@ -186,8 +185,7 @@ merge(WI) ->
%% Need to work out if this is the top level %% Need to work out if this is the top level
%% And then tell merge process to create files at the top level %% And then tell merge process to create files at the top level
%% Which will include the reaping of expired tombstones %% Which will include the reaping of expired tombstones
io:format("Merge from level ~w to merge into ~w files below~n", leveled_log:log("PC008", [SrcLevel, length(Candidates)]),
[SrcLevel, length(Candidates)]),
MergedFiles = case length(Candidates) of MergedFiles = case length(Candidates) of
0 -> 0 ->
@ -195,7 +193,7 @@ merge(WI) ->
%% %%
%% TODO: need to think still about simply renaming when at %% TODO: need to think still about simply renaming when at
%% lower level %% lower level
io:format("File ~s to simply switch levels to level ~w~n", leveled_log:log("PC009",
[SrcF#manifest_entry.filename, SrcLevel + 1]), [SrcF#manifest_entry.filename, SrcLevel + 1]),
[SrcF]; [SrcF];
_ -> _ ->
@ -293,8 +291,7 @@ select_filetomerge(SrcLevel, Manifest) ->
%% The level is the level which the new files should be created at. %% The level is the level which the new files should be created at.
perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) -> perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) ->
io:format("Merge to be commenced for FileToMerge=~s with MSN=~w~n", leveled_log:log("PC010", [SrcFN, MSN]),
[SrcFN, MSN]),
PointerList = lists:map(fun(P) -> PointerList = lists:map(fun(P) ->
{next, P#manifest_entry.owner, all} end, {next, P#manifest_entry.owner, all} end,
CandidateList), CandidateList),
@ -306,14 +303,12 @@ perform_merge({SrcPid, SrcFN}, CandidateList, LevelInfo, {Filepath, MSN}) ->
[]). []).
do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) -> do_merge([], [], {SrcLevel, _IsB}, {_Filepath, MSN}, FileCounter, OutList) ->
io:format("Merge completed with MSN=~w Level=~w and FileCounter=~w~n", leveled_log:log("PC011", [MSN, SrcLevel, FileCounter]),
[MSN, SrcLevel, FileCounter]),
OutList; OutList;
do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) -> do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft", FileName = lists:flatten(io_lib:format(Filepath ++ "_~w_~w.sft",
[SrcLevel + 1, FileCounter])), [SrcLevel + 1, FileCounter])),
io:format("File to be created as part of MSN=~w Filename=~s~n", leveled_log:log("PC012", [MSN, FileName]),
[MSN, FileName]),
TS1 = os:timestamp(), TS1 = os:timestamp(),
LevelR = case IsB of LevelR = case IsB of
true -> true ->
@ -329,8 +324,8 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
LevelR), LevelR),
case Reply of case Reply of
{{[], []}, null, _} -> {{[], []}, null, _} ->
io:format("Merge resulted in empty file ~s~n", [FileName]), leveled_log:log("PC013", [FileName]),
io:format("Empty file ~s to be cleared~n", [FileName]), leveled_log:log("PC014", [FileName]),
ok = leveled_sft:sft_clear(Pid), ok = leveled_sft:sft_clear(Pid),
OutList; OutList;
{{KL1Rem, KL2Rem}, SmallestKey, HighestKey} -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} ->
@ -339,8 +334,7 @@ do_merge(KL1, KL2, {SrcLevel, IsB}, {Filepath, MSN}, FileCounter, OutList) ->
end_key=HighestKey, end_key=HighestKey,
owner=Pid, owner=Pid,
filename=FileName}]), filename=FileName}]),
MTime = timer:now_diff(os:timestamp(), TS1), leveled_log:log_timer("PC015", [], TS1),
io:format("File creation took ~w microseconds ~n", [MTime]),
do_merge(KL1Rem, KL2Rem, do_merge(KL1Rem, KL2Rem,
{SrcLevel, IsB}, {Filepath, MSN}, {SrcLevel, IsB}, {Filepath, MSN},
FileCounter + 1, ExtMan) FileCounter + 1, ExtMan)

View file

@ -338,9 +338,8 @@ init([PCLopts]) ->
PCLopts#penciller_options.start_snapshot} of PCLopts#penciller_options.start_snapshot} of
{undefined, true} -> {undefined, true} ->
SrcPenciller = PCLopts#penciller_options.source_penciller, SrcPenciller = PCLopts#penciller_options.source_penciller,
io:format("Registering ledger snapshot~n"),
{ok, State} = pcl_registersnapshot(SrcPenciller, self()), {ok, State} = pcl_registersnapshot(SrcPenciller, self()),
io:format("Ledger snapshot registered~n"), leveled_log:log("P0001", [self()]),
{ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}}; {ok, State#state{is_snapshot=true, source_penciller=SrcPenciller}};
%% Need to do something about timeout %% Need to do something about timeout
{_RootPath, false} -> {_RootPath, false} ->
@ -421,10 +420,7 @@ handle_call({push_mem, PushedTree}, From, State=#state{is_snapshot=Snap})
State#state.levelzero_cache, State#state.levelzero_cache,
State) State)
end, end,
io:format("Handling of push completed in ~w microseconds with " leveled_log:log_timer("P0002", [S#state.levelzero_size], SW),
++ "L0 cache size now ~w~n",
[timer:now_diff(os:timestamp(), SW),
S#state.levelzero_size]),
{noreply, S}; {noreply, S};
handle_call({fetch, Key}, _From, State) -> handle_call({fetch, Key}, _From, State) ->
{reply, {reply,
@ -489,8 +485,8 @@ handle_cast({manifest_change, WI}, State) ->
{noreply, UpdState}; {noreply, UpdState};
handle_cast({release_snapshot, Snapshot}, State) -> handle_cast({release_snapshot, Snapshot}, State) ->
Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots),
io:format("Ledger snapshot ~w released~n", [Snapshot]), leveled_log:log("P0003", [Snapshot]),
io:format("Remaining ledger snapshots are ~w~n", [Rs]), leveled_log:log("P0004", [Rs]),
{noreply, State#state{registered_snapshots=Rs}}; {noreply, State#state{registered_snapshots=Rs}};
handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap}) handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
when Snap == false -> when Snap == false ->
@ -500,9 +496,7 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
case Reply of case Reply of
{true, Pid} -> {true, Pid} ->
UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files), UF1 = lists:keydelete(FileName, 1, State#state.unreferenced_files),
io:format("Filename ~s removed from unreferenced files as delete " leveled_log:log("P0005", [FileName]),
++ "is confirmed - file should now close~n",
[FileName]),
ok = leveled_sft:sft_deleteconfirmed(Pid), ok = leveled_sft:sft_deleteconfirmed(Pid),
{noreply, State#state{unreferenced_files=UF1}}; {noreply, State#state{unreferenced_files=UF1}};
_ -> _ ->
@ -511,13 +505,12 @@ handle_cast({confirm_delete, FileName}, State=#state{is_snapshot=Snap})
handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) -> handle_info({_Ref, {ok, SrcFN, _StartKey, _EndKey}}, State) ->
io:format("Orphaned reply after timeout on L0 file write ~s~n", [SrcFN]), leveled_log:log("P0006", [SrcFN]),
{noreply, State}. {noreply, State}.
terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true -> terminate(Reason, State=#state{is_snapshot=Snap}) when Snap == true ->
ok = pcl_releasesnapshot(State#state.source_penciller, self()), ok = pcl_releasesnapshot(State#state.source_penciller, self()),
io:format("Sent release message for cloned Penciller following close for " leveled_log:log("P0007", [Reason]),
++ "reason ~w~n", [Reason]),
ok; ok;
terminate(Reason, State) -> terminate(Reason, State) ->
%% When a Penciller shuts down it isn't safe to try an manage the safe %% When a Penciller shuts down it isn't safe to try an manage the safe
@ -536,7 +529,7 @@ terminate(Reason, State) ->
%% The cast may not succeed as the clerk could be synchronously calling %% The cast may not succeed as the clerk could be synchronously calling
%% the penciller looking for a manifest commit %% the penciller looking for a manifest commit
%% %%
io:format("Penciller closing for reason - ~w~n", [Reason]), leveled_log:log("P0008", [Reason]),
MC = leveled_pclerk:clerk_manifestchange(State#state.clerk, MC = leveled_pclerk:clerk_manifestchange(State#state.clerk,
return, return,
true), true),
@ -557,12 +550,12 @@ terminate(Reason, State) ->
{true, [], _} -> {true, [], _} ->
ok = leveled_sft:sft_close(State#state.levelzero_constructor); ok = leveled_sft:sft_close(State#state.levelzero_constructor);
{false, [], 0} -> {false, [], 0} ->
io:format("Level 0 cache empty at close of Penciller~n"); leveled_log:log("P0009", []);
{false, [], _N} -> {false, [], _N} ->
L0Pid = roll_memory(UpdState, true), L0Pid = roll_memory(UpdState, true),
ok = leveled_sft:sft_close(L0Pid); ok = leveled_sft:sft_close(L0Pid);
_ -> _ ->
io:format("No level zero action on close of Penciller~n") leveled_log:log("P0010", [])
end, end,
% Tidy shutdown of individual files % Tidy shutdown of individual files
@ -570,7 +563,7 @@ terminate(Reason, State) ->
lists:foreach(fun({_FN, Pid, _SN}) -> lists:foreach(fun({_FN, Pid, _SN}) ->
ok = leveled_sft:sft_close(Pid) end, ok = leveled_sft:sft_close(Pid) end,
UpdState#state.unreferenced_files), UpdState#state.unreferenced_files),
io:format("Shutdown complete for Penciller~n"), leveled_log:log("P0011", []),
ok. ok.
@ -622,12 +615,10 @@ start_from_file(PCLopts) ->
TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end, TopManSQN = lists:foldl(fun(X, MaxSQN) -> max(X, MaxSQN) end,
0, 0,
ValidManSQNs), ValidManSQNs),
io:format("Store to be started based on " ++ leveled_log:log("P0012", [TopManSQN]),
"manifest sequence number of ~w~n", [TopManSQN]),
ManUpdate = case TopManSQN of ManUpdate = case TopManSQN of
0 -> 0 ->
io:format("Seqence number of 0 indicates no valid " ++ leveled_log:log("P0013", []),
"manifest~n"),
{[], 0}; {[], 0};
_ -> _ ->
CurrManFile = filepath(InitState#state.root_path, CurrManFile = filepath(InitState#state.root_path,
@ -639,14 +630,13 @@ start_from_file(PCLopts) ->
end, end,
{UpdManifest, MaxSQN} = ManUpdate, {UpdManifest, MaxSQN} = ManUpdate,
io:format("Maximum sequence number of ~w found in nonzero levels~n", leveled_log:log("P0014", [MaxSQN]),
[MaxSQN]),
%% Find any L0 files %% Find any L0 files
L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft", L0FN = filepath(RootPath, TopManSQN, new_merge_files) ++ "_0_0.sft",
case filelib:is_file(L0FN) of case filelib:is_file(L0FN) of
true -> true ->
io:format("L0 file found ~s~n", [L0FN]), leveled_log:log("P0015", [L0FN]),
{ok, {ok,
L0Pid, L0Pid,
{L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN), {L0StartKey, L0EndKey}} = leveled_sft:sft_open(L0FN),
@ -659,8 +649,7 @@ start_from_file(PCLopts) ->
1, 1,
UpdManifest, UpdManifest,
{0, [ManifestEntry]}), {0, [ManifestEntry]}),
io:format("L0 file had maximum sequence number of ~w~n", leveled_log:log("P0016", [L0SQN]),
[L0SQN]),
LedgerSQN = max(MaxSQN, L0SQN), LedgerSQN = max(MaxSQN, L0SQN),
{ok, {ok,
InitState#state{manifest=UpdManifest2, InitState#state{manifest=UpdManifest2,
@ -668,7 +657,7 @@ start_from_file(PCLopts) ->
ledger_sqn=LedgerSQN, ledger_sqn=LedgerSQN,
persisted_sqn=LedgerSQN}}; persisted_sqn=LedgerSQN}};
false -> false ->
io:format("No L0 file found~n"), leveled_log:log("P0017", []),
{ok, {ok,
InitState#state{manifest=UpdManifest, InitState#state{manifest=UpdManifest,
manifest_sqn=TopManSQN, manifest_sqn=TopManSQN,
@ -678,10 +667,7 @@ start_from_file(PCLopts) ->
log_pushmem_reply(From, Reply, SW) -> log_pushmem_reply(From, Reply, SW) ->
io:format("Respone to push_mem of ~w ~s took ~w microseconds~n", leveled_log:log_timer("P0018", [element(1,Reply), element(2,Reply)], SW),
[element(1, Reply),
element(2, Reply),
timer:now_diff(os:timestamp(), SW)]),
gen_server:reply(From, element(1, Reply)). gen_server:reply(From, element(1, Reply)).
@ -737,7 +723,7 @@ checkready(Pid) ->
roll_memory(State, false) -> roll_memory(State, false) ->
FileName = levelzero_filename(State), FileName = levelzero_filename(State),
io:format("Rolling level zero to file ~s~n", [FileName]), leveled_log:log("P0019", [FileName]),
Opts = #sft_options{wait=false}, Opts = #sft_options{wait=false},
PCL = self(), PCL = self(),
FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end, FetchFun = fun(Slot) -> pcl_fetchlevelzero(PCL, Slot) end,
@ -832,9 +818,7 @@ return_work(State, From) ->
L when L > 0 -> L when L > 0 ->
[{SrcLevel, Manifest}|OtherWork] = WorkQ, [{SrcLevel, Manifest}|OtherWork] = WorkQ,
Backlog = length(OtherWork), Backlog = length(OtherWork),
io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ leveled_log:log("P0020", [SrcLevel, From, Backlog]),
"queue items outstanding~n",
[SrcLevel, From, Backlog]),
IsBasement = if IsBasement = if
SrcLevel + 1 == BasementL -> SrcLevel + 1 == BasementL ->
true; true;
@ -845,7 +829,7 @@ return_work(State, From) ->
true -> true ->
% Once the L0 file is completed there will be more work % Once the L0 file is completed there will be more work
% - so don't be busy doing other work now % - so don't be busy doing other work now
io:format("Allocation of work blocked as L0 pending~n"), leveled_log:log("P0021", []),
{State, none}; {State, none};
false -> false ->
%% No work currently outstanding %% No work currently outstanding
@ -910,7 +894,7 @@ open_all_filesinmanifest({Manifest, TopSQN}, Level) ->
print_manifest(Manifest) -> print_manifest(Manifest) ->
lists:foreach(fun(L) -> lists:foreach(fun(L) ->
io:format("Manifest at Level ~w~n", [L]), leveled_log:log("P0022", [L]),
Level = get_item(L, Manifest, []), Level = get_item(L, Manifest, []),
lists:foreach(fun print_manifest_entry/1, Level) lists:foreach(fun print_manifest_entry/1, Level)
end, end,
@ -918,16 +902,10 @@ print_manifest(Manifest) ->
ok. ok.
print_manifest_entry(Entry) -> print_manifest_entry(Entry) ->
{S1, S2, S3, {S1, S2, S3} = leveled_codec:print_key(Entry#manifest_entry.start_key),
FS2, FS3} = leveled_codec:print_key(Entry#manifest_entry.start_key), {E1, E2, E3} = leveled_codec:print_key(Entry#manifest_entry.end_key),
{E1, E2, E3, leveled_log:log("P0023",
FE2, FE3} = leveled_codec:print_key(Entry#manifest_entry.end_key), [S1, S2, S3, E1, E2, E3, Entry#manifest_entry.filename]).
io:format("Manifest entry of " ++
"startkey ~s " ++ FS2 ++ " " ++ FS3 ++
" endkey ~s " ++ FE2 ++ " " ++ FE3 ++
" filename=~s~n",
[S1, S2, S3, E1, E2, E3,
Entry#manifest_entry.filename]).
initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) ->
CompareFun = fun(M) -> CompareFun = fun(M) ->
@ -1051,9 +1029,6 @@ find_nextkey(QueryArray, LCnt, {BestKeyLevel, BestKV}, QueryFunT) ->
% the best key % the best key
% But we also need to remove the dominated key from the % But we also need to remove the dominated key from the
% lower level in the query array % lower level in the query array
io:format("Key at level ~w with SQN ~w is better than " ++
"key at lower level ~w with SQN ~w~n",
[LCnt, SQN, BKL, BestSQN]),
OldBestEntry = lists:keyfind(BKL, 1, QueryArray), OldBestEntry = lists:keyfind(BKL, 1, QueryArray),
{BKL, [{BestKey, BestVal}|BestTail]} = OldBestEntry, {BKL, [{BestKey, BestVal}|BestTail]} = OldBestEntry,
find_nextkey(lists:keyreplace(BKL, find_nextkey(lists:keyreplace(BKL,
@ -1149,8 +1124,7 @@ assess_workqueue(WorkQ, LevelToAssess, Man, BasementLevel) ->
maybe_append_work(WorkQ, Level, Manifest, maybe_append_work(WorkQ, Level, Manifest,
MaxFiles, FileCount) MaxFiles, FileCount)
when FileCount > MaxFiles -> when FileCount > MaxFiles ->
io:format("Outstanding compaction work items of ~w at level ~w~n", leveled_log:log("P0024", [FileCount - MaxFiles, Level]),
[FileCount - MaxFiles, Level]),
lists:append(WorkQ, [{Level, Manifest}]); lists:append(WorkQ, [{Level, Manifest}]);
maybe_append_work(WorkQ, _Level, _Manifest, maybe_append_work(WorkQ, _Level, _Manifest,
_MaxFiles, _FileCount) -> _MaxFiles, _FileCount) ->
@ -1183,24 +1157,19 @@ commit_manifest_change(ReturnedWorkItem, State) ->
RootPath = State#state.root_path, RootPath = State#state.root_path,
UnreferencedFiles = State#state.unreferenced_files, UnreferencedFiles = State#state.unreferenced_files,
case {SentWorkItem#penciller_work.next_sqn, if
SentWorkItem#penciller_work.clerk} of NewMSN == SentWorkItem#penciller_work.next_sqn ->
{NewMSN, _From} ->
MTime = timer:now_diff(os:timestamp(),
SentWorkItem#penciller_work.start_time),
WISrcLevel = SentWorkItem#penciller_work.src_level, WISrcLevel = SentWorkItem#penciller_work.src_level,
io:format("Merge to sqn ~w completed in ~w microseconds " ++ leveled_log:log_timer("P0025",
"from Level ~w~n", [SentWorkItem#penciller_work.next_sqn,
[SentWorkItem#penciller_work.next_sqn, WISrcLevel],
MTime, SentWorkItem#penciller_work.start_time),
WISrcLevel]),
ok = rename_manifest_files(RootPath, NewMSN), ok = rename_manifest_files(RootPath, NewMSN),
FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files,
UnreferencedFilesUpd = update_deletions(FilesToDelete, UnreferencedFilesUpd = update_deletions(FilesToDelete,
NewMSN, NewMSN,
UnreferencedFiles), UnreferencedFiles),
io:format("Merge has been commmitted at sequence number ~w~n", leveled_log:log("P0026", [NewMSN]),
[NewMSN]),
NewManifest = ReturnedWorkItem#penciller_work.new_manifest, NewManifest = ReturnedWorkItem#penciller_work.new_manifest,
CurrL0 = get_item(0, State#state.manifest, []), CurrL0 = get_item(0, State#state.manifest, []),
@ -1221,23 +1190,15 @@ commit_manifest_change(ReturnedWorkItem, State) ->
{ok, State#state{ongoing_work=[], {ok, State#state{ongoing_work=[],
manifest_sqn=NewMSN, manifest_sqn=NewMSN,
manifest=RevisedManifest, manifest=RevisedManifest,
unreferenced_files=UnreferencedFilesUpd}}; unreferenced_files=UnreferencedFilesUpd}}
{MaybeWrongMSN, From} ->
io:format("Merge commit at sqn ~w not matched to expected" ++
" sqn ~w from Clerk ~w~n",
[NewMSN, MaybeWrongMSN, From]),
{error, State}
end. end.
rename_manifest_files(RootPath, NewMSN) -> rename_manifest_files(RootPath, NewMSN) ->
OldFN = filepath(RootPath, NewMSN, pending_manifest), OldFN = filepath(RootPath, NewMSN, pending_manifest),
NewFN = filepath(RootPath, NewMSN, current_manifest), NewFN = filepath(RootPath, NewMSN, current_manifest),
io:format("Rename of manifest from ~s ~w to ~s ~w~n", leveled_log:log("P0027", [OldFN, filelib:is_file(OldFN),
[OldFN, NewFN, filelib:is_file(NewFN)]),
filelib:is_file(OldFN),
NewFN,
filelib:is_file(NewFN)]),
ok = file:rename(OldFN,NewFN). ok = file:rename(OldFN,NewFN).
filepath(RootPath, manifest) -> filepath(RootPath, manifest) ->
@ -1257,8 +1218,7 @@ filepath(RootPath, NewMSN, new_merge_files) ->
update_deletions([], _NewMSN, UnreferencedFiles) -> update_deletions([], _NewMSN, UnreferencedFiles) ->
UnreferencedFiles; UnreferencedFiles;
update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) -> update_deletions([ClearedFile|Tail], MSN, UnreferencedFiles) ->
io:format("Adding cleared file ~s to deletion list ~n", leveled_log:log("P0028", [ClearedFile#manifest_entry.filename]),
[ClearedFile#manifest_entry.filename]),
update_deletions(Tail, update_deletions(Tail,
MSN, MSN,
lists:append(UnreferencedFiles, lists:append(UnreferencedFiles,