Scan over CDB file

Make scanning over a CDB file generic rather than specific to read-in of
active nursery log - open to be called as an external function to
support other scanning behaviour.
This commit is contained in:
martinsumner 2016-09-09 15:58:19 +01:00
parent edfe9e3bed
commit 86666b1cb6
4 changed files with 108 additions and 23 deletions

View file

@ -53,9 +53,9 @@
%% - close the previously active journal file (writing the hashtree), and move
%% it to the historic journal
%%
%% Once the object has been persisted to the Journal, the Key and Metadata can
%% be added to the ledger. Initially this will be added to the Bookie's
%% in-memory view of recent changes only.
%% Once the object has been persisted to the Journal, the Key with Metadata
%% and the keychanges can be added to the ledger. Initially this will be
%% added to the Bookie'sin-memory view of recent changes only.
%%
%% The Bookie's memory consists of an in-memory ets table. Periodically, the
%% current table is pushed to the Penciller for eventual persistence, and a
@ -107,6 +107,16 @@
-record(state, {inker :: pid(),
penciller :: pid()}).
-record(item, {primary_key :: term(),
contents :: list(),
metadatas :: list(),
vclock,
hash :: integer(),
size :: integer(),
key_changes :: list()})
%%%============================================================================
%%% API

View file

@ -63,6 +63,7 @@
cdb_lastkey/1,
cdb_filename/1,
cdb_keycheck/2,
cdb_scan/4,
cdb_close/1,
cdb_complete/1]).
@ -123,6 +124,11 @@ cdb_close(Pid) ->
cdb_complete(Pid) ->
gen_server:call(Pid, cdb_complete, infinity).
cdb_scan(Pid, StartPosition, FilterFun, InitAcc) ->
gen_server:call(Pid,
{cdb_scan, StartPosition, FilterFun, InitAcc},
infinity).
%% Get the last key to be added to the file (which will have the highest
%% sequence number)
cdb_lastkey(Pid) ->
@ -232,6 +238,12 @@ handle_call(cdb_lastkey, _From, State) ->
{reply, State#state.last_key, State};
handle_call(cdb_filename, _From, State) ->
{reply, State#state.filename, State};
handle_call({cdb_scan, StartPos, FilterFun, Acc}, _From, State) ->
{LastPosition, Acc2} = scan_over_file(State#state.handle,
StartPos,
FilterFun,
Acc),
{reply, {LastPosition, Acc2}, State};
handle_call(cdb_close, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State#state{handle=closed}};
@ -346,7 +358,8 @@ dump(FileName, CRCCheck) ->
open_active_file(FileName) when is_list(FileName) ->
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
{ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}),
{LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position),
{LastPosition, {HashTree, LastKey}} = startup_scan_over_file(Handle,
Position),
case file:position(Handle, eof) of
{ok, LastPosition} ->
ok = file:close(Handle);
@ -617,32 +630,43 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) ->
%% Scan through the file until there is a failure to crc check an input, and
%% at that point return the position and the key dictionary scanned so far
scan_over_file(Handle, Position) ->
startup_scan_over_file(Handle, Position) ->
HashTree = array:new(256, {default, gb_trees:empty()}),
scan_over_file(Handle, Position, HashTree, empty).
scan_over_file(Handle, Position, fun startup_filter/4, {HashTree, empty}).
scan_over_file(Handle, Position, HashTree, LastKey) ->
%% Scan for key changes - scan over file returning applying FilterFun
%% The FilterFun should accept as input:
%% - Key, Value, Position, Accumulator, outputting a new Accumulator
%% and a loop|stop instruction as a tuple i.e. {loop, Acc} or {stop, Acc}
scan_over_file(Handle, Position, FilterFun, Output) ->
case saferead_keyvalue(Handle) of
false ->
{Position, HashTree, LastKey};
{Position, Output};
{Key, ValueAsBin, KeyLength, ValueLength} ->
case crccheck_value(ValueAsBin) of
true ->
NewPosition = Position + KeyLength + ValueLength
+ ?DWORD_SIZE,
scan_over_file(Handle,
NewPosition,
put_hashtree(Key, Position, HashTree),
Key);
false ->
io:format("CRC check returned false on key of ~w ~n",
[Key]),
{Position, HashTree, LastKey}
case FilterFun(Key, ValueAsBin, Position, Output) of
{stop, UpdOutput} ->
{Position, UpdOutput};
{loop, UpdOutput} ->
scan_over_file(Handle, NewPosition, FilterFun, UpdOutput)
end;
eof ->
{Position, HashTree, LastKey}
{Position, Output}
end.
%% Specific filter to be used at startup to build a hashtree for an incomplete
%% cdb file, and returns at the end the hashtree and the final Key seen in the
%% journal
startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) ->
case crccheck_value(ValueAsBin) of
true ->
{loop, {put_hashtree(Key, Position, Hashtree), Key}};
false ->
{stop, {Hashtree, LastKey}}
end.
%% Read the Key/Value at this point, returning {ok, Key, Value}
%% catch expected exceptiosn associated with file corruption (or end) and

