diff --git a/include/leveled.hrl b/include/leveled.hrl index 481b8db..55192ca 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -46,8 +46,6 @@ {root_path :: string(), cache_size :: integer(), max_journalsize :: integer(), - metadata_extractor :: function(), - indexspec_converter :: function(), snapshot_bookie :: pid()}). -record(iclerk_options, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 18b018f..f95b37c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -133,6 +133,7 @@ terminate/2, code_change/3, book_start/1, + book_start/3, book_riakput/3, book_riakdelete/4, book_riakget/3, @@ -169,6 +170,11 @@ %%% API %%%============================================================================ +book_start(RootPath, LedgerCacheSize, JournalSize) -> + book_start(#bookie_options{root_path=RootPath, + cache_size=LedgerCacheSize, + max_journalsize=JournalSize}). + book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 096e48f..be0bcf1 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -39,7 +39,6 @@ strip_to_keyseqstatusonly/1, striphead_to_details/1, is_active/2, - is_indexkey/1, endkey_passed/2, key_dominates/2, print_key/1, @@ -108,11 +107,6 @@ to_ledgerkey(Bucket, Key, Tag, Field, Value) when Tag == ?IDX_TAG -> to_ledgerkey(Bucket, Key, Tag) -> {Tag, Bucket, Key, null}. -is_indexkey({Tag, _, _, _}) when Tag == ?IDX_TAG -> - true; -is_indexkey(_Key) -> - false. - hash(Obj) -> erlang:phash2(term_to_binary(Obj)). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index c346cc1..5be590b 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -695,7 +695,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ "." ++ ?PENDING_FILEX), MBin = term_to_binary(lists:map(fun({SQN, FN, _PID}) -> {SQN, FN} end, - Manifest)), + Manifest), [compressed]), case filelib:is_file(NewFN) of true -> io:format("Error - trying to write manifest for" diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 2aa99c9..13833f2 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -139,7 +139,10 @@ %% - nonzero_.crr %% %% On startup, the Penciller should look for the nonzero_*.crr file with the -%% highest such manifest sequence number. +%% highest such manifest sequence number. This will be started as the +%% manifest, together with any _0_0.sft file found at that Manifest SQN. +%% Level zero files are not kept in the persisted manifest, and adding a L0 +%% file does not advanced the Manifest SQN. %% %% The pace at which the store can accept updates will be dependent on the %% speed at which the Penciller's Clerk can merge files at lower levels plus @@ -157,6 +160,27 @@ %% table and build a new table starting with the remainder, and the keys from %% the latest push. %% +%% Only a single L0 file may exist at any one moment in time. If pushes are +%% received when memory is over the maximum size, the pushes must be kept into +%% memory. +%% +%% 1 - A L0 file is prompted to be created at ManifestSQN n +%% 2 - The next push to memory will be stalled until the L0 write is reported +%% as completed (as the memory needs to be flushed) +%% 3 - The completion of the L0 file will cause a prompt to be cast to the +%% clerk for them to look for work +%% 4 - On completion of the merge (of the L0 file into L1, as this will be the +%% highest priority work), the clerk will create a new manifest file at +%% manifest SQN n+1 +%% 5 - The clerk will prompt the penciller about the change, and the Penciller +%% will then commit the change (by renaming the manifest file to be active, and +%% advancing th ein-memory state of the manifest and manifest SQN) +%% 6 - The Penciller having committed the change will cast back to the Clerk +%% to inform the Clerk that the chnage has been committed, and so it can carry +%% on requetsing new work +%% 7 - If the Penciller now receives a Push to over the max size, a new L0 file +%% can now be created with the ManifestSQN of n+1 +%% %% ---------- NOTES ON THE USE OF ETS ---------- %% %% Insertion into ETS is very fast, and so using ETS does not slow the PUT @@ -177,7 +201,7 @@ %% they need to iterate, or iterate through map functions scanning all the %% keys. The conversion may not be expensive, as we know loading into an ETS %% table is fast - but there may be some hidden overheads with creating and -%5 destroying many ETS tables. +%% destroying many ETS tables. %% %% A3 - keep a parallel list of lists of things that have gone in the ETS %% table in the format they arrived in @@ -252,7 +276,8 @@ -include_lib("eunit/include/eunit.hrl"). -define(LEVEL_SCALEFACTOR, [{0, 0}, {1, 8}, {2, 64}, {3, 512}, - {4, 4096}, {5, 32768}, {6, 262144}, {7, infinity}]). + {4, 4096}, {5, 32768}, {6, 262144}, + {7, infinity}]). -define(MAX_LEVELS, 8). -define(MAX_WORK_WAIT, 300). -define(MANIFEST_FP, "ledger_manifest"). @@ -383,9 +408,6 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) % If not the change has still been made but the the L0 file will not have % been prompted - so the reply does not indicate failure but returns the % atom 'pause' to signal a loose desire for back-pressure to be applied. - % The only reason in this case why there should be a pause is if the - % manifest is locked pending completion of a manifest change - so reacting - % to the pause signal may not be sensible StartWatch = os:timestamp(), case assess_sqn(DumpList) of {MinSQN, MaxSQN} when MaxSQN >= MinSQN, @@ -494,8 +516,6 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), - io:format("Manifest for iterator of:~n"), - print_manifest(SFTiter), Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}), {reply, Acc, State}; handle_call(work_for_clerk, From, State) -> @@ -588,18 +608,12 @@ terminate(Reason, State) -> no_change -> State end, - % TODO: - % This next section (to the end of the case clause), appears to be - % pointless. It will persist the in-memory state to a SFT file, but on - % startup that file will be ignored as the manifest has not bene updated - % - % Should we update the manifest, or stop trying to persist on closure? Dump = roll_into_list(State#state.memtable_copy), case {UpdState#state.levelzero_pending, get_item(0, UpdState#state.manifest, []), length(Dump)} of {?L0PEND_RESET, [], L} when L > 0 -> - MSN = UpdState#state.manifest_sqn + 1, + MSN = UpdState#state.manifest_sqn, FileName = UpdState#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", @@ -701,7 +715,7 @@ start_from_file(PCLopts) -> %% Find any L0 files L0FN = filepath(RootPath, - TopManSQN + 1, + TopManSQN, new_merge_files) ++ "_0_0.sft", case filelib:is_file(L0FN) of true -> @@ -785,9 +799,9 @@ roll_memory(State, MaxSize, MemTableCopy) -> case ets:info(State#state.memtable, size) of Size when Size > MaxSize -> L0 = get_item(0, State#state.manifest, []), - case {L0, manifest_locked(State)} of - {[], false} -> - MSN = State#state.manifest_sqn + 1, + case L0 of + [] -> + MSN = State#state.manifest_sqn, FileName = State#state.root_path ++ "/" ++ ?FILES_FP ++ "/" ++ integer_to_list(MSN) ++ "_0_0", @@ -798,10 +812,6 @@ roll_memory(State, MaxSize, MemTableCopy) -> 0, Opts), {ok, {true, L0Pid, os:timestamp()}, MSN, Size}; - {[], true} -> - {pause, - "L0 file write blocked by change at sqn=~w~n", - [State#state.manifest_sqn]}; _ -> {pause, "L0 file write blocked by L0 file in manifest~n", @@ -870,23 +880,6 @@ compare_to_sqn(Obj, SQN) -> end. -%% Manifest lock - don't have two changes to the manifest happening -%% concurrently -% TODO: Is this necessary now? - -manifest_locked(State) -> - if - length(State#state.ongoing_work) > 0 -> - true; - true -> - case State#state.levelzero_pending of - {true, _Pid, _TS} -> - true; - _ -> - false - end - end. - %% Work out what the current work queue should be %% %% The work queue should have a lower level work at the front, and no work @@ -905,8 +898,24 @@ return_work(State, From) -> io:format("Work at Level ~w to be scheduled for ~w with ~w " ++ "queue items outstanding~n", [SrcLevel, From, length(OtherWork)]), - case {manifest_locked(State), State#state.ongoing_work} of - {false, _} -> + case {element(1, State#state.levelzero_pending), + State#state.ongoing_work} of + {true, _} -> + % Once the L0 file is completed there will be more work + % - so don't be busy doing other work now + io:format("Allocation of work blocked as L0 pending~n"), + {State, none}; + {_, [OutstandingWork]} -> + % Still awaiting a response + io:format("Ongoing work requested by ~w " ++ + "but work outstanding from Level ~w " ++ + "and Clerk ~w at sequence number ~w~n", + [From, + OutstandingWork#penciller_work.src_level, + OutstandingWork#penciller_work.clerk, + OutstandingWork#penciller_work.next_sqn]), + {State, none}; + _ -> %% No work currently outstanding %% Can allocate work NextSQN = State#state.manifest_sqn + 1, @@ -923,22 +932,7 @@ return_work(State, From) -> start_time = os:timestamp(), ledger_filepath = FP, manifest_file = ManFile}, - {State#state{ongoing_work=[WI]}, WI}; - {true, [OutstandingWork]} -> - %% Still awaiting a response - io:format("Ongoing work requested by ~w " ++ - "but work outstanding from Level ~w " ++ - "and Clerk ~w at sequence number ~w~n", - [From, - OutstandingWork#penciller_work.src_level, - OutstandingWork#penciller_work.clerk, - OutstandingWork#penciller_work.next_sqn]), - {State, none}; - {true, _} -> - %% Manifest locked - io:format("Manifest locked but no work outstanding " ++ - "with clerk~n"), - {State, none} + {State#state{ongoing_work=[WI]}, WI} end; _ -> {State, none} @@ -1313,11 +1307,12 @@ commit_manifest_change(ReturnedWorkItem, State) -> {NewMSN, _From} -> MTime = timer:now_diff(os:timestamp(), SentWorkItem#penciller_work.start_time), + WISrcLevel = SentWorkItem#penciller_work.src_level, io:format("Merge to sqn ~w completed in ~w microseconds " ++ - "at Level ~w~n", + "from Level ~w~n", [SentWorkItem#penciller_work.next_sqn, MTime, - SentWorkItem#penciller_work.src_level]), + WISrcLevel]), ok = rename_manifest_files(RootPath, NewMSN), FilesToDelete = ReturnedWorkItem#penciller_work.unreferenced_files, UnreferencedFilesUpd = update_deletions(FilesToDelete, @@ -1326,10 +1321,26 @@ commit_manifest_change(ReturnedWorkItem, State) -> io:format("Merge has been commmitted at sequence number ~w~n", [NewMSN]), NewManifest = ReturnedWorkItem#penciller_work.new_manifest, - print_manifest(NewManifest), + + CurrL0 = get_item(0, State#state.manifest, []), + % If the work isn't L0 work, then we may have an uncommitted + % manifest change at L0 - so add this back into the Manifest loop + % state + RevisedManifest = case {WISrcLevel, CurrL0} of + {0, _} -> + NewManifest; + {_, []} -> + NewManifest; + {_, [L0ManEntry]} -> + lists:keystore(0, + 1, + NewManifest, + {0, [L0ManEntry]}) + end, + print_manifest(RevisedManifest), {ok, State#state{ongoing_work=[], manifest_sqn=NewMSN, - manifest=NewManifest, + manifest=RevisedManifest, unreferenced_files=UnreferencedFilesUpd}}; {MaybeWrongMSN, From} -> io:format("Merge commit at sqn ~w not matched to expected" ++ @@ -1508,6 +1519,7 @@ simple_server_test() -> ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), maybe_pause_push(pcl_pushmem(PCL, KL2)), maybe_pause_push(pcl_pushmem(PCL, [Key3])), + ?assertMatch(Key1, pcl_fetch(PCL, {o,"Bucket0001", "Key0001", null})), ?assertMatch(Key2, pcl_fetch(PCL, {o,"Bucket0002", "Key0002", null})), ?assertMatch(Key3, pcl_fetch(PCL, {o,"Bucket0003", "Key0003", null})), diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index af56fa9..7669660 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -282,9 +282,6 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> FileMD, KL1, KL2, Level), - {KL1Rem, KL2Rem} = KeyRemainders, - io:format("File created with remainders of size ~w ~w~n", - [length(KL1Rem), length(KL2Rem)]), {reply, {KeyRemainders, UpdFileMD#state.smallest_key, UpdFileMD#state.highest_key}, @@ -334,7 +331,10 @@ handle_call(get_maxsqn, _From, State) -> {reply, State#state.highest_sqn, State}. handle_cast({sft_new, Filename, Inp1, [], 0}, _State) -> + SW = os:timestamp(), {ok, State} = create_levelzero(Inp1, Filename), + io:format("File creation of L0 file ~s took ~w microseconds~n", + [Filename, timer:now_diff(os:timestamp(), SW)]), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index 3b8701f..00a34af 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -40,9 +40,7 @@ simple_load_with2i(_Config) -> simple_querycount(_Config) -> RootPath = testutil:reset_filestructure(), - StartOpts1 = #bookie_options{root_path=RootPath, - max_journalsize=50000000}, - {ok, Book1} = leveled_bookie:book_start(StartOpts1), + {ok, Book1} = leveled_bookie:book_start(RootPath, 4000, 50000000), {TestObject, TestSpec} = testutil:generate_testobject(), ok = leveled_bookie:book_riakput(Book1, TestObject, TestSpec), testutil:check_forobject(Book1, TestObject), @@ -91,7 +89,7 @@ simple_querycount(_Config) -> Book1, ?KEY_ONLY), ok = leveled_bookie:book_close(Book1), - {ok, Book2} = leveled_bookie:book_start(StartOpts1), + {ok, Book2} = leveled_bookie:book_start(RootPath, 2000, 50000000), Index1Count = count_termsonindex("Bucket", "idx1_bin", Book2, @@ -206,7 +204,7 @@ simple_querycount(_Config) -> end, R9), ok = leveled_bookie:book_close(Book2), - {ok, Book3} = leveled_bookie:book_start(StartOpts1), + {ok, Book3} = leveled_bookie:book_start(RootPath, 2000, 50000000), lists:foreach(fun({IdxF, IdxT, X}) -> R = leveled_bookie:book_returnfolder(Book3, {index_query, @@ -223,7 +221,7 @@ simple_querycount(_Config) -> end, R9), ok = leveled_bookie:book_riakput(Book3, Obj9, Spc9), - {ok, Book4} = leveled_bookie:book_start(StartOpts1), + {ok, Book4} = leveled_bookie:book_start(RootPath, 2000, 50000000), lists:foreach(fun({IdxF, IdxT, X}) -> R = leveled_bookie:book_returnfolder(Book4, {index_query,