From 8dfeb520ef985b2583fb17f9a2899bbd65dc6267 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Fri, 7 Oct 2016 18:07:03 +0100 Subject: [PATCH] Inker Refactor Inker refactored to block on manifest write. If this is inefficient the manifets write can be converted ot an append only operation. Waiting on the manifest write makes the logic at startup much easier to manage. --- src/leveled_cdb.erl | 26 +- src/leveled_iclerk.erl | 15 +- src/leveled_inker.erl | 677 ++++++++++++++++------------------------- 3 files changed, 282 insertions(+), 436 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 668b1c8..04adfda 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -139,7 +139,7 @@ cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). cdb_roll(Pid) -> - gen_server:call(Pid, cdb_roll, infinity). + gen_server:cast(Pid, cdb_roll). cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -344,11 +344,17 @@ handle_call(cdb_complete, _From, State=#state{writer=Writer}) {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; -handle_call(cdb_roll, From, State=#state{writer=Writer}) - when Writer == true -> + {stop, normal, {ok, State#state.filename}, State}. + + +handle_cast(destroy, State) -> + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {noreply, State}; +handle_cast(delete_pending, State) -> + {noreply, State#state{pending_delete = true}}; +handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> NewName = determine_new_filename(State#state.filename), - gen_server:reply(From, {ok, NewName}), ok = close_file(State#state.handle, State#state.hashtree, State#state.last_position), @@ -359,15 +365,7 @@ handle_call(cdb_roll, From, State=#state{writer=Writer}) last_key=LastKey, filename=NewName, writer=false, - hash_index=Index}}. - - -handle_cast(destroy, State) -> - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {noreply, State}; -handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete = true}}; + hash_index=Index}}; handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 6f187a3..9403ef4 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -81,7 +81,12 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, State) -> % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - Manifest = leveled_inker:ink_getmanifest(Inker), + Manifest = case leveled_inker:ink_getmanifest(Inker) of + [] -> + []; + [_Active|Tail] -> + Tail + end, MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, @@ -159,7 +164,12 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - {0, 0}, KeySizeList), {ActiveSize, ReplacedSize} = R0, - Score = 100 * ActiveSize / (ActiveSize + ReplacedSize), + Score = case ActiveSize + ReplacedSize of + 0 -> + 100.0; + _ -> + 100 * ActiveSize / (ActiveSize + ReplacedSize) + end, io:format("Score for filename ~s is ~w~n", [FN, Score]), Score. @@ -219,6 +229,7 @@ assess_candidates(AllCandidates, MaxRunLength) -> end. assess_candidates([], _MaxRunLength, _CurrentRun0, BestAssessment) -> + io:format("Best run of ~w~n", [BestAssessment]), BestAssessment; assess_candidates([HeadC|Tail], MaxRunLength, CurrentRun0, BestAssessment) -> CurrentRun1 = choose_best_assessment(CurrentRun0 ++ [HeadC], diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 561ff64..471ea0b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -9,33 +9,43 @@ %% responsible for scheduling compaction work to be carried out by the Inker's %% clerk. %% -%% -------- Journal --------- +%% -------- Journal Files --------- %% -%% The Journal is a series of files originally named as _nursery.cdb +%% The Journal is a series of files originally named as _ %% where the sequence number is the first object sequence number (key) within %% the given database file. The files will be named *.cdb at the point they %% have been made immutable (through a rename operation). Prior to this, they %% will originally start out as a *.pnd file. %% %% At some stage in the future compacted versions of old journal cdb files may -%% be produced. These files will be named -.cdb, and once -%% the manifest is updated the original _nursery.cdb (or -%% _.cdb) files they replace will be erased. +%% be produced. These files will be named -.cdb, and once +%% the manifest is updated the original _.cdb (or +%% _.cdb) files they replace will be erased. %% -%% The current Journal is made up of a set of files referenced in the manifest, -%% combined with a set of files of the form _nursery.[cdb|pnd] with -%% a higher Sequence Number compared to the files in the manifest. +%% The current Journal is made up of a set of files referenced in the manifest. +%% No PUTs are made to files which are not in the manifest. %% %% The Journal is ordered by sequence number from front to back both within %% and across files. %% %% On startup the Inker should open the manifest with the highest sequence %% number, and this will contain the list of filenames that make up the -%% non-recent part of the Journal. The Manifest is completed by opening these -%% files plus any other files with a higher sequence number. The file with -%% the highest sequence number is assumed to to be the active writer. Any file -%% with a lower sequence number and a *.pnd extension should be re-rolled into -%% a *.cdb file. +%% non-recent part of the Journal. All the filenames should then be opened. +%% How they are opened depends on the file extension: +%% +%% - If the file extension is *.cdb the file is opened read only +%% - If the file extension is *.pnd and the file is not the most recent in the +%% manifest, then the file should be completed bfore being opened read-only +%% - If the file extension is *.pnd the file is opened for writing +%% +%% -------- Manifest Files --------- +%% +%% The manifest is just saved as a straight term_to_binary blob, with a +%% filename ordered by the Manifest SQN. The Manifest is first saved with a +%% *.pnd extension, and then renamed to one with a *.man extension. +%% +%% On startup the *.man manifest file with the highest manifest sequence +%% number should be used. %% %% -------- Objects --------- %% @@ -45,47 +55,23 @@ %% - An object (an Erlang term) %% - A set of Key Deltas associated with the change %% -%% -------- Manifest --------- -%% -%% The Journal has a manifest which is the current record of which cdb files -%% are currently active in the Journal (i.e. following compaction). The -%% manifest holds this information through two lists - a list of files which -%% are definitely in the current manifest, and a list of files which have been -%% removed, but may still be present on disk. The use of two lists is to -%% avoid any circumsatnces where a compaction event has led to the deletion of -%% a Journal file with a higher sequence number than any in the remaining -%% manifest. -%% -%% A new manifest file is saved for every compaction event. The manifest files -%% are saved using the filename .man once saved. The ManifestSQN -%% is incremented once for every compaction event. -%% %% -------- Compaction --------- %% %% Compaction is a process whereby an Inker's clerk will: -%% - Request a snapshot of the Ledger, as well as the lowest sequence number -%% that is currently registerd by another snapshot owner -%% - Picks a Journal database file at random (not including the current -%% nursery log) -%% - Performs a random walk on keys and sequence numbers in the chosen CDB -%% file to extract a subset of 100 key and sequence number combinations from -%% the database -%% - Looks up the current sequence number for those keys in the Ledger -%% - If more than % (default n=20) of the keys are now at a higher sequence -%% number, then the database file is a candidate for compaction. In this case -%% each of the next 8 files in sequence should be checked until all those 8 -%% files have been checked or one of the files has been found to be below the -%% threshold. -%% - If a set of below-the-threshold files is found, the files are re-written -%% without any superceded values -%%- The clerk should then request that the Inker commit the manifest change -%% -%% -------- Inker's Clerk --------- -%% -%% +%% - Request a view of the current Inker manifest and a snaphot of the Ledger +%% - Test all files within the Journal to find th eapproximate comapction +%% potential percentage (the volume of the Journal that has been replaced) +%% - Attempts to find the optimal "run" of files to compact +%% - Compacts those files in the run, by rolling over the files re-writing +%% to a new Journal if and only if the Key is still present in the Ledger (or +%% the sequence number of the Key is higher than the SQN of the snapshot) +%% - Requests the Inker update the manifest with the new changes +%% - Instructs the files to destroy themselves when they are next closed %% +%% TODO: how to instruct the files to close is tbd %% + -module(leveled_inker). -behaviour(gen_server). @@ -132,7 +118,6 @@ manifest_sqn = 0 :: integer(), journal_sqn = 0 :: integer(), active_journaldb :: pid(), - active_journaldb_sqn :: integer(), pending_removals = [] :: list(), registered_snapshots = [] :: list(), root_path :: string(), @@ -222,11 +207,9 @@ init([InkerOpts]) -> {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, {Manifest, - ActiveJournalDB, - ActiveJournalSQN} = ink_registersnapshot(SrcInker, self()), + ActiveJournalDB} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, - active_journaldb_sqn=ActiveJournalSQN, source_inker=SrcInker, is_snapshot=true}}; %% Need to do something about timeout @@ -235,28 +218,16 @@ init([InkerOpts]) -> end. -handle_call({put, Key, Object, KeyChanges}, From, State) -> +handle_call({put, Key, Object, KeyChanges}, _From, State) -> case put_object(Key, Object, KeyChanges, State) of {ok, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}; {rolling, UpdState, ObjSize} -> - gen_server:reply(From, {ok, UpdState#state.journal_sqn, ObjSize}), - {NewManifest, - NewManifestSQN} = roll_active_file(State#state.active_journaldb, - State#state.manifest, - State#state.manifest_sqn, - State#state.root_path), - {noreply, UpdState#state{manifest=NewManifest, - manifest_sqn=NewManifestSQN}}; - {blocked, UpdState} -> - {reply, blocked, UpdState} + ok = leveled_cdb:cdb_roll(State#state.active_journaldb), + {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; handle_call({fetch, Key, SQN}, _From, State) -> - case get_object(Key, - SQN, - State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn) of + case get_object(Key, SQN, State#state.manifest) of {{SQN, Key}, {Value, _IndexSpecs}} -> {reply, {ok, Value}, State}; Other -> @@ -265,16 +236,9 @@ handle_call({fetch, Key, SQN}, _From, State) -> {reply, not_present, State} end; handle_call({get, Key, SQN}, _From, State) -> - {reply, get_object(Key, - SQN, - State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn), State}; + {reply, get_object(Key, SQN, State#state.manifest), State}; handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> - Manifest = lists:reverse(State#state.manifest) - ++ [{State#state.active_journaldb_sqn, - dummy, - State#state.active_journaldb}], + Manifest = lists:reverse(State#state.manifest), Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> @@ -283,8 +247,7 @@ handle_call({register_snapshot, Requestor}, _From , State) -> io:format("Inker snapshot ~w registered at SQN ~w~n", [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn}, + State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; handle_call({release_snapshot, Snapshot}, _From , State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), @@ -297,21 +260,16 @@ handle_call({update_manifest, ManifestSnippet, DeletedFiles}, _From, State) -> Man0 = lists:foldl(fun(ManEntry, AccMan) -> - Check = lists:member(ManEntry, DeletedFiles), - if - Check == false -> - AccMan ++ [ManEntry]; - true -> - AccMan - end + remove_from_manifest(AccMan, ManEntry) end, - [], - State#state.manifest), + State#state.manifest, + DeletedFiles), Man1 = lists:foldl(fun(ManEntry, AccMan) -> add_to_manifest(AccMan, ManEntry) end, Man0, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, + manifest_printer(Man1), ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), PendingRemovals = [{NewManifestSQN, DeletedFiles}], {reply, ok, State#state{manifest=Man1, @@ -365,8 +323,8 @@ terminate(Reason, State) -> lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), manifest_printer(State#state.manifest), - close_allmanifest(State#state.manifest, State#state.active_journaldb), - close_allremovals(State#state.pending_removals) + ok = close_allmanifest(State#state.manifest), + ok = close_allremovals(State#state.pending_removals) end. code_change(_OldVsn, State, _Extra) -> @@ -381,13 +339,10 @@ start_from_file(InkerOpts) -> RootPath = InkerOpts#inker_options.root_path, CDBopts = InkerOpts#inker_options.cdb_options, JournalFP = filepath(RootPath, journal_dir), - {ok, JournalFilenames} = case filelib:is_dir(JournalFP) of - true -> - file:list_dir(JournalFP); - false -> - filelib:ensure_dir(JournalFP), - {ok, []} - end, + filelib:ensure_dir(JournalFP), + CompactFP = filepath(RootPath, journal_compact_dir), + filelib:ensure_dir(CompactFP), + ManifestFP = filepath(RootPath, manifest_dir), {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of true -> @@ -397,26 +352,21 @@ start_from_file(InkerOpts) -> {ok, []} end, - CompactFP = filepath(RootPath, journal_compact_dir), - filelib:ensure_dir(CompactFP), IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {Manifest, - {ActiveJournal, LowActiveSQN}, + ManifestSQN, JournalSQN, - ManifestSQN} = build_manifest(ManifestFilenames, - JournalFilenames, - fun simple_manifest_reader/2, + ActiveJournal} = build_manifest(ManifestFilenames, RootPath, CDBopts), - {ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)), + {ok, #state{manifest = Manifest, manifest_sqn = ManifestSQN, journal_sqn = JournalSQN, active_journaldb = ActiveJournal, - active_journaldb_sqn = LowActiveSQN, root_path = RootPath, cdb_options = CDBopts, clerk = Clerk}}. @@ -436,56 +386,32 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> ok -> {ok, State#state{journal_sqn=NewSQN}, ObjSize}; roll -> - FileName = filepath(State#state.root_path, NewSQN, new_journal), + SW = os:timestamp(), CDBopts = State#state.cdb_options, - {ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName, CDBopts), - case leveled_cdb:cdb_put(NewJournalP, - {NewSQN, PrimaryKey}, - Bin1) of - ok -> - {rolling, - State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP, - active_journaldb_sqn=NewSQN}, - ObjSize}; - roll -> - {blocked, State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP, - active_journaldb_sqn=NewSQN}} - end + ManEntry = start_new_activejournal(NewSQN, + State#state.root_path, + CDBopts), + {_, _, NewJournalP} = ManEntry, + NewManifest = add_to_manifest(State#state.manifest, ManEntry), + ok = simple_manifest_writer(NewManifest, + State#state.manifest_sqn + 1, + State#state.root_path), + ok = leveled_cdb:cdb_put(NewJournalP, {NewSQN, PrimaryKey}, Bin1), + io:format("Put to new active journal " ++ + "with manifest write took ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), + {rolling, + State#state{journal_sqn=NewSQN, + manifest=NewManifest, + manifest_sqn = State#state.manifest_sqn + 1, + active_journaldb=NewJournalP}, + ObjSize} end. -roll_active_file(ActiveJournal, Manifest, ManifestSQN, RootPath) -> - SW = os:timestamp(), - io:format("Rolling old journal ~w~n", [ActiveJournal]), - {ok, NewFilename} = leveled_cdb:cdb_roll(ActiveJournal), - JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, - [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], - JournalRegex2, - 'SQN'), - NewManifest = add_to_manifest(Manifest, - {JournalSQN, NewFilename, ActiveJournal}), - NewManifestSQN = ManifestSQN + 1, - ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), - io:format("Rolling old journal completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW)]), - {NewManifest, NewManifestSQN}. -get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> - Obj = if - SQN < ActiveJournalSQN -> - JournalP = find_in_manifest(SQN, Manifest), - if - JournalP == error -> - io:format("Unable to find SQN~w in Manifest~w~n", - [SQN, Manifest]), - error; - true -> - leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}) - end; - true -> - leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey}) - end, +get_object(PrimaryKey, SQN, Manifest) -> + JournalP = find_in_manifest(SQN, Manifest), + Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), case Obj of {{SQN, PK}, Bin} -> {{SQN, PK}, binary_to_term(Bin)}; @@ -495,139 +421,129 @@ get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, - RootPath) -> - build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, - RootPath, - #cdb_options{}). - -build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, RootPath, CDBopts) -> - %% Find the manifest with a highest Manifest sequence number - %% Open it and read it to get the current Confirmed Manifest + % Find the manifest with a highest Manifest sequence number + % Open it and read it to get the current Confirmed Manifest ManifestRegex = "(?[0-9]+)\\." ++ ?MANIFEST_FILEX, ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames, ManifestRegex, 'MSQN'), - {JournalSQN1, - ConfirmedManifest, - Removed, + {Manifest, ManifestSQN} = case length(ValidManSQNs) of 0 -> - {0, [], [], 0}; + {[], 1}; _ -> PersistedManSQN = lists:max(ValidManSQNs), - M1 = ManifestRdrFun(PersistedManSQN, RootPath), - J1 = lists:foldl(fun({JSQN, _FN}, Acc) -> - max(JSQN, Acc) end, - 0, - M1), - {J1, M1, [], PersistedManSQN} + M1 = simple_manifest_reader(PersistedManSQN, + RootPath), + {M1, PersistedManSQN} end, - %% Find any more recent immutable files that have a higher sequence number - %% - the immutable files have already been rolled, and so have a completed - %% hashtree lookup - JournalRegex1 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, - UnremovedJournalFiles = lists:foldl(fun(FN, Acc) -> - case lists:member(FN, Removed) of - true -> - Acc; - false -> - Acc ++ [FN] - end end, - [], - JournalFilenames), - OtherSQNs_imm = sequencenumbers_fromfilenames(UnremovedJournalFiles, - JournalRegex1, - 'SQN'), - ExtendManifestFun = fun(X, Acc) -> - if - X > JournalSQN1 - -> - FN = filepath(RootPath, journal_dir) - ++ "nursery_" ++ - integer_to_list(X) - ++ "." ++ - ?JOURNAL_FILEX, - add_to_manifest(Acc, {X, FN}); - true - -> Acc - end end, - Manifest1 = lists:foldl(ExtendManifestFun, - ConfirmedManifest, - lists:sort(OtherSQNs_imm)), + % Open the manifest files, completing if necessary and ensure there is + % a valid active journal at the head of the manifest + OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts), + {ActiveLowSQN, _FN, ActiveJournal} = lists:nth(1, OpenManifest), + JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of + empty -> + ActiveLowSQN; + {JSQN, _LastKey} -> + JSQN + end, - %% Enrich the manifest so it contains the Pid of any of the immutable - %% entries - io:format("Manifest on startup is: ~n"), - manifest_printer(Manifest1), - Manifest2 = lists:map(fun({LowSQN, FN}) -> - {ok, Pid} = leveled_cdb:cdb_open_reader(FN), - {LowSQN, FN, Pid} end, - Manifest1), - - %% Find any more recent mutable files that have a higher sequence number - %% Roll any mutable files which do not have the highest sequence number - %% to create the hashtree and complete the header entries - JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?PENDING_FILEX, - OtherSQNs_pnd = sequencenumbers_fromfilenames(JournalFilenames, - JournalRegex2, - 'SQN'), - - case length(OtherSQNs_pnd) of - 0 -> - %% Need to create a new active writer, but also find the highest - %% SQN from within the confirmed manifest - TopSQNInManifest = - case length(Manifest2) of - 0 -> - %% Manifest is empty and no active writers - %% can be found so database is empty - 0; - _ -> - TM = lists:last(lists:keysort(1,Manifest2)), - {_SQN, _FN, TMPid} = TM, - {HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid), - HighSQN - end, - LowActiveSQN = TopSQNInManifest + 1, - ActiveFN = filepath(RootPath, LowActiveSQN, new_journal), - {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN, - CDBopts), - {Manifest2, - {ActiveJournal, LowActiveSQN}, - TopSQNInManifest, - ManifestSQN}; - _ -> - {ActiveJournalSQN, - Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd), - Manifest2, - RootPath), - %% Need to work out highest sequence number in tail file to feed - %% into opening of pending journal - ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal), - {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN, - CDBopts), - {HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal), - {Manifest3, - {ActiveJournal, ActiveJournalSQN}, - HighestSQN, - ManifestSQN} - end. + % Update the manifest if it has been changed by the process of laoding + % the manifest (must also increment the manifest SQN). + UpdManifestSQN = if + length(OpenManifest) > length(Manifest) -> + io:format("Updated manifest on startup: ~n"), + manifest_printer(OpenManifest), + simple_manifest_writer(OpenManifest, + ManifestSQN + 1, + RootPath), + ManifestSQN + 1; + true -> + io:format("Unchanged manifest on startup: ~n"), + manifest_printer(OpenManifest), + ManifestSQN + end, + {OpenManifest, UpdManifestSQN, JournalSQN, ActiveJournal}. -close_allmanifest([], ActiveJournal) -> - leveled_cdb:cdb_close(ActiveJournal); -close_allmanifest([H|ManifestT], ActiveJournal) -> + +close_allmanifest([]) -> + ok; +close_allmanifest([H|ManifestT]) -> {_, _, Pid} = H, ok = leveled_cdb:cdb_close(Pid), - close_allmanifest(ManifestT, ActiveJournal). + close_allmanifest(ManifestT). + + +open_all_manifest([], RootPath, CDBOpts) -> + io:format("Manifest is empty, starting from manifest SQN 1~n"), + add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts)); +open_all_manifest(Man0, RootPath, CDBOpts) -> + Man1 = lists:reverse(lists:sort(Man0)), + [{HeadSQN, HeadFN}|ManifestTail] = Man1, + CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, + PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, + Man2 = case filelib:is_file(CompleteHeadFN) of + true -> + io:format("Head manifest entry ~s is complete~n", + [HeadFN]), + {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), + {LastSQN, _LastPK} = leveled_cdb:cdb_lastkey(HeadR), + add_to_manifest(add_to_manifest(ManifestTail, + {HeadSQN, HeadFN, HeadR}), + start_new_activejournal(LastSQN + 1, + RootPath, + CDBOpts)); + false -> + {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, + CDBOpts), + add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadW}) + end, + lists:map(fun(ManEntry) -> + case ManEntry of + {LowSQN, FN} -> + CFN = FN ++ "." ++ ?JOURNAL_FILEX, + PFN = FN ++ "." ++ ?PENDING_FILEX, + case filelib:is_file(CFN) of + true -> + {ok, + Pid} = leveled_cdb:cdb_open_reader(CFN), + {LowSQN, FN, Pid}; + false -> + {ok, + Pid} = leveled_cdb:cdb_open_reader(PFN), + {LowSQN, FN, Pid} + end; + _ -> + ManEntry + end end, + Man2). + + +start_new_activejournal(SQN, RootPath, CDBOpts) -> + Filename = filepath(RootPath, SQN, new_journal), + {ok, PidW} = leveled_cdb:cdb_open_writer(Filename, CDBOpts), + {SQN, Filename, PidW}. + +add_to_manifest(Manifest, Entry) -> + {SQN, FN, PidR} = Entry, + StrippedName = filename:rootname(FN), + lists:reverse(lists:sort([{SQN, StrippedName, PidR}|Manifest])). + +remove_from_manifest(Manifest, Entry) -> + {SQN, FN, _PidR} = Entry, + io:format("File ~s to be removed from manifest~n", [FN]), + lists:keydelete(SQN, 1, Manifest). + +find_in_manifest(_SQN, []) -> + error; +find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> + Pid; +find_in_manifest(SQN, [_Head|Tail]) -> + find_in_manifest(SQN, Tail). + close_allremovals([]) -> ok; @@ -649,20 +565,6 @@ close_allremovals([{ManifestSQN, Removals}|Tail]) -> close_allremovals(Tail). -roll_pending_journals([TopJournalSQN], Manifest, _RootPath) - when is_integer(TopJournalSQN) -> - {TopJournalSQN, Manifest}; -roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> - Filename = filepath(RootPath, JournalSQN, new_journal), - {ok, PidW} = leveled_cdb:cdb_open_writer(Filename), - {ok, NewFilename} = leveled_cdb:cdb_complete(PidW), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), - roll_pending_journals(T, - add_to_manifest(Manifest, - {JournalSQN, NewFilename, PidR}), - RootPath). - - %% Scan between sequence numbers applying FilterFun to each entry where %% FilterFun{K, V, Acc} -> Penciller Key List %% Load the output for the CDB file into the Penciller. @@ -757,15 +659,6 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> [], Filenames). -add_to_manifest(Manifest, Entry) -> - lists:reverse(lists:sort([Entry|Manifest])). - -find_in_manifest(_SQN, []) -> - error; -find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> - Pid; -find_in_manifest(SQN, [_Head|Tail]) -> - find_in_manifest(SQN, Tail). filepath(RootPath, journal_dir) -> RootPath ++ "/" ++ ?FILES_FP ++ "/"; @@ -776,15 +669,22 @@ filepath(RootPath, journal_compact_dir) -> filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), - "nursery_" - ++ integer_to_list(NewSQN) + integer_to_list(NewSQN) ++ "_" + ++ generate_uuid() ++ "." ++ ?PENDING_FILEX); filepath(CompactFilePath, NewSQN, compact_journal) -> filename:join(CompactFilePath, - "nursery_" - ++ integer_to_list(NewSQN) + integer_to_list(NewSQN) ++ "_" + ++ generate_uuid() ++ "." ++ ?PENDING_FILEX). +%% Credit to +%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl +generate_uuid() -> + <> = crypto:rand_bytes(16), + io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). + simple_manifest_reader(SQN, RootPath) -> ManifestPath = filepath(RootPath, manifest_dir), io:format("Opening manifest file at ~s with SQN ~w~n", @@ -858,7 +758,9 @@ build_dummy_journal() -> {K2, V2} = {"Key2", "TestValue2"}, ok = leveled_cdb:cdb_put(J1, {1, K1}, term_to_binary({V1, []})), ok = leveled_cdb:cdb_put(J1, {2, K2}, term_to_binary({V2, []})), - {ok, _} = leveled_cdb:cdb_complete(J1), + ok = leveled_cdb:cdb_roll(J1), + _LK = leveled_cdb:cdb_lastkey(J1), + ok = leveled_cdb:cdb_close(J1), F2 = filename:join(JournalFP, "nursery_3.pnd"), {ok, J2} = leveled_cdb:cdb_open_writer(F2), {K1, V3} = {"Key1", "TestValue3"}, @@ -866,7 +768,8 @@ build_dummy_journal() -> ok = leveled_cdb:cdb_put(J2, {3, K1}, term_to_binary({V3, []})), ok = leveled_cdb:cdb_put(J2, {4, K4}, term_to_binary({V4, []})), ok = leveled_cdb:cdb_close(J2), - Manifest = [{1, "../test/journal/journal_files/nursery_1.cdb"}], + Manifest = [{1, "../test/journal/journal_files/nursery_1"}, + {3, "../test/journal/journal_files/nursery_3"}], ManifestBin = term_to_binary(Manifest), {ok, MF1} = file:open(filename:join(ManifestFP, "1.man"), [binary, raw, read, write]), @@ -891,146 +794,33 @@ clean_subdir(DirPath) -> end, Files). -simple_buildmanifest_test() -> - RootPath = "../test/journal", - build_dummy_journal(), - Res = build_manifest(["1.man"], - ["../test/journal/journal_files/nursery_1.cdb", - "../test/journal/journal_files/nursery_3.pnd"], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(HighSQN, 4), - ?assertMatch(ManSQN, 1), - ?assertMatch([{1, "../test/journal/journal_files/nursery_1.cdb", _}], Man), - {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), - ?assertMatch(ActSQN, 4), - ?assertMatch(ActJournalSQN, 3), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). -another_buildmanifest_test() -> - %% There is a rolled jounral file which is not yet in the manifest - RootPath = "../test/journal", - build_dummy_journal(), - FN = filepath(RootPath, 3, new_journal), - {ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN), - {ok, _} = leveled_cdb:cdb_complete(FileToRoll), - FN2 = filepath(RootPath, 5, new_journal), - {ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2), - {K5, V5} = {"Key5", "TestValue5"}, - {K6, V6} = {"Key6", "TestValue6"}, - ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, term_to_binary({V5, []})), - ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, term_to_binary({V6, []})), - ok = leveled_cdb:cdb_close(NewActiveJN), - %% Test setup - now build manifest - Res = build_manifest(["1.man"], - ["../test/journal/journal_files/nursery_1.cdb", - "../test/journal/journal_files/nursery_3.cdb", - "../test/journal/journal_files/nursery_5.pnd"], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(HighSQN, 6), - ?assertMatch(ManSQN, 1), - ?assertMatch([{3, "../test/journal/journal_files/nursery_3.cdb", _}, - {1, "../test/journal/journal_files/nursery_1.cdb", _}], Man), - {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), - ?assertMatch(ActSQN, 6), - ?assertMatch(ActJournalSQN, 5), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). - - -empty_buildmanifest_test() -> - RootPath = "../test/journal", - Res = build_manifest([], - [], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(Man, []), - ?assertMatch(ManSQN, 0), - ?assertMatch(HighSQN, 0), - ?assertMatch(ActJournalSQN, 1), - empty = leveled_cdb:cdb_lastkey(ActJournal), - FN = leveled_cdb:cdb_filename(ActJournal), - %% The filename should be based on the next journal SQN (1) not 0 - ?assertMatch(FN, filepath(RootPath, 1, new_journal)), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). - -simplejournal_test() -> - %% build up a database, and then open it through the gen_server wrap - %% Get and Put some keys - RootPath = "../test/journal", - build_dummy_journal(), - {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=#cdb_options{}}), - R1 = ink_get(Ink1, "Key1", 1), - ?assertMatch(R1, {{1, "Key1"}, {"TestValue1", []}}), - R2 = ink_get(Ink1, "Key1", 3), - ?assertMatch(R2, {{3, "Key1"}, {"TestValue3", []}}), - {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "Key99", "TestValue99", []), - ?assertMatch(NewSQN1, 5), - R3 = ink_get(Ink1, "Key99", 5), - io:format("Result 3 is ~w~n", [R3]), - ?assertMatch(R3, {{5, "Key99"}, {"TestValue99", []}}), - ink_close(Ink1), - clean_testdir(RootPath). - -rollafile_simplejournal_test() -> +simple_inker_test() -> RootPath = "../test/journal", build_dummy_journal(), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), - FunnyLoop = lists:seq(1, 48), - {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), - ?assertMatch(NewSQN1, 5), - ok = ink_print_manifest(Ink1), - R0 = ink_get(Ink1, "KeyAA", 5), - ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), - lists:foreach(fun(X) -> - {ok, _, _} = ink_put(Ink1, - "KeyZ" ++ integer_to_list(X), - crypto:rand_bytes(10000), - []) end, - FunnyLoop), - {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), - ?assertMatch(NewSQN2, 54), - ok = ink_print_manifest(Ink1), - R1 = ink_get(Ink1, "KeyAA", 5), - ?assertMatch(R1, {{5, "KeyAA"}, {"TestValueAA", []}}), - R2 = ink_get(Ink1, "KeyBB", 54), - ?assertMatch(R2, {{54, "KeyBB"}, {"TestValueBB", []}}), - Man = ink_getmanifest(Ink1), - FakeMan = [{3, "test", dummy}, {1, "other_test", dummy}], - ok = ink_updatemanifest(Ink1, FakeMan, Man), - ?assertMatch(FakeMan, ink_getmanifest(Ink1)), - ok = ink_updatemanifest(Ink1, Man, FakeMan), - ?assertMatch({{5, "KeyAA"}, {"TestValueAA", []}}, - ink_get(Ink1, "KeyAA", 5)), - ?assertMatch({{54, "KeyBB"}, {"TestValueBB", []}}, - ink_get(Ink1, "KeyBB", 54)), + Obj1 = ink_get(Ink1, "Key1", 1), + ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), + Obj2 = ink_get(Ink1, "Key4", 4), + ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), ink_close(Ink1), clean_testdir(RootPath). + compact_journal_test() -> RootPath = "../test/journal", build_dummy_journal(), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), - FunnyLoop = lists:seq(1, 48), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), ?assertMatch(NewSQN1, 5), ok = ink_print_manifest(Ink1), R0 = ink_get(Ink1, "KeyAA", 5), ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), + FunnyLoop = lists:seq(1, 48), Checker = lists:map(fun(X) -> PK = "KeyZ" ++ integer_to_list(X), {ok, SQN, _} = ink_put(Ink1, @@ -1043,16 +833,63 @@ compact_journal_test() -> {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), - ?assertMatch(2, length(ActualManifest)), + ok = ink_print_manifest(Ink1), + ?assertMatch(3, length(ActualManifest)), ok = ink_compactjournal(Ink1, Checker, fun(X) -> {X, 55} end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), - CompactedManifest = ink_getmanifest(Ink1), - ?assertMatch(1, length(CompactedManifest)), + CompactedManifest1 = ink_getmanifest(Ink1), + ?assertMatch(2, length(CompactedManifest1)), + Checker2 = lists:sublist(Checker, 16), + ok = ink_compactjournal(Ink1, + Checker2, + fun(X) -> {X, 55} end, + fun(L, K, SQN) -> lists:member({SQN, K}, L) end, + 5000), + timer:sleep(1000), + CompactedManifest2 = ink_getmanifest(Ink1), + R = lists:foldl(fun({_SQN, FN, _P}, Acc) -> + case string:str(FN, "post_compact") of + N when N > 0 -> + true; + 0 -> + Acc + end end, + false, + CompactedManifest2), + ?assertMatch(true, R), + ?assertMatch(2, length(CompactedManifest2)), ink_close(Ink1), clean_testdir(RootPath). +empty_manifest_test() -> + RootPath = "../test/journal", + clean_testdir(RootPath), + CDBopts = #cdb_options{max_size=300000}, + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts}), + ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), + ok = ink_compactjournal(Ink1, + [], + fun(X) -> {X, 55} end, + fun(L, K, SQN) -> lists:member({SQN, K}, L) end, + 5000), + timer:sleep(1000), + ?assertMatch(1, length(ink_getmanifest(Ink1))), + ok = ink_close(Ink1), + {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts}), + ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), + {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []), + ?assertMatch(2, SQN), + ?assertMatch(true, Size > 0), + {ok, V} = ink_fetch(Ink2, "Key1", 2), + ?assertMatch("Value1", V), + ink_close(Ink2), + clean_testdir(RootPath). + + -endif. \ No newline at end of file