Commit selective_sync

This commit is contained in:
Martin Sumner 2021-08-11 09:43:45 +01:00
parent 4ec9d14019
commit 4ec8d3e25c
3 changed files with 58 additions and 26 deletions

View file

@ -489,6 +489,10 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL)
%% Index tags are not fetchable (as they will not be hashed), but are %% Index tags are not fetchable (as they will not be hashed), but are
%% extractable via range query. %% extractable via range query.
%% %%
%% The extended-arity book_put functions support the addition of an object
%% TTL and a `sync` boolean to flush this PUT (and any other buffered PUTs to
%% disk when the sync_stategy is `none`.
%%
%% The Bookie takes the request and passes it first to the Inker to add the %% The Bookie takes the request and passes it first to the Inker to add the
%% request to the journal. %% request to the journal.
%% %%
@ -529,8 +533,15 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) ->
leveled_codec:tag(), infinity|integer()) -> ok|pause. leveled_codec:tag(), infinity|integer()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) -> book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) ->
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, false).
-spec book_put(pid(), leveled_codec:key(), leveled_codec:key(), any(),
leveled_codec:index_specs(),
leveled_codec:tag(), infinity|integer(),
boolean()) -> ok|pause.
book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync) ->
gen_server:call(Pid, gen_server:call(Pid,
{put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, {put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync},
infinity). infinity).
@ -1254,14 +1265,15 @@ init([Opts]) ->
end. end.
handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync},
when State#state.head_only == false -> From, State) when State#state.head_only == false ->
LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag),
SW0 = os:timestamp(), SW0 = os:timestamp(),
{ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker,
LedgerKey, LedgerKey,
Object, Object,
{IndexSpecs, TTL}), {IndexSpecs, TTL},
DataSync),
{SW1, Timings1} = {SW1, Timings1} =
update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings),
Changes = preparefor_ledgercache(null, Changes = preparefor_ledgercache(null,

View file

@ -96,6 +96,7 @@
cdb_reopen_reader/3, cdb_reopen_reader/3,
cdb_get/2, cdb_get/2,
cdb_put/3, cdb_put/3,
cdb_put/4,
cdb_mput/2, cdb_mput/2,
cdb_getpositions/2, cdb_getpositions/2,
cdb_directfetch/3, cdb_directfetch/3,
@ -251,7 +252,15 @@ cdb_get(Pid, Key) ->
%% It is assumed that the response to a "roll" will be to roll the file, which %% It is assumed that the response to a "roll" will be to roll the file, which
%% will close this file for writing after persisting the hashtree. %% will close this file for writing after persisting the hashtree.
cdb_put(Pid, Key, Value) -> cdb_put(Pid, Key, Value) ->
gen_fsm:sync_send_event(Pid, {put_kv, Key, Value}, infinity). cdb_put(Pid, Key, Value, false).
-spec cdb_put(pid(), any(), any(), boolean()) -> ok|roll.
%% @doc
%% See cdb_put/3. Addition of force-sync option, to be used when sync mode is
%% none to force a sync to disk on this particlar put.
cdb_put(Pid, Key, Value, Sync) ->
gen_fsm:sync_send_event(Pid, {put_kv, Key, Value, Sync}, infinity).
-spec cdb_mput(pid(), list()) -> ok|roll. -spec cdb_mput(pid(), list()) -> ok|roll.
%% @doc %% @doc
@ -533,7 +542,7 @@ writer({key_check, Key}, _From, State) ->
loose_presence), loose_presence),
writer, writer,
State}; State};
writer({put_kv, Key, Value}, _From, State) -> writer({put_kv, Key, Value, Sync}, _From, State) ->
NewCount = State#state.current_count + 1, NewCount = State#state.current_count + 1,
case NewCount >= State#state.max_count of case NewCount >= State#state.max_count of
true -> true ->
@ -552,8 +561,10 @@ writer({put_kv, Key, Value}, _From, State) ->
{reply, roll, writer, State}; {reply, roll, writer, State};
{UpdHandle, NewPosition, HashTree} -> {UpdHandle, NewPosition, HashTree} ->
ok = ok =
case State#state.sync_strategy of case {State#state.sync_strategy, Sync} of
riak_sync -> {riak_sync, _} ->
file:datasync(UpdHandle);
{none, true} ->
file:datasync(UpdHandle); file:datasync(UpdHandle);
_ -> _ ->
ok ok

