From e2bb09b873da1c720f3c7df4ef1d5bee5913a546 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Mon, 26 Sep 2016 10:55:08 +0100 Subject: [PATCH] Snapshot testing Work to test the checking of sequence numbers in snapshots as required by the inkers clerk to calculate the percentage of a file which is compactable --- src/leveled_iclerk.erl | 65 +++++-- src/leveled_inker.erl | 23 ++- src/leveled_penciller.erl | 354 ++++++++++++++++++++++++++------------ 3 files changed, 311 insertions(+), 131 deletions(-) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 1fe049d..fee2417 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -12,16 +12,19 @@ handle_info/2, terminate/2, clerk_new/1, + clerk_compact/5, clerk_remove/2, clerk_stop/1, code_change/3]). -include_lib("eunit/include/eunit.hrl"). --define(BATCH_SIZE, 16) +-define(SAMPLE_SIZE, 200). +-define(BATCH_SIZE, 16). -define(BATCHES_TO_CHECK, 8). --record(state, {owner :: pid()}). +-record(state, {owner :: pid(), + penciller_snapshot :: pid()}). %%%============================================================================ %%% API @@ -36,6 +39,9 @@ clerk_remove(Pid, Removals) -> gen_server:cast(Pid, {remove, Removals}), ok. +clerk_compact(Pid, Manifest, ManifestSQN, Penciller, Timeout) -> + gen_server:cast(Pid, {compact, Manifest, ManifestSQN, Penciller, Timeout}). + clerk_stop(Pid) -> gen_server:cast(Pid, stop). @@ -49,6 +55,15 @@ init([]) -> handle_call({register, Owner}, _From, State) -> {reply, ok, State#state{owner=Owner}}. +handle_cast({compact, Manifest, _ManifestSQN, Penciller, _Timeout}, State) -> + PclOpts = #penciller_options{start_snapshot = true, + source_penciller = Penciller, + requestor = self()}, + PclSnap = leveled_penciller:pcl_start(PclOpts), + ok = leveled_penciller:pcl_loadsnapshot(PclSnap, []), + _CandidateList = scan_all_files(Manifest, PclSnap), + %% TODO - Lots + {noreply, State}; handle_cast({remove, _Removals}, State) -> {noreply, State}; handle_cast(stop, State) -> @@ -69,20 +84,38 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ -check_single_file(CDB, _PencilSnapshot, SampleSize, BatchSize) -> +check_single_file(CDB, PclSnap, SampleSize, BatchSize) -> PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), - KeySizeList. - %% TODO: - %% Need to check the penciller snapshot to see if these keys are at the - %% right sequence number - %% - %% The calculate the proportion (by value size) of the CDB which is at the - %% wrong sequence number to help determine eligibility for compaction - %% - %% BIG TODO: - %% Need to snapshot a penciller - + R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> + {{PK, SQN}, Size} = KS, + Chk = leveled_pcl:pcl_checksequencenumber(PclSnap, + PK, + SQN), + case Chk of + true -> + {ActSize + Size, RplSize}; + false -> + {ActSize, RplSize + Size} + end end, + {0, 0}, + KeySizeList), + {ActiveSize, ReplacedSize} = R0, + 100 * (ActiveSize / (ActiveSize + ReplacedSize)). + +scan_all_files(Manifest, Penciller) -> + scan_all_files(Manifest, Penciller, []). + +scan_all_files([], _Penciller, CandidateList) -> + CandidateList; +scan_all_files([{LowSQN, FN, JournalP}|Tail], Penciller, CandidateList) -> + CompactPerc = check_single_file(JournalP, + Penciller, + ?SAMPLE_SIZE, + ?BATCH_SIZE), + scan_all_files(Tail, Penciller, CandidateList ++ + [{LowSQN, FN, JournalP, CompactPerc}]). + fetch_inbatches([], _BatchSize, _CDB, CheckedList) -> CheckedList; fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> @@ -90,8 +123,8 @@ fetch_inbatches(PositionList, BatchSize, CDB, CheckedList) -> KL_List = leveled_cdb:direct_fetch(CDB, Batch, key_size), fetch_inbatches(Tail, BatchSize, CDB, CheckedList ++ KL_List). -window_closed(_Timeout) -> - true. + + %%%============================================================================ diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index c5ddf94..b7a54a6 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -104,6 +104,7 @@ ink_fetch/3, ink_loadpcl/4, ink_registersnapshot/2, + ink_compactjournal/3, ink_close/1, ink_print_manifest/1, build_dummy_journal/0, @@ -126,8 +127,10 @@ active_journaldb :: pid(), active_journaldb_sqn :: integer(), removed_journaldbs = [] :: list(), + registered_snapshots = [] :: list(), root_path :: string(), - cdb_options :: #cdb_options{}}). + cdb_options :: #cdb_options{}, + clerk :: pid()}). %%%============================================================================ @@ -155,6 +158,9 @@ ink_close(Pid) -> ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). +ink_compactjournal(Pid, Penciller, Timeout) -> + gen_server:call(Pid, {compact_journal, Penciller, Timeout}, infinty). + ink_print_manifest(Pid) -> gen_server:call(Pid, print_manifest, infinity). @@ -221,15 +227,18 @@ handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> State#state.active_journaldb}], Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), {reply, Reply, State}; -handle_call({register_snapshot, _Requestor}, _From , State) -> - %% TODO: Not yet implemented registration of snapshot - %% Should return manifest and register the snapshot +handle_call({register_snapshot, Requestor}, _From , State) -> + Rs = [{Requestor, + State#state.manifest_sqn}|State#state.registered_snapshots], {reply, {State#state.manifest, State#state.active_journaldb}, - State}; + State#state{registered_snapshots=Rs}}; handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; +handle_call({compact_journal, Penciller, Timeout}, _From, State) -> + leveled_iclerk:clerk_compact(Penciller, Timeout), + {reply, ok, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -256,6 +265,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ start_from_file(InkerOpts) -> + {ok, Clerk} = leveled_iclerk:clerk_new(self()), RootPath = InkerOpts#inker_options.root_path, CDBopts = InkerOpts#inker_options.cdb_options, JournalFP = filepath(RootPath, journal_dir), @@ -288,7 +298,8 @@ start_from_file(InkerOpts) -> active_journaldb = ActiveJournal, active_journaldb_sqn = LowActiveSQN, root_path = RootPath, - cdb_options = CDBopts}}. + cdb_options = CDBopts, + clerk = Clerk}}. put_object(PrimaryKey, Object, KeyChanges, State) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 53c48fe..9747b9a 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -228,6 +228,7 @@ pcl_quickstart/1, pcl_pushmem/2, pcl_fetch/2, + pcl_checksequencenumber/3, pcl_workforclerk/1, pcl_requestmanifestchange/2, pcl_confirmdelete/2, @@ -297,6 +298,9 @@ pcl_pushmem(Pid, DumpList) -> pcl_fetch(Pid, Key) -> gen_server:call(Pid, {fetch, Key}, infinity). +pcl_checksequencenumber(Pid, Key, SQN) -> + gen_server:call(Pid, {check_sqn, Key, SQN}, infinity). + pcl_workforclerk(Pid) -> gen_server:call(Pid, work_for_clerk, infinity). @@ -334,9 +338,10 @@ init([PCLopts]) -> PCLopts#penciller_options.start_snapshot} of {undefined, true} -> SrcPenciller = PCLopts#penciller_options.source_penciller, - {ok, {LedgerSQN, - MemTableCopy, - Manifest}} = pcl_registersnapshot(SrcPenciller, self()), + {ok, + LedgerSQN, + Manifest, + MemTableCopy} = pcl_registersnapshot(SrcPenciller, self()), {ok, #state{memtable_copy=MemTableCopy, is_snapshot=true, @@ -349,7 +354,175 @@ init([PCLopts]) -> end. -handle_call({push_mem, DumpList}, From, State0) -> +handle_call({push_mem, DumpList}, From, State) -> + if + State#state.is_snapshot == true -> + {reply, bad_request, State}; + true -> + writer_call({push_mem, DumpList}, From, State) + end; +handle_call({confirm_delete, FileName}, _From, State) -> + if + State#state.is_snapshot == true -> + {reply, bad_request, State}; + true -> + writer_call({confirm_delete, FileName}, _From, State) + end; +handle_call(prompt_compaction, _From, State) -> + if + State#state.is_snapshot == true -> + {reply, bad_request, State}; + true -> + writer_call(prompt_compaction, _From, State) + end; +handle_call({manifest_change, WI}, _From, State) -> + if + State#state.is_snapshot == true -> + {reply, bad_request, State}; + true -> + writer_call({manifest_change, WI}, _From, State) + end; +handle_call({check_sqn, Key, SQN}, _From, State) -> + Obj = if + State#state.is_snapshot == true -> + fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot); + true -> + fetch(Key, + State#state.manifest, + State#state.memtable) + end, + Reply = case Obj of + not_present -> + false; + Obj -> + SQNToCompare = leveled_bookie:strip_to_seqonly(Obj), + if + SQNToCompare > SQN -> + false; + true -> + true + end + end, + {reply, Reply, State}; +handle_call({fetch, Key}, _From, State) -> + Reply = if + State#state.is_snapshot == true -> + fetch_snap(Key, + State#state.manifest, + State#state.levelzero_snapshot); + true -> + fetch(Key, + State#state.manifest, + State#state.memtable) + end, + {reply, Reply, State}; +handle_call(work_for_clerk, From, State) -> + {UpdState, Work} = return_work(State, From), + {reply, {Work, UpdState#state.backlog}, UpdState}; +handle_call(get_startup_sqn, _From, State) -> + {reply, State#state.ledger_sqn, State}; +handle_call({register_snapshot, Snapshot}, _From, State) -> + Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots], + {reply, + {ok, + State#state.ledger_sqn, + State#state.manifest, + State#state.memtable_copy}, + State#state{registered_snapshots = Rs}}; +handle_call({load_snapshot, Increment}, _From, State) -> + MemTableCopy = State#state.memtable_copy, + {Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree, + MemTableCopy#l0snapshot.increments, + MemTableCopy#l0snapshot.ledger_sqn), + if + TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn -> + pcl_updatesnapshotcache(State#state.source_penciller, + Tree0, + TreeSQN0) + end, + {Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0), + io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]), + {reply, ok, State#state{levelzero_snapshot=Tree1, + ledger_sqn=TreeSQN1, + snapshot_fully_loaded=true}}; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({update_snapshotcache, Tree, SQN}, State) -> + MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), + {noreply, State#state{memtable_copy=MemTableC}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State) -> + %% When a Penciller shuts down it isn't safe to try an manage the safe + %% finishing of any outstanding work. The last commmitted manifest will + %% be used. + %% + %% Level 0 files lie outside of the manifest, and so if there is no L0 + %% file present it is safe to write the current contents of memory. If + %% there is a L0 file present - then the memory can be dropped (it is + %% recoverable from the ledger, and there should not be a lot to recover + %% as presumably the ETS file has been recently flushed, hence the presence + %% of a L0 file). + %% + %% The penciller should close each file in the unreferenced files, and + %% then each file in the manifest, and cast a close on the clerk. + %% The cast may not succeed as the clerk could be synchronously calling + %% the penciller looking for a manifest commit + %% + if + State#state.is_snapshot == true -> + ok; + true -> + leveled_pclerk:clerk_stop(State#state.clerk), + Dump = ets:tab2list(State#state.memtable), + case {State#state.levelzero_pending, + get_item(0, State#state.manifest, []), length(Dump)} of + {?L0PEND_RESET, [], L} when L > 0 -> + MSN = State#state.manifest_sqn + 1, + FileName = State#state.root_path + ++ "/" ++ ?FILES_FP ++ "/" + ++ integer_to_list(MSN) ++ "_0_0", + NewSFT = leveled_sft:sft_new(FileName ++ ".pnd", + Dump, + [], + 0), + {ok, L0Pid, {{[], []}, _SK, _HK}} = NewSFT, + io:format("Dump of memory on close to filename ~s~n", + [FileName]), + leveled_sft:sft_close(L0Pid), + file:rename(FileName ++ ".pnd", FileName ++ ".sft"); + {?L0PEND_RESET, [], L} when L == 0 -> + io:format("No keys to dump from memory when closing~n"); + {{true, L0Pid, _TS}, _, _} -> + leveled_sft:sft_close(L0Pid), + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]); + _ -> + io:format("No opportunity to persist memory before closing" + ++ " with ~w keys discarded~n", + [length(Dump)]) + end, + ok = close_files(0, State#state.manifest), + lists:foreach(fun({_FN, Pid, _SN}) -> + leveled_sft:sft_close(Pid) end, + State#state.unreferenced_files), + ok + end. + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +writer_call({push_mem, DumpList}, From, State0) -> % The process for pushing to memory is as follows % - Check that the inbound list does not contain any Keys with a lower % sequence number than any existing keys (assess_sqn/1) @@ -432,12 +605,7 @@ handle_call({push_mem, DumpList}, From, State0) -> io:format("Empty request pushed to Penciller~n"), {reply, ok, State0} end; -handle_call({fetch, Key}, _From, State) -> - {reply, fetch(Key, State#state.manifest, State#state.memtable), State}; -handle_call(work_for_clerk, From, State) -> - {UpdState, Work} = return_work(State, From), - {reply, {Work, UpdState#state.backlog}, UpdState}; -handle_call({confirm_delete, FileName}, _From, State) -> +writer_call({confirm_delete, FileName}, _From, State) -> Reply = confirm_delete(FileName, State#state.unreferenced_files, State#state.registered_snapshots), @@ -448,7 +616,7 @@ handle_call({confirm_delete, FileName}, _From, State) -> _ -> {reply, Reply, State} end; -handle_call(prompt_compaction, _From, State) -> +writer_call(prompt_compaction, _From, State) -> %% If there is a prompt immediately after a L0 async write event then %% there exists the potential for the prompt to stall the database. %% Should only accept prompts if there has been a safe wait from the @@ -480,99 +648,10 @@ handle_call(prompt_compaction, _From, State) -> true -> {reply, ok, State#state{backlog=false}} end; -handle_call({manifest_change, WI}, _From, State) -> +writer_call({manifest_change, WI}, _From, State) -> {ok, UpdState} = commit_manifest_change(WI, State), - {reply, ok, UpdState}; -handle_call(get_startup_sqn, _From, State) -> - {reply, State#state.ledger_sqn, State}; -handle_call({register_snapshot, Snapshot}, _From, State) -> - Rs = [{Snapshot, State#state.ledger_sqn}|State#state.registered_snapshots], - {reply, - {ok, - State#state.ledger_sqn, - State#state.manifest, - State#state.memtable_copy}, - State#state{registered_snapshots = Rs}}; -handle_call({load_snapshot, Increment}, _From, State) -> - MemTableCopy = State#state.memtable_copy, - {Tree0, TreeSQN0} = roll_new_tree(MemTableCopy#l0snapshot.tree, - MemTableCopy#l0snapshot.increments, - MemTableCopy#l0snapshot.ledger_sqn), - if - TreeSQN0 > MemTableCopy#l0snapshot.ledger_sqn -> - pcl_updatesnapshotcache(State#state.source_penciller, - Tree0, - TreeSQN0) - end, - {Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0), - io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]), - {reply, ok, State#state{levelzero_snapshot=Tree1, - ledger_sqn=TreeSQN1, - snapshot_fully_loaded=true}}; -handle_call(close, _From, State) -> - {stop, normal, ok, State}. + {reply, ok, UpdState}. -handle_cast({update_snapshotcache, Tree, SQN}, State) -> - MemTableC = cache_tree_in_memcopy(State#state.memtable_copy, Tree, SQN), - {noreply, State#state{memtable_copy=MemTableC}}; -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - %% When a Penciller shuts down it isn't safe to try an manage the safe - %% finishing of any outstanding work. The last commmitted manifest will - %% be used. - %% - %% Level 0 files lie outside of the manifest, and so if there is no L0 - %% file present it is safe to write the current contents of memory. If - %% there is a L0 file present - then the memory can be dropped (it is - %% recoverable from the ledger, and there should not be a lot to recover - %% as presumably the ETS file has been recently flushed, hence the presence - %% of a L0 file). - %% - %% The penciller should close each file in the unreferenced files, and - %% then each file in the manifest, and cast a close on the clerk. - %% The cast may not succeed as the clerk could be synchronously calling - %% the penciller looking for a manifest commit - %% - leveled_pclerk:clerk_stop(State#state.clerk), - Dump = ets:tab2list(State#state.memtable), - case {State#state.levelzero_pending, - get_item(0, State#state.manifest, []), length(Dump)} of - {?L0PEND_RESET, [], L} when L > 0 -> - MSN = State#state.manifest_sqn + 1, - FileName = State#state.root_path - ++ "/" ++ ?FILES_FP ++ "/" - ++ integer_to_list(MSN) ++ "_0_0", - {ok, - L0Pid, - {{[], []}, _SK, _HK}} = leveled_sft:sft_new(FileName ++ ".pnd", - Dump, - [], - 0), - io:format("Dump of memory on close to filename ~s~n", [FileName]), - leveled_sft:sft_close(L0Pid), - file:rename(FileName ++ ".pnd", FileName ++ ".sft"); - {?L0PEND_RESET, [], L} when L == 0 -> - io:format("No keys to dump from memory when closing~n"); - {{true, L0Pid, _TS}, _, _} -> - leveled_sft:sft_close(L0Pid), - io:format("No opportunity to persist memory before closing " - ++ "with ~w keys discarded~n", [length(Dump)]); - _ -> - io:format("No opportunity to persist memory before closing " - ++ "with ~w keys discarded~n", [length(Dump)]) - end, - ok = close_files(0, State#state.manifest), - lists:foreach(fun({_FN, Pid, _SN}) -> leveled_sft:sft_close(Pid) end, - State#state.unreferenced_files), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. %%%============================================================================ @@ -744,6 +823,14 @@ roll_memory(State, MaxSize) -> end. +fetch_snap(Key, Manifest, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Value} -> + {Key, Value}; + none -> + fetch(Key, Manifest, 0, fun leveled_sft:sft_get/2) + end. + fetch(Key, Manifest, TID) -> case ets:lookup(TID, Key) of [Object] -> @@ -777,6 +864,7 @@ fetch(Key, Manifest, Level, FetchFun) -> ObjectFound end end. + %% Manifest lock - don't have two changes to the manifest happening %% concurrently @@ -862,7 +950,9 @@ roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> gb_trees:enter(K, V, TreeAcc) end, Tree, KVList), - roll_new_tree(UpdTree, TailIncs, SQN). + roll_new_tree(UpdTree, TailIncs, SQN); +roll_new_tree(Tree, [_H|TailIncs], HighSQN) -> + roll_new_tree(Tree, TailIncs, HighSQN). %% Update the memtable copy if the tree created advances the SQN cache_tree_in_memcopy(MemCopy, Tree, SQN) -> @@ -1146,9 +1236,9 @@ simple_server_test() -> ?assertMatch(R3, Key1), ?assertMatch(R4, Key2), S2 = pcl_pushmem(PCL, KL2), - if S2 == pause -> timer:sleep(2000); true -> ok end, + if S2 == pause -> timer:sleep(1000); true -> ok end, S3 = pcl_pushmem(PCL, [Key3]), - if S3 == pause -> timer:sleep(2000); true -> ok end, + if S3 == pause -> timer:sleep(1000); true -> ok end, R5 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), R6 = pcl_fetch(PCL, {o,"Bucket0002", "Key0002"}), R7 = pcl_fetch(PCL, {o,"Bucket0003", "Key0003"}), @@ -1163,7 +1253,7 @@ simple_server_test() -> 2001 -> %% Last push not persisted S3a = pcl_pushmem(PCL, [Key3]), - if S3a == pause -> timer:sleep(2000); true -> ok end, + if S3a == pause -> timer:sleep(1000); true -> ok end, ok; 2002 -> %% everything got persisted @@ -1182,11 +1272,11 @@ simple_server_test() -> ?assertMatch(R9, Key2), ?assertMatch(R10, Key3), S4 = pcl_pushmem(PCLr, KL3), - if S4 == pause -> timer:sleep(2000); true -> ok end, + if S4 == pause -> timer:sleep(1000); true -> ok end, S5 = pcl_pushmem(PCLr, [Key4]), - if S5 == pause -> timer:sleep(2000); true -> ok end, + if S5 == pause -> timer:sleep(1000); true -> ok end, S6 = pcl_pushmem(PCLr, KL4), - if S6 == pause -> timer:sleep(2000); true -> ok end, + if S6 == pause -> timer:sleep(1000); true -> ok end, R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), @@ -1195,6 +1285,52 @@ simple_server_test() -> ?assertMatch(R12, Key2), ?assertMatch(R13, Key3), ?assertMatch(R14, Key4), + SnapOpts = #penciller_options{start_snapshot = true, + source_penciller = PCLr, + requestor = self()}, + {ok, PclSnap} = pcl_start(SnapOpts), + ok = pcl_loadsnapshot(PclSnap, []), + ?assertMatch(Key1, pcl_fetch(PclSnap, {o,"Bucket0001", "Key0001"})), + ?assertMatch(Key2, pcl_fetch(PclSnap, {o,"Bucket0002", "Key0002"})), + ?assertMatch(Key3, pcl_fetch(PclSnap, {o,"Bucket0003", "Key0003"})), + ?assertMatch(Key4, pcl_fetch(PclSnap, {o,"Bucket0004", "Key0004"})), + ?assertMatch(true, pcl_checksequencenumber(PclSnap, + {o,"Bucket0001", "Key0001"}, + 1)), + ?assertMatch(true, pcl_checksequencenumber(PclSnap, + {o,"Bucket0002", "Key0002"}, + 1002)), + ?assertMatch(true, pcl_checksequencenumber(PclSnap, + {o,"Bucket0003", "Key0003"}, + 2002)), + ?assertMatch(true, pcl_checksequencenumber(PclSnap, + {o,"Bucket0004", "Key0004"}, + 3002)), + % Add some more keys and confirm that chekc sequence number still + % sees the old version in the previous snapshot, but will see the new version + % in a new snapshot + Key1A = {{o,"Bucket0001", "Key0001"}, {4002, {active, infinity}, null}}, + KL1A = lists:sort(leveled_sft:generate_randomkeys({4002, 2})), + S7 = pcl_pushmem(PCLr, [Key1A]), + if S7 == pause -> timer:sleep(1000); true -> ok end, + S8 = pcl_pushmem(PCLr, KL1A), + if S8 == pause -> timer:sleep(1000); true -> ok end, + ?assertMatch(true, pcl_checksequencenumber(PclSnap, + {o,"Bucket0001", "Key0001"}, + 1)), + ok = pcl_close(PclSnap), + {ok, PclSnap2} = pcl_start(SnapOpts), + ok = pcl_loadsnapshot(PclSnap2, []), + ?assertMatch(false, pcl_checksequencenumber(PclSnap2, + {o,"Bucket0001", "Key0001"}, + 1)), + ?assertMatch(true, pcl_checksequencenumber(PclSnap2, + {o,"Bucket0001", "Key0001"}, + 4002)), + ?assertMatch(true, pcl_checksequencenumber(PclSnap2, + {o,"Bucket0002", "Key0002"}, + 1002)), + ok = pcl_close(PclSnap2), ok = pcl_close(PCLr), clean_testdir(RootPath).