From 3712c62a5072d3869ebb3b4dc4213191567c331d Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 17 Jan 2017 16:30:04 +0000 Subject: [PATCH] Broken WIP --- src/leveled_iclerk.erl | 45 +++++-------- src/leveled_imanifest.erl | 60 +++++++++++++++++ src/leveled_inker.erl | 137 ++++++++++++++++++-------------------- 3 files changed, 141 insertions(+), 101 deletions(-) create mode 100644 src/leveled_imanifest.erl diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 46d6308..f66fcd0 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -415,7 +415,7 @@ compact_files([], _CDBopts, null, _FilterFun, _FilterServer, _MaxSQN, ManSlice0; compact_files([], _CDBopts, ActiveJournal0, _FilterFun, _FilterServer, _MaxSQN, _RStrategy, ManSlice0) -> - ManSlice1 = ManSlice0 ++ generate_manifest_entry(ActiveJournal0), + ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(ActiveJournal0), ManSlice1; compact_files([Batch|T], CDBopts, ActiveJournal0, FilterFun, FilterServer, MaxSQN, @@ -511,23 +511,10 @@ write_values(KVCList, CDBopts, Journal0, ManSlice0) -> ok -> {Journal1, ManSlice0}; roll -> - ManSlice1 = ManSlice0 ++ generate_manifest_entry(Journal1), + ManSlice1 = ManSlice0 ++ leveled_imanifest:generate_entry(Journal1), write_values(KVCList, CDBopts, null, ManSlice1) end. - - -generate_manifest_entry(ActiveJournal) -> - {ok, NewFN} = leveled_cdb:cdb_complete(ActiveJournal), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), - case leveled_cdb:cdb_firstkey(PidR) of - {StartSQN, _Type, _PK} -> - [{StartSQN, NewFN, PidR}]; - empty -> - leveled_log:log("IC013", [NewFN]), - [] - end. - clear_waste(State) -> WP = State#state.waste_path, WRP = State#state.waste_retention_period, @@ -728,12 +715,13 @@ compact_single_file_recovr_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP}, - LedgerFun1, - LedgerSrv1, - 9, - [{?STD_TAG, recovr}]), + [{LowSQN, FN, PidR, _LastKey}] = + compact_files([Candidate], + #cdb_options{file_path=CompactFP}, + LedgerFun1, + LedgerSrv1, + 9, + [{?STD_TAG, recovr}]), io:format("FN of ~s~n", [FN]), ?assertMatch(2, LowSQN), ?assertMatch(probably, @@ -764,12 +752,13 @@ compact_single_file_retain_test() -> LedgerFun1, CompactFP, CDB} = compact_single_file_setup(), - [{LowSQN, FN, PidR}] = compact_files([Candidate], - #cdb_options{file_path=CompactFP}, - LedgerFun1, - LedgerSrv1, - 9, - [{?STD_TAG, retain}]), + [{LowSQN, FN, PidR, _LK}] = + compact_files([Candidate], + #cdb_options{file_path=CompactFP}, + LedgerFun1, + LedgerSrv1, + 9, + [{?STD_TAG, retain}]), io:format("FN of ~s~n", [FN]), ?assertMatch(1, LowSQN), ?assertMatch(probably, @@ -847,7 +836,7 @@ compact_singlefile_totwosmallfiles_test() -> 900, [{?STD_TAG, recovr}]), ?assertMatch(2, length(ManifestSlice)), - lists:foreach(fun({_SQN, _FN, CDB}) -> + lists:foreach(fun({_SQN, _FN, CDB, _LK}) -> ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB) end, diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl new file mode 100644 index 0000000..c115785 --- /dev/null +++ b/src/leveled_imanifest.erl @@ -0,0 +1,60 @@ +%% -------- Inker Manifest --------- +%% + + +-module(leveled_imanifest). + +-include("include/leveled.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-export([ + generate_entry/1, + add_entry/2, + remove_entry/2, + find_entry/2 + + ]). + + +%%%============================================================================ +%%% API +%%%============================================================================ + +generate_entry(Journal) -> + {ok, NewFN} = leveled_cdb:cdb_complete(Journal), + {ok, PidR} = leveled_cdb:cdb_open_reader(NewFN), + case leveled_cdb:cdb_firstkey(PidR) of + {StartSQN, _Type, _PK} -> + LastKey = leveled_cdb:cdb_lastkey(PidR), + [{StartSQN, NewFN, PidR, LastKey}]; + empty -> + leveled_log:log("IC013", [NewFN]), + [] + end. + +add_entry(Manifest, Entry) -> + {SQN, FN, PidR, LastKey} = Entry, + StrippedName = filename:rootname(FN), + lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])). + +remove_entry(Manifest, Entry) -> + {SQN, FN, _PidR, _LastKey} = Entry, + leveled_log:log("I0013", [FN]), + lists:keydelete(SQN, 1, Manifest). + +find_entry(SQN, [{LowSQN, _FN, Pid, _LK}|_Tail]) when SQN >= LowSQN -> + Pid; +find_entry(SQN, [_Head|Tail]) -> + find_entry(SQN, Tail). + + + +%%%============================================================================ +%%% Test +%%%============================================================================ + +-ifdef(TEST). + + +-endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index f56ea20..c9d1a74 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -168,7 +168,6 @@ ink_releasesnapshot(Pid, Snapshot) -> gen_server:cast(Pid, {release_snapshot, Snapshot}). ink_confirmdelete(Pid, ManSQN) -> - io:format("Confirm delete request received~n"), gen_server:call(Pid, {confirm_delete, ManSQN}). ink_close(Pid) -> @@ -274,7 +273,6 @@ handle_call({register_snapshot, Requestor}, _From , State) -> State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; handle_call({confirm_delete, ManSQN}, _From, State) -> - io:format("Confirm delete request to be processed~n"), Reply = lists:foldl(fun({_R, SnapSQN}, Bool) -> case SnapSQN >= ManSQN of true -> @@ -284,7 +282,6 @@ handle_call({confirm_delete, ManSQN}, _From, State) -> end end, true, State#state.registered_snapshots), - io:format("Confirm delete request complete with reply ~w~n", [Reply]), {reply, Reply, State}; handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; @@ -292,12 +289,12 @@ handle_call({update_manifest, ManifestSnippet, DeletedFiles}, _From, State) -> Man0 = lists:foldl(fun(ManEntry, AccMan) -> - remove_from_manifest(AccMan, ManEntry) + leveled_imanifest:remove_entry(AccMan, ManEntry) end, State#state.manifest, DeletedFiles), Man1 = lists:foldl(fun(ManEntry, AccMan) -> - add_to_manifest(AccMan, ManEntry) end, + leveled_imanifest:add_entry(AccMan, ManEntry) end, Man0, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, @@ -439,7 +436,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> State#state.root_path, CDBopts), {_, _, NewJournalP} = ManEntry, - NewManifest = add_to_manifest(State#state.manifest, ManEntry), + NewManifest = leveled_imanifest:add_entry(State#state.manifest, ManEntry), ok = simple_manifest_writer(NewManifest, State#state.manifest_sqn + 1, State#state.root_path), @@ -457,7 +454,7 @@ put_object(LedgerKey, Object, KeyChanges, State) -> get_object(LedgerKey, SQN, Manifest) -> - JournalP = find_in_manifest(SQN, Manifest), + JournalP = leveled_imanifest:find_entry(SQN, Manifest), {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch, @@ -466,7 +463,7 @@ get_object(LedgerKey, SQN, Manifest) -> leveled_codec:from_inkerkv(Obj). key_check(LedgerKey, SQN, Manifest) -> - JournalP = find_in_manifest(SQN, Manifest), + JournalP = leveled_imanifest:find_entry(SQN, Manifest), {InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey, SQN, to_fetch, @@ -496,7 +493,8 @@ build_manifest(ManifestFilenames, % Open the manifest files, completing if necessary and ensure there is % a valid active journal at the head of the manifest OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts), - {ActiveLowSQN, _FN, ActiveJournal} = lists:nth(1, OpenManifest), + + {ActiveLowSQN, _FN, ActiveJournal, _LK} = lists:nth(1, OpenManifest), JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of empty -> ActiveLowSQN; @@ -506,95 +504,87 @@ build_manifest(ManifestFilenames, % Update the manifest if it has been changed by the process of laoding % the manifest (must also increment the manifest SQN). - UpdManifestSQN = if - length(OpenManifest) > length(Manifest) -> - leveled_log:log("I0009", []), - manifest_printer(OpenManifest), - simple_manifest_writer(OpenManifest, - ManifestSQN + 1, - RootPath), - ManifestSQN + 1; - true -> - leveled_log:log("I0010", []), - manifest_printer(OpenManifest), - ManifestSQN - end, + UpdManifestSQN = + if + length(OpenManifest) > length(Manifest) -> + leveled_log:log("I0009", []), + manifest_printer(OpenManifest), + NextSQN = ManifestSQN + 1, + simple_manifest_writer(OpenManifest, NextSQN, RootPath), + NextSQN; + true -> + leveled_log:log("I0010", []), + manifest_printer(OpenManifest), + ManifestSQN + end, {OpenManifest, UpdManifestSQN, JournalSQN, ActiveJournal}. close_allmanifest([]) -> ok; close_allmanifest([H|ManifestT]) -> - {_, _, Pid} = H, + {_, _, Pid, _} = H, ok = leveled_cdb:cdb_close(Pid), close_allmanifest(ManifestT). open_all_manifest([], RootPath, CDBOpts) -> leveled_log:log("I0011", []), - add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts)); + leveled_imanifest:add_entry([], start_new_activejournal(1, RootPath, CDBOpts)); open_all_manifest(Man0, RootPath, CDBOpts) -> Man1 = lists:reverse(lists:sort(Man0)), - [{HeadSQN, HeadFN}|ManifestTail] = Man1, + [{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1, CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, Man2 = case filelib:is_file(CompleteHeadFN) of true -> leveled_log:log("I0012", [HeadFN]), {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), - {LastSQN, _Type, _PK} = leveled_cdb:cdb_lastkey(HeadR), - add_to_manifest(add_to_manifest(ManifestTail, - {HeadSQN, HeadFN, HeadR}), - start_new_activejournal(LastSQN + 1, + LastKey = leveled_cdb:cdb_lastkey(HeadR), + LastSQN = element(1, LastKey), + ManToHead = leveled_imanifest:add_entry(ManifestTail, + {HeadSQN, + HeadFN, + HeadR, + LastKey}), + NewManEntry = start_new_activejournal(LastSQN + 1, RootPath, - CDBOpts)); + CDBOpts), + leveled_imanifest:add_entry(ManToHead, NewManEntry); false -> {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts), - add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadW}) + leveled_imanifest:add_entry(ManifestTail, + {HeadSQN, HeadFN, HeadW, HeadLK}) end, - lists:map(fun(ManEntry) -> - case ManEntry of - {LowSQN, FN} -> - CFN = FN ++ "." ++ ?JOURNAL_FILEX, - PFN = FN ++ "." ++ ?PENDING_FILEX, - case filelib:is_file(CFN) of - true -> - {ok, - Pid} = leveled_cdb:cdb_open_reader(CFN), - {LowSQN, FN, Pid}; - false -> - W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), - {ok, Pid} = W, - ok = leveled_cdb:cdb_roll(Pid), - {LowSQN, FN, Pid} - end; - _ -> - ManEntry - end end, - Man2). + OpenJournalFun = + fun(ManEntry) -> + case ManEntry of + {LowSQN, FN, _, LK_RO} -> + CFN = FN ++ "." ++ ?JOURNAL_FILEX, + PFN = FN ++ "." ++ ?PENDING_FILEX, + case filelib:is_file(CFN) of + true -> + {ok, Pid} = leveled_cdb:cdb_open_reader(CFN), + {LowSQN, FN, Pid, LK_RO}; + false -> + W = leveled_cdb:cdb_open_writer(PFN, CDBOpts), + {ok, Pid} = W, + ok = leveled_cdb:cdb_roll(Pid), + LK_WR = leveled_cdb:cdb_lastkey(Pid), + {LowSQN, FN, Pid, LK_WR} + end; + _ -> + ManEntry + end + end, + lists:map(OpenJournalFun, Man2). start_new_activejournal(SQN, RootPath, CDBOpts) -> Filename = filepath(RootPath, SQN, new_journal), {ok, PidW} = leveled_cdb:cdb_open_writer(Filename, CDBOpts), - {SQN, Filename, PidW}. - -add_to_manifest(Manifest, Entry) -> - {SQN, FN, PidR} = Entry, - StrippedName = filename:rootname(FN), - lists:reverse(lists:sort([{SQN, StrippedName, PidR}|Manifest])). - -remove_from_manifest(Manifest, Entry) -> - {SQN, FN, _PidR} = Entry, - leveled_log:log("I0013", [FN]), - lists:keydelete(SQN, 1, Manifest). - -find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> - Pid; -find_in_manifest(SQN, [_Head|Tail]) -> - find_in_manifest(SQN, Tail). - + {SQN, Filename, PidW, empty}. %% Scan between sequence numbers applying FilterFun to each entry where @@ -728,8 +718,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX), TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), - MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end, - Manifest), [compressed]), + MBin = term_to_binary(Manifest, [compressed]), case filelib:is_file(NewFN) of false -> leveled_log:log("I0016", [ManSQN]), @@ -739,7 +728,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> end. manifest_printer(Manifest) -> - lists:foreach(fun({SQN, FN, _PID}) -> + lists:foreach(fun({SQN, FN, _PID, _LK}) -> leveled_log:log("I0017", [SQN, FN]) end, Manifest). @@ -777,6 +766,7 @@ build_dummy_journal(KeyConvertF) -> ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})), ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})), ok = leveled_cdb:cdb_roll(J1), + LK1 = leveled_cdb:cdb_lastkey(J1), lists:foldl(fun(X, Closed) -> case Closed of true -> true; @@ -795,9 +785,10 @@ build_dummy_journal(KeyConvertF) -> {K4, V4} = {KeyConvertF("Key4"), "TestValue4"}, ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})), ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})), + LK2 = leveled_cdb:cdb_lastkey(J2), ok = leveled_cdb:cdb_close(J2), - Manifest = [{1, "../test/journal/journal_files/nursery_1"}, - {3, "../test/journal/journal_files/nursery_3"}], + Manifest = [{1, "../test/journal/journal_files/nursery_1", "pid1", LK1}, + {3, "../test/journal/journal_files/nursery_3", "pid2", LK2}], ManifestBin = term_to_binary(Manifest), {ok, MF1} = file:open(filename:join(ManifestFP, "1.man"), [binary, raw, read, write]),