Updated model

This has inappropriate default parameter changes.
This commit is contained in:
Martin Sumner 2019-01-22 12:53:31 +00:00
parent b713ce60a8
commit a13a6ae45f
7 changed files with 231 additions and 23 deletions

3
.gitignore vendored
View file

@ -5,3 +5,6 @@
.DS_Store
rebar.lock
test/test_area/*
cover
cover_*
.eqc-info

Binary file not shown.

View file

@ -62,6 +62,7 @@
book_headonly/4,
book_snapshot/4,
book_compactjournal/2,
book_eqccompactjournal/2,
book_islastcompactionpending/1,
book_trimjournal/1,
book_hotbackup/1,
@ -100,8 +101,8 @@
-include_lib("eunit/include/eunit.hrl").
-define(CACHE_SIZE, 2500).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MIN_CACHE_SIZE, 1).
-define(MIN_PCL_CACHE_SIZE, 4).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(CACHE_SIZE_JITTER, 25).
@ -1005,6 +1006,7 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
-spec book_compactjournal(pid(), integer()) -> ok.
-spec book_eqccompactjournal(pid(), integer()) -> {ok, pid()}.
-spec book_islastcompactionpending(pid()) -> boolean().
-spec book_trimjournal(pid()) -> ok.
@ -1013,9 +1015,13 @@ book_snapshot(Pid, SnapType, Query, LongRunning) ->
%% the scheduling of Journla compaction is called externally, so it is assumed
%% in Riak it will be triggered by a vnode callback.
book_compactjournal(Pid, Timeout) ->
book_eqccompactjournal(Pid, Timeout) ->
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
book_compactjournal(Pid, Timeout) ->
{ok, _P} = gen_server:call(Pid, {compact_journal, Timeout}, infinity),
ok.
%% @doc Check on progress of the last compaction
book_islastcompactionpending(Pid) ->
@ -1371,10 +1377,10 @@ handle_call({return_runner, QueryType}, _From, State) ->
fold_countdown = CountDown}};
handle_call({compact_journal, Timeout}, _From, State)
when State#state.head_only == false ->
ok = leveled_inker:ink_compactjournal(State#state.inker,
R = leveled_inker:ink_compactjournal(State#state.inker,
self(),
Timeout),
{reply, ok, State};
{reply, R, State};
handle_call(confirm_compact, _From, State)
when State#state.head_only == false ->
{reply, leveled_inker:ink_compactionpending(State#state.inker), State};

View file

@ -182,6 +182,7 @@ clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
%% @doc
%% Stop the clerk
clerk_stop(Pid) ->
unlink(Pid),
gen_server:cast(Pid, stop).
-spec clerk_loglevel(pid(), leveled_log:log_level()) -> ok.

View file

@ -348,7 +348,7 @@ ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
as_ink},
infinity).
-spec ink_compactjournal(pid(), pid(), integer()) -> ok.
-spec ink_compactjournal(pid(), pid(), integer()) -> {ok, pid()}.
%% @doc
%% Trigger a compaction event. the compaction event will use a sqn check
%% against the Ledger to see if a value can be compacted - if the penciller
@ -612,7 +612,7 @@ handle_call({compact,
FilterFun,
self(),
Timeout),
{reply, ok, State#state{compaction_pending=true}};
{reply, {ok, State#state.clerk}, State#state{compaction_pending=true}};
handle_call(compaction_complete, _From, State) ->
{reply, ok, State#state{compaction_pending=false}};
handle_call(compaction_pending, _From, State) ->

View file

@ -72,7 +72,7 @@
-include("include/leveled.hrl").
-define(MAX_SLOTS, 256).
-define(MAX_SLOTS, 2).
-define(LOOK_SLOTSIZE, 128). % Maximum of 128
-define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE
-define(NOLOOK_SLOTSIZE, 256).

View file

@ -24,7 +24,8 @@
-include_lib("eunit/include/eunit.hrl").
-include("../include/leveled.hrl").
-compile([export_all, nowarn_export_all]).
-compile([export_all, nowarn_export_all, {nowarn_deprecated_function,
[{gen_fsm, send_event, 2}]}]).
-define(NUMTESTS, 1000).
-define(QC_OUT(P),
@ -75,7 +76,7 @@ init_backend_args(#{dir := Dir, sut := Name} = S) ->
case maps:get(start_opts, S, undefined) of
undefined ->
[ default(?RIAK_TAG, ?STD_TAG), %% Just test one tag at a time
[{root_path, Dir}, {log_level, error} | gen_opts()], Name ];
[{root_path, Dir}, {log_level, error}, {cache_size, 10}, {max_pencillercachesize, 40}, {max_journalsize, 20000} | gen_opts()], Name ];
Opts ->
%% root_path is part of existing options
[ maps:get(tag, S), Opts, Name ]
@ -93,6 +94,7 @@ init_backend_adapt(S, [Tag, Options, Name]) ->
%% @doc init_backend - The actual operation
%% Start the database and read data from disk
init_backend(_Tag, Options, Name) ->
% Options0 = proplists:delete(log_level, Options),
case leveled_bookie:book_start(Options) of
{ok, Bookie} ->
unlink(Bookie),
@ -133,6 +135,7 @@ stop(Pid) ->
stop_next(S, _Value, [_Pid]) ->
S#{leveled => undefined,
iclerk => undefined,
folders => [],
used_folders => [],
stop_folders => maps:get(folders, S, []) ++ maps:get(used_folders, S, [])}.
@ -147,6 +150,75 @@ stop_post(_S, [Pid], _Res) ->
end.
%% --- Operation: updateload ---
updateload_pre(S) ->
is_leveled_open(S).
%% updateload for specific bucket (doesn't overlap with get/put/delete)
updateload_args(#{leveled := Pid, tag := Tag}) ->
?LET(Categories, gen_categories(Tag),
?LET({{Key, Bucket}, Value, IndexSpec, MetaData},
{{gen_key(), <<"LoadBucket">>}, gen_val(), [{add, Cat, gen_index_value()} || Cat <- Categories ], []},
case Tag of
?STD_TAG -> [Pid, Bucket, Key, Value, Value, IndexSpec, Tag, MetaData];
?RIAK_TAG ->
Obj = testutil:riak_object(Bucket, Key, Value, MetaData), %% this has a side effect inserting a random nr
[Pid, Bucket, Key, Value, Obj, IndexSpec, Tag, MetaData]
end
)).
updateload_pre(#{leveled := Leveled}, [Pid, _Bucket, _Key, _Value, _Obj, _, _, _]) ->
Pid == Leveled.
updateload_adapt(#{leveled := Leveled}, [_, Bucket, Key, Value, Obj, Spec, Tag, MetaData]) ->
[ Leveled, Bucket, Key, Value, Obj, Spec, Tag, MetaData ].
%% @doc put - The actual operation
updateload(Pid, Bucket, Key, Value, Obj, Spec, Tag, MetaData) ->
Values =
case Tag of
?STD_TAG -> multiply(100, Value);
?RIAK_TAG ->
lists:map(fun(V) -> testutil:riak_object(Bucket, Key, V, MetaData) end,
multiply(100, Value))
end ++ [Obj],
lists:map(fun(V) -> leveled_bookie:book_put(Pid, Bucket, Key, V, Spec, Tag) end, Values).
multiply(1, _Value) ->
[];
multiply(N, Value) when N > 1 ->
<<Byte:8, Rest/binary>> = Value,
NewValue = <<Rest/binary, Byte:8>>,
[NewValue | multiply(N-1, NewValue)].
updateload_next(#{model := Model} = S, _V, [_Pid, Bucket, Key, _Value, Obj, Spec, _Tag, _MetaData]) ->
?CMD_VALID(S, put,
begin
NewSpec =
case orddict:find({Bucket, Key}, Model) of
error -> merge_index_spec([], Spec);
{ok, {_, OldSpec}} ->
merge_index_spec(OldSpec, Spec)
end,
S#{model => orddict:store({Bucket, Key}, {Obj, NewSpec}, Model)}
end,
S).
updateload_post(S, [_, _, _, _, _, _, _, _], Results) ->
lists:all(fun(Res) -> ?CMD_VALID(S, put, Res == ok, Res == {unsupported_message, put}) end, Results).
updateload_features(#{previous_keys := PK} = S, [_Pid, Bucket, Key, _Value, _Obj, _, Tag, _], _Res) ->
?CMD_VALID(S, put,
case
lists:member({Key, Bucket}, PK) of
true ->
[{updateload, update, Tag}];
false ->
[{updateload, insert, Tag}]
end,
[{updateload, unsupported}]).
%% --- Operation: put ---
put_pre(S) ->
is_leveled_open(S).
@ -496,6 +568,113 @@ kill(Pid) ->
kill_next(S, Value, [Pid]) ->
stop_next(S, Value, [Pid]).
%% --- Operation: compactisalive ---
compactisalive_pre(S) ->
is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined.
compactisalive_args(#{iclerk := IClerk}) ->
[IClerk].
compactisalive_pre(#{iclerk := Pid}, [IClerk]) ->
Pid == IClerk.
compactisalive(IClerk) ->
is_process_alive(IClerk).
compactisalive_post(_S, [_IClerk], Res) ->
Res.
%% --- Operation: compacthappened ---
compacthappened_pre(S) ->
is_leveled_open(S) andalso maps:get(iclerk, S, undefined) /= undefined.
%% Commenting out args disables the operation
%% compacthappened_args(#{dir := DataDir}) ->
%% [DataDir].
compacthappened(DataDir) ->
PostCompact = filename:join(DataDir, "journal/journal_files/post_compact"),
case filelib:is_dir(PostCompact) of
true ->
{ok, Files} = file:list_dir(PostCompact),
Files;
false ->
[]
end.
ledgerpersisted(DataDir) ->
LedgerPath = filename:join(DataDir, "ledger/ledger_files"),
case filelib:is_dir(LedgerPath) of
true ->
{ok, Files} = file:list_dir(LedgerPath),
Files;
false ->
[]
end.
journalwritten(DataDir) ->
JournalPath = filename:join(DataDir, "journal/journal_files"),
case filelib:is_dir(JournalPath) of
true ->
{ok, Files} = file:list_dir(JournalPath),
Files;
false ->
[]
end.
compacthappened_post(_S, [_DataDir], Res) ->
eq(Res, []).
%% --- Operation: compact journal ---
compact_pre(S) ->
is_leveled_open(S).
compact_args(#{leveled := Pid}) ->
[Pid, nat()].
compact_pre(#{leveled := Leveled}, [Pid, _TS]) ->
Pid == Leveled.
compact_adapt(#{leveled := Leveled}, [_Pid, TS]) ->
[ Leveled, TS ].
compact(Pid, TS) ->
{ok, IClerk} = leveled_bookie:book_eqccompactjournal(Pid, TS),
IClerk.
compact_next(S, IClerk, [_Pid, _TS]) ->
case maps:get(iclerk, S, undefined) of
undefined ->
S#{iclerk => IClerk};
_ ->
S
end.
compact_post(S, [_Pid, _TS], Res) ->
case maps:get(iclerk, S, undefined) of
undefined ->
is_pid(Res);
IClerk ->
IClerk == Res
end.
compact_features(S, [_Pid, _TS], _Res) ->
case maps:get(iclerk, S, undefined) of
undefined ->
[{compact, fresh}];
_ ->
[{compact, repeat}]
end.
%% Testing fold:
%% Note async and sync mode!
%% see https://github.com/martinsumner/riak_kv/blob/mas-2.2.5-tictactaae/src/riak_kv_leveled_backend.erl#L238-L419
@ -816,11 +995,14 @@ stop_fold_features(S, [_, _], _) ->
end ].
weight(#{previous_keys := []}, Command) when Command == get;
Command == delete ->
weight(#{previous_keys := []}, get) ->
1;
weight(#{previous_keys := []}, delete) ->
1;
weight(S, C) when C == get;
C == put ->
C == put;
C == delete;
C == updateload ->
?CMD_VALID(S, put, 10, 1);
weight(_S, stop) ->
1;
@ -849,7 +1031,8 @@ prop_db() ->
eqc:dont_print_counterexample(
?LET(Shrinking, parameter(shrinking, false),
?FORALL({Kind, Cmds}, oneof([{seq, more_commands(20, commands(?MODULE))},
{par, more_commands(2, parallel_commands(?MODULE))}]),
{par, more_commands(2, parallel_commands(?MODULE))}
]),
begin
delete_level_data(Dir),
?IMPLIES(empty_dir(Dir),
@ -867,6 +1050,13 @@ prop_db() ->
],
StartOptionFeatures = [ lists:keydelete(root_path, 1, Feature) || {start_options, Feature} <- call_features(history(RunResult)) ],
timer:sleep(1000),
CompactionFiles = compacthappened(Dir),
LedgerFiles = ledgerpersisted(Dir),
JournalFiles = journalwritten(Dir),
io:format("File counts: Compacted ~w Journal ~w Ledger ~w~n", [length(CompactionFiles), length(LedgerFiles), length(JournalFiles)]),
case whereis(maps:get(sut, initial_state())) of
undefined -> delete_level_data(Dir);
Pid when is_pid(Pid) ->
@ -885,6 +1075,9 @@ prop_db() ->
measure(time_per_test, RunTime,
aggregate(command_names(Cmds),
collect(Kind,
measure(compaction_files, length(CompactionFiles),
measure(ledger_files, length(LedgerFiles),
measure(journal_files, length(JournalFiles),
aggregate(with_title('Features'), CallFeatures,
aggregate(with_title('Start Options'), StartOptionFeatures,
features(CallFeatures,
@ -897,7 +1090,7 @@ prop_db() ->
{data_cleanup,
?WHENFAIL(eqc:format("~s\n", [os:cmd("ls -Rl " ++ Dir)]),
empty_dir(Dir))},
{pid_cleanup, equals(Wait, [])}]))))))))
{pid_cleanup, equals(Wait, [])}])))))))))))
end))
end))).
@ -935,7 +1128,7 @@ gen_opts() ->
{compression_method, elements([native, lz4])}
, {compression_point, elements([on_compact, on_receipt])}
%% , {max_journalsize, ?LET(N, nat(), 2048 + 1000 + 32 + 16 + 16 + N)}
, {cache_size, oneof([nat(), 2048, 2060, 5000])}
%% , {cache_size, oneof([nat(), 2048, 2060, 5000])}
]).
options(GenList) ->
@ -952,7 +1145,7 @@ gen_bucket() ->
elements([<<"bucket1">>, <<"bucket2">>, <<"bucket3">>]).
gen_val() ->
noshrink(binary(32)).
noshrink(binary(256)).
gen_categories(?RIAK_TAG) ->
sublist(categories());
@ -1044,11 +1237,16 @@ wait_for_procs(Known, Timeout) ->
case erlang:processes() -- Known of
[] -> [];
Running ->
case Timeout > 0 of
true ->
if
Timeout > 100 ->
timer:sleep(100),
wait_for_procs(Known, Timeout - 100);
false ->
Timeout >= 0 ->
lists:map(fun(P) -> io:format("********* Sending timeout to ~w *********~n", [P]), gen_fsm:send_event(P, timeout) end, Running),
timer:sleep(100),
wait_for_procs(Known, Timeout - 100);
true ->
lists:foreach(fun(P) -> io:format("Process info : ~w~n", [process_info(P)]) end, Running),
Running
end
end.