diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 58d24b2..75e938c 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -53,9 +53,9 @@ %% - close the previously active journal file (writing the hashtree), and move %% it to the historic journal %% -%% Once the object has been persisted to the Journal, the Key and Metadata can -%% be added to the ledger. Initially this will be added to the Bookie's -%% in-memory view of recent changes only. +%% Once the object has been persisted to the Journal, the Key with Metadata +%% and the keychanges can be added to the ledger. Initially this will be +%% added to the Bookie'sin-memory view of recent changes only. %% %% The Bookie's memory consists of an in-memory ets table. Periodically, the %% current table is pushed to the Penciller for eventual persistence, and a @@ -107,6 +107,16 @@ -record(state, {inker :: pid(), penciller :: pid()}). +-record(item, {primary_key :: term(), + contents :: list(), + metadatas :: list(), + vclock, + hash :: integer(), + size :: integer(), + key_changes :: list()}) + + + %%%============================================================================ %%% API diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 513de01..f6444c7 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -63,6 +63,7 @@ cdb_lastkey/1, cdb_filename/1, cdb_keycheck/2, + cdb_scan/4, cdb_close/1, cdb_complete/1]). @@ -123,6 +124,11 @@ cdb_close(Pid) -> cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). +cdb_scan(Pid, StartPosition, FilterFun, InitAcc) -> + gen_server:call(Pid, + {cdb_scan, StartPosition, FilterFun, InitAcc}, + infinity). + %% Get the last key to be added to the file (which will have the highest %% sequence number) cdb_lastkey(Pid) -> @@ -232,6 +238,12 @@ handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; +handle_call({cdb_scan, StartPos, FilterFun, Acc}, _From, State) -> + {LastPosition, Acc2} = scan_over_file(State#state.handle, + StartPos, + FilterFun, + Acc), + {reply, {LastPosition, Acc2}, State}; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), {stop, normal, ok, State#state{handle=closed}}; @@ -346,7 +358,8 @@ dump(FileName, CRCCheck) -> open_active_file(FileName) when is_list(FileName) -> {ok, Handle} = file:open(FileName, ?WRITE_OPS), {ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}), - {LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position), + {LastPosition, {HashTree, LastKey}} = startup_scan_over_file(Handle, + Position), case file:position(Handle, eof) of {ok, LastPosition} -> ok = file:close(Handle); @@ -617,32 +630,43 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) -> %% Scan through the file until there is a failure to crc check an input, and %% at that point return the position and the key dictionary scanned so far -scan_over_file(Handle, Position) -> +startup_scan_over_file(Handle, Position) -> HashTree = array:new(256, {default, gb_trees:empty()}), - scan_over_file(Handle, Position, HashTree, empty). + scan_over_file(Handle, Position, fun startup_filter/4, {HashTree, empty}). -scan_over_file(Handle, Position, HashTree, LastKey) -> +%% Scan for key changes - scan over file returning applying FilterFun +%% The FilterFun should accept as input: +%% - Key, Value, Position, Accumulator, outputting a new Accumulator +%% and a loop|stop instruction as a tuple i.e. {loop, Acc} or {stop, Acc} + +scan_over_file(Handle, Position, FilterFun, Output) -> case saferead_keyvalue(Handle) of false -> - {Position, HashTree, LastKey}; + {Position, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> - case crccheck_value(ValueAsBin) of - true -> - NewPosition = Position + KeyLength + ValueLength + NewPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE, - scan_over_file(Handle, - NewPosition, - put_hashtree(Key, Position, HashTree), - Key); - false -> - io:format("CRC check returned false on key of ~w ~n", - [Key]), - {Position, HashTree, LastKey} + case FilterFun(Key, ValueAsBin, Position, Output) of + {stop, UpdOutput} -> + {Position, UpdOutput}; + {loop, UpdOutput} -> + scan_over_file(Handle, NewPosition, FilterFun, UpdOutput) end; eof -> - {Position, HashTree, LastKey} + {Position, Output} end. +%% Specific filter to be used at startup to build a hashtree for an incomplete +%% cdb file, and returns at the end the hashtree and the final Key seen in the +%% journal + +startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) -> + case crccheck_value(ValueAsBin) of + true -> + {loop, {put_hashtree(Key, Position, Hashtree), Key}}; + false -> + {stop, {Hashtree, LastKey}} + end. %% Read the Key/Value at this point, returning {ok, Key, Value} %% catch expected exceptiosn associated with file corruption (or end) and diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 25b839b..56c982e 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -139,6 +139,9 @@ ink_put(Pid, PrimaryKey, Object, KeyChanges) -> ink_get(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity). +ink_fetchkeychanges(Pid, SQN) -> + gen_server:call(Pid, {fetch_keychanges, SQN}, infinity). + ink_snap(Pid) -> gen_server:call(Pid, snapshot, infinity). @@ -220,6 +223,12 @@ handle_call(snapshot, _From , State) -> handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; +handle_call({fetch_keychanges, SQN}, _From, State) -> + KeyChanges = fetch_key_changes(SQN, + State#state.manifest, + State#state.active_journaldb, + State#state.active_journaldb_sqn), + {reply, KeyChanges, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -455,6 +464,16 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> RootPath). +fetch_key_changes(SQN, Manifest, ActiveJournal, ActiveSQN) -> + InitialChanges = case SQN of + SQN when SQN < ActiveSQN -> + fetch_key_changes(SQN, Manifest); + _ -> + [] + end, + RecentChanges = fetch_key_changes(SQN, ActiveJournal), + InitialChanges ++ RecentChanges. + sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> lists:foldl(fun(FN, Acc) -> diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 009cffb..46fb8e9 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -901,11 +901,14 @@ simple_server_test() -> clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001"},1, {active, infinity}, null}, + Key1 = {{o,"Bucket0001", "Key0001"}, 1, {active, infinity}, null}, KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})), - Key2 = {{o,"Bucket0002", "Key0002"},1002, {active, infinity}, null}, + Key2 = {{o,"Bucket0002", "Key0002"}, 1002, {active, infinity}, null}, KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})), - Key3 = {{o,"Bucket0003", "Key0003"},2002, {active, infinity}, null}, + Key3 = {{o,"Bucket0003", "Key0003"}, 2002, {active, infinity}, null}, + KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})), + Key4 = {{o,"Bucket0004", "Key0004"}, 3002, {active, infinity}, null}, + KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})), ok = pcl_pushmem(PCL, [Key1]), R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), ?assertMatch(R1, Key1), @@ -931,12 +934,41 @@ simple_server_test() -> ok = pcl_close(PCL), {ok, PCLr} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), + TopSQN = pcl_getstartupsequencenumber(PCLr), + case TopSQN of + 2001 -> + %% Last push not persisted + S3a = pcl_pushmem(PCL, [Key3]), + if S3a == pause -> timer:sleep(2000); true -> ok end; + 2002 -> + %% everything got persisted + ok; + _ -> + io:format("Unexpected sequence number on restart ~w~n", [TopSQN]), + ok = pcl_close(PCLr), + clean_testdir(RootPath), + ?assertMatch(true, false) + end, R8 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), R9 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), R10 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), ?assertMatch(R8, Key1), ?assertMatch(R9, Key2), ?assertMatch(R10, Key3), + S4 = pcl_pushmem(PCLr, KL3), + if S4 == pause -> timer:sleep(2000); true -> ok end, + S5 = pcl_pushmem(PCLr, [Key4]), + if S5 == pause -> timer:sleep(2000); true -> ok end, + S6 = pcl_pushmem(PCLr, KL4), + if S6 == pause -> timer:sleep(2000); true -> ok end, + R11 = pcl_fetch(PCLr, {o,"Bucket0001", "Key0001"}), + R12 = pcl_fetch(PCLr, {o,"Bucket0002", "Key0002"}), + R13 = pcl_fetch(PCLr, {o,"Bucket0003", "Key0003"}), + R14 = pcl_fetch(PCLr, {o,"Bucket0004", "Key0004"}), + ?assertMatch(R11, Key1), + ?assertMatch(R12, Key2), + ?assertMatch(R13, Key3), + ?assertMatch(R14, Key4), ok = pcl_close(PCLr), clean_testdir(RootPath).