diff --git a/src/leveled_imanifest.erl b/src/leveled_imanifest.erl index 045be1a..5243c99 100644 --- a/src/leveled_imanifest.erl +++ b/src/leveled_imanifest.erl @@ -10,13 +10,24 @@ -export([ generate_entry/1, - add_entry/2, + add_entry/3, append_lastkey/3, remove_entry/2, - find_entry/2 + find_entry/2, + head_entry/1, + to_list/1, + from_list/1, + reader/2, + writer/3, + printer/1, + complete_filex/0 ]). +-define(MANIFEST_FILEX, "man"). +-define(PENDING_FILEX, "pnd"). +-define(SKIP_WIDTH, 16). + %%%============================================================================ %%% API @@ -34,16 +45,25 @@ generate_entry(Journal) -> [] end. -add_entry(Manifest, Entry) -> +add_entry(Manifest, Entry, ToEnd) -> {SQN, FN, PidR, LastKey} = Entry, StrippedName = filename:rootname(FN), - lists:reverse(lists:sort([{SQN, StrippedName, PidR, LastKey}|Manifest])). + case ToEnd of + true -> + prepend_entry({SQN, StrippedName, PidR, LastKey}, Manifest); + false -> + Man0 = [{SQN, StrippedName, PidR, LastKey}|to_list(Manifest)], + Man1 = lists:reverse(lists:sort(Man0)), + from_list(Man1) + end. append_lastkey(Manifest, Pid, LastKey) -> - [{SQN, Filename, PidR, LK}|ManifestTail] = Manifest, - case {PidR, LK} of + [{SQNMarker, SQNL}|ManifestTail] = Manifest, + [{E_SQN, E_FN, E_P, E_LK}|SQNL_Tail] = SQNL, + case {E_P, E_LK} of {Pid, empty} -> - [{SQN, Filename, PidR, LastKey}|ManifestTail]; + UpdEntry = {E_SQN, E_FN, E_P, LastKey}, + [{SQNMarker, [UpdEntry|SQNL_Tail]}|ManifestTail]; _ -> Manifest end. @@ -51,20 +71,153 @@ append_lastkey(Manifest, Pid, LastKey) -> remove_entry(Manifest, Entry) -> {SQN, FN, _PidR, _LastKey} = Entry, leveled_log:log("I0013", [FN]), - lists:keydelete(SQN, 1, Manifest). + Man0 = lists:keydelete(SQN, 1, to_list(Manifest)), + from_list(Man0). -find_entry(SQN, [{LowSQN, _FN, Pid, _LK}|_Tail]) when SQN >= LowSQN -> - Pid; -find_entry(SQN, [_Head|Tail]) -> +find_entry(SQN, [{SQNMarker, SubL}|_Tail]) when SQN >= SQNMarker -> + find_subentry(SQN, SubL); +find_entry(SQN, [_TopEntry|Tail]) -> find_entry(SQN, Tail). +head_entry(Manifest) -> + [{_SQNMarker, SQNL}|_Tail] = Manifest, + [HeadEntry|_SQNL_Tail] = SQNL, + HeadEntry. + +to_list(Manifest) -> + FoldFun = + fun({_SQNMarker, SubL}, Acc) -> + Acc ++ SubL + end, + lists:foldl(FoldFun, [], Manifest). + +reader(SQN, RootPath) -> + ManifestPath = leveled_inker:filepath(RootPath, manifest_dir), + leveled_log:log("I0015", [ManifestPath, SQN]), + {ok, MBin} = file:read_file(filename:join(ManifestPath, + integer_to_list(SQN) + ++ ".man")), + from_list(lists:reverse(lists:sort(binary_to_term(MBin)))). + + +writer(Manifest, ManSQN, RootPath) -> + ManPath = leveled_inker:filepath(RootPath, manifest_dir), + NewFN = filename:join(ManPath, + integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX), + TmpFN = filename:join(ManPath, + integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), + MBin = term_to_binary(to_list(Manifest), [compressed]), + case filelib:is_file(NewFN) of + false -> + leveled_log:log("I0016", [ManSQN]), + ok = file:write_file(TmpFN, MBin), + ok = file:rename(TmpFN, NewFN), + ok + end. + +printer(Manifest) -> + lists:foreach(fun({SQN, FN, _PID, _LK}) -> + leveled_log:log("I0017", [SQN, FN]) end, + to_list(Manifest)). + +complete_filex() -> + ?MANIFEST_FILEX. +%%%============================================================================ +%%% Internal Functions +%%%============================================================================ + +from_list(Manifest) -> + % Manifest should already be sorted with the highest SQN at the head + % This will be maintained so that we can fold from the left, and find + % more recently added entries quicker - under the assumptions that fresh + % reads are more common than stale reads + lists:foldr(fun prepend_entry/2, [], Manifest). + +prepend_entry(Entry, AccL) -> + {SQN, _FN, _PidR, _LastKey} = Entry, + case AccL of + [] -> + [{SQN, [Entry]}]; + [{SQNMarker, SubL}|Tail] -> + case length(SubL) < ?SKIP_WIDTH of + true -> + [{SQNMarker, [Entry|SubL]}|Tail]; + false -> + [{SQN, [Entry]}|AccL] + end + end. + +find_subentry(SQN, [{ME_SQN, _FN, ME_P, _LK}|_Tail]) when SQN >= ME_SQN -> + ME_P; +find_subentry(SQN, [_TopEntry|Tail]) -> + find_subentry(SQN, Tail). + + %%%============================================================================ %%% Test %%%============================================================================ -ifdef(TEST). +build_testmanifest_aslist() -> + ManifestMapFun = + fun(N) -> + NStr = integer_to_list(N), + {max(1, N * 1000), "FN" ++ NStr, "pid" ++ NStr, "LK" ++ NStr} + end, + lists:map(ManifestMapFun, lists:reverse(lists:seq(0, 50))). + +test_testmanifest(Man0) -> + ?assertMatch("pid0", find_entry(1, Man0)), + ?assertMatch("pid0", find_entry(2, Man0)), + ?assertMatch("pid1", find_entry(1001, Man0)), + ?assertMatch("pid20", find_entry(20000, Man0)), + ?assertMatch("pid20", find_entry(20001, Man0)), + ?assertMatch("pid20", find_entry(20999, Man0)), + ?assertMatch("pid50", find_entry(99999, Man0)). + +buildfromlist_test() -> + ManL = build_testmanifest_aslist(), + Man0 = from_list(ManL), + test_testmanifest(Man0), + ?assertMatch(ManL, to_list(Man0)). + +buildfromend_test() -> + ManL = build_testmanifest_aslist(), + FoldFun = + fun(E, Man) -> + add_entry(Man, E, true) + end, + Man0 = lists:foldr(FoldFun, [], ManL), + test_testmanifest(Man0), + ?assertMatch(ManL, to_list(Man0)). + +buildrandomfashion_test() -> + ManL0 = build_testmanifest_aslist(), + RandMapFun = + fun(X) -> + {random:uniform(), X} + end, + ManL1 = lists:map(RandMapFun, ManL0), + ManL2 = lists:sort(ManL1), + + FoldFun = + fun({_R, E}, Man) -> + add_entry(Man, E, false) + end, + Man0 = lists:foldl(FoldFun, [], ManL2), + + test_testmanifest(Man0), + ?assertMatch(ManL0, to_list(Man0)), + + RandomEntry = lists:nth(random:uniform(50), ManL0), + Man1 = remove_entry(Man0, RandomEntry), + Man2 = add_entry(Man1, RandomEntry, false), + + test_testmanifest(Man2), + ?assertMatch(ManL0, to_list(Man2)). + -endif. \ No newline at end of file diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index c404745..abb7851 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -110,7 +110,6 @@ ink_close/1, ink_doom/1, build_dummy_journal/0, - simple_manifest_reader/2, clean_testdir/1, filepath/2, filepath/3]). @@ -122,7 +121,6 @@ -define(COMPACT_FP, "post_compact"). -define(WASTE_FP, "waste"). -define(JOURNAL_FILEX, "cdb"). --define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). -define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). @@ -259,7 +257,7 @@ handle_call({get, Key, SQN}, _From, State) -> handle_call({key_check, Key, SQN}, _From, State) -> {reply, key_check(Key, SQN, State#state.manifest), State}; handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> - Manifest = lists:reverse(State#state.manifest), + Manifest = lists:reverse(leveled_imanifest:to_list(State#state.manifest)), Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> @@ -281,29 +279,30 @@ handle_call({confirm_delete, ManSQN}, _From, State) -> State#state.registered_snapshots), {reply, Reply, State}; handle_call(get_manifest, _From, State) -> - {reply, State#state.manifest, State}; + {reply, leveled_imanifest:to_list(State#state.manifest), State}; handle_call({update_manifest, ManifestSnippet, DeletedFiles}, _From, State) -> - Man0 = lists:foldl(fun(ManEntry, AccMan) -> - leveled_imanifest:remove_entry(AccMan, ManEntry) - end, - State#state.manifest, - DeletedFiles), - Man1 = lists:foldl(fun(ManEntry, AccMan) -> - leveled_imanifest:add_entry(AccMan, ManEntry) end, - Man0, - ManifestSnippet), + DropFun = + fun(E, Acc) -> + leveled_imanifest:remove_entry(Acc, E) + end, + Man0 = lists:foldl(DropFun, State#state.manifest, DeletedFiles), + AddFun = + fun(E, Acc) -> + leveled_imanifest:add_entry(Acc, E, false) + end, + Man1 = lists:foldl(AddFun, Man0, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, - manifest_printer(Man1), - simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), + leveled_imanifest:printer(Man1), + leveled_imanifest:writer(Man1, NewManifestSQN, State#state.root_path), {reply, {ok, NewManifestSQN}, State#state{manifest=Man1, manifest_sqn=NewManifestSQN, pending_removals=DeletedFiles}}; handle_call(print_manifest, _From, State) -> - manifest_printer(State#state.manifest), + leveled_imanifest:printer(State#state.manifest), {reply, ok, State}; handle_call({compact, Checker, @@ -353,8 +352,9 @@ terminate(Reason, State) -> lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), leveled_log:log("I0007", []), - manifest_printer(State#state.manifest), - ok = close_allmanifest(State#state.manifest) + leveled_imanifest:printer(State#state.manifest), + ManAsList = leveled_imanifest:to_list(State#state.manifest), + ok = close_allmanifest(ManAsList) end. code_change(_OldVsn, State, _Extra) -> @@ -439,10 +439,10 @@ put_object(LedgerKey, Object, KeyChanges, State) -> State#state.root_path, CDBopts), {_, _, NewJournalP, _} = ManEntry, - Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry), - ok = simple_manifest_writer(Manifest1, - State#state.manifest_sqn + 1, - State#state.root_path), + Manifest1 = leveled_imanifest:add_entry(Manifest0, ManEntry, true), + ok = leveled_imanifest:writer(Manifest1, + State#state.manifest_sqn + 1, + State#state.root_path), ok = leveled_cdb:cdb_put(NewJournalP, JournalKey, JournalBin), @@ -478,7 +478,7 @@ build_manifest(ManifestFilenames, CDBopts) -> % Find the manifest with a highest Manifest sequence number % Open it and read it to get the current Confirmed Manifest - ManifestRegex = "(?[0-9]+)\\." ++ ?MANIFEST_FILEX, + ManifestRegex = "(?[0-9]+)\\." ++ leveled_imanifest:complete_filex(), ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames, ManifestRegex, 'MSQN'), @@ -488,7 +488,7 @@ build_manifest(ManifestFilenames, {[], 1}; _ -> PersistedManSQN = lists:max(ValidManSQNs), - M1 = simple_manifest_reader(PersistedManSQN, + M1 = leveled_imanifest:reader(PersistedManSQN, RootPath), {M1, PersistedManSQN} end, @@ -497,7 +497,10 @@ build_manifest(ManifestFilenames, % a valid active journal at the head of the manifest OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts), - {ActiveLowSQN, _FN, ActiveJournal, _LK} = lists:nth(1, OpenManifest), + {ActiveLowSQN, + _FN, + ActiveJournal, + _LK} = leveled_imanifest:head_entry(OpenManifest), JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of empty -> ActiveLowSQN; @@ -511,13 +514,13 @@ build_manifest(ManifestFilenames, if length(OpenManifest) > length(Manifest) -> leveled_log:log("I0009", []), - manifest_printer(OpenManifest), + leveled_imanifest:printer(OpenManifest), NextSQN = ManifestSQN + 1, - simple_manifest_writer(OpenManifest, NextSQN, RootPath), + leveled_imanifest:writer(OpenManifest, NextSQN, RootPath), NextSQN; true -> leveled_log:log("I0010", []), - manifest_printer(OpenManifest), + leveled_imanifest:printer(OpenManifest), ManifestSQN end, {OpenManifest, UpdManifestSQN, JournalSQN, ActiveJournal}. @@ -533,9 +536,11 @@ close_allmanifest([H|ManifestT]) -> open_all_manifest([], RootPath, CDBOpts) -> leveled_log:log("I0011", []), - leveled_imanifest:add_entry([], start_new_activejournal(1, RootPath, CDBOpts)); + leveled_imanifest:add_entry([], + start_new_activejournal(1, RootPath, CDBOpts), + true); open_all_manifest(Man0, RootPath, CDBOpts) -> - Man1 = lists:reverse(lists:sort(Man0)), + Man1 = leveled_imanifest:to_list(Man0), [{HeadSQN, HeadFN, _IgnorePid, HeadLK}|ManifestTail] = Man1, OpenJournalFun = fun(ManEntry) -> @@ -559,7 +564,8 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> ManEntry end end, - OpenedTail = lists:map(OpenJournalFun, ManifestTail), + OpenedTailAsList = lists:map(OpenJournalFun, ManifestTail), + OpenedTail = leveled_imanifest:from_list(OpenedTailAsList), CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, case filelib:is_file(CompleteHeadFN) of @@ -572,16 +578,20 @@ open_all_manifest(Man0, RootPath, CDBOpts) -> {HeadSQN, HeadFN, HeadR, - LastKey}), + LastKey}, + true), NewManEntry = start_new_activejournal(LastSQN + 1, RootPath, CDBOpts), - leveled_imanifest:add_entry(ManToHead, NewManEntry); + leveled_imanifest:add_entry(ManToHead, + NewManEntry, + true); false -> {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, CDBOpts), leveled_imanifest:add_entry(OpenedTail, - {HeadSQN, HeadFN, HeadW, HeadLK}) + {HeadSQN, HeadFN, HeadW, HeadLK}, + true) end. @@ -707,35 +717,6 @@ filepath(CompactFilePath, NewSQN, compact_journal) -> ++ "." ++ ?PENDING_FILEX). -simple_manifest_reader(SQN, RootPath) -> - ManifestPath = filepath(RootPath, manifest_dir), - leveled_log:log("I0015", [ManifestPath, SQN]), - {ok, MBin} = file:read_file(filename:join(ManifestPath, - integer_to_list(SQN) - ++ ".man")), - binary_to_term(MBin). - - -simple_manifest_writer(Manifest, ManSQN, RootPath) -> - ManPath = filepath(RootPath, manifest_dir), - NewFN = filename:join(ManPath, - integer_to_list(ManSQN) ++ "." ++ ?MANIFEST_FILEX), - TmpFN = filename:join(ManPath, - integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), - MBin = term_to_binary(Manifest, [compressed]), - case filelib:is_file(NewFN) of - false -> - leveled_log:log("I0016", [ManSQN]), - ok = file:write_file(TmpFN, MBin), - ok = file:rename(TmpFN, NewFN), - ok - end. - -manifest_printer(Manifest) -> - lists:foreach(fun({SQN, FN, _PID, _LK}) -> - leveled_log:log("I0017", [SQN, FN]) end, - Manifest). - initiate_penciller_snapshot(Bookie) -> {ok, {LedgerSnap, LedgerCache},