diff --git a/include/leveled.hrl b/include/leveled.hrl index 321ff7c..0debd70 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -42,6 +42,7 @@ -record(bookie_options, {root_path :: string(), cache_size :: integer(), + max_journalsize :: integer(), metadata_extractor :: function(), indexspec_converter :: function()}). diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 948ea66..a3679d6 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -11,7 +11,7 @@ %% and frequent use of iterators) %% - The Journal is an extended nursery log in leveldb terms. It is keyed %% on the sequence number of the write -%% - The ledger is a LSM tree, where the key is the actaul object key, and +%% - The ledger is a merge tree, where the key is the actaul object key, and %% the value is the metadata of the object including the sequence number %% %% @@ -140,6 +140,7 @@ book_riakhead/3, book_snapshotstore/3, book_snapshotledger/3, + book_compactjournal/2, book_close/1, strip_to_keyonly/1, strip_to_keyseqonly/1, @@ -152,6 +153,8 @@ -define(CACHE_SIZE, 1000). -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). +-define(SHUTDOWN_WAITS, 60). +-define(SHUTDOWN_PAUSE, 10000). -record(state, {inker :: pid(), penciller :: pid(), @@ -188,6 +191,9 @@ book_snapshotstore(Pid, Requestor, Timeout) -> book_snapshotledger(Pid, Requestor, Timeout) -> gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity). +book_compactjournal(Pid, Timeout) -> + gen_server:call(Pid, {compact_journal, Timeout}, infinity). + book_close(Pid) -> gen_server:call(Pid, close, infinity). @@ -289,6 +295,11 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) -> ledger -> {reply, {ok, LedgerSnapshot, null}, State} end; +handle_call({compact_journal, Timeout}, _From, State) -> + ok = leveled_inker:ink_compactjournal(State#state.inker, + State#state.penciller, + Timeout), + {reply, ok, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -300,7 +311,16 @@ handle_info(_Info, State) -> terminate(Reason, State) -> io:format("Bookie closing for reason ~w~n", [Reason]), - ok = leveled_inker:ink_close(State#state.inker), + WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE), + ok = case shutdown_wait(WaitList, State#state.inker) of + false -> + io:format("Forcing close of inker following wait of " + ++ "~w milliseconds~n", + [lists:sum(WaitList)]), + leveled_inker:ink_forceclose(State#state.inker); + true -> + ok + end, ok = leveled_penciller:pcl_close(State#state.penciller). code_change(_OldVsn, State, _Extra) -> @@ -311,11 +331,30 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ +shutdown_wait([], _Inker) -> + false; +shutdown_wait([TopPause|Rest], Inker) -> + case leveled_inker:ink_close(Inker) of + ok -> + true; + pause -> + io:format("Inker shutdown stil waiting process to complete~n"), + ok = timer:sleep(TopPause), + shutdown_wait(Rest, Inker) + end. + + set_options(Opts) -> %% TODO: Change the max size default, and allow setting through options + MaxJournalSize = case Opts#bookie_options.max_journalsize of + undefined -> + 30000; + MS -> + MS + end, {#inker_options{root_path = Opts#bookie_options.root_path ++ "/" ++ ?JOURNAL_FP, - cdb_options = #cdb_options{max_size=30000}}, + cdb_options = #cdb_options{max_size=MaxJournalSize}}, #penciller_options{root_path=Opts#bookie_options.root_path ++ "/" ++ ?LEDGER_FP}}. @@ -442,10 +481,10 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> {MinSQN, MaxSQN, Output} = Acc0, {SQN, PK} = KeyInLedger, - io:format("Reloading changes with SQN=~w PK=~w~n", [SQN, PK]), {Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)), case SQN of SQN when SQN < MinSQN -> + io:format("Skipping due to low SQN ~w~n", [SQN]), {loop, Acc0}; SQN when SQN =< MaxSQN -> %% TODO - get correct size in a more efficient manner @@ -454,6 +493,8 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) -> Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), {loop, {MinSQN, MaxSQN, Output ++ Changes}}; SQN when SQN > MaxSQN -> + io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n", + [MaxSQN, SQN]), {stop, Acc0} end. diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 40bf5ef..31b4c53 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -66,7 +66,8 @@ cdb_scan/4, cdb_close/1, cdb_complete/1, - cdb_destroy/1]). + cdb_destroy/1, + cdb_deletepending/1]). -include_lib("eunit/include/eunit.hrl"). @@ -84,7 +85,8 @@ filename :: string(), handle :: file:fd(), writer :: boolean(), - max_size :: integer()}). + max_size :: integer(), + pending_delete = false :: boolean()}). %%%============================================================================ @@ -138,6 +140,9 @@ cdb_complete(Pid) -> cdb_destroy(Pid) -> gen_server:cast(Pid, destroy). +cdb_deletepending(Pid) -> + gen_server:cast(Pid, delete_pending). + %% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to %% continue from that point (calling function has to protect against) double %% counting. @@ -355,6 +360,8 @@ handle_cast(destroy, State) -> ok = file:close(State#state.handle), ok = file:delete(State#state.filename), {noreply, State}; +handle_cast(delete_pending, State) -> + {noreply, State#state{pending_delete = true}}; handle_cast(_Msg, State) -> {noreply, State}. @@ -362,11 +369,14 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> - case State#state.handle of - undefined -> + case {State#state.handle, State#state.pending_delete} of + {undefined, _} -> ok; - Handle -> - file:close(Handle) + {Handle, false} -> + file:close(Handle); + {Handle, true} -> + file:close(Handle), + file:delete(State#state.filename) end. code_change(_OldVsn, State, _Extra) -> diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 49a4f43..756d96b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -80,7 +80,7 @@ init([IClerkOpts]) -> end. handle_call(_Msg, _From, State) -> - {reply, not_supprted, State}. + {reply, not_supported, State}. handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, State) -> @@ -111,11 +111,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout}, BestRun), ok = leveled_inker:ink_updatemanifest(Inker, ManifestSlice, - PromptDelete, FilesToDelete), - {noreply, State}; + ok = leveled_inker:ink_compactioncomplete(Inker), + case PromptDelete of + true -> + lists:foreach(fun({_SQN, _FN, J2D}) -> + leveled_cdb:cdb_deletepending(J2D) end, + FilesToDelete), + {noreply, State}; + false -> + {noreply, State} + end; Score -> io:format("No compaction run as highest score=~w~n", [Score]), + ok = leveled_inker:ink_compactioncomplete(Inker), {noreply, State} end; handle_cast({remove, _Removals}, State) -> @@ -361,6 +370,9 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) -> FN = leveled_inker:filepath(FP, SQN, compact_journal), + io:format("Generate journal for compaction" + ++ " with filename ~s~n", + [FN]), leveled_cdb:cdb_open_writer(FN, CDBopts); _ -> diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 56c1664..3fab0fa 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -105,10 +105,12 @@ ink_loadpcl/4, ink_registersnapshot/2, ink_compactjournal/3, + ink_compactioncomplete/1, ink_getmanifest/1, - ink_updatemanifest/4, + ink_updatemanifest/3, ink_print_manifest/1, ink_close/1, + ink_forceclose/1, build_dummy_journal/0, simple_manifest_reader/2, clean_testdir/1, @@ -135,7 +137,9 @@ registered_snapshots = [] :: list(), root_path :: string(), cdb_options :: #cdb_options{}, - clerk :: pid()}). + clerk :: pid(), + compaction_pending = false :: boolean(), + is_snapshot = false :: boolean()}). %%%============================================================================ @@ -158,7 +162,10 @@ ink_registersnapshot(Pid, Requestor) -> gen_server:call(Pid, {snapshot, Requestor}, infinity). ink_close(Pid) -> - gen_server:call(Pid, close, infinity). + gen_server:call(Pid, {close, false}, infinity). + +ink_forceclose(Pid) -> + gen_server:call(Pid, {close, true}, infinity). ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). @@ -172,7 +179,7 @@ ink_compactjournal(Pid, Penciller, Timeout) -> CheckerInitiateFun, CheckerFilterFun, Timeout}, - infiniy). + infinity). %% Allows the Checker to be overriden in test, use something other than a %% penciller @@ -185,14 +192,16 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) -> Timeout}, infinity). +ink_compactioncomplete(Pid) -> + gen_server:call(Pid, compaction_complete, infinity). + ink_getmanifest(Pid) -> gen_server:call(Pid, get_manifest, infinity). -ink_updatemanifest(Pid, ManifestSnippet, PromptDeletion, DeletedFiles) -> +ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) -> gen_server:call(Pid, {update_manifest, ManifestSnippet, - PromptDeletion, DeletedFiles}, infinity). @@ -213,7 +222,8 @@ init([InkerOpts]) -> {ActiveJournalDB, Manifest}} = ink_registersnapshot(SrcInker, Requestor), {ok, #state{manifest=Manifest, - active_journaldb=ActiveJournalDB}}; + active_journaldb=ActiveJournalDB, + is_snapshot=true}}; %% Need to do something about timeout {_RootPath, false} -> start_from_file(InkerOpts) @@ -272,7 +282,6 @@ handle_call(get_manifest, _From, State) -> {reply, State#state.manifest, State}; handle_call({update_manifest, ManifestSnippet, - PromptDeletion, DeletedFiles}, _From, State) -> Man0 = lists:foldl(fun(ManEntry, AccMan) -> Check = lists:member(ManEntry, DeletedFiles), @@ -291,13 +300,7 @@ handle_call({update_manifest, ManifestSnippet), NewManifestSQN = State#state.manifest_sqn + 1, ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path), - PendingRemovals = case PromptDeletion of - true -> - State#state.pending_removals ++ - [{NewManifestSQN, DeletedFiles}]; - _ -> - State#state.pending_removals - end, + PendingRemovals = [{NewManifestSQN, DeletedFiles}], {reply, ok, State#state{manifest=Man1, manifest_sqn=NewManifestSQN, pending_removals=PendingRemovals}}; @@ -316,9 +319,16 @@ handle_call({compact, FilterFun, self(), Timeout), - {reply, ok, State}; -handle_call(close, _From, State) -> - {stop, normal, ok, State}; + {reply, ok, State#state{compaction_pending=true}}; +handle_call(compaction_complete, _From, State) -> + {reply, ok, State#state{compaction_pending=false}}; +handle_call({close, Force}, _From, State) -> + case {State#state.compaction_pending, Force} of + {true, false} -> + {reply, pause, State}; + _ -> + {stop, normal, ok, State} + end; handle_call(Msg, _From, State) -> io:format("Unexpected message ~w~n", [Msg]), {reply, error, State}. @@ -330,12 +340,21 @@ handle_info(_Info, State) -> {noreply, State}. terminate(Reason, State) -> - io:format("Inker closing journal for reason ~w~n", [Reason]), - io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", - [State#state.journal_sqn, State#state.manifest_sqn]), - io:format("Manifest when closing is: ~n"), - manifest_printer(State#state.manifest), - close_allmanifest(State#state.manifest, State#state.active_journaldb). + case State#state.is_snapshot of + true -> + ok; + false -> + io:format("Inker closing journal for reason ~w~n", [Reason]), + io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", + [State#state.journal_sqn, State#state.manifest_sqn]), + io:format("Manifest when closing is: ~n"), + leveled_iclerk:clerk_stop(State#state.clerk), + lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end, + State#state.registered_snapshots), + manifest_printer(State#state.manifest), + close_allmanifest(State#state.manifest, State#state.active_journaldb), + close_allremovals(State#state.pending_removals) + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -597,6 +616,25 @@ close_allmanifest([H|ManifestT], ActiveJournal) -> ok = leveled_cdb:cdb_close(Pid), close_allmanifest(ManifestT, ActiveJournal). +close_allremovals([]) -> + ok; +close_allremovals([{ManifestSQN, Removals}|Tail]) -> + io:format("Closing removals at ManifestSQN=~w~n", [ManifestSQN]), + lists:foreach(fun({LowSQN, FN, Handle}) -> + io:format("Closing removed file with LowSQN=~w" ++ + " and filename ~s~n", + [LowSQN, FN]), + if + is_pid(Handle) == true -> + ok = leveled_cdb:cdb_close(Handle); + true -> + io:format("Non pid in removal ~w - test~n", + [Handle]) + end + end, + Removals), + close_allremovals(Tail). + roll_pending_journals([TopJournalSQN], Manifest, _RootPath) when is_integer(TopJournalSQN) -> @@ -621,22 +659,22 @@ load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail]) when LowSQN >= MinSQN -> io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]), - ok = load_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FilterFun, - Penciller, - Pid, - undefined), - load_from_sequence(MinSQN, FilterFun, Penciller, ManTail); + {ok, LastMinSQN} = load_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FilterFun, + Penciller, + Pid, + undefined), + load_from_sequence(LastMinSQN, FilterFun, Penciller, ManTail); load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) -> load_from_sequence(MinSQN, FilterFun, Penciller, ManTail). load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) -> InitAcc = {MinSQN, MaxSQN, []}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of - {eof, {_AccMinSQN, _AccMaxSQN, AccKL}} -> + {eof, {AccMinSQN, _AccMaxSQN, AccKL}} -> ok = push_to_penciller(Penciller, AccKL), - ok; + {ok, AccMinSQN}; {LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} -> ok = push_to_penciller(Penciller, AccKL), load_between_sequence(MaxSQN + 1, @@ -748,7 +786,7 @@ initiate_penciller_snapshot(Penciller) -> PclOpts = #penciller_options{start_snapshot = true, source_penciller = Penciller, requestor = self()}, - FilterServer = leveled_penciller:pcl_start(PclOpts), + {ok, FilterServer} = leveled_penciller:pcl_start(PclOpts), ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []), FilterServer. @@ -793,6 +831,7 @@ clean_testdir(RootPath) -> clean_subdir(filepath(RootPath, manifest_dir)). clean_subdir(DirPath) -> + ok = filelib:ensure_dir(DirPath), {ok, Files} = file:list_dir(DirPath), lists:foreach(fun(FN) -> File = filename:join(DirPath, FN), @@ -921,9 +960,9 @@ rollafile_simplejournal_test() -> ?assertMatch(R2, {{54, "KeyBB"}, {"TestValueBB", []}}), Man = ink_getmanifest(Ink1), FakeMan = [{3, "test", dummy}, {1, "other_test", dummy}], - ok = ink_updatemanifest(Ink1, FakeMan, true, Man), + ok = ink_updatemanifest(Ink1, FakeMan, Man), ?assertMatch(FakeMan, ink_getmanifest(Ink1)), - ok = ink_updatemanifest(Ink1, Man, true, FakeMan), + ok = ink_updatemanifest(Ink1, Man, FakeMan), ?assertMatch({{5, "KeyAA"}, {"TestValueAA", []}}, ink_get(Ink1, "KeyAA", 5)), ?assertMatch({{54, "KeyBB"}, {"TestValueBB", []}}, @@ -964,7 +1003,6 @@ compact_journal_test() -> timer:sleep(1000), CompactedManifest = ink_getmanifest(Ink1), ?assertMatch(1, length(CompactedManifest)), - ink_updatemanifest(Ink1, ActualManifest, true, CompactedManifest), ink_close(Ink1), clean_testdir(RootPath). diff --git a/src/leveled_iterator.erl b/src/leveled_iterator.erl deleted file mode 100644 index e065918..0000000 --- a/src/leveled_iterator.erl +++ /dev/null @@ -1,197 +0,0 @@ --module(leveled_iterator). - --export([termiterator/3]). - --include_lib("eunit/include/eunit.hrl"). - - -%% Takes a list of terms to iterate - the terms being sorted in Erlang term -%% order -%% -%% Helper Functions should have free functions - -%% {FolderFun, CompareFun, PointerCheck, PointerFetch} -%% FolderFun - function which takes the next item and the accumulator and -%% returns an updated accumulator. Note FolderFun can only increase the -%% accumulator by one entry each time -%% CompareFun - function which should be able to compare two keys (which are -%% not pointers), and return a winning item (or combination of items) -%% PointerCheck - function for differentiating between keys and pointer -%% PointerFetch - function that takes a pointer an EndKey (which may be -%% infinite) and returns a ne wslice of ordered results from that pointer -%% -%% Range can be for the form -%% {StartKey, EndKey, MaxKeys} where EndKey or MaxKeys can be infinite (but -%% not both) - - -termiterator(ListToIterate, HelperFuns, Range) -> - case Range of - {_, infinte, infinite} -> - bad_iterator; - _ -> - termiterator(null, ListToIterate, [], HelperFuns, Range) - end. - - -termiterator(HeadItem, [], Acc, HelperFuns, _) -> - case HeadItem of - null -> - Acc; - _ -> - {FolderFun, _, _, _} = HelperFuns, - FolderFun(Acc, HeadItem) - end; -termiterator(null, [NextItem|TailList], Acc, HelperFuns, Range) -> - %% Check that the NextItem is not a pointer before promoting to HeadItem - %% Cannot now promote a HeadItem which is a pointer - {_, _, PointerCheck, PointerFetch} = HelperFuns, - case PointerCheck(NextItem) of - {true, Pointer} -> - {_, EndKey, _} = Range, - NewSlice = PointerFetch(Pointer, EndKey), - ExtendedList = lists:merge(NewSlice, TailList), - termiterator(null, ExtendedList, Acc, HelperFuns, Range); - false -> - termiterator(NextItem, TailList, Acc, HelperFuns, Range) - end; -termiterator(HeadItem, [NextItem|TailList], Acc, HelperFuns, Range) -> - {FolderFun, CompareFun, PointerCheck, PointerFetch} = HelperFuns, - {_, EndKey, MaxItems} = Range, - %% HeadItem cannot be pointer, but NextItem might be, so check before - %% comparison - case PointerCheck(NextItem) of - {true, Pointer} -> - NewSlice = PointerFetch(Pointer, EndKey), - ExtendedList = lists:merge(NewSlice, [HeadItem|TailList]), - termiterator(null, ExtendedList, Acc, HelperFuns, Range); - false -> - %% Compare to see if Head and Next match, or if Head is a winner - %% to be added to accumulator - case CompareFun(HeadItem, NextItem) of - {match, StrongItem, _WeakItem} -> - %% Discard WeakItem, Strong Item might be an aggregation of - %% the items - termiterator(StrongItem, TailList, Acc, HelperFuns, Range); - {winner, HeadItem} -> - %% Add next item to accumulator, and proceed with next item - AccPlus = FolderFun(Acc, HeadItem), - case length(AccPlus) of - MaxItems -> - AccPlus; - _ -> - termiterator(NextItem, TailList, AccPlus, - HelperFuns, - {HeadItem, EndKey, MaxItems}) - end - end - end. - - -%% Initial forms of keys supported are Index Keys and Object Keys -%% -%% All keys are of the form {Key, Value, SequenceNumber, State} -%% -%% The Key will be of the form: -%% {o, Bucket, Key} - for an Object Key -%% {i, Bucket, IndexName, IndexTerm, Key} - for an Index Key -%% -%% The value will be of the form: -%% {o, ObjectHash, [vector-clocks]} - for an Object Key -%% null - for an Index Key -%% -%% Sequence number is the sequence number the key was added, and the highest -%% sequence number in the list of keys for an index key. -%% -%% State can be one of the following: -%% live - an active key -%% tomb - a tombstone key -%% {timestamp, TS} - an active key to a certain timestamp -%% {pointer, Pointer} - to be added by iterators to indicate further data -%% available in the range from a particular source - - -pointercheck_indexkey(IndexKey) -> - case IndexKey of - {_Key, _Values, _Sequence, {pointer, Pointer}} -> - {true, Pointer}; - _ -> - false - end. - -folder_indexkey(Acc, IndexKey) -> - case IndexKey of - {_Key, _Value, _Sequence, tomb} -> - Acc; - {Key, _Value, _Sequence, live} -> - {i, _, _, _, ObjectKey} = Key, - lists:append(Acc, [ObjectKey]) - end. - -compare_indexkey(IndexKey1, IndexKey2) -> - {{i, Bucket1, Index1, Term1, Key1}, _Val1, Sequence1, _St1} = IndexKey1, - {{i, Bucket2, Index2, Term2, Key2}, _Val2, Sequence2, _St2} = IndexKey2, - case {Bucket1, Index1, Term1, Key1} of - {Bucket2, Index2, Term2, Key2} when Sequence1 >= Sequence2 -> - {match, IndexKey1, IndexKey2}; - {Bucket2, Index2, Term2, Key2} -> - {match, IndexKey2, IndexKey1}; - _ when IndexKey2 >= IndexKey1 -> - {winner, IndexKey1}; - _ -> - {winner, IndexKey2} - end. - - - -%% Unit testsß - -getnextslice(Pointer, _EndKey) -> - case Pointer of - {test, NewList} -> - NewList; - _ -> - [] - end. - - -iterateoverindexkeyswithnopointer_test() -> - Key1 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"}, - null, 1, live}, - Key2 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"}, - null, 2, tomb}, - Key3 = {{i, "pdsRecord", "familyName_bin", "1971SMITH", "10002"}, - null, 2, live}, - Key4 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10003"}, - null, 2, live}, - KeyList = lists:sort([Key1, Key2, Key3, Key4]), - HelperFuns = {fun folder_indexkey/2, fun compare_indexkey/2, - fun pointercheck_indexkey/1, fun getnextslice/2}, - ?assertMatch(["10002", "10003"], - termiterator(KeyList, HelperFuns, {"1971", "1973", infinite})). - -iterateoverindexkeyswithpointer_test() -> - Key1 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"}, - null, 1, live}, - Key2 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"}, - null, 2, tomb}, - Key3 = {{i, "pdsRecord", "familyName_bin", "1971SMITH", "10002"}, - null, 2, live}, - Key4 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10003"}, - null, 2, live}, - Key5 = {{i, "pdsRecord", "familyName_bin", "1972ZAFRIDI", "10004"}, - null, 2, live}, - Key6 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10004"}, - null, 0, {pointer, {test, [Key5]}}}, - KeyList = lists:sort([Key1, Key2, Key3, Key4, Key6]), - HelperFuns = {fun folder_indexkey/2, fun compare_indexkey/2, - fun pointercheck_indexkey/1, fun getnextslice/2}, - ?assertMatch(["10002", "10003", "10004"], - termiterator(KeyList, HelperFuns, {"1971", "1973", infinite})), - ?assertMatch(["10002", "10003"], - termiterator(KeyList, HelperFuns, {"1971", "1973", 2})). - - - - - - diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 9747b9a..a1b1249 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -7,11 +7,16 @@ %% Ledger. %% - The Penciller provides re-write (compaction) work up to be managed by %% the Penciller's Clerk -%% - The Penciller mainatins a register of iterators who have requested +%% - The Penciller maintains a register of iterators who have requested %% snapshots of the Ledger %% - The accepts new dumps (in the form of lists of keys) from the Bookie, and %% calls the Bookie once the process of pencilling this data in the Ledger is %% complete - and the Bookie is free to forget about the data +%% - The Penciller's persistence of the ledger may not be reliable, in that it +%% may lose data but only in sequence from a particular sequence number. On +%% startup the Penciller will inform the Bookie of the highest sequence number +%% it has, and the Bookie should load any missing data from that point out of +%5 the journal. %% %% -------- LEDGER --------- %% @@ -78,18 +83,21 @@ %% %% ---------- SNAPSHOT ---------- %% -%% Iterators may request a snapshot of the database. To provide a snapshot -%% the Penciller must snapshot the ETS table, and then send this with a copy -%% of the manifest. +%% Iterators may request a snapshot of the database. A snapshot is a cloned +%% Penciller seeded not from disk, but by the in-memory ETS table and the +%% in-memory manifest. + +%% To provide a snapshot the Penciller must snapshot the ETS table. The +%% snapshot of the ETS table is managed by the Penciller storing a list of the +%% batches of Keys which have been pushed to the Penciller, and it is expected +%% that this will be converted by the clone into a gb_tree. The clone may +%% then update the master Penciller with the gb_tree to be cached and used by +%% other cloned processes. %% -%% Iterators requesting snapshots are registered by the Penciller, so that SFT -%% files valid at the point of the snapshot until either the iterator is +%% Clones formed to support snapshots are registered by the Penciller, so that +%% SFT files valid at the point of the snapshot until either the iterator is %% completed or has timed out. %% -%% Snapshot requests may request a filtered view of the ETS table (whihc may -%% be quicker than requesting the full table), or requets a snapshot of only -%% the persisted part of the Ledger -%% %% ---------- ON STARTUP ---------- %% %% On Startup the Bookie with ask the Penciller to initiate the Ledger first. @@ -105,15 +113,14 @@ %% ---------- ON SHUTDOWN ---------- %% %% On a controlled shutdown the Penciller should attempt to write any in-memory -%% ETS table to disk into the special ..on_shutdown folder +%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is +%% already pending then the Penciller will not persist this part of the Ledger. %% %% ---------- FOLDER STRUCTURE ---------- %% %% The following folders are used by the Penciller -%% $ROOT/ledger_manifest/ - used for keeping manifest files -%% $ROOT/ledger_onshutdown/ - containing the persisted view of the ETS table -%% written on controlled shutdown -%% $ROOT/ledger_files/ - containing individual SFT files +%% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files +%% $ROOT/ledger/ledger_files/ - containing individual SFT files %% %% In larger stores there could be a large number of files in the ledger_file %% folder - perhaps o(1000). It is assumed that modern file systems should diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index ac1bf68..743d93e 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -408,19 +408,7 @@ create_levelzero(Inp1, Filename) -> false -> ets:tab2list(Inp1) end, - Ext = filename:extension(Filename), - Components = filename:split(Filename), - {TmpFilename, PrmFilename} = case Ext of - [] -> - {filename:join(Components) ++ ".pnd", - filename:join(Components) ++ ".sft"}; - Ext -> - %% This seems unnecessarily hard - DN = filename:dirname(Filename), - FP = lists:last(Components), - FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), - {DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"} - end, + {TmpFilename, PrmFilename} = generate_filenames(Filename), case create_file(TmpFilename) of {error, Reason} -> {error, @@ -442,6 +430,23 @@ create_levelzero(Inp1, Filename) -> oversized_file=InputSize>?MAX_KEYS}} end. + +generate_filenames(RootFilename) -> + Ext = filename:extension(RootFilename), + Components = filename:split(RootFilename), + case Ext of + [] -> + {filename:join(Components) ++ ".pnd", + filename:join(Components) ++ ".sft"}; + Ext -> + %% This seems unnecessarily hard + DN = filename:dirname(RootFilename), + FP = lists:last(Components), + FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)), + {DN ++ "/" ++ FP_NOEXT ++ "pnd", DN ++ "/" ++ FP_NOEXT ++ "sft"} + end. + + %% Start a bare file with an initial header and no further details %% Return the {Handle, metadata record} create_file(FileName) when is_list(FileName) -> @@ -1762,4 +1767,16 @@ big_iterator_test() -> ok = file:close(Handle), ok = file:delete(Filename). +filename_test() -> + FN1 = "../tmp/filename", + FN2 = "../tmp/filename.pnd", + FN3 = "../tmp/subdir/file_name.pend", + ?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"}, + generate_filenames(FN1)), + ?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"}, + generate_filenames(FN2)), + ?assertMatch({"../tmp/subdir/file_name.pnd", + "../tmp/subdir/file_name.sft"}, + generate_filenames(FN3)). + -endif. \ No newline at end of file diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl new file mode 100644 index 0000000..602514c --- /dev/null +++ b/test/end_to_end/basic_SUITE.erl @@ -0,0 +1,135 @@ +-module(basic_SUITE). +-include_lib("common_test/include/ct.hrl"). +-include("../include/leveled.hrl"). +-export([all/0]). +-export([simple_put_fetch/1, + journal_compaction/1]). + +all() -> [journal_compaction, simple_put_fetch]. + +simple_put_fetch(_Config) -> + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, + TestObject#r_object.bucket, + TestObject#r_object.key), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = #bookie_options{root_path=RootPath, + max_journalsize=3000000}, + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, + TestObject#r_object.bucket, + TestObject#r_object.key), + ObjList1 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie2, Obj, Spc) end, + ObjList1), + ChkList1 = lists:sublist(lists:sort(ObjList1), 100), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie2, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList1), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, + TestObject#r_object.bucket, + TestObject#r_object.key), + ok = leveled_bookie:book_close(Bookie2), + reset_filestructure(). + +journal_compaction(_Config) -> + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, + max_journalsize=4000000}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, + TestObject#r_object.bucket, + TestObject#r_object.key), + ObjList1 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + ObjList1), + ChkList1 = lists:sublist(lists:sort(ObjList1), 100), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie1, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList1), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie1, + TestObject#r_object.bucket, + TestObject#r_object.key), + %% Now replace all the objects + ObjList2 = generate_multiple_objects(5000, 2), + lists:foreach(fun({_RN, Obj, Spc}) -> + leveled_bookie:book_riakput(Bookie1, Obj, Spc) end, + ObjList2), + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + ChkList3 = lists:sublist(lists:sort(ObjList2), 500), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie1, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList3), + ok = leveled_bookie:book_close(Bookie1), + % Restart + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + {ok, TestObject} = leveled_bookie:book_riakget(Bookie2, + TestObject#r_object.bucket, + TestObject#r_object.key), + lists:foreach(fun({_RN, Obj, _Spc}) -> + R = leveled_bookie:book_riakget(Bookie2, + Obj#r_object.bucket, + Obj#r_object.key), + R = {ok, Obj} end, + ChkList3), + ok = leveled_bookie:book_close(Bookie2), + reset_filestructure(). + + +reset_filestructure() -> + RootPath = "test", + filelib:ensure_dir(RootPath ++ "/journal/"), + filelib:ensure_dir(RootPath ++ "/ledger/"), + leveled_inker:clean_testdir(RootPath ++ "/journal"), + leveled_penciller:clean_testdir(RootPath ++ "/ledger"), + RootPath. + +generate_testobject() -> + {B1, K1, V1, Spec1, MD} = {"Bucket1", + "Key1", + "Value1", + [], + {"MDK1", "MDV1"}}, + Content = #r_content{metadata=MD, value=V1}, + {#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + Spec1}. + +generate_multiple_objects(Count, KeyNumber) -> + generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(4096)). + +generate_multiple_objects(0, _KeyNumber, ObjL, _Value) -> + ObjL; +generate_multiple_objects(Count, KeyNumber, ObjL, Value) -> + Obj = {"Bucket", + "Key" ++ integer_to_list(KeyNumber), + Value, + [], + [{"MDK", "MDV" ++ integer_to_list(KeyNumber)}, + {"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]}, + {B1, K1, V1, Spec1, MD} = Obj, + Content = #r_content{metadata=MD, value=V1}, + Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + generate_multiple_objects(Count - 1, + KeyNumber + 1, + ObjL ++ [{random:uniform(), Obj1, Spec1}], + Value). + + + \ No newline at end of file