diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 5c56433..40d564c 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -123,6 +123,8 @@ -define(DELETE_TIMEOUT, 10000). -define(TIMING_SAMPLECOUNTDOWN, 1000). -define(TIMING_SAMPLESIZE, 100). +-define(MAX_OBJECT_SIZE, 1000000000). + % 1GB but really should be much smaller than this -record(state, {hashtree, last_position :: integer() | undefined, @@ -464,7 +466,8 @@ writer({put_kv, Key, Value}, _From, State) -> Value, {State#state.last_position, State#state.hashtree}, State#state.binary_mode, - State#state.max_size), + State#state.max_size, + State#state.last_key == empty), case Result of roll -> %% Key and value could not be written @@ -891,7 +894,7 @@ open_active_file(FileName) when is_list(FileName) -> -spec put(list()|file:io_device(), any(), any(), - {integer(), ets:tid()}, boolean(), integer()) + {integer(), ets:tid()}, boolean(), integer(), boolean()) -> roll|{file:io_device(), integer(), ets:tid()}. %% @doc %% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict} @@ -903,20 +906,28 @@ put(FileName, Value, {LastPosition, HashTree}, BinaryMode, - MaxSize) when is_list(FileName) -> + MaxSize, + IsEmpty) when is_list(FileName) -> {ok, Handle} = file:open(FileName, ?WRITE_OPS), - put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize); -put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) -> + put(Handle, Key, Value, {LastPosition, HashTree}, + BinaryMode, MaxSize, IsEmpty); +put(Handle, Key, Value, {LastPosition, HashTree}, + BinaryMode, MaxSize, IsEmpty) -> Bin = key_value_to_record({Key, Value}, BinaryMode), - PotentialNewSize = LastPosition + byte_size(Bin), - if - PotentialNewSize > MaxSize -> + ObjectSize = byte_size(Bin), + SizeWithinReason = ObjectSize < ?MAX_OBJECT_SIZE, + PotentialNewSize = LastPosition + ObjectSize, + case {IsEmpty, PotentialNewSize > MaxSize} of + {false, true} -> roll; - true -> - ok = file:pwrite(Handle, LastPosition, Bin), - {Handle, - PotentialNewSize, - put_hashtree(Key, LastPosition, HashTree)} + _ -> + if + SizeWithinReason -> + ok = file:pwrite(Handle, LastPosition, Bin), + {Handle, + PotentialNewSize, + put_hashtree(Key, LastPosition, HashTree)} + end end. @@ -1860,7 +1871,7 @@ create(FileName,KeyValueList) -> %% should be taken from the startup options not the default put(FileName, Key, Value, {LastPosition, HashTree}) -> put(FileName, Key, Value, {LastPosition, HashTree}, - ?BINARY_MODE, ?MAX_FILE_SIZE). + ?BINARY_MODE, ?MAX_FILE_SIZE, false). dump(FileName) -> diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 3f71933..ed7a8f3 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -157,7 +157,7 @@ clerk_trim(Pid, Inker, PersistedSQN) -> %% of the hastable in the CDB file - so that the file is not blocked during %% this calculation clerk_hashtablecalc(HashTree, StartPos, CDBpid) -> - {ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []), + {ok, Clerk} = gen_server:start_link(?MODULE, [#iclerk_options{}], []), gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}). -spec clerk_stop(pid()) -> ok. diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 32991d4..a2e105e 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -10,7 +10,8 @@ load_and_count_withdelete/1, space_clear_ondelete/1, is_empty_test/1, - many_put_fetch_switchcompression/1 + many_put_fetch_switchcompression/1, + bigjournal_littlejournal/1 ]). all() -> [ @@ -22,7 +23,8 @@ all() -> [ load_and_count_withdelete, space_clear_ondelete, is_empty_test, - many_put_fetch_switchcompression + many_put_fetch_switchcompression, + bigjournal_littlejournal ]. @@ -111,6 +113,32 @@ many_put_fetch_head(_Config) -> testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), ok = leveled_bookie:book_destroy(Bookie3). +bigjournal_littlejournal(_Config) -> + RootPath = testutil:reset_filestructure(), + StartOpts1 = [{root_path, RootPath}, + {max_journalsize, 50000000}, + {max_pencillercachesize, 32000}, + {sync_strategy, testutil:sync_strategy()}, + {compression_point, on_compact}], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjL1 = + testutil:generate_objects(100, 1, [], + leveled_rand:rand_bytes(10000), + fun() -> [] end, <<"B">>), + testutil:riakload(Bookie1, ObjL1), + ok = leveled_bookie:book_close(Bookie1), + StartOpts2 = lists:ukeysort(1, [{max_journalsize, 5000}|StartOpts1]), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), + ObjL2 = + testutil:generate_objects(10, 1000, [], + leveled_rand:rand_bytes(10000), + fun() -> [] end, <<"B">>), + testutil:riakload(Bookie2, ObjL2), + testutil:check_forlist(Bookie2, ObjL1), + testutil:check_forlist(Bookie2, ObjL2), + ok = leveled_bookie:book_destroy(Bookie2). + + journal_compaction(_Config) -> journal_compaction_tester(false, 3600), journal_compaction_tester(false, undefined), diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index e7940d8..83c3de3 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -163,7 +163,20 @@ small_load_with2i(_Config) -> {foldobjects_bybucket, ?RIAK_TAG, "Bucket", all, {SumIntFun, 0}, true}, {async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ), Total1 = Sum1(), - true = Total1 > 100000, + io:format("Total from summing all I is ~w~n", [Total1]), + SumFromObjLFun = + fun(Obj, Acc) -> + {I, _Bin} = testutil:get_value_from_objectlistitem(Obj), + Acc + I + end, + ObjL1Total = + lists:foldl(SumFromObjLFun, 0, ObjL1), + ChkList1Total = + lists:foldl(SumFromObjLFun, 0, ChkList1), + io:format("Total in original object list ~w and from removed list ~w~n", + [ObjL1Total, ChkList1Total]), + + Total1 = ObjL1Total - ChkList1Total, ok = leveled_bookie:book_close(Bookie1), diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index 8501f25..176c065 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -50,6 +50,7 @@ foldkeysfun_returnbucket/3, sync_strategy/0, riak_object/4, + get_value_from_objectlistitem/1, numbered_key/1, fixed_bin_key/1]). @@ -478,6 +479,10 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) -> vclock=generate_vclock()}, Spec1}. +get_value_from_objectlistitem({_Int, Obj, _Spc}) -> + [Content] = Obj#r_object.contents, + Content#r_content.value. + update_some_objects(Bookie, ObjList, SampleSize) -> StartWatchA = os:timestamp(), ToUpdateList = lists:sublist(lists:sort(ObjList), SampleSize),