View file

@ -139,6 +139,9 @@ ink_put(Pid, PrimaryKey, Object, KeyChanges) ->
ink_get(Pid, PrimaryKey, SQN) ->
gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity).
ink_fetchkeychanges(Pid, SQN) ->
gen_server:call(Pid, {fetch_keychanges, SQN}, infinity).
ink_snap(Pid) ->
gen_server:call(Pid, snapshot, infinity).
@ -220,6 +223,12 @@ handle_call(snapshot, _From , State) ->
handle_call(print_manifest, _From, State) ->
manifest_printer(State#state.manifest),
{reply, ok, State};
handle_call({fetch_keychanges, SQN}, _From, State) ->
KeyChanges = fetch_key_changes(SQN,
State#state.manifest,
State#state.active_journaldb,
State#state.active_journaldb_sqn),
{reply, KeyChanges, State};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
@ -455,6 +464,16 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
RootPath).
fetch_key_changes(SQN, Manifest, ActiveJournal, ActiveSQN) ->
InitialChanges = case SQN of
SQN when SQN < ActiveSQN ->
fetch_key_changes(SQN, Manifest);
_ ->
[]
end,
RecentChanges = fetch_key_changes(SQN, ActiveJournal),
InitialChanges ++ RecentChanges.
sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
lists:foldl(fun(FN, Acc) ->

View file

@ -906,6 +906,9 @@ simple_server_test() ->
Key2 = {{o,"Bucket0002", "Key0002"}, 1002, {active, infinity}, null},
KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})),
Key3 = {{o,"Bucket0003", "Key0003"}, 2002, {active, infinity}, null},
KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})),
Key4 = {{o,"Bucket0004", "Key0004"}, 3002, {active, infinity}, null},
KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})),
ok = pcl_pushmem(PCL, [Key1]),
R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}),
?assertMatch(R1, Key1),
@ -931,12 +934,41 @@ simple_server_test() ->
ok = pcl_close(PCL),
{ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath,
max_inmemory_tablesize=1000}),
TopSQN = pcl_getstartupsequencenumber(PCLr),
case TopSQN of
2001 ->
%% Last push not persisted
S3a = pcl_pushmem(PCL, [Key3]),
if S3a == pause -> timer:sleep(2000); true -> ok end;
2002 ->
%% everything got persisted
ok;
_ ->
io:format("Unexpected sequence number on restart ~w~n", [TopSQN]),
ok = pcl_close(PCLr),
clean_testdir(RootPath),
?assertMatch(true, false)
end,
R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
?assertMatch(R8, Key1),
?assertMatch(R9, Key2),
?assertMatch(R10, Key3),
S4 = pcl_pushmem(PCLr, KL3),
if S4 == pause -> timer:sleep(2000); true -> ok end,
S5 = pcl_pushmem(PCLr, [Key4]),
if S5 == pause -> timer:sleep(2000); true -> ok end,
S6 = pcl_pushmem(PCLr, KL4),
if S6 == pause -> timer:sleep(2000); true -> ok end,
R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}),
R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}),
R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}),
R14 = pcl_fetch(PCLr, {o,"Bucket0004", "Key0004"}),
?assertMatch(R11, Key1),
?assertMatch(R12, Key2),
?assertMatch(R13, Key3),
?assertMatch(R14, Key4),
ok = pcl_close(PCLr),
clean_testdir(RootPath).