diff --git a/.eqc-info b/.eqc-info new file mode 100644 index 0000000..7dba85a Binary files /dev/null and b/.eqc-info differ diff --git a/.gitignore b/.gitignore index 8031a86..b266ee3 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ .DS_Store rebar.lock test/test_area/* +cover +cover_* +.eqc-info diff --git a/current_counterexample.eqc b/current_counterexample.eqc new file mode 100644 index 0000000..c4c9f59 Binary files /dev/null and b/current_counterexample.eqc differ diff --git a/rebar.config b/rebar.config index 352ec3f..9b92d91 100644 --- a/rebar.config +++ b/rebar.config @@ -11,8 +11,8 @@ {profiles, [{eqc, [{deps, [meck, fqc]}, - {erl_opts, [debug_info, {parse_transform, lager_transform}, {parse_transform, eqc_cover}]}, - {plugins, [rebar_eqc]}]} + {erl_opts, [debug_info, {parse_transform, eqc_cover}]}, + {extra_src_dirs, ["test"]}]} ]}. {deps, [ diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9e547b4..a0fd287 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -62,6 +62,7 @@ book_headonly/4, book_snapshot/4, book_compactjournal/2, + book_eqccompactjournal/2, book_islastcompactionpending/1, book_trimjournal/1, book_hotbackup/1, @@ -100,8 +101,8 @@ -include_lib("eunit/include/eunit.hrl"). -define(CACHE_SIZE, 2500). --define(MIN_CACHE_SIZE, 100). --define(MIN_PCL_CACHE_SIZE, 400). +-define(MIN_CACHE_SIZE, 1). +-define(MIN_PCL_CACHE_SIZE, 4). -define(MAX_PCL_CACHE_SIZE, 28000). % This is less than actual max - but COIN_SIDECOUNT -define(CACHE_SIZE_JITTER, 25). @@ -1005,6 +1006,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> -spec book_compactjournal(pid(), integer()) -> ok|busy. +-spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}. -spec book_islastcompactionpending(pid()) -> boolean(). -spec book_trimjournal(pid()) -> ok. @@ -1013,8 +1015,13 @@ book_snapshot(Pid, SnapType, Query, LongRunning) -> %% the scheduling of Journla compaction is called externally, so it is assumed %% in Riak it will be triggered by a vnode callback. +book_eqccompactjournal(Pid, Timeout) -> + {_R, P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + {ok, P}. + book_compactjournal(Pid, Timeout) -> - gen_server:call(Pid, {compact_journal, Timeout}, infinity). + {R, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity), + R. %% @doc Check on progress of the last compaction diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index d25f6d9..858a5e9 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -347,7 +347,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> as_ink}, infinity). --spec ink_compactjournal(pid(), pid(), integer()) -> ok|busy. +-spec ink_compactjournal(pid(), pid(), integer()) -> {ok|busy, pid()}. %% @doc %% Trigger a compaction event. the compaction event will use a sqn check %% against the Ledger to see if a value can be compacted - if the penciller @@ -565,9 +565,10 @@ handle_call({compact, CloseFun, FilterFun}, _From, State) -> + Clerk = State#state.clerk, case State#state.compaction_pending of true -> - {reply, busy, State}; + {reply, {busy, Clerk}, State}; false -> Manifest = leveled_imanifest:to_list(State#state.manifest), leveled_iclerk:clerk_compact(State#state.clerk, @@ -576,7 +577,7 @@ handle_call({compact, CloseFun, FilterFun, Manifest), - {reply, ok, State#state{compaction_pending=true}} + {reply, {ok, Clerk}, State#state{compaction_pending=true}} end; handle_call(compaction_pending, _From, State) -> {reply, State#state.compaction_pending, State}; diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index d86f9c5..efe4441 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -72,7 +72,7 @@ -include("include/leveled.hrl"). --define(MAX_SLOTS, 256). +-define(MAX_SLOTS, 2). -define(LOOK_SLOTSIZE, 128). % Maximum of 128 -define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE -define(NOLOOK_SLOTSIZE, 256). diff --git a/test/leveledjc_eqc.erl b/test/leveledjc_eqc.erl new file mode 100644 index 0000000..18c80c8 --- /dev/null +++ b/test/leveledjc_eqc.erl @@ -0,0 +1,1263 @@ +%% ------------------------------------------------------------------- +%% +%% leveld_eqc: basic statem for doing things to leveled +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(leveledjc_eqc). + +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_statem.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include("../include/leveled.hrl"). + +-compile([export_all, nowarn_export_all, {nowarn_deprecated_function, + [{gen_fsm, send_event, 2}]}]). + +-define(NUMTESTS, 1000). +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> + io:format(user, Str, Args) end, P)). + +-define(CMD_VALID(State, Cmd, True, False), + case is_valid_cmd(State, Cmd) of + true -> True; + false -> False + end). + + +eqc_test_() -> + Timeout = 50, + {timeout, max(2 * Timeout, Timeout + 10), + ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(Timeout, ?QC_OUT(prop_db()))))}. + +run() -> + run(?NUMTESTS). + +run(Count) -> + eqc:quickcheck(eqc:numtests(Count, prop_db())). + +check() -> + eqc:check(prop_db()). + +iff(B1, B2) -> B1 == B2. +implies(B1, B2) -> (not B1 orelse B2). + +%% start_opts should not be added to this map, it is added only when the system is started the first time. +initial_state() -> + #{dir => {var, dir}, + sut => sut, + leveled => undefined, %% to make adapt happy after failing pre/1 + counter => 0, + model => orddict:new(), + previous_keys => [], + deleted_keys => [], + folders => [] + }. + +%% --- Operation: init_backend --- +init_backend_pre(S) -> + not is_leveled_open(S). + +init_backend_args(#{dir := Dir, sut := Name} = S) -> + case maps:get(start_opts, S, undefined) of + undefined -> + [ default(?RIAK_TAG, ?STD_TAG), %% Just test one tag at a time + [{root_path, Dir}, {log_level, error}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ]; + Opts -> + %% root_path is part of existing options + [ maps:get(tag, S), Opts, Name ] + end. + +init_backend_pre(S, [Tag, Options, _]) -> + %% for shrinking + PreviousOptions = maps:get(start_opts, S, undefined), + maps:get(tag, S, Tag) == Tag andalso + PreviousOptions == undefined orelse PreviousOptions == Options. + +init_backend_adapt(S, [Tag, Options, Name]) -> + [ maps:get(tag, S, Tag), maps:get(start_opts, S, Options), Name]. + +%% @doc init_backend - The actual operation +%% Start the database and read data from disk +init_backend(_Tag, Options, Name) -> + % Options0 = proplists:delete(log_level, Options), + case leveled_bookie:book_start(Options) of + {ok, Bookie} -> + unlink(Bookie), + erlang:register(Name, Bookie), + Bookie; + Error -> Error + end. + +init_backend_next(S, LevelEdPid, [Tag, Options, _]) -> + S#{leveled => LevelEdPid, start_opts => Options, tag => Tag}. + +init_backend_post(_S, [_, _Options, _], LevelEdPid) -> + is_pid(LevelEdPid). + +init_backend_features(_S, [_Tag, Options, _], _Res) -> + [{start_options, Options}]. + + +%% --- Operation: stop --- +stop_pre(S) -> + is_leveled_open(S). + +%% @doc stop_args - Argument generator +stop_args(#{leveled := Pid}) -> + [Pid]. + +stop_pre(#{leveled := Leveled}, [Pid]) -> + %% check during shrinking + Pid == Leveled. + +stop_adapt(#{leveled := Leveled}, [_]) -> + [Leveled]. + +%% @doc stop - The actual operation +%% Stop the server, but the values are still on disk +stop(Pid) -> + ok = leveled_bookie:book_close(Pid). + +stop_next(S, _Value, [_Pid]) -> + S#{leveled => undefined, + iclerk => undefined, + folders => [], + used_folders => [], + stop_folders => maps:get(folders, S, []) ++ maps:get(used_folders, S, [])}. + +stop_post(_S, [Pid], _Res) -> + Mon = erlang:monitor(process, Pid), + receive + {'DOWN', Mon, _Type, Pid, _Info} -> + true + after 5000 -> + {still_a_pid, Pid} + end. + + +%% --- Operation: updateload --- +updateload_pre(S) -> + is_leveled_open(S). + +%% updateload for specific bucket (doesn't overlap with get/put/delete) +updateload_args(#{leveled := Pid, tag := Tag}) -> + ?LET(Categories, gen_categories(Tag), + ?LET({{Key, Bucket}, Value, IndexSpec, MetaData}, + {{gen_key(), <<"LoadBucket">>}, gen_val(), [{add, Cat, gen_index_value()} || Cat <- Categories ], []}, + case Tag of + ?STD_TAG -> [Pid, Bucket, Key, Value, Value, IndexSpec, Tag, MetaData]; + ?RIAK_TAG -> + Obj = testutil:riak_object(Bucket, Key, Value, MetaData), %% this has a side effect inserting a random nr + [Pid, Bucket, Key, Value, Obj, IndexSpec, Tag, MetaData] + end + )). + +updateload_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Value, _Obj, _, _, _]) -> + Pid == Leveled. + +updateload_adapt(#{leveled := Leveled}, [_, Bucket, Key, Value, Obj, Spec, Tag, MetaData]) -> + [ Leveled, Bucket, Key, Value, Obj, Spec, Tag, MetaData ]. + +%% @doc put - The actual operation +updateload(Pid, Bucket, Key, Value, Obj, Spec, Tag, MetaData) -> + Values = + case Tag of + ?STD_TAG -> multiply(100, Value); + ?RIAK_TAG -> + lists:map(fun(V) -> testutil:riak_object(Bucket, Key, V, MetaData) end, + multiply(100, Value)) + end ++ [Obj], + lists:map(fun(V) -> leveled_bookie:book_put(Pid, Bucket, Key, V, Spec, Tag) end, Values). + +multiply(1, _Value) -> + []; +multiply(N, Value) when N > 1 -> + <> = Value, + NewValue = <>, + [NewValue | multiply(N-1, NewValue)]. + +updateload_next(#{model := Model} = S, _V, [_Pid, Bucket, Key, _Value, Obj, Spec, _Tag, _MetaData]) -> + ?CMD_VALID(S, put, + begin + NewSpec = + case orddict:find({Bucket, Key}, Model) of + error -> merge_index_spec([], Spec); + {ok, {_, OldSpec}} -> + merge_index_spec(OldSpec, Spec) + end, + S#{model => orddict:store({Bucket, Key}, {Obj, NewSpec}, Model)} + end, + S). + +updateload_post(S, [_, _, _, _, _, _, _, _], Results) -> + lists:all(fun(Res) -> ?CMD_VALID(S, put, Res == ok, Res == {unsupported_message, put}) end, Results). + +updateload_features(#{previous_keys := PK} = S, [_Pid, Bucket, Key, _Value, _Obj, _, Tag, _], _Res) -> + ?CMD_VALID(S, put, + case + lists:member({Key, Bucket}, PK) of + true -> + [{updateload, update, Tag}]; + false -> + [{updateload, insert, Tag}] + end, + [{updateload, unsupported}]). + + +%% --- Operation: put --- +put_pre(S) -> + is_leveled_open(S). + +put_args(#{leveled := Pid, previous_keys := PK, tag := Tag}) -> + ?LET(Categories, gen_categories(Tag), + ?LET({{Key, Bucket}, Value, IndexSpec, MetaData}, + {gen_key_in_bucket(PK), gen_val(), [{add, Cat, gen_index_value()} || Cat <- Categories ], []}, + case Tag of + ?STD_TAG -> [Pid, Bucket, Key, Value, IndexSpec, elements([none, Tag])]; + ?RIAK_TAG -> + Obj = testutil:riak_object(Bucket, Key, Value, MetaData), + [Pid, Bucket, Key, Obj, IndexSpec, Tag] + end)). + +put_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Value, _, _]) -> + Pid == Leveled. + +put_adapt(#{leveled := Leveled}, [_, Bucket, Key, Value, Spec, Tag]) -> + [ Leveled, Bucket, Key, Value, Spec, Tag ]. + +%% @doc put - The actual operation +put(Pid, Bucket, Key, Value, Spec, none) -> + leveled_bookie:book_put(Pid, Bucket, Key, Value, Spec); +put(Pid, Bucket, Key, Value, Spec, Tag) -> + leveled_bookie:book_put(Pid, Bucket, Key, Value, Spec, Tag). + +put_next(#{model := Model, previous_keys := PK} = S, _Value, [_Pid, Bucket, Key, Value, Spec, _Tag]) -> + ?CMD_VALID(S, put, + begin + NewSpec = + case orddict:find({Bucket, Key}, Model) of + error -> merge_index_spec([], Spec); + {ok, {_, OldSpec}} -> + merge_index_spec(OldSpec, Spec) + end, + S#{model => orddict:store({Bucket, Key}, {Value, NewSpec}, Model), + previous_keys => PK ++ [{Key, Bucket}]} + end, + S). + +put_post(S, [_, _, _, _, _, _], Res) -> + ?CMD_VALID(S, put, eq(Res, ok), eq(Res, {unsupported_message, put})). + +put_features(#{previous_keys := PK} = S, [_Pid, Bucket, Key, _Value, _, Tag], _Res) -> + ?CMD_VALID(S, put, + case + lists:member({Key, Bucket}, PK) of + true -> + [{put, update, Tag}]; + false -> + [{put, insert, Tag}] + end, + [{put, unsupported}]). + +merge_index_spec(Spec, []) -> + Spec; +merge_index_spec(Spec, [{add, Cat, Idx} | Rest]) -> + merge_index_spec(lists:delete({Cat, Idx}, Spec) ++ [{Cat, Idx}], Rest); +merge_index_spec(Spec, [{remove, Cat, Idx} | Rest]) -> + merge_index_spec(lists:delete({Cat, Idx}, Spec), Rest). + + +%% --- Operation: get --- +get_pre(S) -> + is_leveled_open(S). + +get_args(#{leveled := Pid, previous_keys := PK, tag := Tag}) -> + ?LET({Key, Bucket}, gen_key_in_bucket(PK), + [Pid, Bucket, Key, case Tag of ?STD_TAG -> default(none, Tag); _ -> Tag end]). + +%% @doc get - The actual operation +get(Pid, Bucket, Key, none) -> + leveled_bookie:book_get(Pid, Bucket, Key); +get(Pid, Bucket, Key, Tag) -> + leveled_bookie:book_get(Pid, Bucket, Key, Tag). + +get_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Tag]) -> + Pid == Leveled. + +get_adapt(#{leveled := Leveled}, [_, Bucket, Key, Tag]) -> + [Leveled, Bucket, Key, Tag]. + +get_post(#{model := Model} = S, [_Pid, Bucket, Key, Tag], Res) -> + ?CMD_VALID(S, get, + case Res of + {ok, _} -> + {ok, {Value, _}} = orddict:find({Bucket, Key}, Model), + eq(Res, {ok, Value}); + not_found -> + %% Weird to be able to supply a tag, but must be STD_TAG... + Tag =/= ?STD_TAG orelse orddict:find({Bucket, Key}, Model) == error + end, + eq(Res, {unsupported_message, get})). + +get_features(#{deleted_keys := DK, previous_keys := PK}, [_Pid, Bucket, Key, _Tag], Res) -> + case Res of + not_found -> + [{get, not_found, deleted} || lists:member({Key, Bucket}, DK)] ++ + [{get, not_found, not_inserted} || not lists:member({Key, Bucket}, PK)]; + {ok, B} when is_binary(B) -> + [{get, found}]; + {unsupported_message, _} -> + [{get, unsupported}] + end. + +%% --- Operation: mput --- +mput_pre(S) -> + is_leveled_open(S). + +%% @doc mput_args - Argument generator +%% Specification says: duplicated should be removed +%% "%% The list should be de-duplicated before it is passed to the bookie." +%% Wether this means that keys should be unique or even Action and values is unclear. +%% Slack discussion: +%% `[{add, B1, K1, SK1}, {add, B1, K1, SK2}]` should be fine (same bucket and key, different subkey) +%% +%% Really weird to have to specify a value in case of a remove action +mput_args(#{leveled := Pid, previous_keys := PK}) -> + ?LET(Objs, list({gen_key_in_bucket(PK), nat()}), + [Pid, [ {weighted_default({5, add}, {1, remove}), Bucket, Key, SubKey, gen_val()} || {{Key, Bucket}, SubKey} <- Objs ]]). + + +mput_pre(#{leveled := Leveled}, [Pid, ObjSpecs]) -> + Pid == Leveled andalso no_key_dups(ObjSpecs) == ObjSpecs. + +mput_adapt(#{leveled := Leveled}, [_, ObjSpecs]) -> + [ Leveled, no_key_dups(ObjSpecs) ]. + +mput(Pid, ObjSpecs) -> + leveled_bookie:book_mput(Pid, ObjSpecs). + +mput_next(S, _, [_Pid, ObjSpecs]) -> + ?CMD_VALID(S, mput, + lists:foldl(fun({add, Bucket, Key, _SubKey, Value}, #{model := Model, previous_keys := PK} = Acc) -> + Acc#{model => orddict:store({Bucket, Key}, {Value, []}, Model), + previous_keys => PK ++ [{Key, Bucket}]}; + ({remove, Bucket, Key, _SubKey, _Value}, #{model := Model} = Acc) -> + Acc#{model => orddict:erase({Bucket, Key}, Model)} + end, S, ObjSpecs), + S). + +mput_post(S, [_, _], Res) -> + ?CMD_VALID(S, mput, eq(Res, ok), eq(Res, {unsupported_message, mput})). + +mput_features(S, [_Pid, ObjSpecs], _Res) -> + ?CMD_VALID(S, mput, + {mput, [ element(1, ObjSpec) || ObjSpec <- ObjSpecs ]}, + [{mput, unsupported}]). + +%% --- Operation: head --- +head_pre(S) -> + is_leveled_open(S). + +head_args(#{leveled := Pid, previous_keys := PK, tag := Tag}) -> + ?LET({Key, Bucket}, gen_key_in_bucket(PK), + [Pid, Bucket, Key, Tag]). + +head_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Tag]) -> + Pid == Leveled. + +head_adapt(#{leveled := Leveled}, [_, Bucket, Key, Tag]) -> + [Leveled, Bucket, Key, Tag]. + +head(Pid, Bucket, Key, none) -> + leveled_bookie:book_head(Pid, Bucket, Key); +head(Pid, Bucket, Key, Tag) -> + leveled_bookie:book_head(Pid, Bucket, Key, Tag). + +head_post(#{model := Model} = S, [_Pid, Bucket, Key, Tag], Res) -> + ?CMD_VALID(S, head, + case Res of + {ok, _MetaData} -> + orddict:find({Bucket, Key}, Model) =/= error; + not_found -> + %% Weird to be able to supply a tag, but must be STD_TAG... + implies(lists:member(maps:get(start_opts, S), [{head_only, with_lookup}]), + lists:member(Tag, [?STD_TAG, none, ?HEAD_TAG])) orelse + orddict:find({Bucket, Key}, Model) == error; + {unsupported_message, head} -> + Tag =/= ?HEAD_TAG + end, + eq(Res, {unsupported_message, head})). + +head_features(#{deleted_keys := DK, previous_keys := PK}, [_Pid, Bucket, Key, _Tag], Res) -> + case Res of + not_found -> + [{head, not_found, deleted} || lists:member({Key, Bucket}, DK)] ++ + [{head, not_found, not_inserted} || not lists:member({Key, Bucket}, PK)]; + {ok, {_, _, _}} -> %% Metadata + [{head, found}]; + {ok, Bin} when is_binary(Bin) -> + [{head, found_riak_object}]; + {unsupported_message, _} -> + [{head, unsupported}] + end. + + +%% --- Operation: delete --- +delete_pre(S) -> + is_leveled_open(S). + +delete_args(#{leveled := Pid, previous_keys := PK, tag := Tag}) -> + ?LET({Key, Bucket}, gen_key_in_bucket(PK), + [Pid, Bucket, Key, [], Tag]). + +delete_pre(#{leveled := Leveled, model := Model}, [Pid, Bucket, Key, Spec, _Tag]) -> + Pid == Leveled andalso + case orddict:find({Bucket, Key}, Model) of + error -> true; + {ok, {_, OldSpec}} -> + Spec == OldSpec + end. + +delete_adapt(#{leveled := Leveled, model := Model}, [_, Bucket, Key, Spec, Tag]) -> + NewSpec = + case orddict:find({Bucket, Key}, Model) of + error -> Spec; + {ok, {_, OldSpec}} -> + Spec == OldSpec + end, + [ Leveled, Bucket, Key, NewSpec, Tag ]. + +delete(Pid, Bucket, Key, Spec, ?STD_TAG) -> + leveled_bookie:book_delete(Pid, Bucket, Key, Spec); +delete(Pid, Bucket, Key, Spec, Tag) -> + leveled_bookie:book_put(Pid, Bucket, Key, delete, Spec, Tag). + +delete_next(#{model := Model, deleted_keys := DK} = S, _Value, [_Pid, Bucket, Key, _, _]) -> + ?CMD_VALID(S, delete, + S#{model => orddict:erase({Bucket, Key}, Model), + deleted_keys => DK ++ [{Key, Bucket} || orddict:is_key({Key, Bucket}, Model)]}, + S). + +delete_post(S, [_Pid, _Bucket, _Key, _, _], Res) -> + ?CMD_VALID(S, delete, + eq(Res, ok), + case Res of + {unsupported_message, _} -> true; + _ -> Res + end). + +delete_features(#{previous_keys := PK} = S, [_Pid, Bucket, Key, _, _], _Res) -> + ?CMD_VALID(S, delete, + case lists:member({Key, Bucket}, PK) of + true -> + [{delete, existing}]; + false -> + [{delete, none_existing}] + end, + [{delete, unsupported}]). + +%% --- Operation: is_empty --- +is_empty_pre(S) -> + is_leveled_open(S). + +is_empty_args(#{leveled := Pid, tag := Tag}) -> + [Pid, Tag]. + + +is_empty_pre(#{leveled := Leveled}, [Pid, _]) -> + Pid == Leveled. + +is_empty_adapt(#{leveled := Leveled}, [_, Tag]) -> + [Leveled, Tag]. + +%% @doc is_empty - The actual operation +is_empty(Pid, Tag) -> + leveled_bookie:book_isempty(Pid, Tag). + +is_empty_post(#{model := Model}, [_Pid, _Tag], Res) -> + Size = orddict:size(Model), + case Res of + true -> eq(0, Size); + false when Size == 0 -> expected_empty; + false when Size > 0 -> true + end. + +is_empty_features(_S, [_Pid, _], Res) -> + [{empty, Res}]. + +%% --- Operation: drop --- +drop_pre(S) -> + is_leveled_open(S). + +drop_args(#{leveled := Pid, dir := Dir} = S) -> + ?LET([Tag, _, Name], init_backend_args(S), + [Pid, Tag, [{root_path, Dir} | gen_opts()], Name]). + +drop_pre(#{leveled := Leveled} = S, [Pid, Tag, Opts, Name]) -> + Pid == Leveled andalso init_backend_pre(S, [Tag, Opts, Name]). + +drop_adapt(#{leveled := Leveled} = S, [_Pid, Tag, Opts, Name]) -> + [Leveled | init_backend_adapt(S, [Tag, Opts, Name])]. + +%% @doc drop - The actual operation +%% Remove fles from disk (directory structure may remain) and start a new clean database +drop(Pid, Tag, Opts, Name) -> + Mon = erlang:monitor(process, Pid), + ok = leveled_bookie:book_destroy(Pid), + receive + {'DOWN', Mon, _Type, Pid, _Info} -> + init_backend(Tag, Opts, Name) + after 5000 -> + {still_alive, Pid, Name} + end. + +drop_next(S, Value, [Pid, Tag, Opts, Name]) -> + S1 = stop_next(S, Value, [Pid]), + init_backend_next(S1#{model => orddict:new()}, + Value, [Tag, Opts, Name]). + +drop_post(_S, [_Pid, _Tag, _Opts, _], Res) -> + case is_pid(Res) of + true -> true; + false -> Res + end. + +drop_features(#{model := Model}, [_Pid, _Tag, _Opts, _], _Res) -> + Size = orddict:size(Model), + [{drop, empty} || Size == 0 ] ++ + [{drop, small} || Size > 0 andalso Size < 20 ] ++ + [{drop, medium} || Size >= 20 andalso Size < 1000 ] ++ + [{drop, large} || Size >= 1000 ]. + + + +%% --- Operation: kill --- +%% Test that killing the root Pid of leveled has the same effect as closing it nicely +%% that means, we don't loose data! Not even when parallel successful puts are going on. +kill_pre(S) -> + is_leveled_open(S). + +kill_args(#{leveled := Pid}) -> + [Pid]. + +kill_pre(#{leveled := Leveled}, [Pid]) -> + Pid == Leveled. + +kill_adapt(#{leveled := Leveled}, [_]) -> + [ Leveled ]. + +kill(Pid) -> + exit(Pid, kill), + timer:sleep(1). + +kill_next(S, Value, [Pid]) -> + stop_next(S, Value, [Pid]). + + + +%% --- Operation: compactisalive --- + +compactisalive_pre(S) -> + is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. + +compactisalive_args(#{iclerk := IClerk}) -> + [IClerk]. + +compactisalive_pre(#{iclerk := Pid}, [IClerk]) -> + Pid == IClerk. + +compactisalive(IClerk) -> + is_process_alive(IClerk). + +compactisalive_post(_S, [_IClerk], Res) -> + Res. + + +%% --- Operation: compacthappened --- + +compacthappened_pre(S) -> + is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined. + +%% Commenting out args disables the operation +%% compacthappened_args(#{dir := DataDir}) -> +%% [DataDir]. + +compacthappened(DataDir) -> + PostCompact = filename:join(DataDir, "journal/journal_files/post_compact"), + case filelib:is_dir(PostCompact) of + true -> + {ok, Files} = file:list_dir(PostCompact), + Files; + false -> + [] + end. + +ledgerpersisted(DataDir) -> + LedgerPath = filename:join(DataDir, "ledger/ledger_files"), + case filelib:is_dir(LedgerPath) of + true -> + {ok, Files} = file:list_dir(LedgerPath), + Files; + false -> + [] + end. + +journalwritten(DataDir) -> + JournalPath = filename:join(DataDir, "journal/journal_files"), + case filelib:is_dir(JournalPath) of + true -> + {ok, Files} = file:list_dir(JournalPath), + Files; + false -> + [] + end. + +compacthappened_post(_S, [_DataDir], Res) -> + eq(Res, []). + + + +%% --- Operation: compact journal --- + +compact_pre(S) -> + is_leveled_open(S). + +compact_args(#{leveled := Pid}) -> + [Pid, nat()]. + +compact_pre(#{leveled := Leveled}, [Pid, _TS]) -> + Pid == Leveled. + +compact_adapt(#{leveled := Leveled}, [_Pid, TS]) -> + [ Leveled, TS ]. + +compact(Pid, TS) -> + {ok, IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS), + IClerk. + +compact_next(S, IClerk, [_Pid, _TS]) -> + case maps:get(iclerk, S, undefined) of + undefined -> + S#{iclerk => IClerk}; + _ -> + S + end. + +compact_post(S, [_Pid, _TS], Res) -> + case maps:get(iclerk, S, undefined) of + undefined -> + is_pid(Res); + IClerk -> + IClerk == Res + end. + +compact_features(S, [_Pid, _TS], _Res) -> + case maps:get(iclerk, S, undefined) of + undefined -> + [{compact, fresh}]; + _ -> + [{compact, repeat}] + end. + + +%% Testing fold: +%% Note async and sync mode! +%% see https://github.com/martinsumner/riak_kv/blob/mas-2.2.5-tictactaae/src/riak_kv_leveled_backend.erl#L238-L419 + +%% --- Operation: index folding --- +indexfold_pre(S) -> + is_leveled_open(S). + +indexfold_args(#{leveled := Pid, counter := Counter, previous_keys := PK}) -> + ?LET({Key, Bucket}, gen_key_in_bucket(PK), + [Pid, default(Bucket, {Bucket, Key}), gen_foldacc(3), + ?LET({[N], M}, {gen_index_value(), choose(0,2)}, {gen_category(), [N], [N+M]}), + {bool(), + oneof([undefined, gen_index_value()])}, + Counter %% add a unique counter + ]). + +indexfold_pre(#{leveled := Leveled}, [Pid, _Constraint, _FoldAccT, _Range, _TermHandling, _Counter]) -> + %% Make sure we operate on an existing Pid when shrinking + %% Check start options validity as well? + Pid == Leveled. + +indexfold_adapt(#{leveled := Leveled}, [_, Constraint, FoldAccT, Range, TermHandling, Counter]) -> + %% Keep the counter! + [Leveled, Constraint, FoldAccT, Range, TermHandling, Counter]. + +indexfold(Pid, Constraint, FoldAccT, Range, {_, undefined} = TermHandling, _Counter) -> + {async, Folder} = leveled_bookie:book_indexfold(Pid, Constraint, FoldAccT, Range, TermHandling), + Folder; +indexfold(Pid, Constraint, FoldAccT, Range, {ReturnTerms, RegExp}, _Counter) -> + {ok, RE} = re:compile(RegExp), + {async, Folder} = leveled_bookie:book_indexfold(Pid, Constraint, FoldAccT, Range, {ReturnTerms, RE}), + Folder. + +indexfold_next(#{folders := Folders} = S, SymFolder, + [_, Constraint, {Fun, Acc}, {Category, From, To}, {ReturnTerms, RegExp}, Counter]) -> + ConstraintFun = + fun(B, K, Bool) -> + case Constraint of + {B, KStart} -> not Bool orelse K >= KStart; + B -> true; + _ -> false + end + end, + S#{folders => + Folders ++ + [#{counter => Counter, + type => indexfold, + folder => SymFolder, + reusable => true, + result => fun(Model) -> + Select = + lists:sort( + orddict:fold(fun({B, K}, {_V, Spec}, A) -> + [ {B, {Idx, K}} + || {Cat, Idx} <- Spec, + Idx >= From, Idx =< To, + Cat == Category, + ConstraintFun(B, K, Idx == From), + RegExp == undefined orelse string:find(Idx, RegExp) =/= nomatch + ] ++ A + end, [], Model)), + lists:foldl(fun({B, NK}, A) when ReturnTerms -> + Fun(B, NK, A); + ({B, {_, NK}}, A) -> + Fun(B, NK, A) + end, Acc, Select) + end + }], + counter => Counter + 1}. + +indexfold_post(_S, _, Res) -> + is_function(Res). + +indexfold_features(_S, [_Pid, Constraint, FoldAccT, _Range, {ReturnTerms, _}, _Counter], _Res) -> + [{foldAccT, FoldAccT}] ++ %% This will be extracted for printing later + [{index_fold, bucket} || not is_tuple(Constraint) ] ++ + [{index_fold, bucket_and_primary_key} || is_tuple(Constraint)] ++ + [{index_fold, return_terms, ReturnTerms} ]. + + + + +%% --- Operation: keylist folding --- +%% slack discussion: "`book_keylist` only passes `Bucket` and `Key` into the accumulator, ignoring SubKey - +%% so I don't think this can be used in head_only mode to return results that make sense" +%% +%% There are also keylist functions that take a specific bucket and range into account. Not considered yet. +keylistfold_pre(S) -> + is_leveled_open(S). + +keylistfold_args(#{leveled := Pid, counter := Counter, tag := Tag}) -> + [Pid, Tag, gen_foldacc(3), + Counter %% add a unique counter + ]. + +keylistfold_pre(#{leveled := Leveled}, [Pid, _Tag, _FoldAccT, _Counter]) -> + %% Make sure we operate on an existing Pid when shrinking + %% Check start options validity as well? + Pid == Leveled. + +keylistfold_adapt(#{leveled := Leveled}, [_, Tag, FoldAccT, Counter]) -> + %% Keep the counter! + [Leveled, Tag, FoldAccT, Counter]. + +keylistfold(Pid, Tag, FoldAccT, _Counter) -> + {async, Folder} = leveled_bookie:book_keylist(Pid, Tag, FoldAccT), + Folder. + +keylistfold_next(#{folders := Folders, model := Model} = S, SymFolder, + [_, _Tag, {Fun, Acc}, Counter]) -> + S#{folders => + Folders ++ + [#{counter => Counter, + type => keylist, + folder => SymFolder, + reusable => false, + result => fun(_) -> orddict:fold(fun({B, K}, _V, A) -> Fun(B, K, A) end, Acc, Model) end + }], + counter => Counter + 1}. + +keylistfold_post(_S, _, Res) -> + is_function(Res). + +keylistfold_features(_S, [_Pid, _Tag, FoldAccT, _Counter], _Res) -> + [{foldAccT, FoldAccT}]. %% This will be extracted for printing later + + +%% --- Operation: bucketlistfold --- +bucketlistfold_pre(S) -> + is_leveled_open(S). + +bucketlistfold_args(#{leveled := Pid, counter := Counter, tag := Tag}) -> + [Pid, Tag, gen_foldacc(2), elements([first, all]), Counter]. + +bucketlistfold_pre(#{leveled := Leveled}, [Pid, _Tag, _FoldAccT, _Constraints, _]) -> + Pid == Leveled. + +bucketlistfold_adapt(#{leveled := Leveled}, [_Pid, Tag, FoldAccT, Constraints, Counter]) -> + [Leveled, Tag, FoldAccT, Constraints, Counter]. + +bucketlistfold(Pid, Tag, FoldAccT, Constraints, _) -> + {async, Folder} = leveled_bookie:book_bucketlist(Pid, Tag, FoldAccT, Constraints), + Folder. + +bucketlistfold_next(#{folders := Folders} = S, SymFolder, + [_, _, {Fun, Acc}, Constraints, Counter]) -> + S#{folders => + Folders ++ + [#{counter => Counter, + type => bucketlist, + folder => SymFolder, + reusable => true, + result => fun(Model) -> + Bs = orddict:fold(fun({B, _K}, _V, A) -> A ++ [B || not lists:member(B, A)] end, [], Model), + case {Constraints, Bs} of + {all, _} -> + lists:foldl(fun(B, A) -> Fun(B, A) end, Acc, Bs); + {first, []} -> + Acc; + {first, [First|_]} -> + lists:foldl(fun(B, A) -> Fun(B, A) end, Acc, [First]) + end + end + }], + counter => Counter + 1}. + +bucketlistfold_post(_S, [_Pid, _Tag, _FoldAccT, _Constraints, _], Res) -> + is_function(Res). + +bucketlistfold_features(_S, [_Pid, _Tag, FoldAccT, _Constraints, _], _Res) -> + [ {foldAccT, FoldAccT} ]. + +%% --- Operation: objectfold --- +objectfold_pre(S) -> + is_leveled_open(S). + +objectfold_args(#{leveled := Pid, counter := Counter, tag := Tag}) -> + [Pid, Tag, gen_foldacc(4), bool(), Counter]. + +objectfold_pre(#{leveled := Leveled}, [Pid, _Tag, _FoldAccT, _Snapshot, _Counter]) -> + Leveled == Pid. + +objectfold_adapt(#{leveled := Leveled}, [_Pid, Tag, FoldAccT, Snapshot, Counter]) -> + [Leveled, Tag, FoldAccT, Snapshot, Counter]. + +objectfold(Pid, Tag, FoldAccT, Snapshot, _Counter) -> + {async, Folder} = leveled_bookie:book_objectfold(Pid, Tag, FoldAccT, Snapshot), + Folder. + +objectfold_next(#{folders := Folders, model := Model} = S, SymFolder, + [_Pid, _Tag, {Fun, Acc}, Snapshot, Counter]) -> + S#{folders => + Folders ++ + [#{counter => Counter, + type => objectfold, + folder => SymFolder, + reusable => not Snapshot, + result => fun(M) -> + OnModel = + case Snapshot of + true -> Model; + false -> M + end, + Objs = orddict:fold(fun({B, K}, {V, _}, A) -> [{B, K, V} | A] end, [], OnModel), + lists:foldr(fun({B, K, V}, A) -> Fun(B, K, V, A) end, Acc, Objs) + end + }], + counter => Counter + 1}. + +objectfold_post(_S, [_Pid, _Tag, _FoldAccT, _Snapshot, _Counter], Res) -> + is_function(Res). + +objectfold_features(_S, [_Pid, _Tag, FoldAccT, _Snapshot, _Counter], _Res) -> + [{foldAccT, FoldAccT}]. %% This will be extracted for printing later + + + + +%% --- Operation: fold_run --- +fold_run_pre(S) -> + maps:get(folders, S, []) =/= []. + +fold_run_args(#{folders := Folders}) -> + ?LET(#{counter := Counter, folder := Folder}, elements(Folders), + [Counter, Folder]). + +fold_run_pre(#{folders := Folders}, [Counter, _Folder]) -> + %% Ensure membership even under shrinking + %% Counter is fixed at first generation and does not shrink! + get_foldobj(Folders, Counter) =/= undefined. + +fold_run(_, Folder) -> + catch Folder(). + +fold_run_next(#{folders := Folders} = S, _Value, [Counter, _Folder]) -> + %% leveled_runner comment: "Iterators should de-register themselves from the Penciller on completion." + FoldObj = get_foldobj(Folders, Counter), + case FoldObj of + #{reusable := false} -> + UsedFolders = maps:get(used_folders, S, []), + S#{folders => Folders -- [FoldObj], + used_folders => UsedFolders ++ [FoldObj]}; + _ -> + S + end. + +fold_run_post(#{folders := Folders, leveled := Leveled, model := Model}, [Count, _], Res) -> + case Leveled of + undefined -> + is_exit(Res); + _ -> + #{result := ResFun} = get_foldobj(Folders, Count), + eq(Res, ResFun(Model)) + end. + +fold_run_features(#{folders := Folders, leveled := Leveled}, [Count, _Folder], Res) -> + #{type := Type} = get_foldobj(Folders, Count), + [ {fold_run, Type} || Leveled =/= undefined ] ++ + [ fold_run_on_stopped_leveled || Leveled == undefined ] ++ + [ {fold_run, found_list, length(Res)}|| is_list(Res) ] ++ + [ {fold_run, found_integer}|| is_integer(Res) ]. + + +%% --- Operation: fold_run on already used folder --- +%% A fold that has already ran to completion should results in an exception when re-used. +%% leveled_runner comment: "Iterators should de-register themselves from the Penciller on completion." +noreuse_fold_pre(S) -> + maps:get(used_folders, S, []) =/= []. + +noreuse_fold_args(#{used_folders := Folders}) -> + ?LET(#{counter := Counter, folder := Folder}, elements(Folders), + [Counter, Folder]). + +noreuse_fold_pre(S, [Counter, _Folder]) -> + %% Ensure membership even under shrinking + %% Counter is fixed at first generation and does not shrink! + lists:member(Counter, + [ maps:get(counter, Used) || Used <- maps:get(used_folders, S, []) ]). + +noreuse_fold(_, Folder) -> + catch Folder(). + +noreuse_fold_post(_S, [_, _], Res) -> + is_exit(Res). + +noreuse_fold_features(_, [_, _], _) -> + [ reuse_fold ]. + + +%% --- Operation: fold_run on folder that survived a crash --- +%% A fold that has already ran to completion should results in an exception when re-used. +stop_fold_pre(S) -> + maps:get(stop_folders, S, []) =/= []. + +stop_fold_args(#{stop_folders := Folders}) -> + ?LET(#{counter := Counter, folder := Folder}, elements(Folders), + [Counter, Folder]). + +stop_fold_pre(S, [Counter, _Folder]) -> + %% Ensure membership even under shrinking + %% Counter is fixed at first generation and does not shrink! + lists:member(Counter, + [ maps:get(counter, Used) || Used <- maps:get(stop_folders, S, []) ]). + +stop_fold(_, Folder) -> + catch Folder(). + +stop_fold_post(_S, [_Counter, _], Res) -> + is_exit(Res). + +stop_fold_features(S, [_, _], _) -> + [ case maps:get(leveled, S) of + undefined -> + stop_fold_when_closed; + _ -> + stop_fold_when_open + end ]. + + +weight(#{previous_keys := []}, get) -> + 1; +weight(#{previous_keys := []}, delete) -> + 1; +weight(S, C) when C == get; + C == put; + C == delete; + C == updateload -> + ?CMD_VALID(S, put, 10, 1); +weight(_S, stop) -> + 1; +weight(_, _) -> + 1. + + +is_valid_cmd(S, put) -> + not in_head_only_mode(S); +is_valid_cmd(S, delete) -> + is_valid_cmd(S, put); +is_valid_cmd(S, get) -> + not in_head_only_mode(S); +is_valid_cmd(S, head) -> + not lists:member({head_only, no_lookup}, maps:get(start_opts, S, [])); +is_valid_cmd(S, mput) -> + in_head_only_mode(S). + + + +%% @doc check that the implementation of leveled is equivalent to a +%% sorted dict at least +-spec prop_db() -> eqc:property(). +prop_db() -> + Dir = "./leveled_data", + eqc:dont_print_counterexample( + ?LET(Shrinking, parameter(shrinking, false), + ?FORALL({Kind, Cmds}, oneof([{seq, more_commands(20, commands(?MODULE))}, + {par, more_commands(2, parallel_commands(?MODULE))} + ]), + begin + delete_level_data(Dir), + ?IMPLIES(empty_dir(Dir), + ?ALWAYS(if Shrinking -> 10; true -> 1 end, + begin + Procs = erlang:processes(), + StartTime = erlang:system_time(millisecond), + + RunResult = execute(Kind, Cmds, [{dir, Dir}]), + %% Do not extract the 'state' from this tuple, since parallel commands + %% miss the notion of final state. + CallFeatures = [ Feature || Feature <- call_features(history(RunResult)), + not is_foldaccT(Feature), + not (is_tuple(Feature) andalso element(1, Feature) == start_options) + ], + StartOptionFeatures = [ lists:keydelete(root_path, 1, Feature) || {start_options, Feature} <- call_features(history(RunResult)) ], + + timer:sleep(1000), + CompactionFiles = compacthappened(Dir), + LedgerFiles = ledgerpersisted(Dir), + JournalFiles = journalwritten(Dir), + io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]), + + + case whereis(maps:get(sut, initial_state())) of + undefined -> delete_level_data(Dir); + Pid when is_pid(Pid) -> + leveled_bookie:book_destroy(Pid) + end, + + Wait = wait_for_procs(Procs, 1500), + RunTime = erlang:system_time(millisecond) - StartTime, + + %% Since in parallel commands we don't have access to the state, we retrieve functions + %% from the features + FoldAccTs = [ FoldAccT || Entry <- history(RunResult), + {foldAccT, FoldAccT} <- eqc_statem:history_features(Entry)], + + pretty_commands(?MODULE, Cmds, RunResult, + measure(time_per_test, RunTime, + aggregate(command_names(Cmds), + collect(Kind, + measure(compaction_files, length(CompactionFiles), + measure(ledger_files, length(LedgerFiles), + measure(journal_files, length(JournalFiles), + aggregate(with_title('Features'), CallFeatures, + aggregate(with_title('Start Options'), StartOptionFeatures, + features(CallFeatures, + conjunction([{result, + ?WHENFAIL([ begin + eqc:format("~p with acc ~p:\n~s\n", [F, Acc, + show_function(F)]) + end || {F, Acc} <- FoldAccTs ], + result(RunResult) == ok)}, + {data_cleanup, + ?WHENFAIL(eqc:format("~s\n", [os:cmd("ls -Rl " ++ Dir)]), + empty_dir(Dir))}, + {pid_cleanup, equals(Wait, [])}]))))))))))) + end)) + end))). + +history({H, _, _}) -> H. +result({_, _, Res}) -> Res. + +execute(seq, Cmds, Env) -> + run_commands(Cmds, Env); +execute(par, Cmds, Env) -> + run_parallel_commands(Cmds, Env). + +is_exit({'EXIT', _}) -> + true; +is_exit(Other) -> + {expected_exit, Other}. + +is_foldaccT({foldAccT, _}) -> + true; +is_foldaccT(_) -> + false. + +show_function(F) -> + case proplists:get_value(module, erlang:fun_info(F)) of + eqc_fun -> + eqc_fun:show_function(F); + _ -> + proplists:get_value(name, erlang:fun_info(F)) + end. + + +%% slack discussion: +%% `max_journalsize` should be at least 2048 + byte_size(smallest_object) + byte_size(smallest_object's key) + overhead (which is a few bytes per K/V pair). +gen_opts() -> + options([%% {head_only, elements([false, no_lookup, with_lookup])} we don't test head_only mode + {compression_method, elements([native, lz4])} + , {compression_point, elements([on_compact, on_receipt])} + %% , {max_journalsize, ?LET(N, nat(), 2048 + 1000 + 32 + 16 + 16 + N)} + %% , {cache_size, oneof([nat(), 2048, 2060, 5000])} + ]). + +options(GenList) -> + ?LET(Bools, vector(length(GenList), bool()), + [ Opt || {Opt, true} <- lists:zip(GenList, Bools)]). + + +gen_key() -> + binary(16). + +%% Cannot be atoms! +%% key() type specified: should be binary(). +gen_bucket() -> + elements([<<"bucket1">>, <<"bucket2">>, <<"bucket3">>]). + +gen_val() -> + noshrink(binary(256)). + +gen_categories(?RIAK_TAG) -> + sublist(categories()); +gen_categories(_) -> + %% for ?STD_TAG this seems to make little sense + []. + + +categories() -> + [dep, lib]. + +gen_category() -> + elements(categories()). + +gen_index_value() -> + %% Carefully selected to have one out-layer and several border cases. + [elements("arts")]. + +gen_key_in_bucket([]) -> + {gen_key(), gen_bucket()}; +gen_key_in_bucket(Previous) -> + ?LET({K, B}, elements(Previous), + frequency([{1, gen_key_in_bucket([])}, + {1, {K, gen_bucket()}}, + {2, {K, B}}])). + +gen_foldacc(2) -> + ?SHRINK(oneof([{eqc_fun:function2(int()), int()}, + {eqc_fun:function2(list(int())), list(int())}]), + [fold_buckets()]); +gen_foldacc(3) -> + ?SHRINK(oneof([{eqc_fun:function3(int()), int()}, + {eqc_fun:function3(list(int())), list(int())}]), + [fold_collect()]); +gen_foldacc(4) -> + ?SHRINK(oneof([{eqc_fun:function4(int()), int()}, + {eqc_fun:function4(list(int())), list(int())}]), + [fold_objects()]). + + + +fold_buckets() -> + {fun(B, Acc) -> [B | Acc] end, []}. + +fold_collect() -> + {fun(X, Y, Acc) -> [{X, Y} | Acc] end, []}. + +fold_objects() -> + {fun(X, Y, Z, Acc) -> [{X, Y, Z} | Acc] end, []}. + +%% This makes system fall over +fold_collect_no_acc() -> + fun(X, Y, Z) -> [{X, Y} | Z] end. + +fold_count() -> + {fun(_X, _Y, Z) -> Z + 1 end, 0}. + +fold_keys() -> + {fun(X, _Y, Z) -> [X | Z] end, []}. + + +empty_dir(Dir) -> + case file:list_dir(Dir) of + {error, enoent} -> true; + {ok, Ds} -> + lists:all(fun(D) -> empty_dir(filename:join(Dir, D)) end, Ds); + _ -> + false + end. + +get_foldobj([], _Counter) -> + undefined; +get_foldobj([#{counter := Counter} = Map | _Rest], Counter) -> + Map; +get_foldobj([_ | Rest], Counter) -> + get_foldobj(Rest, Counter). + + +%% Helper for all those preconditions that just check that leveled Pid +%% is populated in state. (We cannot check with is_pid, since that's +%% symbolic in test case generation!). +is_leveled_open(S) -> + maps:get(leveled, S, undefined) =/= undefined. + +in_head_only_mode(S) -> + proplists:get_value(head_only, maps:get(start_opts, S, []), false) =/= false. + +wait_for_procs(Known, Timeout) -> + case erlang:processes() -- Known of + [] -> []; + Running -> + if + Timeout > 100 -> + timer:sleep(100), + wait_for_procs(Known, Timeout - 100); + Timeout >= 0 -> + lists:map(fun(P) -> io:format("********* Sending timeout to ~w *********~n", [P]), gen_fsm:send_event(P, timeout) end, Running), + timer:sleep(100), + wait_for_procs(Known, Timeout - 100); + true -> + lists:foreach(fun(P) -> io:format("Process info : ~w~n", [process_info(P)]) end, Running), + Running + end + end. + +delete_level_data(Dir) -> + os:cmd("rm -rf " ++ Dir). + +%% Slack discussion: +%% `[{add, B1, K1, SK1}, {add, B1, K1, SK2}]` should be fine (same bucket and key, different subkey) +no_key_dups([]) -> + []; +no_key_dups([{_Action, Bucket, Key, SubKey, _Value} = E | Es]) -> + [E | no_key_dups([ {A, B, K, SK, V} || {A, B, K, SK, V} <- Es, + {B, K, SK} =/= {Bucket, Key, SubKey}])]. \ No newline at end of file