diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 3794dab..18b018f 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -160,7 +160,7 @@ penciller :: pid(), cache_size :: integer(), back_pressure :: boolean(), - ledger_cache :: {gb_trees:tree(), list()}, + ledger_cache :: gb_trees:tree(), is_snapshot :: boolean()}). @@ -242,7 +242,7 @@ init([Opts]) -> {ok, #state{inker=Inker, penciller=Penciller, cache_size=CacheSize, - ledger_cache={gb_trees:empty(), []}, + ledger_cache=gb_trees:empty(), is_snapshot=false}}; Bookie -> {ok, @@ -397,15 +397,16 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> +bucket_stats(Penciller, LedgerCache, Bucket, Tag) -> PCLopts = #penciller_options{start_snapshot=true, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(ChangeList)]), + [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, ChangeList}), + {infinity, Increment}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), EndKey = leveled_codec:to_ledgerkey(Bucket, null, Tag), Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, @@ -418,7 +419,7 @@ bucket_stats(Penciller, {_ObjTree, ChangeList}, Bucket, Tag) -> end, {async, Folder}. -index_query(Penciller, {_ObjTree, ChangeList}, +index_query(Penciller, LedgerCache, Bucket, {IdxField, StartValue, EndValue}, {ReturnTerms, TermRegex}) -> @@ -426,10 +427,11 @@ index_query(Penciller, {_ObjTree, ChangeList}, source_penciller=Penciller}, {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> + Increment = gb_trees:to_list(LedgerCache), io:format("Length of increment in snapshot is ~w~n", - [length(ChangeList)]), + [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - {infinity, ChangeList}), + {infinity, Increment}), StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, IdxField, StartValue), EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG, @@ -491,9 +493,8 @@ startup(InkerOpts, PencillerOpts) -> {Inker, Penciller}. -fetch_head(Key, Penciller, {ObjTree, _ChangeList}) -> - - case gb_trees:lookup(Key, ObjTree) of +fetch_head(Key, Penciller, LedgerCache) -> + case gb_trees:lookup(Key, LedgerCache) of {value, Head} -> Head; none -> @@ -569,30 +570,20 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> [PrimaryChange] ++ ConvSpecs. addto_ledgercache(Changes, Cache) -> - {ObjectTree, ChangeList} = Cache, - {lists:foldl(fun({K, V}, Acc) -> - case leveled_codec:is_indexkey(K) of - false -> - gb_trees:enter(K, V, Acc); - true -> - Acc - end - end, - ObjectTree, - Changes), - ChangeList ++ Changes}. + lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, + Cache, + Changes). maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> - {_ObjectTree, ChangeList} = Cache, - CacheSize = length(ChangeList), + CacheSize = gb_trees:size(Cache), if CacheSize > MaxCacheSize -> case leveled_penciller:pcl_pushmem(Penciller, - ChangeList) of + gb_trees:to_list(Cache)) of ok -> - {ok, {gb_trees:empty(), []}}; + {ok, gb_trees:empty()}; pause -> - {pause, {gb_trees:empty(), []}}; + {pause, gb_trees:empty()}; refused -> {ok, Cache} end; diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 0ddc6d7..c346cc1 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -724,9 +724,10 @@ manifest_printer(Manifest) -> initiate_penciller_snapshot(Bookie) -> {ok, - {LedgerSnap, {_ObjTree, ChangeList}}, + {LedgerSnap, LedgerCache}, _} = leveled_bookie:book_snapshotledger(Bookie, self(), undefined), - ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, ChangeList), + ok = leveled_penciller:pcl_loadsnapshot(LedgerSnap, + gb_trees:to_list(LedgerCache)), MaxSQN = leveled_penciller:pcl_getstartupsequencenumber(LedgerSnap), {LedgerSnap, MaxSQN}. diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 32c70a3..2aa99c9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -246,6 +246,7 @@ pcl_loadsnapshot/2, pcl_getstartupsequencenumber/1, roll_new_tree/3, + roll_into_list/1, clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -377,7 +378,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) % then add to memory in the background before updating the loop state % - Push the update into memory (do_pushtomem/3) % - If we haven't got through quickcheck now need to check if there is a - % definite need to write a new L0 file (roll_memory/2). If all clear this + % definite need to write a new L0 file (roll_memory/3). If all clear this % will write the file in the background and allow a response to the user. % If not the change has still been made but the the L0 file will not have % been prompted - so the reply does not indicate failure but returns the @@ -414,7 +415,7 @@ handle_call({push_mem, DumpList}, From, State=#state{is_snapshot=Snap}) State1#state.memtable_copy, MaxSQN), - case roll_memory(State1, MaxTableSize) of + case roll_memory(State1, MaxTableSize, L0Snap) of {ok, L0Pend, ManSN, TableSize2} -> io:format("Push completed in ~w microseconds~n", [timer:now_diff(os:timestamp(), StartWatch)]), @@ -587,9 +588,16 @@ terminate(Reason, State) -> no_change -> State end, - Dump = ets:tab2list(UpdState#state.memtable), + % TODO: + % This next section (to the end of the case clause), appears to be + % pointless. It will persist the in-memory state to a SFT file, but on + % startup that file will be ignored as the manifest has not bene updated + % + % Should we update the manifest, or stop trying to persist on closure? + Dump = roll_into_list(State#state.memtable_copy), case {UpdState#state.levelzero_pending, - get_item(0, UpdState#state.manifest, []), length(Dump)} of + get_item(0, UpdState#state.manifest, []), + length(Dump)} of {?L0PEND_RESET, [], L} when L > 0 -> MSN = UpdState#state.manifest_sqn + 1, FileName = UpdState#state.root_path @@ -616,6 +624,8 @@ terminate(Reason, State) -> ++ " with ~w keys discarded~n", [length(Dump)]) end, + + % Tidy shutdown of individual files ok = close_files(0, UpdState#state.manifest), lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, @@ -639,15 +649,9 @@ start_from_file(PCLopts) -> M -> M end, - % Options (not) chosen here: - % - As we pass the ETS table to the sft file when the L0 file is created - % then this cannot be private. - % - There is no use of iterator, so a set could be used, but then the - % output of tab2list would need to be sorted - % TODO: - % - Test switching to [set, private] and sending the L0 snapshots to the - % sft_new cast - TID = ets:new(?MEMTABLE, [ordered_set]), + % There is no need to export this ets table (hence private) or iterate + % over it (hence set not ordered_set) + TID = ets:new(?MEMTABLE, [set, private]), {ok, Clerk} = leveled_pclerk:clerk_new(self()), InitState = #state{memtable=TID, clerk=Clerk, @@ -767,12 +771,17 @@ quickcheck_pushtomem(DumpList, TableSize, MaxSize) -> do_pushtomem(DumpList, MemTable, Snapshot, MaxSQN) -> SW = os:timestamp(), UpdSnapshot = add_increment_to_memcopy(Snapshot, MaxSQN, DumpList), + % Note that the DumpList must have been taken from a source which + % naturally de-duplicates the keys. It is not possible just to cache + % changes in a list (in the Bookie for example), as the insert method does + % not apply the list in order, and so it is not clear which of a duplicate + % key will be applied ets:insert(MemTable, DumpList), io:format("Push into memory timed at ~w microseconds~n", [timer:now_diff(os:timestamp(), SW)]), UpdSnapshot. -roll_memory(State, MaxSize) -> +roll_memory(State, MaxSize, MemTableCopy) -> case ets:info(State#state.memtable, size) of Size when Size > MaxSize -> L0 = get_item(0, State#state.manifest, []), @@ -784,7 +793,7 @@ roll_memory(State, MaxSize) -> ++ integer_to_list(MSN) ++ "_0_0", Opts = #sft_options{wait=false}, {ok, L0Pid} = leveled_sft:sft_new(FileName, - State#state.memtable, + MemTableCopy, [], 0, Opts), @@ -938,7 +947,6 @@ return_work(State, From) -> %% This takes the three parts of a memtable copy - the increments, the tree %% and the SQN at which the tree was formed, and outputs a new tree - roll_new_tree(Tree, [], HighSQN) -> {Tree, HighSQN}; roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> @@ -954,6 +962,14 @@ roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> roll_new_tree(Tree, [_H|TailIncs], HighSQN) -> roll_new_tree(Tree, TailIncs, HighSQN). +%% This takes the three parts of a memtable copy - the increments, the tree +%% and the SQN at which the tree was formed, and outputs a sorted list +roll_into_list(MemTableCopy) -> + {Tree, _SQN} = roll_new_tree(MemTableCopy#l0snapshot.tree, + MemTableCopy#l0snapshot.increments, + MemTableCopy#l0snapshot.ledger_sqn), + gb_trees:to_list(Tree). + %% Update the memtable copy if the tree created advances the SQN cache_tree_in_memcopy(MemCopy, Tree, SQN) -> case MemCopy#l0snapshot.ledger_sqn of @@ -1331,7 +1347,7 @@ rename_manifest_files(RootPath, NewMSN) -> filelib:is_file(OldFN), NewFN, filelib:is_file(NewFN)]), - file:rename(OldFN,NewFN). + ok = file:rename(OldFN,NewFN). filepath(RootPath, manifest) -> RootPath ++ "/" ++ ?MANIFEST_FP; @@ -1379,13 +1395,14 @@ confirm_delete(Filename, UnreferencedFiles, RegisteredSnapshots) -> assess_sqn([]) -> empty; -assess_sqn([HeadKV|[]]) -> - {leveled_codec:strip_to_seqonly(HeadKV), - leveled_codec:strip_to_seqonly(HeadKV)}; -assess_sqn([HeadKV|DumpList]) -> - {leveled_codec:strip_to_seqonly(HeadKV), - leveled_codec:strip_to_seqonly(lists:last(DumpList))}. +assess_sqn(DumpList) -> + assess_sqn(DumpList, infinity, 0). +assess_sqn([], MinSQN, MaxSQN) -> + {MinSQN, MaxSQN}; +assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> + SQN = leveled_codec:strip_to_seqonly(HeadKey), + assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). %%%============================================================================ %%% Test @@ -1499,11 +1516,6 @@ simple_server_test() -> max_inmemory_tablesize=1000}), TopSQN = pcl_getstartupsequencenumber(PCLr), Check = case TopSQN of - 2001 -> - %% Last push not persisted - S3a = pcl_pushmem(PCL, [Key3]), - if S3a == pause -> timer:sleep(1000); true -> ok end, - ok; 2002 -> %% everything got persisted ok; diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 1e846e2..af56fa9 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -183,7 +183,7 @@ -define(MERGE_SCANWIDTH, 8). -define(DELETE_TIMEOUT, 60000). -define(MAX_KEYS, ?SLOT_COUNT * ?BLOCK_COUNT * ?BLOCK_SIZE). - +-define(DISCARD_EXT, ".discarded"). -record(state, {version = ?CURRENT_VERSION :: tuple(), slot_index :: list(), @@ -387,7 +387,7 @@ create_levelzero(Inp1, Filename) -> true -> Inp1; false -> - ets:tab2list(Inp1) + leveled_penciller:roll_into_list(Inp1) end, {TmpFilename, PrmFilename} = generate_filenames(Filename), case create_file(TmpFilename) of @@ -510,6 +510,19 @@ complete_file(Handle, FileMD, KL1, KL2, Level, Rename) -> open_file(FileMD); {true, OldName, NewName} -> io:format("Renaming file from ~s to ~s~n", [OldName, NewName]), + case filelib:is_file(NewName) of + true -> + io:format("Filename ~s already exists~n", + [NewName]), + AltName = filename:join(filename:dirname(NewName), + filename:basename(NewName)) + ++ ?DISCARD_EXT, + io:format("Rename rogue filename ~s to ~s~n", + [NewName, AltName]), + ok = file:rename(NewName, AltName); + false -> + ok + end, ok = file:rename(OldName, NewName), open_file(FileMD#state{filename=NewName}) end, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index bf5a700..dac295f 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -192,6 +192,7 @@ fetchput_snapshot(_Config) -> io:format("Checked for replacement objects in active bookie" ++ ", old objects in snapshot~n"), + ok = filelib:ensure_dir(RootPath ++ "/ledger/ledger_files"), {ok, FNsA} = file:list_dir(RootPath ++ "/ledger/ledger_files"), ObjList3 = testutil:generate_objects(15000, 5002), lists:foreach(fun({_RN, Obj, Spc}) -> @@ -212,6 +213,7 @@ fetchput_snapshot(_Config) -> testutil:check_forlist(SnapBookie3, lists:nth(length(CLs2), CLs2)), testutil:check_formissinglist(SnapBookie2, ChkList3), testutil:check_formissinglist(SnapBookie2, lists:nth(length(CLs2), CLs2)), + testutil:check_forlist(Bookie2, ChkList2), testutil:check_forlist(SnapBookie3, ChkList2), testutil:check_forlist(SnapBookie2, ChkList1), io:format("Started new snapshot and check for new objects~n"), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 0d472cc..c938d6a 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -53,14 +53,22 @@ check_forlist(Bookie, ChkList, Log) -> lists:foreach(fun({_RN, Obj, _Spc}) -> if Log == true -> - io:format("Fetching Key ~w~n", [Obj#r_object.key]); + io:format("Fetching Key ~s~n", [Obj#r_object.key]); true -> ok end, R = leveled_bookie:book_riakget(Bookie, Obj#r_object.bucket, Obj#r_object.key), - R = {ok, Obj} end, + ok = case R of + {ok, Obj} -> + ok; + not_found -> + io:format("Object not found for key ~s~n", + [Obj#r_object.key]), + error + end + end, ChkList), io:format("Fetch check took ~w microseconds checking list of length ~w~n", [timer:now_diff(os:timestamp(), SW), length(ChkList)]).