Broken WIP

This commit is contained in:
martinsumner 2017-01-17 16:30:04 +00:00
parent b2bb4ce73e
commit 3712c62a50
3 changed files with 141 additions and 101 deletions

View file

@ -415,7 +415,7 @@ compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN,
ManSlice0;
compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN,
_RStrategy, ManSlice0) ->
ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0),
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0),
ManSlice1;
compact_files([Batch|T], CDBopts, ActiveJournal0,
FilterFun, FilterServer, MaxSQN,
@ -511,23 +511,10 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) ->
ok ->
{Journal1, ManSlice0};
roll ->
ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1),
ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1),
write_values(KVCList, CDBopts, null, ManSlice1)
end.
generate_manifest_entry(ActiveJournal) ->
{ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal),
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
case leveled_cdb:cdb_firstkey(PidR) of
{StartSQN, _Type, _PK} ->
[{StartSQN, NewFN, PidR}];
empty ->
leveled_log:log("IC013", [NewFN]),
[]
end.
clear_waste(State) ->
WP = State#state.waste_path,
WRP = State#state.waste_retention_period,
@ -728,7 +715,8 @@ compact_single_file_recovr_test() ->
LedgerFun1,
CompactFP,
CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR}] = compact_files([Candidate],
[{LowSQN, FN, PidR, _LastKey}] =
compact_files([Candidate],
#cdb_options{file_path=CompactFP},
LedgerFun1,
LedgerSrv1,
@ -764,7 +752,8 @@ compact_single_file_retain_test() ->
LedgerFun1,
CompactFP,
CDB} = compact_single_file_setup(),
[{LowSQN, FN, PidR}] = compact_files([Candidate],
[{LowSQN, FN, PidR, _LK}] =
compact_files([Candidate],
#cdb_options{file_path=CompactFP},
LedgerFun1,
LedgerSrv1,
@ -847,7 +836,7 @@ compact_singlefile_totwosmallfiles_test() ->
900,
[{?STD_TAG, recovr}]),
?assertMatch(2, length(ManifestSlice)),
lists:foreach(fun({_SQN, _FN, CDB}) ->
lists:foreach(fun({_SQN, _FN, CDB, _LK}) ->
ok = leveled_cdb:cdb_deletepending(CDB),
ok = leveled_cdb:cdb_destroy(CDB)
end,

60
src/leveled_imanifest.erl Normal file
View file

@ -0,0 +1,60 @@
%% -------- Inker Manifest ---------
%%
-module(leveled_imanifest).
-include("include/leveled.hrl").
-include_lib("eunit/include/eunit.hrl").
-export([
generate_entry/1,
add_entry/2,
remove_entry/2,
find_entry/2
]).
%%%============================================================================
%%% API
%%%============================================================================
generate_entry(Journal) ->
{ok, NewFN} = leveled_cdb:cdb_complete(Journal),
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFN),
case leveled_cdb:cdb_firstkey(PidR) of
{StartSQN, _Type, _PK} ->
LastKey = leveled_cdb:cdb_lastkey(PidR),
[{StartSQN, NewFN, PidR, LastKey}];
empty ->
leveled_log:log("IC013", [NewFN]),
[]
end.
add_entry(Manifest, Entry) ->
{SQN, FN, PidR, LastKey} = Entry,
StrippedName = filename:rootname(FN),
lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])).
remove_entry(Manifest, Entry) ->
{SQN, FN, _PidR, _LastKey} = Entry,
leveled_log:log("I0013", [FN]),
lists:keydelete(SQN, 1, Manifest).
find_entry(SQN, [{LowSQN, _FN, Pid, _LK}|_Tail]) when SQN >= LowSQN ->
Pid;
find_entry(SQN, [_Head|Tail]) ->
find_entry(SQN, Tail).
%%%============================================================================
%%% Test
%%%============================================================================
-ifdef(TEST).
-endif.

View file

