Work on reload strategies
Further work on variable reload srategies wiht some unit test coverage. Also work on potentially supporting no_hash on PUT to journal files for objects which will never be directly fetched.
This commit is contained in:
parent
102cfe7f6f
commit
97087a6b2b
6 changed files with 387 additions and 139 deletions
|
@ -106,7 +106,6 @@
|
|||
ink_print_manifest/1,
|
||||
ink_close/1,
|
||||
ink_forceclose/1,
|
||||
create_value_for_cdb/1,
|
||||
build_dummy_journal/0,
|
||||
simple_manifest_reader/2,
|
||||
clean_testdir/1,
|
||||
|
@ -342,9 +341,9 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
start_from_file(InkerOpts) ->
|
||||
RootPath = InkerOpts#inker_options.root_path,
|
||||
CDBopts = InkerOpts#inker_options.cdb_options,
|
||||
start_from_file(InkOpts) ->
|
||||
RootPath = InkOpts#inker_options.root_path,
|
||||
CDBopts = InkOpts#inker_options.cdb_options,
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
filelib:ensure_dir(JournalFP),
|
||||
CompactFP = filepath(RootPath, journal_compact_dir),
|
||||
|
@ -360,8 +359,10 @@ start_from_file(InkerOpts) ->
|
|||
end,
|
||||
|
||||
IClerkCDBOpts = CDBopts#cdb_options{file_path = CompactFP},
|
||||
ReloadStrategy = InkOpts#inker_options.reload_strategy,
|
||||
IClerkOpts = #iclerk_options{inker = self(),
|
||||
cdb_options=IClerkCDBOpts},
|
||||
cdb_options=IClerkCDBOpts,
|
||||
reload_strategy = ReloadStrategy},
|
||||
{ok, Clerk} = leveled_iclerk:clerk_new(IClerkOpts),
|
||||
|
||||
{Manifest,
|
||||
|
@ -379,24 +380,18 @@ start_from_file(InkerOpts) ->
|
|||
clerk = Clerk}}.
|
||||
|
||||
|
||||
put_object(PrimaryKey, Object, KeyChanges, State) ->
|
||||
put_object(LedgerKey, Object, KeyChanges, State) ->
|
||||
NewSQN = State#state.journal_sqn + 1,
|
||||
{InkerType, HashOpt} = case Object of
|
||||
delete ->
|
||||
{tomb, no_hash};
|
||||
%delta ->
|
||||
% {keyd, no_hash
|
||||
_ ->
|
||||
{stnd, hash}
|
||||
end,
|
||||
Bin1 = create_value_for_cdb({Object, KeyChanges}),
|
||||
ObjSize = byte_size(Bin1),
|
||||
{JournalKey, JournalBin, HashOpt} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
NewSQN,
|
||||
Object,
|
||||
KeyChanges),
|
||||
case leveled_cdb:cdb_put(State#state.active_journaldb,
|
||||
{NewSQN, InkerType, PrimaryKey},
|
||||
Bin1,
|
||||
JournalKey,
|
||||
JournalBin,
|
||||
HashOpt) of
|
||||
ok ->
|
||||
{ok, State#state{journal_sqn=NewSQN}, ObjSize};
|
||||
{ok, State#state{journal_sqn=NewSQN}, byte_size(JournalBin)};
|
||||
roll ->
|
||||
SW = os:timestamp(),
|
||||
CDBopts = State#state.cdb_options,
|
||||
|
@ -409,8 +404,8 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
|
|||
State#state.manifest_sqn + 1,
|
||||
State#state.root_path),
|
||||
ok = leveled_cdb:cdb_put(NewJournalP,
|
||||
{NewSQN, InkerType, PrimaryKey},
|
||||
Bin1,
|
||||
JournalKey,
|
||||
JournalBin,
|
||||
HashOpt),
|
||||
io:format("Put to new active journal " ++
|
||||
"with manifest write took ~w microseconds~n",
|
||||
|
@ -420,30 +415,18 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
|
|||
manifest=NewManifest,
|
||||
manifest_sqn = State#state.manifest_sqn + 1,
|
||||
active_journaldb=NewJournalP},
|
||||
ObjSize}
|
||||
byte_size(JournalBin)}
|
||||
end.
|
||||
|
||||
|
||||
create_value_for_cdb(Value) ->
|
||||
case Value of
|
||||
{Object, KeyChanges} ->
|
||||
term_to_binary({Object, KeyChanges}, [compressed]);
|
||||
Value when is_binary(Value) ->
|
||||
Value
|
||||
end.
|
||||
|
||||
|
||||
get_object(PrimaryKey, SQN, Manifest) ->
|
||||
get_object(LedgerKey, SQN, Manifest) ->
|
||||
JournalP = find_in_manifest(SQN, Manifest),
|
||||
Obj = leveled_cdb:cdb_get(JournalP, {SQN, stnd, PrimaryKey}),
|
||||
case Obj of
|
||||
{{SQN, stnd, PK}, Bin} when is_binary(Bin) ->
|
||||
{{SQN, PK}, binary_to_term(Bin)};
|
||||
{{SQN, stnd, PK}, Term} ->
|
||||
{{SQN, PK}, Term};
|
||||
_ ->
|
||||
Obj
|
||||
end.
|
||||
{InkerKey, _V, true} = leveled_codec:to_inkerkv(LedgerKey,
|
||||
SQN,
|
||||
to_fetch,
|
||||
null),
|
||||
Obj = leveled_cdb:cdb_get(JournalP, InkerKey),
|
||||
leveled_codec:from_inkerkv(Obj).
|
||||
|
||||
|
||||
build_manifest(ManifestFilenames,
|
||||
|
@ -771,6 +754,10 @@ initiate_penciller_snapshot(Bookie) ->
|
|||
-ifdef(TEST).
|
||||
|
||||
build_dummy_journal() ->
|
||||
F = fun(X) -> X end,
|
||||
build_dummy_journal(F).
|
||||
|
||||
build_dummy_journal(KeyConvertF) ->
|
||||
RootPath = "../test/journal",
|
||||
clean_testdir(RootPath),
|
||||
JournalFP = filepath(RootPath, journal_dir),
|
||||
|
@ -780,8 +767,8 @@ build_dummy_journal() ->
|
|||
ok = filelib:ensure_dir(ManifestFP),
|
||||
F1 = filename:join(JournalFP, "nursery_1.pnd"),
|
||||
{ok, J1} = leveled_cdb:cdb_open_writer(F1),
|
||||
{K1, V1} = {"Key1", "TestValue1"},
|
||||
{K2, V2} = {"Key2", "TestValue2"},
|
||||
{K1, V1} = {KeyConvertF("Key1"), "TestValue1"},
|
||||
{K2, V2} = {KeyConvertF("Key2"), "TestValue2"},
|
||||
ok = leveled_cdb:cdb_put(J1, {1, stnd, K1}, term_to_binary({V1, []})),
|
||||
ok = leveled_cdb:cdb_put(J1, {2, stnd, K2}, term_to_binary({V2, []})),
|
||||
ok = leveled_cdb:cdb_roll(J1),
|
||||
|
@ -789,8 +776,8 @@ build_dummy_journal() ->
|
|||
ok = leveled_cdb:cdb_close(J1),
|
||||
F2 = filename:join(JournalFP, "nursery_3.pnd"),
|
||||
{ok, J2} = leveled_cdb:cdb_open_writer(F2),
|
||||
{K1, V3} = {"Key1", "TestValue3"},
|
||||
{K4, V4} = {"Key4", "TestValue4"},
|
||||
{K1, V3} = {KeyConvertF("Key1"), "TestValue3"},
|
||||
{K4, V4} = {KeyConvertF("Key4"), "TestValue4"},
|
||||
ok = leveled_cdb:cdb_put(J2, {3, stnd, K1}, term_to_binary({V3, []})),
|
||||
ok = leveled_cdb:cdb_put(J2, {4, stnd, K4}, term_to_binary({V4, []})),
|
||||
ok = leveled_cdb:cdb_close(J2),
|
||||
|
@ -854,29 +841,37 @@ simple_inker_completeactivejournal_test() ->
|
|||
ink_close(Ink1),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
||||
test_ledgerkey(Key) ->
|
||||
{o, "Bucket", Key, null}.
|
||||
|
||||
compact_journal_test() ->
|
||||
RootPath = "../test/journal",
|
||||
build_dummy_journal(),
|
||||
build_dummy_journal(fun test_ledgerkey/1),
|
||||
CDBopts = #cdb_options{max_size=300000},
|
||||
RStrategy = [{?STD_TAG, recovr}],
|
||||
{ok, Ink1} = ink_start(#inker_options{root_path=RootPath,
|
||||
cdb_options=CDBopts}),
|
||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []),
|
||||
cdb_options=CDBopts,
|
||||
reload_strategy=RStrategy}),
|
||||
{ok, NewSQN1, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyAA"),
|
||||
"TestValueAA", []),
|
||||
?assertMatch(NewSQN1, 5),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
R0 = ink_get(Ink1, "KeyAA", 5),
|
||||
?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}),
|
||||
R0 = ink_get(Ink1, test_ledgerkey("KeyAA"), 5),
|
||||
?assertMatch(R0, {{5, test_ledgerkey("KeyAA")}, {"TestValueAA", []}}),
|
||||
FunnyLoop = lists:seq(1, 48),
|
||||
Checker = lists:map(fun(X) ->
|
||||
PK = "KeyZ" ++ integer_to_list(X),
|
||||
{ok, SQN, _} = ink_put(Ink1,
|
||||
PK,
|
||||
test_ledgerkey(PK),
|
||||
crypto:rand_bytes(10000),
|
||||
[]),
|
||||
{SQN, PK}
|
||||
{SQN, test_ledgerkey(PK)}
|
||||
end,
|
||||
FunnyLoop),
|
||||
{ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []),
|
||||
{ok, NewSQN2, _ObjSize} = ink_put(Ink1,
|
||||
test_ledgerkey("KeyBB"),
|
||||
"TestValueBB", []),
|
||||
?assertMatch(NewSQN2, 54),
|
||||
ActualManifest = ink_getmanifest(Ink1),
|
||||
ok = ink_print_manifest(Ink1),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue