diff --git a/include/leveled.hrl b/include/leveled.hrl index 80cdc87..030fdd4 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -30,4 +30,26 @@ -record(penciller_options, {root_path :: string(), - max_inmemory_tablesize :: integer()}). \ No newline at end of file + max_inmemory_tablesize :: integer()}). + +-record(bookie_options, + {root_path :: string(), + cache_size :: integer(), + metadata_extractor :: function(), + indexspec_converter :: function()}). + +%% Temp location for records related to riak + +-record(r_content, { + metadata, + value :: term() + }). + +-record(r_object, { + bucket, + key, + contents :: [#r_content{}], + vclock, + updatemetadata=dict:store(clean, true, dict:new()), + updatevalue :: term()}). + \ No newline at end of file diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 75e938c..1b68d85 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -33,19 +33,23 @@ %% -------- PUT -------- %% %% A PUT request consists of -%% - A primary Key -%% - Metadata associated with the primary key (2i, vector clock, object size) -%% - A value -%% - A set of secondary key changes which should be made as part of the commit +%% - A Primary Key and a Value +%% - IndexSpecs - a set of secondary key changes associated with the +%% transaction %% %% The Bookie takes the place request and passes it first to the Inker to add %% the request to the ledger. %% -%% The inker will pass the request to the current (append only) CDB journal -%% file to persist the change. The call should return either 'ok' or 'roll'. -%% 'roll' indicates that the CDB file has insufficient capacity for +%% The inker will pass the PK/Value/IndexSpecs to the current (append only) +%% CDB journal file to persist the change. The call should return either 'ok' +%% or 'roll'. -'roll' indicates that the CDB file has insufficient capacity for %% this write. - +%% +%% (Note that storing the IndexSpecs will create some duplication with the +%% Metadata wrapped up within the Object value. This Value and the IndexSpecs +%% are compressed before storage, so this should provide some mitigation for +%% the duplication). +%% %% In resonse to a 'roll', the inker should: %% - start a new active journal file with an open_write_request, and then; %% - call to PUT the object in this file; @@ -53,13 +57,31 @@ %% - close the previously active journal file (writing the hashtree), and move %% it to the historic journal %% -%% Once the object has been persisted to the Journal, the Key with Metadata -%% and the keychanges can be added to the ledger. Initially this will be -%% added to the Bookie'sin-memory view of recent changes only. +%% The inker will also return the SQN which the change has been made at, as +%% well as the object size on disk within the Journal. %% -%% The Bookie's memory consists of an in-memory ets table. Periodically, the -%% current table is pushed to the Penciller for eventual persistence, and a -%% new table is started. +%% Once the object has been persisted to the Journal, the Ledger can be updated. +%% The Ledger is updated by the Bookie applying a function (passed in at +%% startup) to the Value to return the Object Metadata, a function to generate +%% a hash of the Value and also taking the Primary Key, the IndexSpecs, the +%% Sequence Number in the Journal and the Object Size (returned from the +%% Inker). +%% +%% The Bookie should generate a series of ledger key changes from this +%% information, using a function passed in at startup. For Riak this will be +%% of the form: +%% {{o, Bucket, Key}, +%% SQN, +%% {Hash, Size, {Riak_Metadata}}, +%% {active, TS}|{tomb, TS}} or +%% {{i, Bucket, IndexTerm, IndexField, Key}, +%% SQN, +%% null, +%% {active, TS}|{tomb, TS}} +%% +%% Recent Ledger changes are retained initially in the Bookies' memory (in an +%% in-memory ets table). Periodically, the current table is pushed to the +%% Penciller for eventual persistence, and a new table is started. %% %% This completes the non-deferrable work associated with a PUT %% @@ -86,6 +108,18 @@ %% %% e.g. Get all for SegmentID/Partition %% +%% +%% +%% -------- On Startup -------- +%% +%% On startup the Bookie must restart both the Inker to load the Journal, and +%% the Penciller to load the Ledger. Once the Penciller has started, the +%% Bookie should request the highest sequence number in the Ledger, and then +%% and try and rebuild any missing information from the Journal +%% +%% To rebuild the Ledger it requests the Inker to scan over the files from +%% the sequence number and re-generate the Ledger changes. + -module(leveled_bookie). @@ -99,22 +133,28 @@ handle_cast/2, handle_info/2, terminate/2, - code_change/3]). + code_change/3, + book_start/1, + book_put/4, + book_get/2, + book_head/2, + strip_to_keyonly/1, + strip_to_keyseqonly/1, + strip_to_seqonly/1, + strip_to_statusonly/1, + strip_to_details/1]). -include_lib("eunit/include/eunit.hrl"). +-define(CACHE_SIZE, 1000). -record(state, {inker :: pid(), - penciller :: pid()}). - --record(item, {primary_key :: term(), - contents :: list(), - metadatas :: list(), - vclock, - hash :: integer(), - size :: integer(), - key_changes :: list()}) - + penciller :: pid(), + metadata_extractor :: function(), + indexspec_converter :: function(), + cache_size :: integer(), + back_pressure :: boolean(), + ledger_cache :: gb_trees:tree()}). @@ -122,7 +162,17 @@ %%% API %%%============================================================================ +book_start(Opts) -> + gen_server:start(?MODULE, [Opts], []). +book_put(Pid, PrimaryKey, Object, IndexSpecs) -> + gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity). + +book_get(Pid, PrimaryKey) -> + gen_server:call(Pid, {get, PrimaryKey}, infinity). + +book_head(Pid, PrimaryKey) -> + gen_server:call(Pid, {head, PrimaryKey}, infinity). %%%============================================================================ %%% gen_server callbacks @@ -131,11 +181,81 @@ init([Opts]) -> {InkerOpts, PencillerOpts} = set_options(Opts), {Inker, Penciller} = startup(InkerOpts, PencillerOpts), - {ok, #state{inker=Inker, penciller=Penciller}}. + Extractor = if + Opts#bookie_options.metadata_extractor == undefined -> + fun extract_metadata/2; + true -> + Opts#bookie_options.metadata_extractor + end, + Converter = if + Opts#bookie_options.indexspec_converter == undefined -> + fun convert_indexspecs/3; + true -> + Opts#bookie_options.indexspec_converter + end, + CacheSize = if + Opts#bookie_options.cache_size == undefined -> + ?CACHE_SIZE; + true -> + Opts#bookie_options.cache_size + end, + {ok, #state{inker=Inker, + penciller=Penciller, + metadata_extractor=Extractor, + indexspec_converter=Converter, + cache_size=CacheSize, + ledger_cache=gb_trees:empty()}}. -handle_call(_, _From, State) -> - {reply, ok, State}. +handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> + {ok, SQN, ObjSize} = leveled_inker:ink_put(PrimaryKey, Object, IndexSpecs), + Changes = preparefor_ledgercache(PrimaryKey, + SQN, + Object, + ObjSize, + IndexSpecs), + Cache0 = addto_ledgercache(Changes, State#state.ledger_cache), + gen_server:reply(From, ok), + case maybepush_ledgercache(State#state.cache_size, + Cache0, + State#state.penciller) of + {ok, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, back_pressure=false}}; + {pause, NewCache} -> + {noreply, State#state{ledger_cache=NewCache, back_pressure=true}} + end; +handle_call({get, Key}, _From, State) -> + case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of + not_present -> + {reply, not_found, State}; + Head -> + {Key, Seqn, Status} = strip_to_details(Head), + case Status of + {tomb, _} -> + {reply, not_found, State}; + {active, _} -> + case fetch_value(Key, Seqn, State#state.inker) of + not_present -> + {reply, not_found, State}; + Object -> + {reply, {ok, Object}, State} + end + end + end; +handle_call({head, Key}, _From, State) -> + case fetch_head(Key, State#state.penciller, State#state.ledger_cache) of + not_present -> + {reply, not_found, State}; + Head -> + {Key, _Seqn, Status} = strip_to_details(Head), + case Status of + {tomb, _} -> + {reply, not_found, State}; + {active, _} -> + MD = strip_to_mdonly(Head), + {reply, {ok, MD}, State} + end + end. handle_cast(_Msg, State) -> {noreply, State}. @@ -154,18 +274,137 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================ -set_options(_Opts) -> - {#inker_options{}, #penciller_options{}}. +set_options(Opts) -> + {#inker_options{root_path=Opts#bookie_options.root_path}, + #penciller_options{root_path=Opts#bookie_options.root_path}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), - KeyChanges = leveled_inker:ink_fetchkeychangesfrom(Inker, LedgerSQN), - ok = leveled_penciller:pcl_pushmem(Penciller, KeyChanges), + ok = leveled_inker:ink_loadpcl(LedgerSQN, fun load_fun/4, Penciller), {Inker, Penciller}. +fetch_head(Key, Penciller, Cache) -> + case gb_trees:lookup(Key, Cache) of + {value, Head} -> + Head; + none -> + case leveled_penciller:pcl_fetch(Penciller, Key) of + {Key, Head} -> + Head; + not_present -> + not_present + end + end. + +fetch_value(Key, SQN, Inker) -> + case leveled_inker:ink_fetch(Inker, Key, SQN) of + {ok, Value} -> + Value; + not_present -> + not_present + end. + +%% Format of a Key within the ledger is +%% {PrimaryKey, SQN, Metadata, Status} + +strip_to_keyonly({keyonly, K}) -> K; +strip_to_keyonly({K, _V}) -> K. + +strip_to_keyseqonly({K, {SeqN, _, _}}) -> {K, SeqN}. + +strip_to_statusonly({_, {_, St, _}}) -> St. + +strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. + +strip_to_details({K, {SeqN, St, _}}) -> {K, SeqN, St}. + +strip_to_mdonly({_, {_, _, MD}}) -> MD. + +get_metadatas(#r_object{contents=Contents}) -> + [Content#r_content.metadata || Content <- Contents]. + +set_vclock(Object=#r_object{}, VClock) -> Object#r_object{vclock=VClock}. + +vclock(#r_object{vclock=VClock}) -> VClock. + +to_binary(v0, Obj) -> + term_to_binary(Obj). + +hash(Obj=#r_object{}) -> + Vclock = vclock(Obj), + UpdObj = set_vclock(Obj, lists:sort(Vclock)), + erlang:phash2(to_binary(v0, UpdObj)). + +extract_metadata(Obj, Size) -> + {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. + +convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> + lists:map(fun({IndexOp, IndexField, IndexValue}) -> + Status = case IndexOp of + add -> + %% TODO: timestamp support + {active, infinity}; + remove -> + %% TODO: timestamps for delayed reaping + {tomb, infinity} + end, + {o, B, K} = PrimaryKey, + {{i, B, IndexField, IndexValue, K}, + {SQN, Status, null}} + end, + IndexSpecs). + + +preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> + PrimaryChange = {PK, + {SQN, + {active, infinity}, + extract_metadata(Obj, Size)}}, + SecChanges = convert_indexspecs(IndexSpecs, SQN, PK), + [PrimaryChange] ++ SecChanges. + +addto_ledgercache(Changes, Cache) -> + lists:foldl(fun({{K, V}, Acc}) -> gb_trees:enter(K, V, Acc) end, + Cache, + Changes). + +maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> + CacheSize = gb_trees:size(Cache), + if + CacheSize > MaxCacheSize -> + case leveled_penciller:pcl_pushmem(Penciller, + gb_trees:to_list(Cache)) of + ok -> + {ok, gb_trees:empty()}; + pause -> + {pause, gb_trees:empty()}; + refused -> + {ok, Cache} + end; + true -> + {ok, Cache} + end. + +load_fun(KeyInLedger, ValueInLedger, _Position, Acc0) -> + {MinSQN, MaxSQN, Output} = Acc0, + {SQN, PK} = KeyInLedger, + {Obj, IndexSpecs} = ValueInLedger, + case SQN of + SQN when SQN < MinSQN -> + {loop, Acc0}; + SQN when SQN =< MaxSQN -> + %% TODO - get correct size in a more efficient manner + %% Need to have compressed size + Size = byte_size(term_to_binary(ValueInLedger, [compressed])), + Changes = preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs), + {loop, {MinSQN, MaxSQN, Output ++ Changes}}; + SQN when SQN > MaxSQN -> + {stop, Acc0} + end. + %%%============================================================================ %%% Test diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index f6444c7..cba127d 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -82,7 +82,7 @@ hash_index = [] :: list(), filename :: string(), handle :: file:fd(), - writer :: boolean, + writer :: boolean(), max_size :: integer()}). @@ -124,9 +124,16 @@ cdb_close(Pid) -> cdb_complete(Pid) -> gen_server:call(Pid, cdb_complete, infinity). -cdb_scan(Pid, StartPosition, FilterFun, InitAcc) -> +%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to +%% continue from that point (calling function has to protect against) double +%% counting. +%% +%% LastPosition could be the atom complete when the last key processed was at +%% the end of the file. last_key must be defined in LoopState. + +cdb_scan(Pid, FilterFun, InitAcc, StartPosition) -> gen_server:call(Pid, - {cdb_scan, StartPosition, FilterFun, InitAcc}, + {cdb_scan, FilterFun, InitAcc, StartPosition}, infinity). %% Get the last key to be added to the file (which will have the highest @@ -238,15 +245,24 @@ handle_call(cdb_lastkey, _From, State) -> {reply, State#state.last_key, State}; handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; -handle_call({cdb_scan, StartPos, FilterFun, Acc}, _From, State) -> +handle_call({cdb_scan, FilterFun, Acc, StartPos}, _From, State) -> + {ok, StartPos0} = case StartPos of + undefined -> + file:position(State#state.handle, + ?BASE_POSITION); + StartPos -> + {ok, StartPos} + end, + ok = check_last_key(State#state.last_key), {LastPosition, Acc2} = scan_over_file(State#state.handle, - StartPos, + StartPos0, FilterFun, - Acc), + Acc, + State#state.last_key), {reply, {LastPosition, Acc2}, State}; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, ok, State#state{handle=closed}}; + {stop, normal, ok, State#state{handle=undefined}}; handle_call(cdb_complete, _From, State) -> case State#state.writer of true -> @@ -261,6 +277,9 @@ handle_call(cdb_complete, _From, State) -> ok = file:rename(State#state.filename, NewName), {stop, normal, {ok, NewName}, State}; false -> + ok = file:close(State#state.handle), + {stop, normal, {ok, State#state.filename}, State}; + undefined -> ok = file:close(State#state.handle), {stop, normal, {ok, State#state.filename}, State} end. @@ -275,7 +294,7 @@ handle_info(_Info, State) -> terminate(_Reason, State) -> case State#state.handle of - closed -> + undefined -> ok; Handle -> file:close(Handle) @@ -632,28 +651,36 @@ extract_kvpair(Handle, [Position|Rest], Key, Check) -> %% at that point return the position and the key dictionary scanned so far startup_scan_over_file(Handle, Position) -> HashTree = array:new(256, {default, gb_trees:empty()}), - scan_over_file(Handle, Position, fun startup_filter/4, {HashTree, empty}). + scan_over_file(Handle, + Position, + fun startup_filter/4, + {HashTree, empty}, + undefined). %% Scan for key changes - scan over file returning applying FilterFun %% The FilterFun should accept as input: %% - Key, Value, Position, Accumulator, outputting a new Accumulator %% and a loop|stop instruction as a tuple i.e. {loop, Acc} or {stop, Acc} -scan_over_file(Handle, Position, FilterFun, Output) -> +scan_over_file(Handle, Position, FilterFun, Output, LastKey) -> case saferead_keyvalue(Handle) of false -> {Position, Output}; {Key, ValueAsBin, KeyLength, ValueLength} -> NewPosition = Position + KeyLength + ValueLength + ?DWORD_SIZE, - case FilterFun(Key, ValueAsBin, Position, Output) of - {stop, UpdOutput} -> + case {FilterFun(Key, ValueAsBin, Position, Output), Key} of + {{stop, UpdOutput}, _} -> {Position, UpdOutput}; - {loop, UpdOutput} -> - scan_over_file(Handle, NewPosition, FilterFun, UpdOutput) - end; - eof -> - {Position, Output} + {{loop, UpdOutput}, LastKey} -> + {eof, UpdOutput}; + {{loop, UpdOutput}, _} -> + scan_over_file(Handle, + NewPosition, + FilterFun, + UpdOutput, + Key) + end end. %% Specific filter to be used at startup to build a hashtree for an incomplete @@ -668,6 +695,15 @@ startup_filter(Key, ValueAsBin, Position, {Hashtree, LastKey}) -> {stop, {Hashtree, LastKey}} end. +%% Confirm that the last key has been defined and set to a non-default value + +check_last_key(LastKey) -> + case LastKey of + undefined -> error; + empty -> error; + _ -> ok + end. + %% Read the Key/Value at this point, returning {ok, Key, Value} %% catch expected exceptiosn associated with file corruption (or end) and %% return eof @@ -765,7 +801,7 @@ read_next_term(Handle, Length, crc, Check) -> _ -> {false, binary_to_term(Bin)} end; - _ -> + false -> {ok, _} = file:position(Handle, {cur, 4}), {ok, Bin} = file:read(Handle, Length - 4), {unchecked, binary_to_term(Bin)} @@ -999,7 +1035,7 @@ key_value_to_record({Key, Value}) -> -ifdef(TEST). write_key_value_pairs_1_test() -> - {ok,Handle} = file:open("../test/test.cdb",write), + {ok,Handle} = file:open("../test/test.cdb",[write]), {_, HashTree} = write_key_value_pairs(Handle, [{"key1","value1"}, {"key2","value2"}]), @@ -1025,7 +1061,7 @@ write_key_value_pairs_1_test() -> write_hash_tables_1_test() -> - {ok, Handle} = file:open("../test/testx.cdb",write), + {ok, Handle} = file:open("../test/testx.cdb", [write]), R0 = array:new(256, {default, gb_trees:empty()}), R1 = array:set(64, gb_trees:insert(6383014720, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 56c982e..e5302c0 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -101,11 +101,14 @@ ink_start/1, ink_put/4, ink_get/3, + ink_fetch/3, + ink_loadpcl/4, ink_snap/1, ink_close/1, ink_print_manifest/1, build_dummy_journal/0, - simple_manifest_reader/2]). + simple_manifest_reader/2, + clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -114,7 +117,8 @@ -define(JOURNAL_FILEX, "cdb"). -define(MANIFEST_FILEX, "man"). -define(PENDING_FILEX, "pnd"). - +-define(LOADING_PAUSE, 5000). +-define(LOADING_BATCH, 1000). -record(state, {manifest = [] :: list(), manifest_sqn = 0 :: integer(), @@ -139,8 +143,8 @@ ink_put(Pid, PrimaryKey, Object, KeyChanges) -> ink_get(Pid, PrimaryKey, SQN) -> gen_server:call(Pid, {get, PrimaryKey, SQN}, infinity). -ink_fetchkeychanges(Pid, SQN) -> - gen_server:call(Pid, {fetch_keychanges, SQN}, infinity). +ink_fetch(Pid, PrimaryKey, SQN) -> + gen_server:call(Pid, {fetch, PrimaryKey, SQN}, infinity). ink_snap(Pid) -> gen_server:call(Pid, snapshot, infinity). @@ -148,6 +152,9 @@ ink_snap(Pid) -> ink_close(Pid) -> gen_server:call(Pid, close, infinity). +ink_loadpcl(Pid, MinSQN, FilterFun, Penciller) -> + gen_server:call(Pid, {load_pcl, MinSQN, FilterFun, Penciller}, infinity). + ink_print_manifest(Pid) -> gen_server:call(Pid, print_manifest, infinity). @@ -193,10 +200,10 @@ init([InkerOpts]) -> handle_call({put, Key, Object, KeyChanges}, From, State) -> case put_object(Key, Object, KeyChanges, State) of - {ok, UpdState} -> - {reply, {ok, UpdState#state.journal_sqn}, UpdState}; - {rolling, UpdState} -> - gen_server:reply(From, {ok, UpdState#state.journal_sqn}), + {ok, UpdState, ObjSize} -> + {reply, {ok, UpdState#state.journal_sqn, ObjSize}, UpdState}; + {rolling, UpdState, ObjSize} -> + gen_server:reply(From, {ok, UpdState#state.journal_sqn, ObjSize}), {NewManifest, NewManifestSQN} = roll_active_file(State#state.active_journaldb, State#state.manifest, @@ -207,12 +214,31 @@ handle_call({put, Key, Object, KeyChanges}, From, State) -> {blocked, UpdState} -> {reply, blocked, UpdState} end; +handle_call({fetch, Key, SQN}, _From, State) -> + case get_object(Key, + SQN, + State#state.manifest, + State#state.active_journaldb, + State#state.active_journaldb_sqn) of + {{SQN, Key}, {Value, _IndexSpecs}} -> + {reply, {ok, Value}, State}; + Other -> + io:format("Unexpected failure to fetch value for" ++ + "Key=~s SQN=~w with reason ~w", [Key, SQN, Other]), + {reply, not_present, State} + end; handle_call({get, Key, SQN}, _From, State) -> {reply, get_object(Key, SQN, State#state.manifest, State#state.active_journaldb, State#state.active_journaldb_sqn), State}; +handle_call({load_pcl, StartSQN, FilterFun, Penciller}, _From, State) -> + Manifest = State#state.manifest ++ [{State#state.active_journaldb_sqn, + dummy, + State#state.active_journaldb}], + Reply = load_from_sequence(StartSQN, FilterFun, Penciller, Manifest), + {reply, Reply, State}; handle_call(snapshot, _From , State) -> %% TODO: Not yet implemented registration of snapshot %% Should return manifest and register the snapshot @@ -223,12 +249,6 @@ handle_call(snapshot, _From , State) -> handle_call(print_manifest, _From, State) -> manifest_printer(State#state.manifest), {reply, ok, State}; -handle_call({fetch_keychanges, SQN}, _From, State) -> - KeyChanges = fetch_key_changes(SQN, - State#state.manifest, - State#state.active_journaldb, - State#state.active_journaldb_sqn), - {reply, KeyChanges, State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -257,11 +277,12 @@ code_change(_OldVsn, State, _Extra) -> put_object(PrimaryKey, Object, KeyChanges, State) -> NewSQN = State#state.journal_sqn + 1, Bin1 = term_to_binary({Object, KeyChanges}, [compressed]), + ObjSize = byte_size(Bin1), case leveled_cdb:cdb_put(State#state.active_journaldb, {NewSQN, PrimaryKey}, Bin1) of ok -> - {ok, State#state{journal_sqn=NewSQN}}; + {ok, State#state{journal_sqn=NewSQN}, ObjSize}; roll -> FileName = filepath(State#state.root_path, NewSQN, new_journal), CDBopts = State#state.cdb_options, @@ -270,9 +291,11 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> {NewSQN, PrimaryKey}, Bin1) of ok -> - {rolling, State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP, - active_journaldb_sqn=NewSQN}}; + {rolling, + State#state{journal_sqn=NewSQN, + active_journaldb=NewJournalP, + active_journaldb_sqn=NewSQN}, + ObjSize}; roll -> {blocked, State#state{journal_sqn=NewSQN, active_journaldb=NewJournalP, @@ -464,16 +487,49 @@ roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> RootPath). -fetch_key_changes(SQN, Manifest, ActiveJournal, ActiveSQN) -> - InitialChanges = case SQN of - SQN when SQN < ActiveSQN -> - fetch_key_changes(SQN, Manifest); - _ -> - [] - end, - RecentChanges = fetch_key_changes(SQN, ActiveJournal), - InitialChanges ++ RecentChanges. +%% Scan between sequence numbers applying FilterFun to each entry where +%% FilterFun{K, V, Acc} -> Penciller Key List +%% Load the output for the CDB file into the Penciller. +load_from_sequence(_MinSQN, _FilterFun, _Penciller, []) -> + ok; +load_from_sequence(MinSQN, FilterFun, Penciller, [{LowSQN, _FN, Pid}|ManTail]) + when MinSQN >= LowSQN -> + ok = load_between_sequence(MinSQN, + MinSQN + ?LOADING_BATCH, + FilterFun, + Penciller, + Pid, + undefined), + load_from_sequence(MinSQN, FilterFun, Penciller, ManTail); +load_from_sequence(MinSQN, FilterFun, Penciller, [_H|ManTail]) -> + load_from_sequence(MinSQN, FilterFun, Penciller, ManTail). + +load_between_sequence(MinSQN, MaxSQN, FilterFun, Penciller, CDBpid, StartPos) -> + InitAcc = {MinSQN, MaxSQN, []}, + case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitAcc, StartPos) of + {eof, Acc} -> + ok = push_to_penciller(Penciller, Acc), + ok; + {LastPosition, Acc} -> + ok = push_to_penciller(Penciller, Acc), + load_between_sequence(MaxSQN + 1, + MaxSQN + 1 + ?LOADING_BATCH, + FilterFun, + Penciller, + CDBpid, + LastPosition) + end. + +push_to_penciller(Penciller, KeyList) -> + R = leveled_penciler:pcl_pushmem(Penciller, KeyList), + if + R == pause -> + timer:sleep(?LOADING_PAUSE); + true -> + ok + end. + sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> lists:foldl(fun(FN, Acc) -> @@ -676,7 +732,7 @@ simplejournal_test() -> ?assertMatch(R1, {{1, "Key1"}, {"TestValue1", []}}), R2 = ink_get(Ink1, "Key1", 3), ?assertMatch(R2, {{3, "Key1"}, {"TestValue3", []}}), - {ok, NewSQN1} = ink_put(Ink1, "Key99", "TestValue99", []), + {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "Key99", "TestValue99", []), ?assertMatch(NewSQN1, 5), R3 = ink_get(Ink1, "Key99", 5), io:format("Result 3 is ~w~n", [R3]), @@ -691,18 +747,18 @@ rollafile_simplejournal_test() -> {ok, Ink1} = ink_start(#inker_options{root_path=RootPath, cdb_options=CDBopts}), FunnyLoop = lists:seq(1, 48), - {ok, NewSQN1} = ink_put(Ink1, "KeyAA", "TestValueAA", []), + {ok, NewSQN1, _ObjSize} = ink_put(Ink1, "KeyAA", "TestValueAA", []), ?assertMatch(NewSQN1, 5), ok = ink_print_manifest(Ink1), R0 = ink_get(Ink1, "KeyAA", 5), ?assertMatch(R0, {{5, "KeyAA"}, {"TestValueAA", []}}), lists:foreach(fun(X) -> - {ok, _} = ink_put(Ink1, - "KeyZ" ++ integer_to_list(X), - crypto:rand_bytes(10000), - []) end, + {ok, _, _} = ink_put(Ink1, + "KeyZ" ++ integer_to_list(X), + crypto:rand_bytes(10000), + []) end, FunnyLoop), - {ok, NewSQN2} = ink_put(Ink1, "KeyBB", "TestValueBB", []), + {ok, NewSQN2, _ObjSize} = ink_put(Ink1, "KeyBB", "TestValueBB", []), ?assertMatch(NewSQN2, 54), ok = ink_print_manifest(Ink1), R1 = ink_get(Ink1, "KeyAA", 5), diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 46fb8e9..09babae 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -168,7 +168,8 @@ pcl_confirmdelete/2, pcl_prompt/1, pcl_close/1, - pcl_getstartupsequencenumber/1]). + pcl_getstartupsequencenumber/1, + clean_testdir/1]). -include_lib("eunit/include/eunit.hrl"). @@ -835,7 +836,7 @@ assess_sqn(DumpList) -> assess_sqn([], MinSQN, MaxSQN) -> {MinSQN, MaxSQN}; assess_sqn([HeadKey|Tail], MinSQN, MaxSQN) -> - {_K, SQN} = leveled_sft:strip_to_key_seqn_only(HeadKey), + {_K, SQN} = leveled_bookie:strip_to_keyseqonly(HeadKey), assess_sqn(Tail, min(MinSQN, SQN), max(MaxSQN, SQN)). @@ -901,13 +902,13 @@ simple_server_test() -> clean_testdir(RootPath), {ok, PCL} = pcl_start(#penciller_options{root_path=RootPath, max_inmemory_tablesize=1000}), - Key1 = {{o,"Bucket0001", "Key0001"}, 1, {active, infinity}, null}, + Key1 = {{o,"Bucket0001", "Key0001"}, {1, {active, infinity}, null}}, KL1 = lists:sort(leveled_sft:generate_randomkeys({1000, 2})), - Key2 = {{o,"Bucket0002", "Key0002"}, 1002, {active, infinity}, null}, + Key2 = {{o,"Bucket0002", "Key0002"}, {1002, {active, infinity}, null}}, KL2 = lists:sort(leveled_sft:generate_randomkeys({1000, 1002})), - Key3 = {{o,"Bucket0003", "Key0003"}, 2002, {active, infinity}, null}, + Key3 = {{o,"Bucket0003", "Key0003"}, {2002, {active, infinity}, null}}, KL3 = lists:sort(leveled_sft:generate_randomkeys({1000, 2002})), - Key4 = {{o,"Bucket0004", "Key0004"}, 3002, {active, infinity}, null}, + Key4 = {{o,"Bucket0004", "Key0004"}, {3002, {active, infinity}, null}}, KL4 = lists:sort(leveled_sft:generate_randomkeys({1000, 3002})), ok = pcl_pushmem(PCL, [Key1]), R1 = pcl_fetch(PCL, {o,"Bucket0001", "Key0001"}), diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index d420829..d4b81fb 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -162,8 +162,6 @@ sft_getfilename/1, sft_setfordelete/2, sft_getmaxsequencenumber/1, - strip_to_keyonly/1, - strip_to_key_seqn_only/1, generate_randomkeys/1]). -include_lib("eunit/include/eunit.hrl"). @@ -203,7 +201,7 @@ filename = "not set" :: string(), handle :: file:fd(), background_complete = false :: boolean(), - background_failure = "Unknown" :: string(), + background_failure :: tuple(), oversized_file = false :: boolean(), ready_for_delete = false ::boolean(), penciller :: pid()}). @@ -458,7 +456,7 @@ create_file(FileName) when is_list(FileName) -> FileMD = #state{next_position=StartPos, filename=FileName}, {Handle, FileMD}; {error, Reason} -> - io:format("Error opening filename ~s with reason ~s", + io:format("Error opening filename ~s with reason ~w", [FileName, Reason]), {error, Reason} end. @@ -579,7 +577,7 @@ acc_list_keysonly(null, empty) -> acc_list_keysonly(null, RList) -> RList; acc_list_keysonly(R, RList) -> - lists:append(RList, [strip_to_key_seqn_only(R)]). + lists:append(RList, [leveled_bookie:strip_to_keyseqonly(R)]). acc_list_kv(null, empty) -> []; @@ -668,7 +666,7 @@ fetch_range(Handle, FileMD, StartKey, NearestKey, EndKey, FunList, scan_block([], StartKey, _EndKey, _FunList, _AccFun, Acc) -> {partial, Acc, StartKey}; scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> - K = strip_to_keyonly(HeadKV), + K = leveled_bookie:strip_to_keyonly(HeadKV), case K of K when K < StartKey, StartKey /= all -> scan_block(T, StartKey, EndKey, FunList, AccFun, Acc); @@ -676,11 +674,11 @@ scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> {complete, Acc}; _ -> case applyfuns(FunList, HeadKV) of - include -> + true -> %% Add result to the accumulator scan_block(T, StartKey, EndKey, FunList, AccFun, AccFun(HeadKV, Acc)); - skip -> + false -> scan_block(T, StartKey, EndKey, FunList, AccFun, Acc) end @@ -688,13 +686,13 @@ scan_block([HeadKV|T], StartKey, EndKey, FunList, AccFun, Acc) -> applyfuns([], _KV) -> - include; + true; applyfuns([HeadFun|OtherFuns], KV) -> case HeadFun(KV) of true -> applyfuns(OtherFuns, KV); false -> - skip + false end. fetch_keyvalue_fromblock([], _Key, _LengthList, _Handle, _StartOfSlot) -> @@ -1005,20 +1003,20 @@ create_slot(KL1, KL2, Level, BlockCount, SegLists, SerialisedSlot, LengthList, {BlockKeyList, Status, {LSNb, HSNb}, SegmentList, KL1b, KL2b} = create_block(KL1, KL2, Level), - case LowKey of + TrackingMetadata = case LowKey of null -> [NewLowKeyV|_] = BlockKeyList, - TrackingMetadata = {strip_to_keyonly(NewLowKeyV), - min(LSN, LSNb), max(HSN, HSNb), - strip_to_keyonly(last(BlockKeyList, - {last, LastKey})), - Status}; + {leveled_bookie:strip_to_keyonly(NewLowKeyV), + min(LSN, LSNb), max(HSN, HSNb), + leveled_bookie:strip_to_keyonly(last(BlockKeyList, + {last, LastKey})), + Status}; _ -> - TrackingMetadata = {LowKey, - min(LSN, LSNb), max(HSN, HSNb), - strip_to_keyonly(last(BlockKeyList, - {last, LastKey})), - Status} + {LowKey, + min(LSN, LSNb), max(HSN, HSNb), + leveled_bookie:strip_to_keyonly(last(BlockKeyList, + {last, LastKey})), + Status} end, SerialisedBlock = serialise_block(BlockKeyList), BlockLength = byte_size(SerialisedBlock), @@ -1034,11 +1032,6 @@ last([E|Es], PrevLast) -> last(E, Es, PrevLast). last(_, [E|Es], PrevLast) -> last(E, Es, PrevLast); last(E, [], _) -> E. -strip_to_keyonly({keyonly, K}) -> K; -strip_to_keyonly({K, _, _, _}) -> K. - -strip_to_key_seqn_only({K, SeqN, _, _}) -> {K, SeqN}. - serialise_block(BlockKeyList) -> term_to_binary(BlockKeyList, [{compressed, ?COMPRESSION_LEVEL}]). @@ -1060,7 +1053,7 @@ key_dominates(KL1, KL2, Level) -> Level). key_dominates_expanded([H1|T1], [], Level) -> - {_, _, St1, _} = H1, + St1 = leveled_bookie:strip_to_statusonly(H1), case maybe_reap_expiredkey(St1, Level) of true -> {skipped_key, maybe_expand_pointer(T1), []}; @@ -1068,7 +1061,7 @@ key_dominates_expanded([H1|T1], [], Level) -> {{next_key, H1}, maybe_expand_pointer(T1), []} end; key_dominates_expanded([], [H2|T2], Level) -> - {_, _, St2, _} = H2, + St2 = leveled_bookie:strip_to_statusonly(H2), case maybe_reap_expiredkey(St2, Level) of true -> {skipped_key, [], maybe_expand_pointer(T2)}; @@ -1076,8 +1069,8 @@ key_dominates_expanded([], [H2|T2], Level) -> {{next_key, H2}, [], maybe_expand_pointer(T2)} end; key_dominates_expanded([H1|T1], [H2|T2], Level) -> - {K1, Sq1, St1, _} = H1, - {K2, Sq2, St2, _} = H2, + {K1, Sq1, St1} = leveled_bookie:strip_to_details(H1), + {K2, Sq2, St2} = leveled_bookie:strip_to_details(H2), case K1 of K2 -> case Sq1 > Sq2 of @@ -1139,14 +1132,16 @@ pointer_append_queryresults(Results, QueryPid) -> end. -%% Update the sequence numbers -update_sequencenumbers({_, SN, _, _}, 0, 0) -> +%% Update the sequence numbers +update_sequencenumbers(Item, LSN, HSN) when is_tuple(Item) -> + update_sequencenumbers(leveled_bookie:strip_to_seqonly(Item), LSN, HSN); +update_sequencenumbers(SN, 0, 0) -> {SN, SN}; -update_sequencenumbers({_, SN, _, _}, LSN, HSN) when SN < LSN -> +update_sequencenumbers(SN, LSN, HSN) when SN < LSN -> {SN, HSN}; -update_sequencenumbers({_, SN, _, _}, LSN, HSN) when SN > HSN -> +update_sequencenumbers(SN, LSN, HSN) when SN > HSN -> {LSN, SN}; -update_sequencenumbers({_, _, _, _}, LSN, HSN) -> +update_sequencenumbers(_SN, LSN, HSN) -> {LSN, HSN}. @@ -1250,7 +1245,7 @@ merge_seglists({SegList1, SegList2, SegList3, SegList4}) -> lists:sort(Stage4). hash_for_segmentid(KV) -> - erlang:phash2(strip_to_keyonly(KV), ?MAX_SEG_HASH). + erlang:phash2(leveled_bookie:strip_to_keyonly(KV), ?MAX_SEG_HASH). %% Check for a given list of segments in the filter, returning in normal @@ -1396,30 +1391,6 @@ findremainder(BitStr, Factor) -> %%% Test %%%============================================================================ -speedtest_check_forsegment(_, 0, _, _) -> - true; -speedtest_check_forsegment(SegFilter, LoopCount, CRCCheck, IDsToCheck) -> - check_for_segments(SegFilter, gensegmentids(IDsToCheck), CRCCheck), - speedtest_check_forsegment(SegFilter, LoopCount - 1, CRCCheck, IDsToCheck). - -gensegmentids(Count) -> - gensegmentids([], Count). - -gensegmentids(GeneratedIDs, 0) -> - lists:sort(GeneratedIDs); -gensegmentids(GeneratedIDs, Count) -> - gensegmentids([random:uniform(1024*1024)|GeneratedIDs], Count - 1). - - -generate_randomsegfilter(BlockSize) -> - Block1 = gensegmentids(BlockSize), - Block2 = gensegmentids(BlockSize), - Block3 = gensegmentids(BlockSize), - Block4 = gensegmentids(BlockSize), - serialise_segment_filter(generate_segment_filter({Block1, - Block2, - Block3, - Block4})). generate_randomkeys({Count, StartSQN}) -> @@ -1433,8 +1404,8 @@ generate_randomkeys(Count, SQN, Acc) -> RandKey = {{o, lists:concat(["Bucket", random:uniform(1024)]), lists:concat(["Key", random:uniform(1024)])}, - SQN, - {active, infinity}, null}, + {SQN, + {active, infinity}, null}}, generate_randomkeys(Count - 1, SQN + 1, [RandKey|Acc]). generate_sequentialkeys(Count, Start) -> @@ -1447,76 +1418,71 @@ generate_sequentialkeys(Target, Incr, Acc) -> NextKey = {{o, "BucketSeq", lists:concat(["Key", KeyStr])}, - 5, - {active, infinity}, null}, + {5, + {active, infinity}, null}}, generate_sequentialkeys(Target, Incr + 1, [NextKey|Acc]). -dummy_test() -> - R = speedtest_check_forsegment(a, 0, b, c), - ?assertMatch(R, true), - _ = generate_randomsegfilter(8). - simple_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key3"}, 2, {active, infinity}, null}], - KeyList2 = [{{o, "Bucket1", "Key2"}, 3, {active, infinity}, null}], + KeyList1 = [{{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key3"}, {2, {active, infinity}, null}}], + KeyList2 = [{{o, "Bucket1", "Key2"}, {3, {active, infinity}, null}}], {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, KeyList2, 1), ?assertMatch(partial, ListStatus), [H1|T1] = MergedKeyList, - ?assertMatch(H1, {{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}), + ?assertMatch(H1, {{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}), [H2|T2] = T1, - ?assertMatch(H2, {{o, "Bucket1", "Key2"}, 3, {active, infinity}, null}), - ?assertMatch(T2, [{{o, "Bucket1", "Key3"}, 2, {active, infinity}, null}]), + ?assertMatch(H2, {{o, "Bucket1", "Key2"}, {3, {active, infinity}, null}}), + ?assertMatch(T2, [{{o, "Bucket1", "Key3"}, {2, {active, infinity}, null}}]), ?assertMatch(SN, {1,3}). dominate_create_block_test() -> - KeyList1 = [{{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key2"}, 2, {active, infinity}, null}], - KeyList2 = [{{o, "Bucket1", "Key2"}, 3, {tomb, infinity}, null}], + KeyList1 = [{{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key2"}, {2, {active, infinity}, null}}], + KeyList2 = [{{o, "Bucket1", "Key2"}, {3, {tomb, infinity}, null}}], {MergedKeyList, ListStatus, SN, _, _, _} = create_block(KeyList1, KeyList2, 1), ?assertMatch(partial, ListStatus), [K1, K2] = MergedKeyList, - ?assertMatch(K1, {{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}), - ?assertMatch(K2, {{o, "Bucket1", "Key2"}, 3, {tomb, infinity}, null}), + ?assertMatch(K1, {{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}), + ?assertMatch(K2, {{o, "Bucket1", "Key2"}, {3, {tomb, infinity}, null}}), ?assertMatch(SN, {1,3}). sample_keylist() -> - KeyList1 = [{{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key3"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key5"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key7"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key9"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key1"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key3"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key5"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key7"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key9"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key1"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key3"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key5"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key7"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key9"}, 1, {active, infinity}, null}, - {{o, "Bucket4", "Key1"}, 1, {active, infinity}, null}], - KeyList2 = [{{o, "Bucket1", "Key2"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key4"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key6"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key8"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key9a"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key9b"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key9c"}, 1, {active, infinity}, null}, - {{o, "Bucket1", "Key9d"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key2"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key4"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key6"}, 1, {active, infinity}, null}, - {{o, "Bucket2", "Key8"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key2"}, 1, {active, infinity}, null}, - {{o, "Bucket3", "Key4"}, 3, {active, infinity}, null}, - {{o, "Bucket3", "Key6"}, 2, {active, infinity}, null}, - {{o, "Bucket3", "Key8"}, 1, {active, infinity}, null}], + KeyList1 = [{{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key3"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key5"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key7"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key9"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key1"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key3"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key5"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key7"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key9"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key1"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key3"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key5"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key7"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key9"}, {1, {active, infinity}, null}}, + {{o, "Bucket4", "Key1"}, {1, {active, infinity}, null}}], + KeyList2 = [{{o, "Bucket1", "Key2"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key4"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key6"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key8"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key9a"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key9b"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key9c"}, {1, {active, infinity}, null}}, + {{o, "Bucket1", "Key9d"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key2"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key4"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key6"}, {1, {active, infinity}, null}}, + {{o, "Bucket2", "Key8"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key2"}, {1, {active, infinity}, null}}, + {{o, "Bucket3", "Key4"}, {3, {active, infinity}, null}}, + {{o, "Bucket3", "Key6"}, {2, {active, infinity}, null}}, + {{o, "Bucket3", "Key8"}, {1, {active, infinity}, null}}], {KeyList1, KeyList2}. alternating_create_block_test() -> @@ -1528,12 +1494,12 @@ alternating_create_block_test() -> ?assertMatch(BlockSize, 32), ?assertMatch(ListStatus, complete), K1 = lists:nth(1, MergedKeyList), - ?assertMatch(K1, {{o, "Bucket1", "Key1"}, 1, {active, infinity}, null}), + ?assertMatch(K1, {{o, "Bucket1", "Key1"}, {1, {active, infinity}, null}}), K11 = lists:nth(11, MergedKeyList), - ?assertMatch(K11, {{o, "Bucket1", "Key9b"}, 1, {active, infinity}, null}), + ?assertMatch(K11, {{o, "Bucket1", "Key9b"}, {1, {active, infinity}, null}}), K32 = lists:nth(32, MergedKeyList), - ?assertMatch(K32, {{o, "Bucket4", "Key1"}, 1, {active, infinity}, null}), - HKey = {{o, "Bucket1", "Key0"}, 1, {active, infinity}, null}, + ?assertMatch(K32, {{o, "Bucket4", "Key1"}, {1, {active, infinity}, null}}), + HKey = {{o, "Bucket1", "Key0"}, {1, {active, infinity}, null}}, {_, ListStatus2, _, _, _, _} = create_block([HKey|KeyList1], KeyList2, 1), ?assertMatch(ListStatus2, full). @@ -1690,7 +1656,7 @@ initial_create_file_test() -> Result1 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key8"}), io:format("Result is ~w~n", [Result1]), ?assertMatch(Result1, {{o, "Bucket1", "Key8"}, - 1, {active, infinity}, null}), + {1, {active, infinity}, null}}), Result2 = fetch_keyvalue(UpdHandle, UpdFileMD, {o, "Bucket1", "Key88"}), io:format("Result is ~w~n", [Result2]), ?assertMatch(Result2, not_present), @@ -1705,18 +1671,18 @@ big_create_file_test() -> {Handle, FileMD, {_KL1Rem, _KL2Rem}} = complete_file(InitHandle, InitFileMD, KL1, KL2, 1), - [{K1, Sq1, St1, V1}|_] = KL1, - [{K2, Sq2, St2, V2}|_] = KL2, + [{K1, {Sq1, St1, V1}}|_] = KL1, + [{K2, {Sq2, St2, V2}}|_] = KL2, Result1 = fetch_keyvalue(Handle, FileMD, K1), Result2 = fetch_keyvalue(Handle, FileMD, K2), - ?assertMatch(Result1, {K1, Sq1, St1, V1}), - ?assertMatch(Result2, {K2, Sq2, St2, V2}), + ?assertMatch(Result1, {K1, {Sq1, St1, V1}}), + ?assertMatch(Result2, {K2, {Sq2, St2, V2}}), SubList = lists:sublist(KL2, 1000), FailedFinds = lists:foldl(fun(K, Acc) -> - {Kn, _, _, _} = K, + {Kn, {_, _, _}} = K, Rn = fetch_keyvalue(Handle, FileMD, Kn), case Rn of - {Kn, _, _, _} -> + {Kn, {_, _, _}} -> Acc; _ -> Acc + 1