@ -168,7 +168,6 @@ ink_releasesnapshot(Pid, Snapshot) ->
gen_server:cast(Pid, {release_snapshot, Snapshot}).
ink_confirmdelete(Pid, ManSQN) ->
io:format("Confirm delete request received~n"),
gen_server:call(Pid, {confirm_delete, ManSQN}).
ink_close(Pid) ->
@ -274,7 +273,6 @@ handle_call({register_snapshot, Requestor}, _From , State) ->
State#state.active_journaldb},
State#state{registered_snapshots=Rs}};
handle_call({confirm_delete, ManSQN}, _From, State) ->
io:format("Confirm delete request to be processed~n"),
Reply = lists:foldl(fun({_R, SnapSQN}, Bool) ->
case SnapSQN >= ManSQN of
true ->
@ -284,7 +282,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) ->
end end,
true,
State#state.registered_snapshots),
io:format("Confirm delete request complete with reply ~w~n", [Reply]),
{reply, Reply, State};
handle_call(get_manifest, _From, State) ->
{reply, State#state.manifest, State};
@ -292,12 +289,12 @@ handle_call({update_manifest,
ManifestSnippet,
DeletedFiles}, _From, State) ->
Man0 = lists:foldl(fun(ManEntry, AccMan) ->
remove_from_manifest(AccMan, ManEntry)
leveled_imanifest:remove_entry(AccMan, ManEntry)
end,
State#state.manifest,
DeletedFiles),
Man1 = lists:foldl(fun(ManEntry, AccMan) ->
add_to_manifest(AccMan, ManEntry) end,
leveled_imanifest:add_entry(AccMan, ManEntry) end,
Man0,
ManifestSnippet),
NewManifestSQN = State#state.manifest_sqn + 1,
@ -439,7 +436,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
State#state.root_path,
CDBopts),
{_, _, NewJournalP} = ManEntry,
NewManifest = add_to_manifest(State#state.manifest, ManEntry),
NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry),
ok = simple_manifest_writer(NewManifest,
State#state.manifest_sqn + 1,
State#state.root_path),
@ -457,7 +454,7 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
get_object(LedgerKey, SQN, Manifest) ->
JournalP = find_in_manifest(SQN, Manifest),
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
SQN,
to_fetch,
@ -466,7 +463,7 @@ get_object(LedgerKey, SQN, Manifest) ->
leveled_codec:from_inkerkv(Obj).
key_check(LedgerKey, SQN, Manifest) ->
JournalP = find_in_manifest(SQN, Manifest),
JournalP = leveled_imanifest:find_entry(SQN, Manifest),
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
SQN,
to_fetch,
@ -496,7 +493,8 @@ build_manifest(ManifestFilenames,
% Open the manifest files, completing if necessary and ensure there is
% a valid active journal at the head of the manifest
OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts),
{ActiveLowSQN, _FN, ActiveJournal} = lists:nth(1, OpenManifest),
{ActiveLowSQN, _FN, ActiveJournal, _LK} = lists:nth(1, OpenManifest),
JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of
empty ->
ActiveLowSQN;
@ -506,14 +504,14 @@ build_manifest(ManifestFilenames,
% Update the manifest if it has been changed by the process of laoding
% the manifest (must also increment the manifest SQN).
UpdManifestSQN = if
UpdManifestSQN =
if
length(OpenManifest) > length(Manifest) ->
leveled_log:log("I0009", []),
manifest_printer(OpenManifest),
simple_manifest_writer(OpenManifest,
ManifestSQN + 1,
RootPath),
ManifestSQN + 1;
NextSQN = ManifestSQN + 1,
simple_manifest_writer(OpenManifest, NextSQN, RootPath),
NextSQN;
true ->
leveled_log:log("I0010", []),
manifest_printer(OpenManifest),
@ -525,76 +523,68 @@ build_manifest(ManifestFilenames,
close_allmanifest([]) ->
ok;
close_allmanifest([H|ManifestT]) ->
{_, _, Pid} = H,
{_, _, Pid, _} = H,
ok = leveled_cdb:cdb_close(Pid),
close_allmanifest(ManifestT).
open_all_manifest([], RootPath, CDBOpts) ->
leveled_log:log("I0011", []),
add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts));
leveled_imanifest:add_entry([], start_new_activejournal(1, RootPath, CDBOpts));
open_all_manifest(Man0, RootPath, CDBOpts) ->
Man1 = lists:reverse(lists:sort(Man0)),
[{HeadSQN, HeadFN}|ManifestTail] = Man1,
[{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1,
CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX,
PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX,
Man2 = case filelib:is_file(CompleteHeadFN) of
true ->
leveled_log:log("I0012", [HeadFN]),
{ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN),
{LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR),
add_to_manifest(add_to_manifest(ManifestTail,
{HeadSQN, HeadFN, HeadR}),
start_new_activejournal(LastSQN + 1,
LastKey = leveled_cdb:cdb_lastkey(HeadR),
LastSQN = element(1, LastKey),
ManToHead = leveled_imanifest:add_entry(ManifestTail,
{HeadSQN,
HeadFN,
HeadR,
LastKey}),
NewManEntry = start_new_activejournal(LastSQN + 1,
RootPath,
CDBOpts));
CDBOpts),
leveled_imanifest:add_entry(ManToHead, NewManEntry);
false ->
{ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN,
CDBOpts),
add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadW})
leveled_imanifest:add_entry(ManifestTail,
{HeadSQN, HeadFN, HeadW, HeadLK})
end,
lists:map(fun(ManEntry) ->
OpenJournalFun =
fun(ManEntry) ->
case ManEntry of
{LowSQN, FN} ->
{LowSQN, FN, _, LK_RO} ->
CFN = FN ++ "." ++ ?JOURNAL_FILEX,
PFN = FN ++ "." ++ ?PENDING_FILEX,
case filelib:is_file(CFN) of
true ->
{ok,
Pid} = leveled_cdb:cdb_open_reader(CFN),
{LowSQN, FN, Pid};
{ok, Pid} = leveled_cdb:cdb_open_reader(CFN),
{LowSQN, FN, Pid, LK_RO};
false ->
W = leveled_cdb:cdb_open_writer(PFN, CDBOpts),
{ok, Pid} = W,
ok = leveled_cdb:cdb_roll(Pid),
{LowSQN, FN, Pid}
LK_WR = leveled_cdb:cdb_lastkey(Pid),
{LowSQN, FN, Pid, LK_WR}
end;
_ ->
ManEntry
end end,
Man2).
end
end,
lists:map(OpenJournalFun, Man2).
start_new_activejournal(SQN, RootPath, CDBOpts) ->
Filename = filepath(RootPath, SQN, new_journal),
{ok, PidW} = leveled_cdb:cdb_open_writer(Filename, CDBOpts),
{SQN, Filename, PidW}.
add_to_manifest(Manifest, Entry) ->
{SQN, FN, PidR} = Entry,
StrippedName = filename:rootname(FN),
lists:reverse(lists:sort([{SQN, StrippedName, PidR}|Manifest])).
remove_from_manifest(Manifest, Entry) ->
{SQN, FN, _PidR} = Entry,
leveled_log:log("I0013", [FN]),
lists:keydelete(SQN, 1, Manifest).
find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN ->
Pid;
find_in_manifest(SQN, [_Head|Tail]) ->
find_in_manifest(SQN, Tail).
{SQN, Filename, PidW, empty}.
%% Scan between sequence numbers applying FilterFun to each entry where
@ -728,8 +718,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX),
TmpFN = filename:join(ManPath,
integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX),
MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end,
Manifest), [compressed]),
MBin = term_to_binary(Manifest, [compressed]),
case filelib:is_file(NewFN) of
false ->
leveled_log:log("I0016", [ManSQN]),
@ -739,7 +728,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
end.
manifest_printer(Manifest) ->
lists:foreach(fun({SQN, FN, _PID}) ->
lists:foreach(fun({SQN, FN, _PID, _LK}) ->
leveled_log:log("I0017", [SQN, FN]) end,
Manifest).
@ -777,6 +766,7 @@ build_dummy_journal(KeyConvertF) ->
ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})),
ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})),
ok = leveled_cdb:cdb_roll(J1),
LK1 = leveled_cdb:cdb_lastkey(J1),
lists:foldl(fun(X, Closed) ->
case Closed of
true -> true;
@ -795,9 +785,10 @@ build_dummy_journal(KeyConvertF) ->
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})),
ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})),
LK2 = leveled_cdb:cdb_lastkey(J2),
ok = leveled_cdb:cdb_close(J2),
Manifest = [{1, "../test/journal/journal_files/nursery_1"},
{3, "../test/journal/journal_files/nursery_3"}],
Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1},
{3, "../test/journal/journal_files/nursery_3", "pid2", LK2}],
ManifestBin = term_to_binary(Manifest),
{ok, MF1} = file:open(filename:join(ManifestFP, "1.man"),
[binary, raw, read, write]),