Merge pull request #186 from martinsumner/mas-i185-emptyjournal
Mas i185 emptyjournal
This commit is contained in:
commit
908f762a48
5 changed files with 75 additions and 18 deletions
|
@ -123,6 +123,8 @@
|
|||
-define(DELETE_TIMEOUT, 10000).
|
||||
-define(TIMING_SAMPLECOUNTDOWN, 1000).
|
||||
-define(TIMING_SAMPLESIZE, 100).
|
||||
-define(MAX_OBJECT_SIZE, 1000000000).
|
||||
% 1GB but really should be much smaller than this
|
||||
|
||||
-record(state, {hashtree,
|
||||
last_position :: integer() | undefined,
|
||||
|
@ -464,7 +466,8 @@ writer({put_kv, Key, Value}, _From, State) ->
|
|||
Value,
|
||||
{State#state.last_position, State#state.hashtree},
|
||||
State#state.binary_mode,
|
||||
State#state.max_size),
|
||||
State#state.max_size,
|
||||
State#state.last_key == empty),
|
||||
case Result of
|
||||
roll ->
|
||||
%% Key and value could not be written
|
||||
|
@ -891,7 +894,7 @@ open_active_file(FileName) when is_list(FileName) ->
|
|||
|
||||
-spec put(list()|file:io_device(),
|
||||
any(), any(),
|
||||
{integer(), ets:tid()}, boolean(), integer())
|
||||
{integer(), ets:tid()}, boolean(), integer(), boolean())
|
||||
-> roll|{file:io_device(), integer(), ets:tid()}.
|
||||
%% @doc
|
||||
%% put(Handle, Key, Value, {LastPosition, HashDict}) -> {NewPosition, KeyDict}
|
||||
|
@ -903,20 +906,28 @@ put(FileName,
|
|||
Value,
|
||||
{LastPosition, HashTree},
|
||||
BinaryMode,
|
||||
MaxSize) when is_list(FileName) ->
|
||||
MaxSize,
|
||||
IsEmpty) when is_list(FileName) ->
|
||||
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
|
||||
put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize);
|
||||
put(Handle, Key, Value, {LastPosition, HashTree}, BinaryMode, MaxSize) ->
|
||||
put(Handle, Key, Value, {LastPosition, HashTree},
|
||||
BinaryMode, MaxSize, IsEmpty);
|
||||
put(Handle, Key, Value, {LastPosition, HashTree},
|
||||
BinaryMode, MaxSize, IsEmpty) ->
|
||||
Bin = key_value_to_record({Key, Value}, BinaryMode),
|
||||
PotentialNewSize = LastPosition + byte_size(Bin),
|
||||
if
|
||||
PotentialNewSize > MaxSize ->
|
||||
ObjectSize = byte_size(Bin),
|
||||
SizeWithinReason = ObjectSize < ?MAX_OBJECT_SIZE,
|
||||
PotentialNewSize = LastPosition + ObjectSize,
|
||||
case {IsEmpty, PotentialNewSize > MaxSize} of
|
||||
{false, true} ->
|
||||
roll;
|
||||
true ->
|
||||
ok = file:pwrite(Handle, LastPosition, Bin),
|
||||
{Handle,
|
||||
PotentialNewSize,
|
||||
put_hashtree(Key, LastPosition, HashTree)}
|
||||
_ ->
|
||||
if
|
||||
SizeWithinReason ->
|
||||
ok = file:pwrite(Handle, LastPosition, Bin),
|
||||
{Handle,
|
||||
PotentialNewSize,
|
||||
put_hashtree(Key, LastPosition, HashTree)}
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
|
@ -1860,7 +1871,7 @@ create(FileName,KeyValueList) ->
|
|||
%% should be taken from the startup options not the default
|
||||
put(FileName, Key, Value, {LastPosition, HashTree}) ->
|
||||
put(FileName, Key, Value, {LastPosition, HashTree},
|
||||
?BINARY_MODE, ?MAX_FILE_SIZE).
|
||||
?BINARY_MODE, ?MAX_FILE_SIZE, false).
|
||||
|
||||
|
||||
dump(FileName) ->
|
||||
|
|
|
@ -157,7 +157,7 @@ clerk_trim(Pid, Inker, PersistedSQN) ->
|
|||
%% of the hastable in the CDB file - so that the file is not blocked during
|
||||
%% this calculation
|
||||
clerk_hashtablecalc(HashTree, StartPos, CDBpid) ->
|
||||
{ok, Clerk} = gen_server:start(?MODULE, [#iclerk_options{}], []),
|
||||
{ok, Clerk} = gen_server:start_link(?MODULE, [#iclerk_options{}], []),
|
||||
gen_server:cast(Clerk, {hashtable_calc, HashTree, StartPos, CDBpid}).
|
||||
|
||||
-spec clerk_stop(pid()) -> ok.
|
||||
|
|
|
@ -10,7 +10,8 @@
|
|||
load_and_count_withdelete/1,
|
||||
space_clear_ondelete/1,
|
||||
is_empty_test/1,
|
||||
many_put_fetch_switchcompression/1
|
||||
many_put_fetch_switchcompression/1,
|
||||
bigjournal_littlejournal/1
|
||||
]).
|
||||
|
||||
all() -> [
|
||||
|
@ -22,7 +23,8 @@ all() -> [
|
|||
load_and_count_withdelete,
|
||||
space_clear_ondelete,
|
||||
is_empty_test,
|
||||
many_put_fetch_switchcompression
|
||||
many_put_fetch_switchcompression,
|
||||
bigjournal_littlejournal
|
||||
].
|
||||
|
||||
|
||||
|
@ -111,6 +113,32 @@ many_put_fetch_head(_Config) ->
|
|||
testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"),
|
||||
ok = leveled_bookie:book_destroy(Bookie3).
|
||||
|
||||
bigjournal_littlejournal(_Config) ->
|
||||
RootPath = testutil:reset_filestructure(),
|
||||
StartOpts1 = [{root_path, RootPath},
|
||||
{max_journalsize, 50000000},
|
||||
{max_pencillercachesize, 32000},
|
||||
{sync_strategy, testutil:sync_strategy()},
|
||||
{compression_point, on_compact}],
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
ObjL1 =
|
||||
testutil:generate_objects(100, 1, [],
|
||||
leveled_rand:rand_bytes(10000),
|
||||
fun() -> [] end, <<"B">>),
|
||||
testutil:riakload(Bookie1, ObjL1),
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
StartOpts2 = lists:ukeysort(1, [{max_journalsize, 5000}|StartOpts1]),
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||
ObjL2 =
|
||||
testutil:generate_objects(10, 1000, [],
|
||||
leveled_rand:rand_bytes(10000),
|
||||
fun() -> [] end, <<"B">>),
|
||||
testutil:riakload(Bookie2, ObjL2),
|
||||
testutil:check_forlist(Bookie2, ObjL1),
|
||||
testutil:check_forlist(Bookie2, ObjL2),
|
||||
ok = leveled_bookie:book_destroy(Bookie2).
|
||||
|
||||
|
||||
journal_compaction(_Config) ->
|
||||
journal_compaction_tester(false, 3600),
|
||||
journal_compaction_tester(false, undefined),
|
||||
|
|
|
@ -163,7 +163,20 @@ small_load_with2i(_Config) ->
|
|||
{foldobjects_bybucket, ?RIAK_TAG, "Bucket", all, {SumIntFun, 0}, true},
|
||||
{async, Sum1} = leveled_bookie:book_returnfolder(Bookie1, BucketObjQ),
|
||||
Total1 = Sum1(),
|
||||
true = Total1 > 100000,
|
||||
io:format("Total from summing all I is ~w~n", [Total1]),
|
||||
SumFromObjLFun =
|
||||
fun(Obj, Acc) ->
|
||||
{I, _Bin} = testutil:get_value_from_objectlistitem(Obj),
|
||||
Acc + I
|
||||
end,
|
||||
ObjL1Total =
|
||||
lists:foldl(SumFromObjLFun, 0, ObjL1),
|
||||
ChkList1Total =
|
||||
lists:foldl(SumFromObjLFun, 0, ChkList1),
|
||||
io:format("Total in original object list ~w and from removed list ~w~n",
|
||||
[ObjL1Total, ChkList1Total]),
|
||||
|
||||
Total1 = ObjL1Total - ChkList1Total,
|
||||
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
foldkeysfun_returnbucket/3,
|
||||
sync_strategy/0,
|
||||
riak_object/4,
|
||||
get_value_from_objectlistitem/1,
|
||||
numbered_key/1,
|
||||
fixed_bin_key/1]).
|
||||
|
||||
|
@ -478,6 +479,10 @@ set_object(Bucket, Key, Value, IndexGen, Indexes2Remove) ->
|
|||
vclock=generate_vclock()},
|
||||
Spec1}.
|
||||
|
||||
get_value_from_objectlistitem({_Int, Obj, _Spc}) ->
|
||||
[Content] = Obj#r_object.contents,
|
||||
Content#r_content.value.
|
||||
|
||||
update_some_objects(Bookie, ObjList, SampleSize) ->
|
||||
StartWatchA = os:timestamp(),
|
||||
ToUpdateList = lists:sublist(lists:sort(ObjList), SampleSize),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue