diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 668b1c8..04adfda 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -139,7 +139,7 @@ cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). cdb_roll(Pid) -> - gen_server:call(Pid, cdb_roll, infinity). + gen_server:cast(Pid, cdb_roll). cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). @@ -344,11 +344,17 @@ handle_call(cdb_complete, _From, State=#state{writer=Writer}) {stop, normal, {ok, NewName}, State}; handle_call(cdb_complete, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, {ok, State#state.filename}, State}; -handle_call(cdb_roll, From, State=#state{writer=Writer}) - when Writer == true -> + {stop, normal, {ok, State#state.filename}, State}. + + +handle_cast(destroy, State) -> + ok = file:close(State#state.handle), + ok = file:delete(State#state.filename), + {noreply, State}; +handle_cast(delete_pending, State) -> + {noreply, State#state{pending_delete = true}}; +handle_cast(cdb_roll, State=#state{writer=Writer}) when Writer == true -> NewName = determine_new_filename(State#state.filename), - gen_server:reply(From, {ok, NewName}), ok = close_file(State#state.handle, State#state.hashtree, State#state.last_position), @@ -359,15 +365,7 @@ handle_call(cdb_roll, From, State=#state{writer=Writer}) last_key=LastKey, filename=NewName, writer=false, - hash_index=Index}}. - - -handle_cast(destroy, State) -> - ok = file:close(State#state.handle), - ok = file:delete(State#state.filename), - {noreply, State}; -handle_cast(delete_pending, State) -> - {noreply, State#state{pending_delete = true}}; + hash_index=Index}}; handle_cast(_Msg, State) -> {noreply, State}. diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 6f187a3..9403ef4 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -81,7 +81,12 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, State) -> % Need to fetch manifest at start rather than have it be passed in % Don't want to process a queued call waiting on an old manifest - Manifest = leveled_inker:ink_getmanifest(Inker), + Manifest = case leveled_inker:ink_getmanifest(Inker) of + [] -> + []; + [_Active|Tail] -> + Tail + end, MaxRunLength = State#state.max_run_length, {FilterServer, MaxSQN} = InitiateFun(Checker), CDBopts = State#state.cdb_options, @@ -159,7 +164,12 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - {0, 0}, KeySizeList), {ActiveSize, ReplacedSize} = R0, - Score = 100 * ActiveSize / (ActiveSize + ReplacedSize), + Score = case ActiveSize + ReplacedSize of + 0 -> + 100.0; + _ -> + 100 * ActiveSize / (ActiveSize + ReplacedSize) + end, io:format("Score for filename ~s is ~w~n", [FN, Score]), Score. @@ -219,6 +229,7 @@ assess_candidates(AllCandidates, MaxRunLength) -> end. assess_candidates([], _MaxRunLength, _CurrentRun0, BestAssessment) -> + io:format("Best run of ~w~n", [BestAssessment]), BestAssessment; assess_candidates([HeadC|Tail], MaxRunLength, CurrentRun0, BestAssessment) -> CurrentRun1 = choose_best_assessment(CurrentRun0 ++ [HeadC], diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 561ff64..471ea0b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -9,33 +9,43 @@ %% responsible for scheduling compaction work to be carried out by the Inker's %% clerk. %% -%% -------- Journal --------- +%% -------- Journal Files --------- %% -%% The Journal is a series of files originally named as _nursery.cdb +%% The Journal is a series of files originally named as _ %% where the sequence number is the first object sequence number (key) within %% the given database file. The files will be named *.cdb at the point they %% have been made immutable (through a rename operation). Prior to this, they %% will originally start out as a *.pnd file. %% %% At some stage in the future compacted versions of old journal cdb files may -%% be produced. These files will be named -.cdb, and once -%% the manifest is updated the original _nursery.cdb (or -%% _.cdb) files they replace will be erased. +%% be produced. These files will be named -.cdb, and once +%% the manifest is updated the original _.cdb (or +%% _.cdb) files they replace will be erased. %% -%% The current Journal is made up of a set of files referenced in the manifest, -%% combined with a set of files of the form _nursery.[cdb|pnd] with -%% a higher Sequence Number compared to the files in the manifest. +%% The current Journal is made up of a set of files referenced in the manifest. +%% No PUTs are made to files which are not in the manifest. %% %% The Journal is ordered by sequence number from front to back both within %% and across files. %% %% On startup the Inker should open the manifest with the highest sequence %% number, and this will contain the list of filenames that make up the -%% non-recent part of the Journal. The Manifest is completed by opening these -%% files plus any other files with a higher sequence number. The file with -%% the highest sequence number is assumed to to be the active writer. Any file -%% with a lower sequence number and a *.pnd extension should be re-rolled into -%% a *.cdb file. +%% non-recent part of the Journal. All the filenames should then be opened. +%% How they are opened depends on the file extension: +%% +%% - If the file extension is *.cdb the file is opened read only +%% - If the file extension is *.pnd and the file is not the most recent in the +%% manifest, then the file should be completed bfore being opened read-only +%% - If the file extension is *.pnd the file is opened for writing +%% +%% -------- Manifest Files --------- +%% +%% The manifest is just saved as a straight term_to_binary blob, with a +%% filename ordered by the Manifest SQN. The Manifest is first saved with a +%% *.pnd extension, and then renamed to one with a *.man extension. +%% +%% On startup the *.man manifest file with the highest manifest sequence +%% number should be used. %% %% -------- Objects --------- %% @@ -45,47 +55,23 @@ %% - An object (an Erlang term) %% - A set of Key Deltas associated with the change %% -%% -------- Manifest --------- -%% -%% The Journal has a manifest which is the current record of which cdb files -%% are currently active in the Journal (i.e. following compaction). The -%% manifest holds this information through two lists - a list of files which -%% are definitely in the current manifest, and a list of files which have been -%% removed, but may still be present on disk. The use of two lists is to -%% avoid any circumsatnces where a compaction event has led to the deletion of -%% a Journal file with a higher sequence number than any in the remaining -%% manifest. -%% -%% A new manifest file is saved for every compaction event. The manifest files -%% are saved using the filename .man once saved. The ManifestSQN -%% is incremented once for every compaction event. -%% %% -------- Compaction --------- %% %% Compaction is a process whereby an Inker's clerk will: -%% - Request a snapshot of the Ledger, as well as the lowest sequence number -%% that is currently registerd by another snapshot owner -%% - Picks a Journal database file at random (not including the current -%% nursery log) -%% - Performs a random walk on keys and sequence numbers in the chosen CDB -%% file to extract a subset of 100 key and sequence number combinations from -%% the database -%% - Looks up the current sequence number for those keys in the Ledger -%% - If more than % (default n=20) of the keys are now at a higher sequence -%% number, then the database file is a candidate for compaction. In this case -%% each of the next 8 files in sequence should be checked until all those 8 -%% files have been checked or one of the files has been found to be below the -%% threshold. -%% - If a set of below-the-threshold files is found, the files are re-written -%% without any superceded values -%%- The clerk should then request that the Inker commit the manifest change -%% -%% -------- Inker's Clerk --------- -%% -%% +%% - Request a view of the current Inker manifest and a snaphot of the Ledger +%% - Test all files within the Journal to find th eapproximate comapction +%% potential percentage (the volume of the Journal that has been replaced) +%% - Attempts to find the optimal "run" of files to compact +%% - Compacts those files in the run, by rolling over the files re-writing +%% to a new Journal if and only if the Key is still present in the Ledger (or +%% the sequence number of the Key is higher than the SQN of the snapshot) +%% - Requests the Inker update the manifest with the new changes +%% - Instructs the files to destroy themselves when they are next closed %% +%% TODO: how to instruct the files to close is tbd %% + -module(leveled_inker). -behaviour(gen_server). @@ -132,7 +118,6 @@ manifest_sqn = 0 :: integer(), journal_sqn = 0 :: integer(), active_journaldb :: pid(), - active_journaldb_sqn :: integer(), pending_removals = [] :: list(), registered_snapshots = [] :: list(), root_path :: string(), @@ -222,11 +207,9 @@ init([InkerOpts]) -> {undefined, true} -> SrcInker = InkerOpts#inker_options.source_inker, {Manifest, - ActiveJournalDB, - ActiveJournalSQN} = ink_registersnapshot(SrcInker, self()), + ActiveJournalDB} = ink_registersnapshot(SrcInker, self()), {ok, #state{manifest=Manifest, active_journaldb=ActiveJournalDB, - active_journaldb_sqn=ActiveJournalSQN, source_inker=SrcInker, is_snapshot=true}}; %% Need to do something about timeout @@ -235,28 +218,16 @@ init([InkerOpts]) -> end. -handle_call({put, Key, Object, KeyChanges}, From, State) -> +handle_call({put, Key, Object, KeyChanges}, _From, State) -> case put_object(Key, Object, KeyChanges, State) of {ok, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}; {rolling, UpdState, ObjSize} -> - gen_server:reply(From, {ok, UpdState#state.journal_sqn, ObjSize}), - {NewManifest, - NewManifestSQN} = roll_active_file(State#state.active_journaldb, - State#state.manifest, - State#state.manifest_sqn, - State#state.root_path), - {noreply, UpdState#state{manifest=NewManifest, - manifest_sqn=NewManifestSQN}}; - {blocked, UpdState} -> - {reply, blocked, UpdState} + ok = leveled_cdb:cdb_roll(State#state.active_journaldb), + {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; handle_call({fetch, Key, SQN}, _From, State) -> - case get_object(Key, - SQN, - State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn) of + case get_object(Key, SQN, State#state.manifest) of {{SQN, Key}, {Value, _IndexSpecs}} -> {reply, {ok, Value}, State}; Other -> @@ -265,16 +236,9 @@ handle_call({fetch, Key, SQN}, _From, State) -> {reply, not_present, State} end; handle_call({get, Key, SQN}, _From, State) -> - {reply, get_object(Key, - SQN, - State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn), State}; + {reply, get_object(Key, SQN, State#state.manifest), State}; handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> - Manifest = lists:reverse(State#state.manifest) - ++ [{State#state.active_journaldb_sqn, - dummy, - State#state.active_journaldb}], + Manifest = lists:reverse(State#state.manifest), Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), {reply, Reply, State}; handle_call({register_snapshot, Requestor}, _From , State) -> @@ -283,8 +247,7 @@ handle_call({register_snapshot, Requestor}, _From , State) -> io:format("Inker snapshot ~w registered at SQN ~w~n", [Requestor, State#state.manifest_sqn]), {reply, {State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn}, + State#state.active_journaldb}, State#state{registered_snapshots=Rs}}; handle_call({release_snapshot, Snapshot}, _From , State) -> Rs = lists:keydelete(Snapshot, 1, State#state.registered_snapshots), @@ -297,21 +260,16 @@ handle_call({update_manifest, ManifestSnippet, DeletedFiles}, _From, State) -> Man0 = lists:foldl(fun(ManEntry, AccMan) -> - Check = lists:member(ManEntry, DeletedFiles), - if - Check == false -> - AccMan ++ [ManEntry]; - true -> - AccMan - end + remove_from_manifest(AccMan, ManEntry) end, - [], - State#state.manifest), + State#state.manifest, + DeletedFiles), Man1 = lists:foldl(fun(ManEntry, AccMan) -> add_to_manifest(AccMan, ManEntry) end, Man0, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, + manifest_printer(Man1), ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), PendingRemovals = [{NewManifestSQN, DeletedFiles}], {reply, ok, State#state{manifest=Man1, @@ -365,8 +323,8 @@ terminate(Reason, State) -> lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, State#state.registered_snapshots), manifest_printer(State#state.manifest), - close_allmanifest(State#state.manifest, State#state.active_journaldb), - close_allremovals(State#state.pending_removals) + ok = close_allmanifest(State#state.manifest), + ok = close_allremovals(State#state.pending_removals) end. code_change(_OldVsn, State, _Extra) -> @@ -381,13 +339,10 @@ start_from_file(InkerOpts) -> RootPath = InkerOpts#inker_options.root_path, CDBopts = InkerOpts#inker_options.cdb_options, JournalFP = filepath(RootPath, journal_dir), - {ok, JournalFilenames} = case filelib:is_dir(JournalFP) of - true -> - file:list_dir(JournalFP); - false -> - filelib:ensure_dir(JournalFP), - {ok, []} - end, + filelib:ensure_dir(JournalFP), + CompactFP = filepath(RootPath, journal_compact_dir), + filelib:ensure_dir(CompactFP), + ManifestFP = filepath(RootPath, manifest_dir), {ok, ManifestFilenames} = case filelib:is_dir(ManifestFP) of true -> @@ -397,26 +352,21 @@ start_from_file(InkerOpts) -> {ok, []} end, - CompactFP = filepath(RootPath, journal_compact_dir), - filelib:ensure_dir(CompactFP), IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP}, IClerkOpts = #iclerk_options{inker = self(), cdb_options=IClerkCDBOpts}, {ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts), {Manifest, - {ActiveJournal, LowActiveSQN}, + ManifestSQN, JournalSQN, - ManifestSQN} = build_manifest(ManifestFilenames, - JournalFilenames, - fun simple_manifest_reader/2, + ActiveJournal} = build_manifest(ManifestFilenames, RootPath, CDBopts), - {ok, #state{manifest = lists:reverse(lists:keysort(1, Manifest)), + {ok, #state{manifest = Manifest, manifest_sqn = ManifestSQN, journal_sqn = JournalSQN, active_journaldb = ActiveJournal, - active_journaldb_sqn = LowActiveSQN, root_path = RootPath, cdb_options = CDBopts, clerk = Clerk}}. @@ -436,56 +386,32 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> ok -> {ok, State#state{journal_sqn=NewSQN}, ObjSize}; roll -> - FileName = filepath(State#state.root_path, NewSQN, new_journal), + SW = os:timestamp(), CDBopts = State#state.cdb_options, - {ok, NewJournalP} = leveled_cdb:cdb_open_writer(FileName, CDBopts), - case leveled_cdb:cdb_put(NewJournalP, - {NewSQN, PrimaryKey}, - Bin1) of - ok -> - {rolling, - State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP, - active_journaldb_sqn=NewSQN}, - ObjSize}; - roll -> - {blocked, State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP, - active_journaldb_sqn=NewSQN}} - end + ManEntry = start_new_activejournal(NewSQN, + State#state.root_path, + CDBopts), + {_, _, NewJournalP} = ManEntry, + NewManifest = add_to_manifest(State#state.manifest, ManEntry), + ok = simple_manifest_writer(NewManifest, + State#state.manifest_sqn + 1, + State#state.root_path), + ok = leveled_cdb:cdb_put(NewJournalP, {NewSQN, PrimaryKey}, Bin1), + io:format("Put to new active journal " ++ + "with manifest write took ~w microseconds~n", + [timer:now_diff(os:timestamp(),SW)]), + {rolling, + State#state{journal_sqn=NewSQN, + manifest=NewManifest, + manifest_sqn = State#state.manifest_sqn + 1, + active_journaldb=NewJournalP}, + ObjSize} end. -roll_active_file(ActiveJournal, Manifest, ManifestSQN, RootPath) -> - SW = os:timestamp(), - io:format("Rolling old journal ~w~n", [ActiveJournal]), - {ok, NewFilename} = leveled_cdb:cdb_roll(ActiveJournal), - JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, - [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], - JournalRegex2, - 'SQN'), - NewManifest = add_to_manifest(Manifest, - {JournalSQN, NewFilename, ActiveJournal}), - NewManifestSQN = ManifestSQN + 1, - ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), - io:format("Rolling old journal completed in ~w microseconds~n", - [timer:now_diff(os:timestamp(),SW)]), - {NewManifest, NewManifestSQN}. -get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> - Obj = if - SQN < ActiveJournalSQN -> - JournalP = find_in_manifest(SQN, Manifest), - if - JournalP == error -> - io:format("Unable to find SQN~w in Manifest~w~n", - [SQN, Manifest]), - error; - true -> - leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}) - end; - true -> - leveled_cdb:cdb_get(ActiveJournal, {SQN, PrimaryKey}) - end, +get_object(PrimaryKey, SQN, Manifest) -> + JournalP = find_in_manifest(SQN, Manifest), + Obj = leveled_cdb:cdb_get(JournalP, {SQN, PrimaryKey}), case Obj of {{SQN, PK}, Bin} -> {{SQN, PK}, binary_to_term(Bin)}; @@ -495,139 +421,129 @@ get_object(PrimaryKey, SQN, Manifest, ActiveJournal, ActiveJournalSQN) -> build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, - RootPath) -> - build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, - RootPath, - #cdb_options{}). - -build_manifest(ManifestFilenames, - JournalFilenames, - ManifestRdrFun, RootPath, CDBopts) -> - %% Find the manifest with a highest Manifest sequence number - %% Open it and read it to get the current Confirmed Manifest + % Find the manifest with a highest Manifest sequence number + % Open it and read it to get the current Confirmed Manifest ManifestRegex = "(?[0-9]+)\\." ++ ?MANIFEST_FILEX, ValidManSQNs = sequencenumbers_fromfilenames(ManifestFilenames, ManifestRegex, 'MSQN'), - {JournalSQN1, - ConfirmedManifest, - Removed, + {Manifest, ManifestSQN} = case length(ValidManSQNs) of 0 -> - {0, [], [], 0}; + {[], 1}; _ -> PersistedManSQN = lists:max(ValidManSQNs), - M1 = ManifestRdrFun(PersistedManSQN, RootPath), - J1 = lists:foldl(fun({JSQN, _FN}, Acc) -> - max(JSQN, Acc) end, - 0, - M1), - {J1, M1, [], PersistedManSQN} + M1 = simple_manifest_reader(PersistedManSQN, + RootPath), + {M1, PersistedManSQN} end, - %% Find any more recent immutable files that have a higher sequence number - %% - the immutable files have already been rolled, and so have a completed - %% hashtree lookup - JournalRegex1 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, - UnremovedJournalFiles = lists:foldl(fun(FN, Acc) -> - case lists:member(FN, Removed) of - true -> - Acc; - false -> - Acc ++ [FN] - end end, - [], - JournalFilenames), - OtherSQNs_imm = sequencenumbers_fromfilenames(UnremovedJournalFiles, - JournalRegex1, - 'SQN'), - ExtendManifestFun = fun(X, Acc) -> - if - X > JournalSQN1 - -> - FN = filepath(RootPath, journal_dir) - ++ "nursery_" ++ - integer_to_list(X) - ++ "." ++ - ?JOURNAL_FILEX, - add_to_manifest(Acc, {X, FN}); - true - -> Acc - end end, - Manifest1 = lists:foldl(ExtendManifestFun, - ConfirmedManifest, - lists:sort(OtherSQNs_imm)), + % Open the manifest files, completing if necessary and ensure there is + % a valid active journal at the head of the manifest + OpenManifest = open_all_manifest(Manifest, RootPath, CDBopts), + {ActiveLowSQN, _FN, ActiveJournal} = lists:nth(1, OpenManifest), + JournalSQN = case leveled_cdb:cdb_lastkey(ActiveJournal) of + empty -> + ActiveLowSQN; + {JSQN, _LastKey} -> + JSQN + end, - %% Enrich the manifest so it contains the Pid of any of the immutable - %% entries - io:format("Manifest on startup is: ~n"), - manifest_printer(Manifest1), - Manifest2 = lists:map(fun({LowSQN, FN}) -> - {ok, Pid} = leveled_cdb:cdb_open_reader(FN), - {LowSQN, FN, Pid} end, - Manifest1), - - %% Find any more recent mutable files that have a higher sequence number - %% Roll any mutable files which do not have the highest sequence number - %% to create the hashtree and complete the header entries - JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?PENDING_FILEX, - OtherSQNs_pnd = sequencenumbers_fromfilenames(JournalFilenames, - JournalRegex2, - 'SQN'), - - case length(OtherSQNs_pnd) of - 0 -> - %% Need to create a new active writer, but also find the highest - %% SQN from within the confirmed manifest - TopSQNInManifest = - case length(Manifest2) of - 0 -> - %% Manifest is empty and no active writers - %% can be found so database is empty - 0; - _ -> - TM = lists:last(lists:keysort(1,Manifest2)), - {_SQN, _FN, TMPid} = TM, - {HighSQN, _HighKey} = leveled_cdb:cdb_lastkey(TMPid), - HighSQN - end, - LowActiveSQN = TopSQNInManifest + 1, - ActiveFN = filepath(RootPath, LowActiveSQN, new_journal), - {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN, - CDBopts), - {Manifest2, - {ActiveJournal, LowActiveSQN}, - TopSQNInManifest, - ManifestSQN}; - _ -> - {ActiveJournalSQN, - Manifest3} = roll_pending_journals(lists:sort(OtherSQNs_pnd), - Manifest2, - RootPath), - %% Need to work out highest sequence number in tail file to feed - %% into opening of pending journal - ActiveFN = filepath(RootPath, ActiveJournalSQN, new_journal), - {ok, ActiveJournal} = leveled_cdb:cdb_open_writer(ActiveFN, - CDBopts), - {HighestSQN, _HighestKey} = leveled_cdb:cdb_lastkey(ActiveJournal), - {Manifest3, - {ActiveJournal, ActiveJournalSQN}, - HighestSQN, - ManifestSQN} - end. + % Update the manifest if it has been changed by the process of laoding + % the manifest (must also increment the manifest SQN). + UpdManifestSQN = if + length(OpenManifest) > length(Manifest) -> + io:format("Updated manifest on startup: ~n"), + manifest_printer(OpenManifest), + simple_manifest_writer(OpenManifest, + ManifestSQN + 1, + RootPath), + ManifestSQN + 1; + true -> + io:format("Unchanged manifest on startup: ~n"), + manifest_printer(OpenManifest), + ManifestSQN + end, + {OpenManifest, UpdManifestSQN, JournalSQN, ActiveJournal}. -close_allmanifest([], ActiveJournal) -> - leveled_cdb:cdb_close(ActiveJournal); -close_allmanifest([H|ManifestT], ActiveJournal) -> + +close_allmanifest([]) -> + ok; +close_allmanifest([H|ManifestT]) -> {_, _, Pid} = H, ok = leveled_cdb:cdb_close(Pid), - close_allmanifest(ManifestT, ActiveJournal). + close_allmanifest(ManifestT). + + +open_all_manifest([], RootPath, CDBOpts) -> + io:format("Manifest is empty, starting from manifest SQN 1~n"), + add_to_manifest([], start_new_activejournal(1, RootPath, CDBOpts)); +open_all_manifest(Man0, RootPath, CDBOpts) -> + Man1 = lists:reverse(lists:sort(Man0)), + [{HeadSQN, HeadFN}|ManifestTail] = Man1, + CompleteHeadFN = HeadFN ++ "." ++ ?JOURNAL_FILEX, + PendingHeadFN = HeadFN ++ "." ++ ?PENDING_FILEX, + Man2 = case filelib:is_file(CompleteHeadFN) of + true -> + io:format("Head manifest entry ~s is complete~n", + [HeadFN]), + {ok, HeadR} = leveled_cdb:cdb_open_reader(CompleteHeadFN), + {LastSQN, _LastPK} = leveled_cdb:cdb_lastkey(HeadR), + add_to_manifest(add_to_manifest(ManifestTail, + {HeadSQN, HeadFN, HeadR}), + start_new_activejournal(LastSQN + 1, + RootPath, + CDBOpts)); + false -> + {ok, HeadW} = leveled_cdb:cdb_open_writer(PendingHeadFN, + CDBOpts), + add_to_manifest(ManifestTail, {HeadSQN, HeadFN, HeadW}) + end, + lists:map(fun(ManEntry) -> + case ManEntry of + {LowSQN, FN} -> + CFN = FN ++ "." ++ ?JOURNAL_FILEX, + PFN = FN ++ "." ++ ?PENDING_FILEX, + case filelib:is_file(CFN) of + true -> + {ok, + Pid} = leveled_cdb:cdb_open_reader(CFN), + {LowSQN, FN, Pid}; + false -> + {ok, + Pid} = leveled_cdb:cdb_open_reader(PFN), + {LowSQN, FN, Pid} + end; + _ -> + ManEntry + end end, + Man2). + + +start_new_activejournal(SQN, RootPath, CDBOpts) -> + Filename = filepath(RootPath, SQN, new_journal), + {ok, PidW} = leveled_cdb:cdb_open_writer(Filename, CDBOpts), + {SQN, Filename, PidW}. + +add_to_manifest(Manifest, Entry) -> + {SQN, FN, PidR} = Entry, + StrippedName = filename:rootname(FN), + lists:reverse(lists:sort([{SQN, StrippedName, PidR}|Manifest])). + +remove_from_manifest(Manifest, Entry) -> + {SQN, FN, _PidR} = Entry, + io:format("File ~s to be removed from manifest~n", [FN]), + lists:keydelete(SQN, 1, Manifest). + +find_in_manifest(_SQN, []) -> + error; +find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> + Pid; +find_in_manifest(SQN, [_Head|Tail]) -> + find_in_manifest(SQN, Tail). + close_allremovals([]) -> ok; @@ -649,20 +565,6 @@ close_allremovals([{ManifestSQN, Removals}|Tail]) -> close_allremovals(Tail). -roll_pending_journals([TopJournalSQN], Manifest, _RootPath) - when is_integer(TopJournalSQN) -> - {TopJournalSQN, Manifest}; -roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> - Filename = filepath(RootPath, JournalSQN, new_journal), - {ok, PidW} = leveled_cdb:cdb_open_writer(Filename), - {ok, NewFilename} = leveled_cdb:cdb_complete(PidW), - {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), - roll_pending_journals(T, - add_to_manifest(Manifest, - {JournalSQN, NewFilename, PidR}), - RootPath). - - %% Scan between sequence numbers applying FilterFun to each entry where %% FilterFun{K, V, Acc} -> Penciller Key List %% Load the output for the CDB file into the Penciller. @@ -757,15 +659,6 @@ sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> [], Filenames). -add_to_manifest(Manifest, Entry) -> - lists:reverse(lists:sort([Entry|Manifest])). - -find_in_manifest(_SQN, []) -> - error; -find_in_manifest(SQN, [{LowSQN, _FN, Pid}|_Tail]) when SQN >= LowSQN -> - Pid; -find_in_manifest(SQN, [_Head|Tail]) -> - find_in_manifest(SQN, Tail). filepath(RootPath, journal_dir) -> RootPath ++ "/" ++ ?FILES_FP ++ "/"; @@ -776,15 +669,22 @@ filepath(RootPath, journal_compact_dir) -> filepath(RootPath, NewSQN, new_journal) -> filename:join(filepath(RootPath, journal_dir), - "nursery_" - ++ integer_to_list(NewSQN) + integer_to_list(NewSQN) ++ "_" + ++ generate_uuid() ++ "." ++ ?PENDING_FILEX); filepath(CompactFilePath, NewSQN, compact_journal) -> filename:join(CompactFilePath, - "nursery_" - ++ integer_to_list(NewSQN) + integer_to_list(NewSQN) ++ "_" + ++ generate_uuid() ++ "." ++ ?PENDING_FILEX). +%% Credit to +%% https://github.com/afiskon/erlang-uuid-v4/blob/master/src/uuid.erl +generate_uuid() -> + <> = crypto:rand_bytes(16), + io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b", + [A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]). + simple_manifest_reader(SQN, RootPath) -> ManifestPath = filepath(RootPath, manifest_dir), io:format("Opening manifest file at ~s with SQN ~w~n", @@ -858,7 +758,9 @@ build_dummy_journal() -> {K2, V2} = {"Key2", "TestValue2"}, ok = leveled_cdb:cdb_put(J1, {1, K1}, term_to_binary({V1, []})), ok = leveled_cdb:cdb_put(J1, {2, K2}, term_to_binary({V2, []})), - {ok, _} = leveled_cdb:cdb_complete(J1), + ok = leveled_cdb:cdb_roll(J1), + _LK = leveled_cdb:cdb_lastkey(J1), + ok = leveled_cdb:cdb_close(J1), F2 = filename:join(JournalFP, "nursery_3.pnd"), {ok, J2} = leveled_cdb:cdb_open_writer(F2), {K1, V3} = {"Key1", "TestValue3"}, @@ -866,7 +768,8 @@ build_dummy_journal() -> ok = leveled_cdb:cdb_put(J2, {3, K1}, term_to_binary({V3, []})), ok = leveled_cdb:cdb_put(J2, {4, K4}, term_to_binary({V4, []})), ok = leveled_cdb:cdb_close(J2), - Manifest = [{1, "../test/journal/journal_files/nursery_1.cdb"}], + Manifest = [{1, "../test/journal/journal_files/nursery_1"}, + {3, "../test/journal/journal_files/nursery_3"}], ManifestBin = term_to_binary(Manifest), {ok, MF1} = file:open(filename:join(ManifestFP, "1.man"), [binary, raw, read, write]), @@ -891,146 +794,33 @@ clean_subdir(DirPath) -> end, Files). -simple_buildmanifest_test() -> - RootPath = "../test/journal", - build_dummy_journal(), - Res = build_manifest(["1.man"], - ["../test/journal/journal_files/nursery_1.cdb", - "../test/journal/journal_files/nursery_3.pnd"], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(HighSQN, 4), - ?assertMatch(ManSQN, 1), - ?assertMatch([{1, "../test/journal/journal_files/nursery_1.cdb", _}], Man), - {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), - ?assertMatch(ActSQN, 4), - ?assertMatch(ActJournalSQN, 3), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). -another_buildmanifest_test() -> - %% There is a rolled jounral file which is not yet in the manifest - RootPath = "../test/journal", - build_dummy_journal(), - FN = filepath(RootPath, 3, new_journal), - {ok, FileToRoll} = leveled_cdb:cdb_open_writer(FN), - {ok, _} = leveled_cdb:cdb_complete(FileToRoll), - FN2 = filepath(RootPath, 5, new_journal), - {ok, NewActiveJN} = leveled_cdb:cdb_open_writer(FN2), - {K5, V5} = {"Key5", "TestValue5"}, - {K6, V6} = {"Key6", "TestValue6"}, - ok = leveled_cdb:cdb_put(NewActiveJN, {5, K5}, term_to_binary({V5, []})), - ok = leveled_cdb:cdb_put(NewActiveJN, {6, K6}, term_to_binary({V6, []})), - ok = leveled_cdb:cdb_close(NewActiveJN), - %% Test setup - now build manifest - Res = build_manifest(["1.man"], - ["../test/journal/journal_files/nursery_1.cdb", - "../test/journal/journal_files/nursery_3.cdb", - "../test/journal/journal_files/nursery_5.pnd"], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(HighSQN, 6), - ?assertMatch(ManSQN, 1), - ?assertMatch([{3, "../test/journal/journal_files/nursery_3.cdb", _}, - {1, "../test/journal/journal_files/nursery_1.cdb", _}], Man), - {ActSQN, _ActK} = leveled_cdb:cdb_lastkey(ActJournal), - ?assertMatch(ActSQN, 6), - ?assertMatch(ActJournalSQN, 5), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). - - -empty_buildmanifest_test() -> - RootPath = "../test/journal", - Res = build_manifest([], - [], - fun simple_manifest_reader/2, - RootPath), - io:format("Build manifest output is ~w~n", [Res]), - {Man, {ActJournal, ActJournalSQN}, HighSQN, ManSQN} = Res, - ?assertMatch(Man, []), - ?assertMatch(ManSQN, 0), - ?assertMatch(HighSQN, 0), - ?assertMatch(ActJournalSQN, 1), - empty = leveled_cdb:cdb_lastkey(ActJournal), - FN = leveled_cdb:cdb_filename(ActJournal), - %% The filename should be based on the next journal SQN (1) not 0 - ?assertMatch(FN, filepath(RootPath, 1, new_journal)), - close_allmanifest(Man, ActJournal), - clean_testdir(RootPath). - -simplejournal_test() -> - %% build up a database, and then open it through the gen_server wrap - %% Get and Put some keys - RootPath = "../test/journal", - build_dummy_journal(), - {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, - cdb_options=#cdb_options{}}), - R1 = ink_get(Ink1, "Key1", 1), - ?assertMatch(R1, {{1, "Key1"}, {"TestValue1", []}}), - R2 = ink_get(Ink1, "Key1", 3), - ?assertMatch(R2, {{3, "Key1"}, {"TestValue3", []}}), - {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "Key99", "TestValue99", []), - ?assertMatch(NewSQN1, 5), - R3 = ink_get(Ink1, "Key99", 5), - io:format("Result 3 is ~w~n", [R3]), - ?assertMatch(R3, {{5, "Key99"}, {"TestValue99", []}}), - ink_close(Ink1), - clean_testdir(RootPath). - -rollafile_simplejournal_test() -> +simple_inker_test() -> RootPath = "../test/journal", build_dummy_journal(), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), - FunnyLoop = lists:seq(1, 48), - {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), - ?assertMatch(NewSQN1, 5), - ok = ink_print_manifest(Ink1), - R0 = ink_get(Ink1, "KeyAA", 5), - ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), - lists:foreach(fun(X) -> - {ok, _, _} = ink_put(Ink1, - "KeyZ" ++ integer_to_list(X), - crypto:rand_bytes(10000), - []) end, - FunnyLoop), - {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), - ?assertMatch(NewSQN2, 54), - ok = ink_print_manifest(Ink1), - R1 = ink_get(Ink1, "KeyAA", 5), - ?assertMatch(R1, {{5, "KeyAA"}, {"TestValueAA", []}}), - R2 = ink_get(Ink1, "KeyBB", 54), - ?assertMatch(R2, {{54, "KeyBB"}, {"TestValueBB", []}}), - Man = ink_getmanifest(Ink1), - FakeMan = [{3, "test", dummy}, {1, "other_test", dummy}], - ok = ink_updatemanifest(Ink1, FakeMan, Man), - ?assertMatch(FakeMan, ink_getmanifest(Ink1)), - ok = ink_updatemanifest(Ink1, Man, FakeMan), - ?assertMatch({{5, "KeyAA"}, {"TestValueAA", []}}, - ink_get(Ink1, "KeyAA", 5)), - ?assertMatch({{54, "KeyBB"}, {"TestValueBB", []}}, - ink_get(Ink1, "KeyBB", 54)), + Obj1 = ink_get(Ink1, "Key1", 1), + ?assertMatch({{1, "Key1"}, {"TestValue1", []}}, Obj1), + Obj2 = ink_get(Ink1, "Key4", 4), + ?assertMatch({{4, "Key4"}, {"TestValue4", []}}, Obj2), ink_close(Ink1), clean_testdir(RootPath). + compact_journal_test() -> RootPath = "../test/journal", build_dummy_journal(), CDBopts = #cdb_options{max_size=300000}, {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), - FunnyLoop = lists:seq(1, 48), {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), ?assertMatch(NewSQN1, 5), ok = ink_print_manifest(Ink1), R0 = ink_get(Ink1, "KeyAA", 5), ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), + FunnyLoop = lists:seq(1, 48), Checker = lists:map(fun(X) -> PK = "KeyZ" ++ integer_to_list(X), {ok, SQN, _} = ink_put(Ink1, @@ -1043,16 +833,63 @@ compact_journal_test() -> {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), - ?assertMatch(2, length(ActualManifest)), + ok = ink_print_manifest(Ink1), + ?assertMatch(3, length(ActualManifest)), ok = ink_compactjournal(Ink1, Checker, fun(X) -> {X, 55} end, fun(L, K, SQN) -> lists:member({SQN, K}, L) end, 5000), timer:sleep(1000), - CompactedManifest = ink_getmanifest(Ink1), - ?assertMatch(1, length(CompactedManifest)), + CompactedManifest1 = ink_getmanifest(Ink1), + ?assertMatch(2, length(CompactedManifest1)), + Checker2 = lists:sublist(Checker, 16), + ok = ink_compactjournal(Ink1, + Checker2, + fun(X) -> {X, 55} end, + fun(L, K, SQN) -> lists:member({SQN, K}, L) end, + 5000), + timer:sleep(1000), + CompactedManifest2 = ink_getmanifest(Ink1), + R = lists:foldl(fun({_SQN, FN, _P}, Acc) -> + case string:str(FN, "post_compact") of + N when N > 0 -> + true; + 0 -> + Acc + end end, + false, + CompactedManifest2), + ?assertMatch(true, R), + ?assertMatch(2, length(CompactedManifest2)), ink_close(Ink1), clean_testdir(RootPath). +empty_manifest_test() -> + RootPath = "../test/journal", + clean_testdir(RootPath), + CDBopts = #cdb_options{max_size=300000}, + {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts}), + ?assertMatch(not_present, ink_fetch(Ink1, "Key1", 1)), + ok = ink_compactjournal(Ink1, + [], + fun(X) -> {X, 55} end, + fun(L, K, SQN) -> lists:member({SQN, K}, L) end, + 5000), + timer:sleep(1000), + ?assertMatch(1, length(ink_getmanifest(Ink1))), + ok = ink_close(Ink1), + {ok, Ink2} = ink_start(#inker_options{root_path=RootPath, + cdb_options=CDBopts}), + ?assertMatch(not_present, ink_fetch(Ink2, "Key1", 1)), + {ok, SQN, Size} = ink_put(Ink2, "Key1", "Value1", []), + ?assertMatch(2, SQN), + ?assertMatch(true, Size > 0), + {ok, V} = ink_fetch(Ink2, "Key1", 2), + ?assertMatch("Value1", V), + ink_close(Ink2), + clean_testdir(RootPath). + + -endif. \ No newline at end of file