diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 1b68d85..25326f2 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -135,18 +135,20 @@ terminate/2, code_change/3, book_start/1, - book_put/4, - book_get/2, - book_head/2, + book_riakput/3, + book_riakget/3, + book_riakhead/3, + book_close/1, strip_to_keyonly/1, strip_to_keyseqonly/1, strip_to_seqonly/1, - strip_to_statusonly/1, - strip_to_details/1]). + strip_to_statusonly/1]). -include_lib("eunit/include/eunit.hrl"). -define(CACHE_SIZE, 1000). +-define(JOURNAL_FP, "journal"). +-define(LEDGER_FP, "ledger"). -record(state, {inker :: pid(), penciller :: pid(), @@ -165,15 +167,21 @@ book_start(Opts) -> gen_server:start(?MODULE, [Opts], []). -book_put(Pid, PrimaryKey, Object, IndexSpecs) -> +book_riakput(Pid, Object, IndexSpecs) -> + PrimaryKey = {o, Object#r_object.bucket, Object#r_object.key}, gen_server:call(Pid, {put, PrimaryKey, Object, IndexSpecs}, infinity). -book_get(Pid, PrimaryKey) -> +book_riakget(Pid, Bucket, Key) -> + PrimaryKey = {o, Bucket, Key}, gen_server:call(Pid, {get, PrimaryKey}, infinity). -book_head(Pid, PrimaryKey) -> +book_riakhead(Pid, Bucket, Key) -> + PrimaryKey = {o, Bucket, Key}, gen_server:call(Pid, {head, PrimaryKey}, infinity). +book_close(Pid) -> + gen_server:call(Pid, close, infinity). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -208,7 +216,10 @@ init([Opts]) -> handle_call({put, PrimaryKey, Object, IndexSpecs}, From, State) -> - {ok, SQN, ObjSize} = leveled_inker:ink_put(PrimaryKey, Object, IndexSpecs), + {ok, SQN, ObjSize} = leveled_inker:ink_put(State#state.inker, + PrimaryKey, + Object, + IndexSpecs), Changes = preparefor_ledgercache(PrimaryKey, SQN, Object, @@ -229,7 +240,7 @@ handle_call({get, Key}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {Key, Seqn, Status} = strip_to_details(Head), + {Seqn, Status, _MD} = striphead_to_details(Head), case Status of {tomb, _} -> {reply, not_found, State}; @@ -247,15 +258,17 @@ handle_call({head, Key}, _From, State) -> not_present -> {reply, not_found, State}; Head -> - {Key, _Seqn, Status} = strip_to_details(Head), + {_Seqn, Status, MD} = striphead_to_details(Head), case Status of {tomb, _} -> {reply, not_found, State}; {active, _} -> - MD = strip_to_mdonly(Head), - {reply, {ok, MD}, State} + OMD = build_metadata_object(Key, MD), + {reply, {ok, OMD}, State} end - end. + end; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -263,8 +276,10 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(Reason, State) -> + io:format("Bookie closing for reason ~w~n", [Reason]), + ok = leveled_inker:ink_close(State#state.inker), + ok = leveled_penciller:pcl_close(State#state.penciller). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -275,14 +290,18 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ set_options(Opts) -> - {#inker_options{root_path=Opts#bookie_options.root_path}, - #penciller_options{root_path=Opts#bookie_options.root_path}}. + %% TODO: Change the max size default, and allow setting through options + {#inker_options{root_path = Opts#bookie_options.root_path ++ + "/" ++ ?JOURNAL_FP, + cdb_options = #cdb_options{max_size=30000}}, + #penciller_options{root_path=Opts#bookie_options.root_path ++ + "/" ++ ?LEDGER_FP}}. startup(InkerOpts, PencillerOpts) -> {ok, Inker} = leveled_inker:ink_start(InkerOpts), {ok, Penciller} = leveled_penciller:pcl_start(PencillerOpts), LedgerSQN = leveled_penciller:pcl_getstartupsequencenumber(Penciller), - ok = leveled_inker:ink_loadpcl(LedgerSQN, fun load_fun/4, Penciller), + ok = leveled_inker:ink_loadpcl(Inker, LedgerSQN, fun load_fun/4, Penciller), {Inker, Penciller}. @@ -319,9 +338,7 @@ strip_to_statusonly({_, {_, St, _}}) -> St. strip_to_seqonly({_, {SeqN, _, _}}) -> SeqN. -strip_to_details({K, {SeqN, St, _}}) -> {K, SeqN, St}. - -strip_to_mdonly({_, {_, _, MD}}) -> MD. +striphead_to_details({SeqN, St, MD}) -> {SeqN, St, MD}. get_metadatas(#r_object{contents=Contents}) -> [Content#r_content.metadata || Content <- Contents]. @@ -341,6 +358,14 @@ hash(Obj=#r_object{}) -> extract_metadata(Obj, Size) -> {get_metadatas(Obj), vclock(Obj), hash(Obj), Size}. +build_metadata_object(PrimaryKey, Head) -> + {o, Bucket, Key} = PrimaryKey, + {MD, VC, _, _} = Head, + Contents = lists:foldl(fun(X, Acc) -> Acc ++ [#r_content{metadata=X}] end, + [], + MD), + #r_object{contents=Contents, bucket=Bucket, key=Key, vclock=VC}. + convert_indexspecs(IndexSpecs, SQN, PrimaryKey) -> lists:map(fun({IndexOp, IndexField, IndexValue}) -> Status = case IndexOp of @@ -367,7 +392,7 @@ preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) -> [PrimaryChange] ++ SecChanges. addto_ledgercache(Changes, Cache) -> - lists:foldl(fun({{K, V}, Acc}) -> gb_trees:enter(K, V, Acc) end, + lists:foldl(fun({K, V}, Acc) -> gb_trees:enter(K, V, Acc) end, Cache, Changes). @@ -412,4 +437,26 @@ load_fun(KeyInLedger, ValueInLedger, _Position, Acc0) -> -ifdef(TEST). +reset_filestructure() -> + RootPath = "../test", + leveled_inker:clean_testdir(RootPath ++ "/" ++ ?JOURNAL_FP), + leveled_penciller:clean_testdir(RootPath ++ "/" ++ ?LEDGER_FP), + RootPath. + +single_key_test() -> + RootPath = reset_filestructure(), + {ok, Bookie} = book_start(#bookie_options{root_path=RootPath}), + {B1, K1, V1, Spec1, MD} = {"Bucket1", + "Key1", + "Value1", + [], + {"MDK1", "MDV1"}}, + Content = #r_content{metadata=MD, value=V1}, + Object = #r_object{bucket=B1, key=K1, contents=[Content], vclock=[{'a',1}]}, + ok = book_riakput(Bookie, Object, Spec1), + {ok, F1} = book_riakget(Bookie, B1, K1), + ?assertMatch(F1, Object), + ok = book_close(Bookie), + reset_filestructure(). + -endif. \ No newline at end of file