diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index fbd2add..a8f4b6a 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -489,6 +489,10 @@ book_tempput(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) %% Index tags are not fetchable (as they will not be hashed), but are %% extractable via range query. %% +%% The extended-arity book_put functions support the addition of an object +%% TTL and a `sync` boolean to flush this PUT (and any other buffered PUTs to +%% disk when the sync_stategy is `none`. +%% %% The Bookie takes the request and passes it first to the Inker to add the %% request to the journal. %% @@ -529,8 +533,15 @@ book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag) -> leveled_codec:tag(), infinity|integer()) -> ok|pause. book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL) when is_atom(Tag) -> + book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, false). + +-spec book_put(pid(), leveled_codec:key(), leveled_codec:key(), any(), + leveled_codec:index_specs(), + leveled_codec:tag(), infinity|integer(), + boolean()) -> ok|pause. +book_put(Pid, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync) -> gen_server:call(Pid, - {put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, + {put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync}, infinity). @@ -1254,14 +1265,15 @@ init([Opts]) -> end. -handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL}, From, State) - when State#state.head_only == false -> +handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync}, + From, State) when State#state.head_only == false -> LedgerKey = leveled_codec:to_ledgerkey(Bucket, Key, Tag), SW0 = os:timestamp(), {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, LedgerKey, Object, - {IndexSpecs, TTL}), + {IndexSpecs, TTL}, + DataSync), {SW1, Timings1} = update_timings(SW0, {put, {inker, ObjSize}}, State#state.put_timings), Changes = preparefor_ledgercache(null, diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index ee4e946..313068e 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -96,6 +96,7 @@ cdb_reopen_reader/3, cdb_get/2, cdb_put/3, + cdb_put/4, cdb_mput/2, cdb_getpositions/2, cdb_directfetch/3, @@ -251,7 +252,15 @@ cdb_get(Pid, Key) -> %% It is assumed that the response to a "roll" will be to roll the file, which %% will close this file for writing after persisting the hashtree. cdb_put(Pid, Key, Value) -> - gen_fsm:sync_send_event(Pid, {put_kv, Key, Value}, infinity). + cdb_put(Pid, Key, Value, false). + +-spec cdb_put(pid(), any(), any(), boolean()) -> ok|roll. +%% @doc +%% See cdb_put/3. Addition of force-sync option, to be used when sync mode is +%% none to force a sync to disk on this particlar put. +cdb_put(Pid, Key, Value, Sync) -> + + gen_fsm:sync_send_event(Pid, {put_kv, Key, Value, Sync}, infinity). -spec cdb_mput(pid(), list()) -> ok|roll. %% @doc @@ -533,7 +542,7 @@ writer({key_check, Key}, _From, State) -> loose_presence), writer, State}; -writer({put_kv, Key, Value}, _From, State) -> +writer({put_kv, Key, Value, Sync}, _From, State) -> NewCount = State#state.current_count + 1, case NewCount >= State#state.max_count of true -> @@ -552,8 +561,10 @@ writer({put_kv, Key, Value}, _From, State) -> {reply, roll, writer, State}; {UpdHandle, NewPosition, HashTree} -> ok = - case State#state.sync_strategy of - riak_sync -> + case {State#state.sync_strategy, Sync} of + {riak_sync, _} -> + file:datasync(UpdHandle); + {none, true} -> file:datasync(UpdHandle); _ -> ok diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9e338c2..abd4285 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -95,7 +95,7 @@ code_change/3, ink_start/1, ink_snapstart/1, - ink_put/4, + ink_put/5, ink_mput/3, ink_get/3, ink_fetch/3, @@ -197,16 +197,20 @@ ink_snapstart(InkerOpts) -> -spec ink_put(pid(), leveled_codec:ledger_key(), any(), - leveled_codec:journal_keychanges()) -> - {ok, integer(), integer()}. + leveled_codec:journal_keychanges(), + boolean()) -> + {ok, integer(), integer()}. %% @doc %% PUT an object into the journal, returning the sequence number for the PUT %% as well as the size of the object (information required by the ledger). %% %% KeyChanges is a tuple of {KeyChanges, TTL} where the TTL is an -%% expiry time (or infinity). -ink_put(Pid, PrimaryKey, Object, KeyChanges) -> - gen_server:call(Pid, {put, PrimaryKey, Object, KeyChanges}, infinity). +%% expiry time (or infinity). A sync option can be passed, to override a +%% sync_strategy of none for this particular PUT. +ink_put(Pid, PrimaryKey, Object, KeyChanges, DataSync) -> + gen_server:call(Pid, + {put, PrimaryKey, Object, KeyChanges, DataSync}, + infinity). -spec ink_mput(pid(), any(), {list(), integer()|infinity}) -> {ok, integer()}. @@ -491,15 +495,15 @@ init([LogOpts, InkerOpts]) -> end. -handle_call({put, Key, Object, KeyChanges}, _From, +handle_call({put, Key, Object, KeyChanges, DataSync}, _From, State=#state{is_snapshot=Snap}) when Snap == false -> - case put_object(Key, Object, KeyChanges, State) of + case put_object(Key, Object, KeyChanges, DataSync, State) of {_, UpdState, ObjSize} -> {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState} end; handle_call({mput, Key, ObjChanges}, _From, State=#state{is_snapshot=Snap}) when Snap == false -> - case put_object(Key, head_only, ObjChanges, State) of + case put_object(Key, head_only, ObjChanges, false, State) of {_, UpdState, _ObjSize} -> {reply, {ok, UpdState#state.journal_sqn}, UpdState} end; @@ -893,16 +897,17 @@ get_cdbopts(InkOpts)-> -spec put_object(leveled_codec:ledger_key(), any(), - leveled_codec:journal_keychanges(), + leveled_codec:journal_keychanges(), + boolean(), ink_state()) - -> {ok|rolling, ink_state(), integer()}. + -> {ok|rolling, ink_state(), integer()}. %% @doc %% Add the object to the current journal if it fits. If it doesn't fit, a new %% journal must be started, and the old journal is set to "roll" into a read %% only Journal. %% The reply contains the byte_size of the object, using the size calculated %% to store the object. -put_object(LedgerKey, Object, KeyChanges, State) -> +put_object(LedgerKey, Object, KeyChanges, Sync, State) -> NewSQN = State#state.journal_sqn + 1, ActiveJournal = State#state.active_journaldb, {JournalKey, JournalBin} = @@ -914,7 +919,8 @@ put_object(LedgerKey, Object, KeyChanges, State) -> State#state.compress_on_receipt), case leveled_cdb:cdb_put(ActiveJournal, JournalKey, - JournalBin) of + JournalBin, + Sync) of ok -> {ok, State#state{journal_sqn=NewSQN}, @@ -1397,7 +1403,7 @@ compact_journal_wastediscarded_test_() -> compact_journal_testto(WRP, ExpectedFiles) -> RootPath = "test/test_area/journal", - CDBopts = #cdb_options{max_size=300000}, + CDBopts = #cdb_options{max_size=300000, sync_strategy=none}, RStrategy = [{?STD_TAG, recovr}], InkOpts = #inker_options{root_path=RootPath, cdb_options=CDBopts, @@ -1414,7 +1420,8 @@ compact_journal_testto(WRP, ExpectedFiles) -> {ok, NewSQN1, ObjSize} = ink_put(Ink1, test_ledgerkey("KeyAA"), "TestValueAA", - {[], infinity}), + {[], infinity}, + true), ?assertMatch(NewSQN1, 5), ok = ink_printmanifest(Ink1), R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5), @@ -1427,14 +1434,16 @@ compact_journal_testto(WRP, ExpectedFiles) -> {ok, SQN, _} = ink_put(Ink1, test_ledgerkey(PK), leveled_rand:rand_bytes(10000), - {[], infinity}), + {[], infinity}, + false), {SQN, test_ledgerkey(PK)} end, FunnyLoop), {ok, NewSQN2, ObjSize} = ink_put(Ink1, test_ledgerkey("KeyBB"), "TestValueBB", - {[], infinity}), + {[], infinity}, + true), ?assertMatch(NewSQN2, 54), ActualManifest = ink_getmanifest(Ink1), ok = ink_printmanifest(Ink1), @@ -1513,7 +1522,7 @@ empty_manifest_test() -> compress_on_receipt=false}), ?assertMatch(not_present, ink_fetch(Ink2, key_converter("Key1"), 1)), {ok, SQN, Size} = - ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}), + ink_put(Ink2, key_converter("Key1"), "Value1", {[], infinity}, false), ?assertMatch(1, SQN), % This is the first key - so should have SQN of 1 ?assertMatch(true, Size > 0), {ok, V} = ink_fetch(Ink2, key_converter("Key1"), 1),