View file

@ -95,7 +95,7 @@
code_change/3, code_change/3,
ink_start/1, ink_start/1,
ink_snapstart/1, ink_snapstart/1,
ink_put/4, ink_put/5,
ink_mput/3, ink_mput/3,
ink_get/3, ink_get/3,
ink_fetch/3, ink_fetch/3,
@ -197,16 +197,20 @@ ink_snapstart(InkerOpts) ->
-spec ink_put(pid(), -spec ink_put(pid(),
leveled_codec:ledger_key(), leveled_codec:ledger_key(),
any(), any(),
leveled_codec:journal_keychanges()) -> leveled_codec:journal_keychanges(),
{ok, integer(), integer()}. boolean()) ->
{ok, integer(), integer()}.
%% @doc %% @doc
%% PUT an object into the journal, returning the sequence number for the PUT %% PUT an object into the journal, returning the sequence number for the PUT
%% as well as the size of the object (information required by the ledger). %% as well as the size of the object (information required by the ledger).
%% %%
%% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an %% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an
%% expiry time (or infinity). %% expiry time (or infinity). A sync option can be passed, to override a
ink_put(Pid, PrimaryKey, Object, KeyChanges) -> %% sync_strategy of none for this particular PUT.
gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). ink_put(Pid, PrimaryKey, Object, KeyChanges, DataSync) ->
gen_server:call(Pid,
{put, PrimaryKey, Object, KeyChanges, DataSync},
infinity).
-spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}. -spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}.
@ -491,15 +495,15 @@ init([LogOpts, InkerOpts]) ->
end. end.
handle_call({put, Key, Object, KeyChanges}, _From, handle_call({put, Key, Object, KeyChanges, DataSync}, _From,
State=#state{is_snapshot=Snap}) when Snap == false -> State=#state{is_snapshot=Snap}) when Snap == false ->
case put_object(Key, Object, KeyChanges, State) of case put_object(Key, Object, KeyChanges, DataSync, State) of
{_, UpdState, ObjSize} -> {_, UpdState, ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}
end; end;
handle_call({mput, Key, ObjChanges}, _From, handle_call({mput, Key, ObjChanges}, _From,
State=#state{is_snapshot=Snap}) when Snap == false -> State=#state{is_snapshot=Snap}) when Snap == false ->
case put_object(Key, head_only, ObjChanges, State) of case put_object(Key, head_only, ObjChanges, false, State) of
{_, UpdState, _ObjSize} -> {_, UpdState, _ObjSize} ->
{reply, {ok, UpdState#state.journal_sqn}, UpdState} {reply, {ok, UpdState#state.journal_sqn}, UpdState}
end; end;
@ -893,16 +897,17 @@ get_cdbopts(InkOpts)->
-spec put_object(leveled_codec:ledger_key(), -spec put_object(leveled_codec:ledger_key(),
any(), any(),
leveled_codec:journal_keychanges(), leveled_codec:journal_keychanges(),
boolean(),
ink_state()) ink_state())
-> {ok|rolling, ink_state(), integer()}. -> {ok|rolling, ink_state(), integer()}.
%% @doc %% @doc
%% Add the object to the current journal if it fits. If it doesn't fit, a new %% Add the object to the current journal if it fits. If it doesn't fit, a new
%% journal must be started, and the old journal is set to "roll" into a read %% journal must be started, and the old journal is set to "roll" into a read
%% only Journal. %% only Journal.
%% The reply contains the byte_size of the object, using the size calculated %% The reply contains the byte_size of the object, using the size calculated
%% to store the object. %% to store the object.
put_object(LedgerKey, Object, KeyChanges, State) -> put_object(LedgerKey, Object, KeyChanges, Sync, State) ->
NewSQN = State#state.journal_sqn + 1, NewSQN = State#state.journal_sqn + 1,
ActiveJournal = State#state.active_journaldb, ActiveJournal = State#state.active_journaldb,
{JournalKey, JournalBin} = {JournalKey, JournalBin} =
@ -914,7 +919,8 @@ put_object(LedgerKey, Object, KeyChanges, State) ->
State#state.compress_on_receipt), State#state.compress_on_receipt),
case leveled_cdb:cdb_put(ActiveJournal, case leveled_cdb:cdb_put(ActiveJournal,
JournalKey, JournalKey,
JournalBin) of JournalBin,
Sync) of
ok -> ok ->
{ok, {ok,
State#state{journal_sqn=NewSQN}, State#state{journal_sqn=NewSQN},
@ -1397,7 +1403,7 @@ compact_journal_wastediscarded_test_() ->
compact_journal_testto(WRP, ExpectedFiles) -> compact_journal_testto(WRP, ExpectedFiles) ->
RootPath = "test/test_area/journal", RootPath = "test/test_area/journal",
CDBopts = #cdb_options{max_size=300000}, CDBopts = #cdb_options{max_size=300000, sync_strategy=none},
RStrategy = [{?STD_TAG, recovr}], RStrategy = [{?STD_TAG, recovr}],
InkOpts = #inker_options{root_path=RootPath, InkOpts = #inker_options{root_path=RootPath,
cdb_options=CDBopts, cdb_options=CDBopts,
@ -1414,7 +1420,8 @@ compact_journal_testto(WRP, ExpectedFiles) ->
{ok, NewSQN1, ObjSize} = ink_put(Ink1, {ok, NewSQN1, ObjSize} = ink_put(Ink1,
test_ledgerkey("KeyAA"), test_ledgerkey("KeyAA"),
"TestValueAA", "TestValueAA",
{[], infinity}), {[], infinity},
true),
?assertMatch(NewSQN1, 5), ?assertMatch(NewSQN1, 5),
ok = ink_printmanifest(Ink1), ok = ink_printmanifest(Ink1),
R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5),
@ -1427,14 +1434,16 @@ compact_journal_testto(WRP, ExpectedFiles) ->
{ok, SQN, _} = ink_put(Ink1, {ok, SQN, _} = ink_put(Ink1,
test_ledgerkey(PK), test_ledgerkey(PK),
leveled_rand:rand_bytes(10000), leveled_rand:rand_bytes(10000),
{[], infinity}), {[], infinity},
false),
{SQN, test_ledgerkey(PK)} {SQN, test_ledgerkey(PK)}
end, end,
FunnyLoop), FunnyLoop),
{ok, NewSQN2, ObjSize} = ink_put(Ink1, {ok, NewSQN2, ObjSize} = ink_put(Ink1,
test_ledgerkey("KeyBB"), test_ledgerkey("KeyBB"),
"TestValueBB", "TestValueBB",
{[], infinity}), {[], infinity},
true),
?assertMatch(NewSQN2, 54), ?assertMatch(NewSQN2, 54),
ActualManifest = ink_getmanifest(Ink1), ActualManifest = ink_getmanifest(Ink1),
ok = ink_printmanifest(Ink1), ok = ink_printmanifest(Ink1),
@ -1513,7 +1522,7 @@ empty_manifest_test() ->
compress_on_receipt=false}), compress_on_receipt=false}),
?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)), ?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)),
{ok, SQN, Size} = {ok, SQN, Size} =
ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}), ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}, false),
?assertMatch(1, SQN), % This is the first key - so should have SQN of 1 ?assertMatch(1, SQN), % This is the first key - so should have SQN of 1
?assertMatch(true, Size > 0), ?assertMatch(true, Size > 0),
{ok, V} = ink_fetch(Ink2, key_converter("Key1"), 1), {ok, V} = ink_fetch(Ink2, key_converter("Key1"), 1),