From f0e1c1d7eaa2512ddd91a08f39ece8b7e02ca332 Mon Sep 17 00:00:00 2001 From: martinsumner Date: Tue, 6 Sep 2016 17:17:31 +0100 Subject: [PATCH] Basic GET/PUT and rolling in Inker Add support to roll file on PUT in the inker --- src/leveled_bookie.erl | 4 ++-- src/leveled_cdb.erl | 21 ++++++++++++--------- src/leveled_inker.erl | 36 ++++++++++++++++++++++++++---------- 3 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 84f7b5a..0eab5bc 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -42,7 +42,7 @@ %% the request to the ledger. %% %% The inker will pass the request to the current (append only) CDB journal -%% fileto persist the change. The call should return either 'ok' or 'roll'. +%% file to persist the change. The call should return either 'ok' or 'roll'. %% 'roll' indicates that the CDB file has insufficient capacity for %% this write. @@ -59,7 +59,7 @@ %% %% The Bookie's memory consists of an in-memory ets table. Periodically, the %% current table is pushed to the Penciller for eventual persistence, and a -%% new tabble is started. +%% new table is started. %% %% This completes the non-deferrable work associated with a PUT %% diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 88c8223..2e63dbc 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -71,6 +71,7 @@ -define(CRC_CHECK, true). -define(MAX_FILE_SIZE, 3221225472). -define(BASE_POSITION, 2048). +-define(WRITE_OPS, [binary, raw, read, write]). -record(state, {hashtree, last_position :: integer(), @@ -139,8 +140,7 @@ init([]) -> handle_call({cdb_open_writer, Filename}, _From, State) -> io:format("Opening file for writing with filename ~s~n", [Filename]), {LastPosition, HashTree, LastKey} = open_active_file(Filename), - {ok, Handle} = file:open(Filename, [binary, raw, read, - write, delayed_write]), + {ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]), {reply, ok, State#state{handle=Handle, last_position=LastPosition, last_key=LastKey, @@ -220,7 +220,7 @@ handle_call(cdb_filename, _From, State) -> {reply, State#state.filename, State}; handle_call(cdb_close, _From, State) -> ok = file:close(State#state.handle), - {stop, normal, ok, State}; + {stop, normal, ok, State#state{handle=closed}}; handle_call(cdb_complete, _From, State) -> case State#state.writer of true -> @@ -247,7 +247,12 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> - file:close(State#state.handle). + case State#state.handle of + closed -> + ok; + Handle -> + file:close(Handle) + end. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -270,7 +275,7 @@ from_dict(FileName,Dict) -> %% this function creates a CDB %% create(FileName,KeyValueList) -> - {ok, Handle} = file:open(FileName, [binary, raw, read, write]), + {ok, Handle} = file:open(FileName, ?WRITE_OPS), {ok, _} = file:position(Handle, {bof, ?BASE_POSITION}), {BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList), close_file(Handle, HashTree, BasePos). @@ -324,7 +329,7 @@ dump(FileName, CRCCheck) -> %% tuples as the write_key_value_pairs function, and the current position, and %% the file handle open_active_file(FileName) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, [binary, raw, read, write]), + {ok, Handle} = file:open(FileName, ?WRITE_OPS), {ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}), {LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position), case file:position(Handle, eof) of @@ -345,14 +350,12 @@ open_active_file(FileName) when is_list(FileName) -> %% dictionary of Keys and positions. Returns an updated Position %% put(FileName, Key, Value, {LastPosition, HashTree}) when is_list(FileName) -> - {ok, Handle} = file:open(FileName, - [binary, raw, read, write, delayed_write]), + {ok, Handle} = file:open(FileName, ?WRITE_OPS), put(Handle, Key, Value, {LastPosition, HashTree}); put(Handle, Key, Value, {LastPosition, HashTree}) -> Bin = key_value_to_record({Key, Value}), PotentialNewSize = LastPosition + byte_size(Bin), if PotentialNewSize > ?MAX_FILE_SIZE -> - close_file(Handle, HashTree, LastPosition), roll; true -> ok = file:pwrite(Handle, LastPosition, Bin), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index b303f4e..de75666 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -102,6 +102,7 @@ ink_put/4, ink_get/3, ink_snap/1, + ink_close/1, build_dummy_journal/0, simple_manifest_reader/2]). @@ -139,6 +140,9 @@ ink_get(Pid, PrimaryKey, SQN) -> ink_snap(Pid) -> gen_server:call(Pid, snapshot, infinity). +ink_close(Pid) -> + gen_server:call(Pid, close, infinity). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -200,7 +204,12 @@ handle_call({get, Key, SQN}, _From, State) -> handle_call(snapshot, _From , State) -> %% TODO: Not yet implemented registration of snapshot %% Should return manifest and register the snapshot - {reply, State#state.manifest, State}. + {reply, {State#state.manifest, + State#state.active_journaldb, + State#state.active_journaldb_sqn}, + State}; +handle_call(close, _From, State) -> + {stop, normal, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -208,8 +217,12 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(Reason, State) -> + io:format("Inker closing journal for reason ~w~n", [Reason]), + io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n", + [State#state.journal_sqn, State#state.manifest_sqn]), + io:format("Manifest when closing is ~w~n", [State#state.manifest]), + close_allmanifest(State#state.manifest, State#state.active_journaldb). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -235,21 +248,24 @@ put_object(PrimaryKey, Object, KeyChanges, State) -> Bin1) of ok -> {rolling, State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP}}; + active_journaldb=NewJournalP, + active_journaldb_sqn=NewSQN}}; roll -> {blocked, State#state{journal_sqn=NewSQN, - active_journaldb=NewJournalP}} + active_journaldb=NewJournalP, + active_journaldb_sqn=NewSQN}} end end. roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) -> + io:format("Rolling old journal ~w~n", [OldActiveJournal]), {ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal), {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), JournalRegex2 = "nursery_(?[0-9]+)\\." ++ ?JOURNAL_FILEX, [JournalSQN] = sequencenumbers_fromfilenames([NewFilename], JournalRegex2, 'SQN'), - NewManifest = lists:append(Manifest, {JournalSQN, NewFilename, PidR}), + NewManifest = lists:append(Manifest, [{JournalSQN, NewFilename, PidR}]), NewManifestSQN = ManifestSQN + 1, ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath), {NewManifest, NewManifestSQN}. @@ -387,7 +403,7 @@ close_allmanifest([], ActiveJournal) -> leveled_cdb:cdb_close(ActiveJournal); close_allmanifest([H|ManifestT], ActiveJournal) -> {_, _, Pid} = H, - leveled_cdb:cdb_close(Pid), + ok = leveled_cdb:cdb_close(Pid), close_allmanifest(ManifestT, ActiveJournal). @@ -396,12 +412,12 @@ roll_pending_journals([TopJournalSQN], Manifest, _RootPath) {TopJournalSQN, Manifest}; roll_pending_journals([JournalSQN|T], Manifest, RootPath) -> Filename = filepath(RootPath, JournalSQN, new_journal), - PidW = leveled_cdb:cdb_open_writer(Filename), + {ok, PidW} = leveled_cdb:cdb_open_writer(Filename), {ok, NewFilename} = leveled_cdb:cdb_complete(PidW), {ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename), roll_pending_journals(T, lists:append(Manifest, - {JournalSQN, NewFilename, PidR}), + [{JournalSQN, NewFilename, PidR}]), RootPath). @@ -454,7 +470,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) -> NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX), TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX), MBin = term_to_binary(Manifest), - case file:is_file(NewFN) of + case filelib:is_file(NewFN) of true -> io:format("Error - trying to write manifest for" ++ " ManifestSQN=~w which already exists~n", [ManSQN]),