Basic GET/PUT and rolling in Inker

Add support to roll file on PUT in the inker
This commit is contained in:
martinsumner 2016-09-06 17:17:31 +01:00
parent f3a40e106d
commit f0e1c1d7ea
3 changed files with 40 additions and 21 deletions

View file

@ -59,7 +59,7 @@
%%
%% The Bookie's memory consists of an in-memory ets table. Periodically, the
%% current table is pushed to the Penciller for eventual persistence, and a
%% new tabble is started.
%% new table is started.
%%
%% This completes the non-deferrable work associated with a PUT
%%

View file

@ -71,6 +71,7 @@
-define(CRC_CHECK, true).
-define(MAX_FILE_SIZE, 3221225472).
-define(BASE_POSITION, 2048).
-define(WRITE_OPS, [binary, raw, read, write]).
-record(state, {hashtree,
last_position :: integer(),
@ -139,8 +140,7 @@ init([]) ->
handle_call({cdb_open_writer, Filename}, _From, State) ->
io:format("Opening file for writing with filename ~s~n", [Filename]),
{LastPosition, HashTree, LastKey} = open_active_file(Filename),
{ok, Handle} = file:open(Filename, [binary, raw, read,
write, delayed_write]),
{ok, Handle} = file:open(Filename, [sync | ?WRITE_OPS]),
{reply, ok, State#state{handle=Handle,
last_position=LastPosition,
last_key=LastKey,
@ -220,7 +220,7 @@ handle_call(cdb_filename, _From, State) ->
{reply, State#state.filename, State};
handle_call(cdb_close, _From, State) ->
ok = file:close(State#state.handle),
{stop, normal, ok, State};
{stop, normal, ok, State#state{handle=closed}};
handle_call(cdb_complete, _From, State) ->
case State#state.writer of
true ->
@ -247,7 +247,12 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State) ->
file:close(State#state.handle).
case State#state.handle of
closed ->
ok;
Handle ->
file:close(Handle)
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -270,7 +275,7 @@ from_dict(FileName,Dict) ->
%% this function creates a CDB
%%
create(FileName,KeyValueList) ->
{ok, Handle} = file:open(FileName, [binary, raw, read, write]),
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
{ok, _} = file:position(Handle, {bof, ?BASE_POSITION}),
{BasePos, HashTree} = write_key_value_pairs(Handle, KeyValueList),
close_file(Handle, HashTree, BasePos).
@ -324,7 +329,7 @@ dump(FileName, CRCCheck) ->
%% tuples as the write_key_value_pairs function, and the current position, and
%% the file handle
open_active_file(FileName) when is_list(FileName) ->
{ok, Handle} = file:open(FileName, [binary, raw, read, write]),
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
{ok, Position} = file:position(Handle, {bof, 256*?DWORD_SIZE}),
{LastPosition, HashTree, LastKey} = scan_over_file(Handle, Position),
case file:position(Handle, eof) of
@ -345,14 +350,12 @@ open_active_file(FileName) when is_list(FileName) ->
%% dictionary of Keys and positions. Returns an updated Position
%%
put(FileName, Key, Value, {LastPosition, HashTree}) when is_list(FileName) ->
{ok, Handle} = file:open(FileName,
[binary, raw, read, write, delayed_write]),
{ok, Handle} = file:open(FileName, ?WRITE_OPS),
put(Handle, Key, Value, {LastPosition, HashTree});
put(Handle, Key, Value, {LastPosition, HashTree}) ->
Bin = key_value_to_record({Key, Value}),
PotentialNewSize = LastPosition + byte_size(Bin),
if PotentialNewSize > ?MAX_FILE_SIZE ->
close_file(Handle, HashTree, LastPosition),
roll;
true ->
ok = file:pwrite(Handle, LastPosition, Bin),

View file

@ -102,6 +102,7 @@
ink_put/4,
ink_get/3,
ink_snap/1,
ink_close/1,
build_dummy_journal/0,
simple_manifest_reader/2]).
@ -139,6 +140,9 @@ ink_get(Pid, PrimaryKey, SQN) ->
ink_snap(Pid) ->
gen_server:call(Pid, snapshot, infinity).
ink_close(Pid) ->
gen_server:call(Pid, close, infinity).
%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
@ -200,7 +204,12 @@ handle_call({get, Key, SQN}, _From, State) ->
handle_call(snapshot, _From , State) ->
%% TODO: Not yet implemented registration of snapshot
%% Should return manifest and register the snapshot
{reply, State#state.manifest, State}.
{reply, {State#state.manifest,
State#state.active_journaldb,
State#state.active_journaldb_sqn},
State};
handle_call(close, _From, State) ->
{stop, normal, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@ -208,8 +217,12 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
terminate(Reason, State) ->
io:format("Inker closing journal for reason ~w~n", [Reason]),
io:format("Close triggered with journal_sqn=~w and manifest_sqn=~w~n",
[State#state.journal_sqn, State#state.manifest_sqn]),
io:format("Manifest when closing is ~w~n", [State#state.manifest]),
close_allmanifest(State#state.manifest, State#state.active_journaldb).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -235,21 +248,24 @@ put_object(PrimaryKey, Object, KeyChanges, State) ->
Bin1) of
ok ->
{rolling, State#state{journal_sqn=NewSQN,
active_journaldb=NewJournalP}};
active_journaldb=NewJournalP,
active_journaldb_sqn=NewSQN}};
roll ->
{blocked, State#state{journal_sqn=NewSQN,
active_journaldb=NewJournalP}}
active_journaldb=NewJournalP,
active_journaldb_sqn=NewSQN}}
end
end.
roll_active_file(OldActiveJournal, Manifest, ManifestSQN, RootPath) ->
io:format("Rolling old journal ~w~n", [OldActiveJournal]),
{ok, NewFilename} = leveled_cdb:cdb_complete(OldActiveJournal),
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
JournalRegex2 = "nursery_(?<SQN>[0-9]+)\\." ++ ?JOURNAL_FILEX,
[JournalSQN] = sequencenumbers_fromfilenames([NewFilename],
JournalRegex2,
'SQN'),
NewManifest = lists:append(Manifest, {JournalSQN, NewFilename, PidR}),
NewManifest = lists:append(Manifest, [{JournalSQN, NewFilename, PidR}]),
NewManifestSQN = ManifestSQN + 1,
ok = simple_manifest_writer(NewManifest, NewManifestSQN, RootPath),
{NewManifest, NewManifestSQN}.
@ -387,7 +403,7 @@ close_allmanifest([], ActiveJournal) ->
leveled_cdb:cdb_close(ActiveJournal);
close_allmanifest([H|ManifestT], ActiveJournal) ->
{_, _, Pid} = H,
leveled_cdb:cdb_close(Pid),
ok = leveled_cdb:cdb_close(Pid),
close_allmanifest(ManifestT, ActiveJournal).
@ -396,12 +412,12 @@ roll_pending_journals([TopJournalSQN], Manifest, _RootPath)
{TopJournalSQN, Manifest};
roll_pending_journals([JournalSQN|T], Manifest, RootPath) ->
Filename = filepath(RootPath, JournalSQN, new_journal),
PidW = leveled_cdb:cdb_open_writer(Filename),
{ok, PidW} = leveled_cdb:cdb_open_writer(Filename),
{ok, NewFilename} = leveled_cdb:cdb_complete(PidW),
{ok, PidR} = leveled_cdb:cdb_open_reader(NewFilename),
roll_pending_journals(T,
lists:append(Manifest,
{JournalSQN, NewFilename, PidR}),
[{JournalSQN, NewFilename, PidR}]),
RootPath).
@ -454,7 +470,7 @@ simple_manifest_writer(Manifest, ManSQN, RootPath) ->
NewFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?MANIFEST_FILEX),
TmpFN = filename:join(ManPath, integer_to_list(ManSQN) ++ ?PENDING_FILEX),
MBin = term_to_binary(Manifest),
case file:is_file(NewFN) of
case filelib:is_file(NewFN) of
true ->
io:format("Error - trying to write manifest for"
++ " ManifestSQN=~w which already exists~n", [ManSQN]),