Add initial system test
Add some initial system tests. This highlighted issues: - That files deleted by compaction would be left orphaned and not close, and would not in fact delete (now deleted by closure only) - There was an issue on stratup that the first few keys in each journal would not be re-loaded into the ledger
This commit is contained in:
parent
15f57a0b4a
commit
507428bd0b
9 changed files with 339 additions and 275 deletions
|
@ -42,6 +42,7 @@
|
|||
-record(bookie_options,
|
||||
{root_path :: string(),
|
||||
cache_size :: integer(),
|
||||
max_journalsize :: integer(),
|
||||
metadata_extractor :: function(),
|
||||
indexspec_converter :: function()}).
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
%% and frequent use of iterators)
|
||||
%% - The Journal is an extended nursery log in leveldb terms. It is keyed
|
||||
%% on the sequence number of the write
|
||||
%% - The ledger is a LSM tree, where the key is the actaul object key, and
|
||||
%% - The ledger is a merge tree, where the key is the actaul object key, and
|
||||
%% the value is the metadata of the object including the sequence number
|
||||
%%
|
||||
%%
|
||||
|
@ -140,6 +140,7 @@
|
|||
book_riakhead/3,
|
||||
book_snapshotstore/3,
|
||||
book_snapshotledger/3,
|
||||
book_compactjournal/2,
|
||||
book_close/1,
|
||||
strip_to_keyonly/1,
|
||||
strip_to_keyseqonly/1,
|
||||
|
@ -152,6 +153,8 @@
|
|||
-define(CACHE_SIZE, 1000).
|
||||
-define(JOURNAL_FP, "journal").
|
||||
-define(LEDGER_FP, "ledger").
|
||||
-define(SHUTDOWN_WAITS, 60).
|
||||
-define(SHUTDOWN_PAUSE, 10000).
|
||||
|
||||
-record(state, {inker :: pid(),
|
||||
penciller :: pid(),
|
||||
|
@ -188,6 +191,9 @@ book_snapshotstore(Pid, Requestor, Timeout) ->
|
|||
book_snapshotledger(Pid, Requestor, Timeout) ->
|
||||
gen_server:call(Pid, {snapshot, Requestor, ledger, Timeout}, infinity).
|
||||
|
||||
book_compactjournal(Pid, Timeout) ->
|
||||
gen_server:call(Pid, {compact_journal, Timeout}, infinity).
|
||||
|
||||
book_close(Pid) ->
|
||||
gen_server:call(Pid, close, infinity).
|
||||
|
||||
|
@ -289,6 +295,11 @@ handle_call({snapshot, Requestor, SnapType, _Timeout}, _From, State) ->
|
|||
ledger ->
|
||||
{reply, {ok, LedgerSnapshot, null}, State}
|
||||
end;
|
||||
handle_call({compact_journal, Timeout}, _From, State) ->
|
||||
ok = leveled_inker:ink_compactjournal(State#state.inker,
|
||||
State#state.penciller,
|
||||
Timeout),
|
||||
{reply, ok, State};
|
||||
handle_call(close, _From, State) ->
|
||||
{stop, normal, ok, State}.
|
||||
|
||||
|
@ -300,7 +311,16 @@ handle_info(_Info, State) ->
|
|||
|
||||
terminate(Reason, State) ->
|
||||
io:format("Bookie closing for reason ~w~n", [Reason]),
|
||||
ok = leveled_inker:ink_close(State#state.inker),
|
||||
WaitList = lists:duplicate(?SHUTDOWN_WAITS, ?SHUTDOWN_PAUSE),
|
||||
ok = case shutdown_wait(WaitList, State#state.inker) of
|
||||
false ->
|
||||
io:format("Forcing close of inker following wait of "
|
||||
++ "~w milliseconds~n",
|
||||
[lists:sum(WaitList)]),
|
||||
leveled_inker:ink_forceclose(State#state.inker);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
ok = leveled_penciller:pcl_close(State#state.penciller).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -311,11 +331,30 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%============================================================================
|
||||
|
||||
shutdown_wait([], _Inker) ->
|
||||
false;
|
||||
shutdown_wait([TopPause|Rest], Inker) ->
|
||||
case leveled_inker:ink_close(Inker) of
|
||||
ok ->
|
||||
true;
|
||||
pause ->
|
||||
io:format("Inker shutdown stil waiting process to complete~n"),
|
||||
ok = timer:sleep(TopPause),
|
||||
shutdown_wait(Rest, Inker)
|
||||
end.
|
||||
|
||||
|
||||
set_options(Opts) ->
|
||||
%% TODO: Change the max size default, and allow setting through options
|
||||
MaxJournalSize = case Opts#bookie_options.max_journalsize of
|
||||
undefined ->
|
||||
30000;
|
||||
MS ->
|
||||
MS
|
||||
end,
|
||||
{#inker_options{root_path = Opts#bookie_options.root_path ++
|
||||
"/" ++ ?JOURNAL_FP,
|
||||
cdb_options = #cdb_options{max_size=30000}},
|
||||
cdb_options = #cdb_options{max_size=MaxJournalSize}},
|
||||
#penciller_options{root_path=Opts#bookie_options.root_path ++
|
||||
"/" ++ ?LEDGER_FP}}.
|
||||
|
||||
|
@ -442,10 +481,10 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) ->
|
|||
load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
||||
{MinSQN, MaxSQN, Output} = Acc0,
|
||||
{SQN, PK} = KeyInLedger,
|
||||
io:format("Reloading changes with SQN=~w PK=~w~n", [SQN, PK]),
|
||||
{Obj, IndexSpecs} = binary_to_term(ExtractFun(ValueInLedger)),
|
||||
case SQN of
|
||||
SQN when SQN < MinSQN ->
|
||||
io:format("Skipping due to low SQN ~w~n", [SQN]),
|
||||
{loop, Acc0};
|
||||
SQN when SQN =< MaxSQN ->
|
||||
%% TODO - get correct size in a more efficient manner
|
||||
|
@ -454,6 +493,8 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0, ExtractFun) ->
|
|||
Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs),
|
||||
{loop, {MinSQN, MaxSQN, Output ++ Changes}};
|
||||
SQN when SQN > MaxSQN ->
|
||||
io:format("Skipping as exceeded MaxSQN ~w with SQN ~w~n",
|
||||
[MaxSQN, SQN]),
|
||||
{stop, Acc0}
|
||||
end.
|
||||
|
||||
|
|
|
@ -66,7 +66,8 @@
|
|||
cdb_scan/4,
|
||||
cdb_close/1,
|
||||
cdb_complete/1,
|
||||
cdb_destroy/1]).
|
||||
cdb_destroy/1,
|
||||
cdb_deletepending/1]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -84,7 +85,8 @@
|
|||
filename :: string(),
|
||||
handle :: file:fd(),
|
||||
writer :: boolean(),
|
||||
max_size :: integer()}).
|
||||
max_size :: integer(),
|
||||
pending_delete = false :: boolean()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -138,6 +140,9 @@ cdb_complete(Pid) ->
|
|||
cdb_destroy(Pid) ->
|
||||
gen_server:cast(Pid, destroy).
|
||||
|
||||
cdb_deletepending(Pid) ->
|
||||
gen_server:cast(Pid, delete_pending).
|
||||
|
||||
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
|
||||
%% continue from that point (calling function has to protect against) double
|
||||
%% counting.
|
||||
|
@ -355,6 +360,8 @@ handle_cast(destroy, State) ->
|
|||
ok = file:close(State#state.handle),
|
||||
ok = file:delete(State#state.filename),
|
||||
{noreply, State};
|
||||
handle_cast(delete_pending, State) ->
|
||||
{noreply, State#state{pending_delete = true}};
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
|
@ -362,11 +369,14 @@ handle_info(_Info, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, State) ->
|
||||
case State#state.handle of
|
||||
undefined ->
|
||||
case {State#state.handle, State#state.pending_delete} of
|
||||
{undefined, _} ->
|
||||
ok;
|
||||
Handle ->
|
||||
file:close(Handle)
|
||||
{Handle, false} ->
|
||||
file:close(Handle);
|
||||
{Handle, true} ->
|
||||
file:close(Handle),
|
||||
file:delete(State#state.filename)
|
||||
end.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
|
|
@ -80,7 +80,7 @@ init([IClerkOpts]) ->
|
|||
end.
|
||||
|
||||
handle_call(_Msg, _From, State) ->
|
||||
{reply, not_supprted, State}.
|
||||
{reply, not_supported, State}.
|
||||
|
||||
handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
||||
State) ->
|
||||
|
@ -111,11 +111,20 @@ handle_cast({compact, Checker, InitiateFun, FilterFun, Inker, _Timeout},
|
|||
BestRun),
|
||||
ok = leveled_inker:ink_updatemanifest(Inker,
|
||||
ManifestSlice,
|
||||
PromptDelete,
|
||||
FilesToDelete),
|
||||
{noreply, State};
|
||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||
case PromptDelete of
|
||||
true ->
|
||||
lists:foreach(fun({_SQN, _FN, J2D}) ->
|
||||
leveled_cdb:cdb_deletepending(J2D) end,
|
||||
FilesToDelete),
|
||||
{noreply, State};
|
||||
false ->
|
||||
{noreply, State}
|
||||
end;
|
||||
Score ->
|
||||
io:format("No compaction run as highest score=~w~n", [Score]),
|
||||
ok = leveled_inker:ink_compactioncomplete(Inker),
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_cast({remove, _Removals}, State) ->
|
||||
|
@ -361,6 +370,9 @@ write_values([KVC|Rest], CDBopts, Journal0, ManSlice0) ->
|
|||
FN = leveled_inker:filepath(FP,
|
||||
SQN,
|
||||
compact_journal),
|
||||
io:format("Generate journal for compaction"
|
||||
++ " with filename ~s~n",
|
||||
[FN]),
|
||||
leveled_cdb:cdb_open_writer(FN,
|
||||
CDBopts);
|
||||
_ ->
|
||||
|
|
|
@ -105,10 +105,12 @@
|
|||
ink_loadpcl/4,
|
||||
ink_registersnapshot/2,
|
||||
ink_compactjournal/3,
|
||||
ink_compactioncomplete/1,
|
||||
ink_getmanifest/1,
|
||||
ink_updatemanifest/4,
|
||||
ink_updatemanifest/3,
|
||||
ink_print_manifest/1,
|
||||
ink_close/1,
|
||||
ink_forceclose/1,
|
||||
build_dummy_journal/0,
|
||||
simple_manifest_reader/2,
|
||||
clean_testdir/1,
|
||||
|
@ -135,7 +137,9 @@
|
|||
registered_snapshots = [] :: list(),
|
||||
root_path :: string(),
|
||||
cdb_options :: #cdb_options{},
|
||||
clerk :: pid()}).
|
||||
clerk :: pid(),
|
||||
compaction_pending = false :: boolean(),
|
||||
is_snapshot = false :: boolean()}).
|
||||
|
||||
|
||||
%%%============================================================================
|
||||
|
@ -158,7 +162,10 @@ ink_registersnapshot(Pid, Requestor) ->
|
|||
gen_server:call(Pid, {snapshot, Requestor}, infinity).
|
||||
|
||||
ink_close(Pid) ->
|
||||
gen_server:call(Pid, close, infinity).
|
||||
gen_server:call(Pid, {close, false}, infinity).
|
||||
|
||||
ink_forceclose(Pid) ->
|
||||
gen_server:call(Pid, {close, true}, infinity).
|
||||
|
||||
ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) ->
|
||||
gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity).
|
||||
|
@ -172,7 +179,7 @@ ink_compactjournal(Pid, Penciller, Timeout) ->
|
|||
CheckerInitiateFun,
|
||||
CheckerFilterFun,
|
||||
Timeout},
|
||||
infiniy).
|
||||
infinity).
|
||||
|
||||
%% Allows the Checker to be overriden in test, use something other than a
|
||||
%% penciller
|
||||
|
@ -185,14 +192,16 @@ ink_compactjournal(Pid, Checker, InitiateFun, FilterFun, Timeout) ->
|
|||
Timeout},
|
||||
infinity).
|
||||
|
||||
ink_compactioncomplete(Pid) ->
|
||||
gen_server:call(Pid, compaction_complete, infinity).
|
||||
|
||||
ink_getmanifest(Pid) ->
|
||||
gen_server:call(Pid, get_manifest, infinity).
|
||||
|
||||
ink_updatemanifest(Pid, ManifestSnippet, PromptDeletion, DeletedFiles) ->
|
||||
ink_updatemanifest(Pid, ManifestSnippet, DeletedFiles) ->
|
||||
gen_server:call(Pid,
|
||||
{update_manifest,
|
||||
ManifestSnippet,
|
||||
PromptDeletion,
|
||||
DeletedFiles},
|
||||
infinity).
|
||||
|
||||
|
@ -213,7 +222,8 @@ init([InkerOpts]) ->
|
|||
{ActiveJournalDB,
|
||||
Manifest}} = ink_registersnapshot(SrcInker, Requestor),
|
||||
{ok, #state{manifest=Manifest,
|
||||
active_journaldb=ActiveJournalDB}};
|
||||
active_journaldb=ActiveJournalDB,
|
||||
is_snapshot=true}};
|
||||
%% Need to do something about timeout
|
||||
{_RootPath, false} ->
|
||||
start_from_file(InkerOpts)
|
||||
|
@ -272,7 +282,6 @@ handle_call(get_manifest, _From, State) ->
|
|||
{reply, State#state.manifest, State};
|
||||
handle_call({update_manifest,
|
||||
ManifestSnippet,
|
||||
PromptDeletion,
|
||||
DeletedFiles}, _From, State) ->
|
||||
Man0 = lists:foldl(fun(ManEntry, AccMan) ->
|
||||
Check = lists:member(ManEntry, DeletedFiles),
|
||||
|
@ -291,13 +300,7 @@ handle_call({update_manifest,
|
|||
ManifestSnippet),
|
||||
NewManifestSQN = State#state.manifest_sqn + 1,
|
||||
ok = simple_manifest_writer(Man1, NewManifestSQN, State#state.root_path),
|
||||
PendingRemovals = case PromptDeletion of
|
||||
true ->
|
||||
State#state.pending_removals ++
|
||||
[{NewManifestSQN, DeletedFiles}];
|
||||
_ ->
|
||||
State#state.pending_removals
|
||||
end,
|
||||
PendingRemovals = [{NewManifestSQN, DeletedFiles}],
|
||||
{reply, ok, State#state{manifest=Man1,
|
||||
manifest_sqn=NewManifestSQN,
|
||||
pending_removals=PendingRemovals}};
|
||||
|
@ -316,9 +319,16 @@ handle_call({compact,
|
|||
FilterFun,
|
||||
self(),
|
||||
Timeout),
|
||||
{reply, ok, State};
|
||||
handle_call(close, _From, State) ->
|
||||
{stop, normal, ok, State};
|
||||
{reply, ok, State#state{compaction_pending=true}};
|
||||
handle_call(compaction_complete, _From, State) ->
|
||||
{reply, ok, State#state{compaction_pending=false}};
|
||||
handle_call({close, Force}, _From, State) ->
|
||||
case {State#state.compaction_pending, Force} of
|
||||
{true, false} ->
|
||||
{reply, pause, State};
|
||||
_ ->
|
||||
{stop, normal, ok, State}
|
||||
end;
|
||||
handle_call(Msg, _From, State) ->
|
||||
io:format("Unexpected message ~w~n", [Msg]),
|
||||
{reply, error, State}.
|
||||
|
@ -330,12 +340,21 @@ handle_info(_Info, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
terminate(Reason, State) ->
|
||||
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
||||
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
||||
[State#state.journal_sqn, State#state.manifest_sqn]),
|
||||
io:format("Manifest when closing is: ~n"),
|
||||
manifest_printer(State#state.manifest),
|
||||
close_allmanifest(State#state.manifest, State#state.active_journaldb).
|
||||
case State#state.is_snapshot of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
io:format("Inker closing journal for reason ~w~n", [Reason]),
|
||||
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
|
||||
[State#state.journal_sqn, State#state.manifest_sqn]),
|
||||
io:format("Manifest when closing is: ~n"),
|
||||
leveled_iclerk:clerk_stop(State#state.clerk),
|
||||
lists:foreach(fun({Snap, _SQN}) -> ok = ink_close(Snap) end,
|
||||
State#state.registered_snapshots),
|
||||
manifest_printer(State#state.manifest),
|
||||
close_allmanifest(State#state.manifest, State#state.active_journaldb),
|
||||
close_allremovals(State#state.pending_removals)
|
||||
end.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
@ -597,6 +616,25 @@ close_allmanifest([H|ManifestT], ActiveJournal) ->
|
|||
ok = leveled_cdb:cdb_close(Pid),
|
||||
close_allmanifest(ManifestT, ActiveJournal).
|
||||
|
||||
close_allremovals([]) ->
|
||||
ok;
|
||||
close_allremovals([{ManifestSQN, Removals}|Tail]) ->
|
||||
io:format("Closing removals at ManifestSQN=~w~n", [ManifestSQN]),
|
||||
lists:foreach(fun({LowSQN, FN, Handle}) ->
|
||||
io:format("Closing removed file with LowSQN=~w" ++
|
||||
" and filename ~s~n",
|
||||
[LowSQN, FN]),
|
||||
if
|
||||
is_pid(Handle) == true ->
|
||||
ok = leveled_cdb:cdb_close(Handle);
|
||||
true ->
|
||||
io:format("Non pid in removal ~w - test~n",
|
||||
[Handle])
|
||||
end
|
||||
end,
|
||||
Removals),
|
||||
close_allremovals(Tail).
|
||||
|
||||
|
||||
roll_pending_journals([TopJournalSQN], Manifest, _RootPath)
|
||||
when is_integer(TopJournalSQN) ->
|
||||
|
@ -621,22 +659,22 @@ load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) ->
|
|||
load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, FN, Pid}|ManTail])
|
||||
when LowSQN >= MinSQN ->
|
||||
io:format("Loading from filename ~s from SQN ~w~n", [FN, MinSQN]),
|
||||
ok = load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FilterFun,
|
||||
Penciller,
|
||||
Pid,
|
||||
undefined),
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, ManTail);
|
||||
{ok, LastMinSQN} = load_between_sequence(MinSQN,
|
||||
MinSQN + ?LOADING_BATCH,
|
||||
FilterFun,
|
||||
Penciller,
|
||||
Pid,
|
||||
undefined),
|
||||
load_from_sequence(LastMinSQN, FilterFun, Penciller, ManTail);
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) ->
|
||||
load_from_sequence(MinSQN, FilterFun, Penciller, ManTail).
|
||||
|
||||
load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) ->
|
||||
InitAcc = {MinSQN, MaxSQN, []},
|
||||
case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of
|
||||
{eof, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||
{eof, {AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||
ok = push_to_penciller(Penciller, AccKL),
|
||||
ok;
|
||||
{ok, AccMinSQN};
|
||||
{LastPosition, {_AccMinSQN, _AccMaxSQN, AccKL}} ->
|
||||
ok = push_to_penciller(Penciller, AccKL),
|
||||
load_between_sequence(MaxSQN + 1,
|
||||
|
@ -748,7 +786,7 @@ initiate_penciller_snapshot(Penciller) ->
|
|||
PclOpts = #penciller_options{start_snapshot = true,
|
||||
source_penciller = Penciller,
|
||||
requestor = self()},
|
||||
FilterServer = leveled_penciller:pcl_start(PclOpts),
|
||||
{ok, FilterServer} = leveled_penciller:pcl_start(PclOpts),
|
||||
ok = leveled_penciller:pcl_loadsnapshot(FilterServer, []),
|
||||
FilterServer.
|
||||
|
||||
|
@ -793,6 +831,7 @@ clean_testdir(RootPath) ->
|
|||
clean_subdir(filepath(RootPath, manifest_dir)).
|
||||
|
||||
clean_subdir(DirPath) ->
|
||||
ok = filelib:ensure_dir(DirPath),
|
||||
{ok, Files} = file:list_dir(DirPath),
|
||||
lists:foreach(fun(FN) ->
|
||||
File = filename:join(DirPath, FN),
|
||||
|
@ -921,9 +960,9 @@ rollafile_simplejournal_test() ->
|
|||
?assertMatch(R2, {{54, "KeyBB"}, {"TestValueBB", []}}),
|
||||
Man = ink_getmanifest(Ink1),
|
||||
FakeMan = [{3, "test", dummy}, {1, "other_test", dummy}],
|
||||
ok = ink_updatemanifest(Ink1, FakeMan, true, Man),
|
||||
ok = ink_updatemanifest(Ink1, FakeMan, Man),
|
||||
?assertMatch(FakeMan, ink_getmanifest(Ink1)),
|
||||
ok = ink_updatemanifest(Ink1, Man, true, FakeMan),
|
||||
ok = ink_updatemanifest(Ink1, Man, FakeMan),
|
||||
?assertMatch({{5, "KeyAA"}, {"TestValueAA", []}},
|
||||
ink_get(Ink1, "KeyAA", 5)),
|
||||
?assertMatch({{54, "KeyBB"}, {"TestValueBB", []}},
|
||||
|
@ -964,7 +1003,6 @@ compact_journal_test() ->
|
|||
timer:sleep(1000),
|
||||
CompactedManifest = ink_getmanifest(Ink1),
|
||||
?assertMatch(1, length(CompactedManifest)),
|
||||
ink_updatemanifest(Ink1, ActualManifest, true, CompactedManifest),
|
||||
ink_close(Ink1),
|
||||
clean_testdir(RootPath).
|
||||
|
||||
|
|
|
@ -1,197 +0,0 @@
|
|||
-module(leveled_iterator).
|
||||
|
||||
-export([termiterator/3]).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
||||
%% Takes a list of terms to iterate - the terms being sorted in Erlang term
|
||||
%% order
|
||||
%%
|
||||
%% Helper Functions should have free functions -
|
||||
%% {FolderFun, CompareFun, PointerCheck, PointerFetch}
|
||||
%% FolderFun - function which takes the next item and the accumulator and
|
||||
%% returns an updated accumulator. Note FolderFun can only increase the
|
||||
%% accumulator by one entry each time
|
||||
%% CompareFun - function which should be able to compare two keys (which are
|
||||
%% not pointers), and return a winning item (or combination of items)
|
||||
%% PointerCheck - function for differentiating between keys and pointer
|
||||
%% PointerFetch - function that takes a pointer an EndKey (which may be
|
||||
%% infinite) and returns a ne wslice of ordered results from that pointer
|
||||
%%
|
||||
%% Range can be for the form
|
||||
%% {StartKey, EndKey, MaxKeys} where EndKey or MaxKeys can be infinite (but
|
||||
%% not both)
|
||||
|
||||
|
||||
termiterator(ListToIterate, HelperFuns, Range) ->
|
||||
case Range of
|
||||
{_, infinte, infinite} ->
|
||||
bad_iterator;
|
||||
_ ->
|
||||
termiterator(null, ListToIterate, [], HelperFuns, Range)
|
||||
end.
|
||||
|
||||
|
||||
termiterator(HeadItem, [], Acc, HelperFuns, _) ->
|
||||
case HeadItem of
|
||||
null ->
|
||||
Acc;
|
||||
_ ->
|
||||
{FolderFun, _, _, _} = HelperFuns,
|
||||
FolderFun(Acc, HeadItem)
|
||||
end;
|
||||
termiterator(null, [NextItem|TailList], Acc, HelperFuns, Range) ->
|
||||
%% Check that the NextItem is not a pointer before promoting to HeadItem
|
||||
%% Cannot now promote a HeadItem which is a pointer
|
||||
{_, _, PointerCheck, PointerFetch} = HelperFuns,
|
||||
case PointerCheck(NextItem) of
|
||||
{true, Pointer} ->
|
||||
{_, EndKey, _} = Range,
|
||||
NewSlice = PointerFetch(Pointer, EndKey),
|
||||
ExtendedList = lists:merge(NewSlice, TailList),
|
||||
termiterator(null, ExtendedList, Acc, HelperFuns, Range);
|
||||
false ->
|
||||
termiterator(NextItem, TailList, Acc, HelperFuns, Range)
|
||||
end;
|
||||
termiterator(HeadItem, [NextItem|TailList], Acc, HelperFuns, Range) ->
|
||||
{FolderFun, CompareFun, PointerCheck, PointerFetch} = HelperFuns,
|
||||
{_, EndKey, MaxItems} = Range,
|
||||
%% HeadItem cannot be pointer, but NextItem might be, so check before
|
||||
%% comparison
|
||||
case PointerCheck(NextItem) of
|
||||
{true, Pointer} ->
|
||||
NewSlice = PointerFetch(Pointer, EndKey),
|
||||
ExtendedList = lists:merge(NewSlice, [HeadItem|TailList]),
|
||||
termiterator(null, ExtendedList, Acc, HelperFuns, Range);
|
||||
false ->
|
||||
%% Compare to see if Head and Next match, or if Head is a winner
|
||||
%% to be added to accumulator
|
||||
case CompareFun(HeadItem, NextItem) of
|
||||
{match, StrongItem, _WeakItem} ->
|
||||
%% Discard WeakItem, Strong Item might be an aggregation of
|
||||
%% the items
|
||||
termiterator(StrongItem, TailList, Acc, HelperFuns, Range);
|
||||
{winner, HeadItem} ->
|
||||
%% Add next item to accumulator, and proceed with next item
|
||||
AccPlus = FolderFun(Acc, HeadItem),
|
||||
case length(AccPlus) of
|
||||
MaxItems ->
|
||||
AccPlus;
|
||||
_ ->
|
||||
termiterator(NextItem, TailList, AccPlus,
|
||||
HelperFuns,
|
||||
{HeadItem, EndKey, MaxItems})
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
%% Initial forms of keys supported are Index Keys and Object Keys
|
||||
%%
|
||||
%% All keys are of the form {Key, Value, SequenceNumber, State}
|
||||
%%
|
||||
%% The Key will be of the form:
|
||||
%% {o, Bucket, Key} - for an Object Key
|
||||
%% {i, Bucket, IndexName, IndexTerm, Key} - for an Index Key
|
||||
%%
|
||||
%% The value will be of the form:
|
||||
%% {o, ObjectHash, [vector-clocks]} - for an Object Key
|
||||
%% null - for an Index Key
|
||||
%%
|
||||
%% Sequence number is the sequence number the key was added, and the highest
|
||||
%% sequence number in the list of keys for an index key.
|
||||
%%
|
||||
%% State can be one of the following:
|
||||
%% live - an active key
|
||||
%% tomb - a tombstone key
|
||||
%% {timestamp, TS} - an active key to a certain timestamp
|
||||
%% {pointer, Pointer} - to be added by iterators to indicate further data
|
||||
%% available in the range from a particular source
|
||||
|
||||
|
||||
pointercheck_indexkey(IndexKey) ->
|
||||
case IndexKey of
|
||||
{_Key, _Values, _Sequence, {pointer, Pointer}} ->
|
||||
{true, Pointer};
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
folder_indexkey(Acc, IndexKey) ->
|
||||
case IndexKey of
|
||||
{_Key, _Value, _Sequence, tomb} ->
|
||||
Acc;
|
||||
{Key, _Value, _Sequence, live} ->
|
||||
{i, _, _, _, ObjectKey} = Key,
|
||||
lists:append(Acc, [ObjectKey])
|
||||
end.
|
||||
|
||||
compare_indexkey(IndexKey1, IndexKey2) ->
|
||||
{{i, Bucket1, Index1, Term1, Key1}, _Val1, Sequence1, _St1} = IndexKey1,
|
||||
{{i, Bucket2, Index2, Term2, Key2}, _Val2, Sequence2, _St2} = IndexKey2,
|
||||
case {Bucket1, Index1, Term1, Key1} of
|
||||
{Bucket2, Index2, Term2, Key2} when Sequence1 >= Sequence2 ->
|
||||
{match, IndexKey1, IndexKey2};
|
||||
{Bucket2, Index2, Term2, Key2} ->
|
||||
{match, IndexKey2, IndexKey1};
|
||||
_ when IndexKey2 >= IndexKey1 ->
|
||||
{winner, IndexKey1};
|
||||
_ ->
|
||||
{winner, IndexKey2}
|
||||
end.
|
||||
|
||||
|
||||
|
||||
%% Unit testsß
|
||||
|
||||
getnextslice(Pointer, _EndKey) ->
|
||||
case Pointer of
|
||||
{test, NewList} ->
|
||||
NewList;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
|
||||
iterateoverindexkeyswithnopointer_test() ->
|
||||
Key1 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"},
|
||||
null, 1, live},
|
||||
Key2 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"},
|
||||
null, 2, tomb},
|
||||
Key3 = {{i, "pdsRecord", "familyName_bin", "1971SMITH", "10002"},
|
||||
null, 2, live},
|
||||
Key4 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10003"},
|
||||
null, 2, live},
|
||||
KeyList = lists:sort([Key1, Key2, Key3, Key4]),
|
||||
HelperFuns = {fun folder_indexkey/2, fun compare_indexkey/2,
|
||||
fun pointercheck_indexkey/1, fun getnextslice/2},
|
||||
?assertMatch(["10002", "10003"],
|
||||
termiterator(KeyList, HelperFuns, {"1971", "1973", infinite})).
|
||||
|
||||
iterateoverindexkeyswithpointer_test() ->
|
||||
Key1 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"},
|
||||
null, 1, live},
|
||||
Key2 = {{i, "pdsRecord", "familyName_bin", "1972SMITH", "10001"},
|
||||
null, 2, tomb},
|
||||
Key3 = {{i, "pdsRecord", "familyName_bin", "1971SMITH", "10002"},
|
||||
null, 2, live},
|
||||
Key4 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10003"},
|
||||
null, 2, live},
|
||||
Key5 = {{i, "pdsRecord", "familyName_bin", "1972ZAFRIDI", "10004"},
|
||||
null, 2, live},
|
||||
Key6 = {{i, "pdsRecord", "familyName_bin", "1972JONES", "10004"},
|
||||
null, 0, {pointer, {test, [Key5]}}},
|
||||
KeyList = lists:sort([Key1, Key2, Key3, Key4, Key6]),
|
||||
HelperFuns = {fun folder_indexkey/2, fun compare_indexkey/2,
|
||||
fun pointercheck_indexkey/1, fun getnextslice/2},
|
||||
?assertMatch(["10002", "10003", "10004"],
|
||||
termiterator(KeyList, HelperFuns, {"1971", "1973", infinite})),
|
||||
?assertMatch(["10002", "10003"],
|
||||
termiterator(KeyList, HelperFuns, {"1971", "1973", 2})).
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -7,11 +7,16 @@
|
|||
%% Ledger.
|
||||
%% - The Penciller provides re-write (compaction) work up to be managed by
|
||||
%% the Penciller's Clerk
|
||||
%% - The Penciller mainatins a register of iterators who have requested
|
||||
%% - The Penciller maintains a register of iterators who have requested
|
||||
%% snapshots of the Ledger
|
||||
%% - The accepts new dumps (in the form of lists of keys) from the Bookie, and
|
||||
%% calls the Bookie once the process of pencilling this data in the Ledger is
|
||||
%% complete - and the Bookie is free to forget about the data
|
||||
%% - The Penciller's persistence of the ledger may not be reliable, in that it
|
||||
%% may lose data but only in sequence from a particular sequence number. On
|
||||
%% startup the Penciller will inform the Bookie of the highest sequence number
|
||||
%% it has, and the Bookie should load any missing data from that point out of
|
||||
%5 the journal.
|
||||
%%
|
||||
%% -------- LEDGER ---------
|
||||
%%
|
||||
|
@ -78,18 +83,21 @@
|
|||
%%
|
||||
%% ---------- SNAPSHOT ----------
|
||||
%%
|
||||
%% Iterators may request a snapshot of the database. To provide a snapshot
|
||||
%% the Penciller must snapshot the ETS table, and then send this with a copy
|
||||
%% of the manifest.
|
||||
%% Iterators may request a snapshot of the database. A snapshot is a cloned
|
||||
%% Penciller seeded not from disk, but by the in-memory ETS table and the
|
||||
%% in-memory manifest.
|
||||
|
||||
%% To provide a snapshot the Penciller must snapshot the ETS table. The
|
||||
%% snapshot of the ETS table is managed by the Penciller storing a list of the
|
||||
%% batches of Keys which have been pushed to the Penciller, and it is expected
|
||||
%% that this will be converted by the clone into a gb_tree. The clone may
|
||||
%% then update the master Penciller with the gb_tree to be cached and used by
|
||||
%% other cloned processes.
|
||||
%%
|
||||
%% Iterators requesting snapshots are registered by the Penciller, so that SFT
|
||||
%% files valid at the point of the snapshot until either the iterator is
|
||||
%% Clones formed to support snapshots are registered by the Penciller, so that
|
||||
%% SFT files valid at the point of the snapshot until either the iterator is
|
||||
%% completed or has timed out.
|
||||
%%
|
||||
%% Snapshot requests may request a filtered view of the ETS table (whihc may
|
||||
%% be quicker than requesting the full table), or requets a snapshot of only
|
||||
%% the persisted part of the Ledger
|
||||
%%
|
||||
%% ---------- ON STARTUP ----------
|
||||
%%
|
||||
%% On Startup the Bookie with ask the Penciller to initiate the Ledger first.
|
||||
|
@ -105,15 +113,14 @@
|
|||
%% ---------- ON SHUTDOWN ----------
|
||||
%%
|
||||
%% On a controlled shutdown the Penciller should attempt to write any in-memory
|
||||
%% ETS table to disk into the special ..on_shutdown folder
|
||||
%% ETS table to a L0 SFT file, assuming one is nto already pending. If one is
|
||||
%% already pending then the Penciller will not persist this part of the Ledger.
|
||||
%%
|
||||
%% ---------- FOLDER STRUCTURE ----------
|
||||
%%
|
||||
%% The following folders are used by the Penciller
|
||||
%% $ROOT/ledger_manifest/ - used for keeping manifest files
|
||||
%% $ROOT/ledger_onshutdown/ - containing the persisted view of the ETS table
|
||||
%% written on controlled shutdown
|
||||
%% $ROOT/ledger_files/ - containing individual SFT files
|
||||
%% $ROOT/ledger/ledger_manifest/ - used for keeping manifest files
|
||||
%% $ROOT/ledger/ledger_files/ - containing individual SFT files
|
||||
%%
|
||||
%% In larger stores there could be a large number of files in the ledger_file
|
||||
%% folder - perhaps o(1000). It is assumed that modern file systems should
|
||||
|
|
|
@ -408,19 +408,7 @@ create_levelzero(Inp1, Filename) ->
|
|||
false ->
|
||||
ets:tab2list(Inp1)
|
||||
end,
|
||||
Ext = filename:extension(Filename),
|
||||
Components = filename:split(Filename),
|
||||
{TmpFilename, PrmFilename} = case Ext of
|
||||
[] ->
|
||||
{filename:join(Components) ++ ".pnd",
|
||||
filename:join(Components) ++ ".sft"};
|
||||
Ext ->
|
||||
%% This seems unnecessarily hard
|
||||
DN = filename:dirname(Filename),
|
||||
FP = lists:last(Components),
|
||||
FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)),
|
||||
{DN ++ "/" ++ FP_NOEXT ++ ".pnd", DN ++ "/" ++ FP_NOEXT ++ ".sft"}
|
||||
end,
|
||||
{TmpFilename, PrmFilename} = generate_filenames(Filename),
|
||||
case create_file(TmpFilename) of
|
||||
{error, Reason} ->
|
||||
{error,
|
||||
|
@ -442,6 +430,23 @@ create_levelzero(Inp1, Filename) ->
|
|||
oversized_file=InputSize>?MAX_KEYS}}
|
||||
end.
|
||||
|
||||
|
||||
generate_filenames(RootFilename) ->
|
||||
Ext = filename:extension(RootFilename),
|
||||
Components = filename:split(RootFilename),
|
||||
case Ext of
|
||||
[] ->
|
||||
{filename:join(Components) ++ ".pnd",
|
||||
filename:join(Components) ++ ".sft"};
|
||||
Ext ->
|
||||
%% This seems unnecessarily hard
|
||||
DN = filename:dirname(RootFilename),
|
||||
FP = lists:last(Components),
|
||||
FP_NOEXT = lists:sublist(FP, 1, 1 + length(FP) - length(Ext)),
|
||||
{DN ++ "/" ++ FP_NOEXT ++ "pnd", DN ++ "/" ++ FP_NOEXT ++ "sft"}
|
||||
end.
|
||||
|
||||
|
||||
%% Start a bare file with an initial header and no further details
|
||||
%% Return the {Handle, metadata record}
|
||||
create_file(FileName) when is_list(FileName) ->
|
||||
|
@ -1762,4 +1767,16 @@ big_iterator_test() ->
|
|||
ok = file:close(Handle),
|
||||
ok = file:delete(Filename).
|
||||
|
||||
filename_test() ->
|
||||
FN1 = "../tmp/filename",
|
||||
FN2 = "../tmp/filename.pnd",
|
||||
FN3 = "../tmp/subdir/file_name.pend",
|
||||
?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"},
|
||||
generate_filenames(FN1)),
|
||||
?assertMatch({"../tmp/filename.pnd", "../tmp/filename.sft"},
|
||||
generate_filenames(FN2)),
|
||||
?assertMatch({"../tmp/subdir/file_name.pnd",
|
||||
"../tmp/subdir/file_name.sft"},
|
||||
generate_filenames(FN3)).
|
||||
|
||||
-endif.
|
135
test/end_to_end/basic_SUITE.erl
Normal file
135
test/end_to_end/basic_SUITE.erl
Normal file
|
@ -0,0 +1,135 @@
|
|||
-module(basic_SUITE).
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include("../include/leveled.hrl").
|
||||
-export([all/0]).
|
||||
-export([simple_put_fetch/1,
|
||||
journal_compaction/1]).
|
||||
|
||||
all() -> [journal_compaction, simple_put_fetch].
|
||||
|
||||
simple_put_fetch(_Config) ->
|
||||
RootPath = reset_filestructure(),
|
||||
StartOpts1 = #bookie_options{root_path=RootPath},
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
{TestObject, TestSpec} = generate_testobject(),
|
||||
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie1,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
StartOpts2 = #bookie_options{root_path=RootPath,
|
||||
max_journalsize=3000000},
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts2),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie2,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ObjList1 = generate_multiple_objects(5000, 2),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie2, Obj, Spc) end,
|
||||
ObjList1),
|
||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie2,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList1),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie2,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
reset_filestructure().
|
||||
|
||||
journal_compaction(_Config) ->
|
||||
RootPath = reset_filestructure(),
|
||||
StartOpts1 = #bookie_options{root_path=RootPath,
|
||||
max_journalsize=4000000},
|
||||
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
|
||||
{TestObject, TestSpec} = generate_testobject(),
|
||||
ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie1,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
ObjList1 = generate_multiple_objects(5000, 2),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
|
||||
ObjList1),
|
||||
ChkList1 = lists:sublist(lists:sort(ObjList1), 100),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie1,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList1),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie1,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
%% Now replace all the objects
|
||||
ObjList2 = generate_multiple_objects(5000, 2),
|
||||
lists:foreach(fun({_RN, Obj, Spc}) ->
|
||||
leveled_bookie:book_riakput(Bookie1, Obj, Spc) end,
|
||||
ObjList2),
|
||||
ok = leveled_bookie:book_compactjournal(Bookie1, 30000),
|
||||
ChkList3 = lists:sublist(lists:sort(ObjList2), 500),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie1,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList3),
|
||||
ok = leveled_bookie:book_close(Bookie1),
|
||||
% Restart
|
||||
{ok, Bookie2} = leveled_bookie:book_start(StartOpts1),
|
||||
{ok, TestObject} = leveled_bookie:book_riakget(Bookie2,
|
||||
TestObject#r_object.bucket,
|
||||
TestObject#r_object.key),
|
||||
lists:foreach(fun({_RN, Obj, _Spc}) ->
|
||||
R = leveled_bookie:book_riakget(Bookie2,
|
||||
Obj#r_object.bucket,
|
||||
Obj#r_object.key),
|
||||
R = {ok, Obj} end,
|
||||
ChkList3),
|
||||
ok = leveled_bookie:book_close(Bookie2),
|
||||
reset_filestructure().
|
||||
|
||||
|
||||
reset_filestructure() ->
|
||||
RootPath = "test",
|
||||
filelib:ensure_dir(RootPath ++ "/journal/"),
|
||||
filelib:ensure_dir(RootPath ++ "/ledger/"),
|
||||
leveled_inker:clean_testdir(RootPath ++ "/journal"),
|
||||
leveled_penciller:clean_testdir(RootPath ++ "/ledger"),
|
||||
RootPath.
|
||||
|
||||
generate_testobject() ->
|
||||
{B1, K1, V1, Spec1, MD} = {"Bucket1",
|
||||
"Key1",
|
||||
"Value1",
|
||||
[],
|
||||
{"MDK1", "MDV1"}},
|
||||
Content = #r_content{metadata=MD, value=V1},
|
||||
{#r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
||||
Spec1}.
|
||||
|
||||
generate_multiple_objects(Count, KeyNumber) ->
|
||||
generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(4096)).
|
||||
|
||||
generate_multiple_objects(0, _KeyNumber, ObjL, _Value) ->
|
||||
ObjL;
|
||||
generate_multiple_objects(Count, KeyNumber, ObjL, Value) ->
|
||||
Obj = {"Bucket",
|
||||
"Key" ++ integer_to_list(KeyNumber),
|
||||
Value,
|
||||
[],
|
||||
[{"MDK", "MDV" ++ integer_to_list(KeyNumber)},
|
||||
{"MDK2", "MDV" ++ integer_to_list(KeyNumber)}]},
|
||||
{B1, K1, V1, Spec1, MD} = Obj,
|
||||
Content = #r_content{metadata=MD, value=V1},
|
||||
Obj1 = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]},
|
||||
generate_multiple_objects(Count - 1,
|
||||
KeyNumber + 1,
|
||||
ObjL ++ [{random:uniform(), Obj1, Spec1}],
|
||||
Value).
|
||||
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue