diff --git a/.gitignore b/.gitignore index 8031a86..b266ee3 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ .DS_Store rebar.lock test/test_area/* +cover +cover_* +.eqc-info diff --git a/current_counterexample.eqc b/current_counterexample.eqc index 775fd17..c4c9f59 100644 Binary files a/current_counterexample.eqc and b/current_counterexample.eqc differ diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 73c0c33..cfa4084 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -62,6 +62,7 @@ book_headonly/4, book_snapshot/4, book_compactjournal/2, + book_eqccompactjournal/2, book_islastcompactionpending/1, book_trimjournal/1, book_hotbackup/1, @@ -100,8 +101,8 @@ -include_lib("eunit/include/eunit.hrl"). -define(CACHE_SIZE, 2500). --define(MIN_CACHE_SIZE, 100). --define(MIN_PCL_CACHE_SIZE, 400). +-define(MIN_CACHE_SIZE, 1). +-define(MIN_PCL_CACHE_SIZE, 4). -define(MAX_PCL_CACHE_SIZE, 28000). % This is less than actual max - but COIN_SIDECOUNT -define(CACHE_SIZE_JITTER, 25). @@ -1005,6 +1006,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok. +-spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1013,9 +1015,13 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% the scheduling of Journla compaction is called externally, so it is assumed %% in Riak it will be triggered by a vnode callback. -book_compactjournal(Pid, Timeout) -> +book_eqccompactjournal(Pid, Timeout) -> gen_server:call(Pid, {compact_journal, Timeout}, infinity). +book_compactjournal(Pid, Timeout) -> + {ok, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + ok. + %% @doc Check on progress of the last compaction book_islastcompactionpending(Pid) -> @@ -1371,10 +1377,10 @@ handle_call({return_runner, QueryType}, _From, State) -> fold_countdown = CountDown}}; handle_call({compact_journal, Timeout}, _From, State) when State#state.head_only == false -> - ok = leveled_inker:ink_compactjournal(State#state.inker, + R = leveled_inker:ink_compactjournal(State#state.inker, self(), Timeout), - {reply, ok, State}; + {reply, R, State}; handle_call(confirm_compact, _From, State) when State#state.head_only == false -> {reply, leveled_inker:ink_compactionpending(State#state.inker), State}; diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 5f6b09a..f0a2318 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -182,6 +182,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> %% @doc %% Stop the clerk clerk_stop(Pid) -> + unlink(Pid), gen_server:cast(Pid, stop). -spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok. diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 3f27e31..cee237a 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -348,7 +348,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> as_ink}, infinity). --spec ink_compactjournal(pid(), pid(), integer()) -> ok. +-spec ink_compactjournal(pid(), pid(), integer()) -> {ok, pid()}. %% @doc %% Trigger a compaction event. the compaction event will use a sqn check %% against the Ledger to see if a value can be compacted - if the penciller @@ -612,7 +612,7 @@ handle_call({compact, FilterFun, self(), Timeout), - {reply, ok, State#state{compaction_pending=true}}; + {reply, {ok, State#state.clerk}, State#state{compaction_pending=true}}; handle_call(compaction_complete, _From, State) -> {reply, ok, State#state{compaction_pending=false}}; handle_call(compaction_pending, _From, State) -> diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index d86f9c5..efe4441 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -72,7 +72,7 @@ -include("include/leveled.hrl"). --define(MAX_SLOTS, 256). +-define(MAX_SLOTS, 2). -define(LOOK_SLOTSIZE, 128). % Maximum of 128 -define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE -define(NOLOOK_SLOTSIZE, 256). diff --git a/test/leveledjc_eqc.erl b/test/leveledjc_eqc.erl index eec9a64..18c80c8 100644 --- a/test/leveledjc_eqc.erl +++ b/test/leveledjc_eqc.erl @@ -24,7 +24,8 @@ -include_lib("eunit/include/eunit.hrl"). -include("../include/leveled.hrl"). --compile([export_all, nowarn_export_all]). +-compile([export_all, nowarn_export_all, {nowarn_deprecated_function, + [{gen_fsm, send_event, 2}]}]). -define(NUMTESTS, 1000). -define(QC_OUT(P), @@ -75,7 +76,7 @@ init_backend_args(#{dir := Dir, sut := Name} = S) -> case maps:get(start_opts, S, undefined) of undefined -> [ default(?RIAK_TAG, ?STD_TAG), %% Just test one tag at a time - [{root_path, Dir}, {log_level, error} | gen_opts()], Name ]; + [{root_path, Dir}, {log_level, error}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ]; Opts -> %% root_path is part of existing options [ maps:get(tag, S), Opts, Name ] @@ -93,6 +94,7 @@ init_backend_adapt(S, [Tag, Options, Name]) -> %% @doc init_backend - The actual operation %% Start the database and read data from disk init_backend(_Tag, Options, Name) -> + % Options0 = proplists:delete(log_level, Options), case leveled_bookie:book_start(Options) of {ok, Bookie} -> unlink(Bookie), @@ -133,9 +135,10 @@ stop(Pid) -> stop_next(S, _Value, [_Pid]) -> S#{leveled => undefined, - folders => [], - used_folders => [], - stop_folders => maps:get(folders, S, []) ++ maps:get(used_folders, S, [])}. + iclerk => undefined, + folders => [], + used_folders => [], + stop_folders => maps:get(folders, S, []) ++ maps:get(used_folders, S, [])}. stop_post(_S, [Pid], _Res) -> Mon = erlang:monitor(process, Pid), @@ -147,6 +150,75 @@ stop_post(_S, [Pid], _Res) -> end. +%% --- Operation: updateload --- +updateload_pre(S) -> + is_leveled_open(S). + +%% updateload for specific bucket (doesn't overlap with get/put/delete) +updateload_args(#{leveled := Pid, tag := Tag}) -> + ?LET(Categories, gen_categories(Tag), + ?LET({{Key, Bucket}, Value, IndexSpec, MetaData}, + {{gen_key(), <<"LoadBucket">>}, gen_val(), [{add, Cat, gen_index_value()} || Cat <- Categories ], []}, + case Tag of + ?STD_TAG -> [Pid, Bucket, Key, Value, Value, IndexSpec, Tag, MetaData]; + ?RIAK_TAG -> + Obj = testutil:riak_object(Bucket, Key, Value, MetaData), %% this has a side effect inserting a random nr + [Pid, Bucket, Key, Value, Obj, IndexSpec, Tag, MetaData] + end + )). + +updateload_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Value, _Obj, _, _, _]) -> + Pid == Leveled. + +updateload_adapt(#{leveled := Leveled}, [_, Bucket, Key, Value, Obj, Spec, Tag, MetaData]) -> + [ Leveled, Bucket, Key, Value, Obj, Spec, Tag, MetaData ]. + +%% @doc put - The actual operation +updateload(Pid, Bucket, Key, Value, Obj, Spec, Tag, MetaData) -> + Values = + case Tag of + ?STD_TAG -> multiply(100, Value); + ?RIAK_TAG -> + lists:map(fun(V) -> testutil:riak_object(Bucket, Key, V, MetaData) end, + multiply(100, Value)) + end ++ [Obj], + lists:map(fun(V) -> leveled_bookie:book_put(Pid, Bucket, Key, V, Spec, Tag) end, Values). + +multiply(1, _Value) -> + []; +multiply(N, Value) when N > 1 -> + <> = Value, + NewValue = <>, + [NewValue | multiply(N-1, NewValue)]. + +updateload_next(#{model := Model} = S, _V, [_Pid, Bucket, Key, _Value, Obj, Spec, _Tag, _MetaData]) -> + ?CMD_VALID(S, put, + begin + NewSpec = + case orddict:find({Bucket, Key}, Model) of + error -> merge_index_spec([], Spec); + {ok, {_, OldSpec}} -> + merge_index_spec(OldSpec, Spec) + end, + S#{model => orddict:store({Bucket, Key}, {Obj, NewSpec}, Model)} + end, + S). + +updateload_post(S, [_, _, _, _, _, _, _, _], Results) -> + lists:all(fun(Res) -> ?CMD_VALID(S, put, Res == ok, Res == {unsupported_message, put}) end, Results). + +updateload_features(#{previous_keys := PK} = S, [_Pid, Bucket, Key, _Value, _Obj, _, Tag, _], _Res) -> + ?CMD_VALID(S, put, + case + lists:member({Key, Bucket}, PK) of + true -> + [{updateload, update, Tag}]; + false -> + [{updateload, insert, Tag}] + end, + [{updateload, unsupported}]). + + %% --- Operation: put --- put_pre(S) -> is_leveled_open(S). @@ -496,6 +568,113 @@ kill(Pid) -> kill_next(S, Value, [Pid]) -> stop_next(S, Value, [Pid]). + + +%% --- Operation: compactisalive --- + +compactisalive_pre(S) -> + is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. + +compactisalive_args(#{iclerk := IClerk}) -> + [IClerk]. + +compactisalive_pre(#{iclerk := Pid}, [IClerk]) -> + Pid == IClerk. + +compactisalive(IClerk) -> + is_process_alive(IClerk). + +compactisalive_post(_S, [_IClerk], Res) -> + Res. + + +%% --- Operation: compacthappened --- + +compacthappened_pre(S) -> + is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. + +%% Commenting out args disables the operation +%% compacthappened_args(#{dir := DataDir}) -> +%% [DataDir]. + +compacthappened(DataDir) -> + PostCompact = filename:join(DataDir, "journal/journal_files/post_compact"), + case filelib:is_dir(PostCompact) of + true -> + {ok, Files} = file:list_dir(PostCompact), + Files; + false -> + [] + end. + +ledgerpersisted(DataDir) -> + LedgerPath = filename:join(DataDir, "ledger/ledger_files"), + case filelib:is_dir(LedgerPath) of + true -> + {ok, Files} = file:list_dir(LedgerPath), + Files; + false -> + [] + end. + +journalwritten(DataDir) -> + JournalPath = filename:join(DataDir, "journal/journal_files"), + case filelib:is_dir(JournalPath) of + true -> + {ok, Files} = file:list_dir(JournalPath), + Files; + false -> + [] + end. + +compacthappened_post(_S, [_DataDir], Res) -> + eq(Res, []). + + + +%% --- Operation: compact journal --- + +compact_pre(S) -> + is_leveled_open(S). + +compact_args(#{leveled := Pid}) -> + [Pid, nat()]. + +compact_pre(#{leveled := Leveled}, [Pid, _TS]) -> + Pid == Leveled. + +compact_adapt(#{leveled := Leveled}, [_Pid, TS]) -> + [ Leveled, TS ]. + +compact(Pid, TS) -> + {ok, IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS), + IClerk. + +compact_next(S, IClerk, [_Pid, _TS]) -> + case maps:get(iclerk, S, undefined) of + undefined -> + S#{iclerk => IClerk}; + _ -> + S + end. + +compact_post(S, [_Pid, _TS], Res) -> + case maps:get(iclerk, S, undefined) of + undefined -> + is_pid(Res); + IClerk -> + IClerk == Res + end. + +compact_features(S, [_Pid, _TS], _Res) -> + case maps:get(iclerk, S, undefined) of + undefined -> + [{compact, fresh}]; + _ -> + [{compact, repeat}] + end. + + %% Testing fold: %% Note async and sync mode! %% see https://github.com/martinsumner/riak_kv/blob/mas-2.2.5-tictactaae/src/riak_kv_leveled_backend.erl#L238-L419 @@ -816,11 +995,14 @@ stop_fold_features(S, [_, _], _) -> end ]. -weight(#{previous_keys := []}, Command) when Command == get; - Command == delete -> +weight(#{previous_keys := []}, get) -> + 1; +weight(#{previous_keys := []}, delete) -> 1; weight(S, C) when C == get; - C == put -> + C == put; + C == delete; + C == updateload -> ?CMD_VALID(S, put, 10, 1); weight(_S, stop) -> 1; @@ -849,7 +1031,8 @@ prop_db() -> eqc:dont_print_counterexample( ?LET(Shrinking, parameter(shrinking, false), ?FORALL({Kind, Cmds}, oneof([{seq, more_commands(20, commands(?MODULE))}, - {par, more_commands(2, parallel_commands(?MODULE))}]), + {par, more_commands(2, parallel_commands(?MODULE))} + ]), begin delete_level_data(Dir), ?IMPLIES(empty_dir(Dir), @@ -867,6 +1050,13 @@ prop_db() -> ], StartOptionFeatures = [ lists:keydelete(root_path, 1, Feature) || {start_options, Feature} <- call_features(history(RunResult)) ], + timer:sleep(1000), + CompactionFiles = compacthappened(Dir), + LedgerFiles = ledgerpersisted(Dir), + JournalFiles = journalwritten(Dir), + io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]), + + case whereis(maps:get(sut, initial_state())) of undefined -> delete_level_data(Dir); Pid when is_pid(Pid) -> @@ -885,6 +1075,9 @@ prop_db() -> measure(time_per_test, RunTime, aggregate(command_names(Cmds), collect(Kind, + measure(compaction_files, length(CompactionFiles), + measure(ledger_files, length(LedgerFiles), + measure(journal_files, length(JournalFiles), aggregate(with_title('Features'), CallFeatures, aggregate(with_title('Start Options'), StartOptionFeatures, features(CallFeatures, @@ -897,7 +1090,7 @@ prop_db() -> {data_cleanup, ?WHENFAIL(eqc:format("~s\n", [os:cmd("ls -Rl " ++ Dir)]), empty_dir(Dir))}, - {pid_cleanup, equals(Wait, [])}])))))))) + {pid_cleanup, equals(Wait, [])}]))))))))))) end)) end))). @@ -935,7 +1128,7 @@ gen_opts() -> {compression_method, elements([native, lz4])} , {compression_point, elements([on_compact, on_receipt])} %% , {max_journalsize, ?LET(N, nat(), 2048 + 1000 + 32 + 16 + 16 + N)} - , {cache_size, oneof([nat(), 2048, 2060, 5000])} + %% , {cache_size, oneof([nat(), 2048, 2060, 5000])} ]). options(GenList) -> @@ -952,7 +1145,7 @@ gen_bucket() -> elements([<<"bucket1">>, <<"bucket2">>, <<"bucket3">>]). gen_val() -> - noshrink(binary(32)). + noshrink(binary(256)). gen_categories(?RIAK_TAG) -> sublist(categories()); @@ -1044,11 +1237,16 @@ wait_for_procs(Known, Timeout) -> case erlang:processes() -- Known of [] -> []; Running -> - case Timeout > 0 of - true -> + if + Timeout > 100 -> timer:sleep(100), wait_for_procs(Known, Timeout - 100); - false -> + Timeout >= 0 -> + lists:map(fun(P) -> io:format("********* Sending timeout to ~w *********~n", [P]), gen_fsm:send_event(P, timeout) end, Running), + timer:sleep(100), + wait_for_procs(Known, Timeout - 100); + true -> + lists:foreach(fun(P) -> io:format("Process info : ~w~n", [process_info(P)]) end, Running), Running end end.