Support for 2i query part1

Added basic support for 2i query.  This involved some refactoring of the
test code to share functions between suites.

There is sill a need for a Part 2 as no tests currently cover removal of
index entries.
This commit is contained in:
martinsumner 2016-10-18 01:59:03 +01:00
parent ac0504e79e
commit 3e475f46e8
11 changed files with 682 additions and 288 deletions

View file

@ -124,7 +124,7 @@
-behaviour(gen_server).
-include("../include/leveled.hrl").
-include("include/leveled.hrl").
-export([init/1,
handle_call/3,
@ -348,6 +348,17 @@ handle_call({return_folder, FolderType}, _From, State) ->
State#state.ledger_cache,
Bucket,
?RIAK_TAG),
State};
{index_query,
Bucket,
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}} ->
{reply,
index_query(State#state.penciller,
State#state.ledger_cache,
Bucket,
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}),
State}
end;
handle_call({compact_journal, Timeout}, _From, State) ->
@ -408,6 +419,41 @@ bucket_stats(Penciller, LedgerCache, Bucket, Tag) ->
end,
{async, Folder}.
index_query(Penciller, LedgerCache,
Bucket,
{IdxField, StartValue, EndValue},
{ReturnTerms, TermRegex}) ->
PCLopts = #penciller_options{start_snapshot=true,
source_penciller=Penciller},
{ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts),
Folder = fun() ->
Increment = gb_trees:to_list(LedgerCache),
io:format("Length of increment in snapshot is ~w~n",
[length(Increment)]),
ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot,
{infinity, Increment}),
StartKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
IdxField, StartValue),
EndKey = leveled_codec:to_ledgerkey(Bucket, null, ?IDX_TAG,
IdxField, EndValue),
AddFun = case ReturnTerms of
true ->
fun add_terms/3;
_ ->
fun add_keys/3
end,
AccFun = accumulate_index(TermRegex, AddFun),
Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot,
StartKey,
EndKey,
AccFun,
[]),
ok = leveled_penciller:pcl_close(LedgerSnapshot),
Acc
end,
{async, Folder}.
shutdown_wait([], _Inker) ->
false;
shutdown_wait([TopPause|Rest], Inker) ->
@ -476,6 +522,47 @@ accumulate_size(Key, Value, {Size, Count}) ->
{Size, Count}
end.
add_keys(ObjKey, _IdxValue, Acc) ->
Acc ++ [ObjKey].
add_terms(ObjKey, IdxValue, Acc) ->
Acc ++ [{IdxValue, ObjKey}].
accumulate_index(TermRe, AddFun) ->
case TermRe of
undefined ->
fun(Key, Value, Acc) ->
case leveled_codec:is_active(Key, Value) of
true ->
{_Bucket,
ObjKey,
IdxValue} = leveled_codec:from_ledgerkey(Key),
AddFun(ObjKey, IdxValue, Acc);
false ->
Acc
end end;
TermRe ->
fun(Key, Value, Acc) ->
case leveled_codec:is_active(Key, Value) of
true ->
{_Bucket,
ObjKey,
IdxValue} = leveled_codec:from_ledgerkey(Key),
case re:run(IdxValue, TermRe) of
nomatch ->
Acc;
_ ->
AddFun(ObjKey, IdxValue, Acc)
end;
false ->
Acc
end end
end.
preparefor_ledgercache(PK, SQN, Obj, Size, IndexSpecs) ->
{Bucket, Key, PrimaryChange} = leveled_codec:generate_ledgerkv(PK,
SQN,