diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 46ce7e2..ebd6dc2 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -361,13 +361,15 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================ bucket_stats(Penciller, LedgerCache, Bucket) -> + PCLopts = #penciller_options{start_snapshot=true, + source_penciller=Penciller}, + {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Folder = fun() -> - PCLopts = #penciller_options{start_snapshot=true, - source_penciller=Penciller}, - {ok, LedgerSnapshot} = leveled_penciller:pcl_start(PCLopts), Increment = gb_trees:to_list(LedgerCache), + io:format("Length of increment in snapshot is ~w~n", + [length(Increment)]), ok = leveled_penciller:pcl_loadsnapshot(LedgerSnapshot, - Increment), + {infinity, Increment}), StartKey = {o, Bucket, null, null}, EndKey = {o, Bucket, null, null}, Acc = leveled_penciller:pcl_fetchkeys(LedgerSnapshot, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 96926c7..33e5e9b 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -151,7 +151,6 @@ check_single_file(CDB, FilterFun, FilterServer, MaxSQN, SampleSize, BatchSize) - FN = leveled_cdb:cdb_filename(CDB), PositionList = leveled_cdb:cdb_getpositions(CDB, SampleSize), KeySizeList = fetch_inbatches(PositionList, BatchSize, CDB, []), - io:format("KeySizeList ~w~n", [KeySizeList]), R0 = lists:foldl(fun(KS, {ActSize, RplSize}) -> {{SQN, PK}, Size} = KS, Check = FilterFun(FilterServer, PK, SQN), diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index bcf1023..946a118 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -594,7 +594,7 @@ load_from_sequence(MinSQN, FilterFun, Penciller, [{_LowSQN, FN, Pid}|Rest]) -> undefined, FN, Rest); - [{NextSQN, _FN, Pid}|_Rest] when NextSQN > MinSQN -> + [{NextSQN, _NxtFN, _NxtPid}|_Rest] when NextSQN > MinSQN -> load_between_sequence(MinSQN, MinSQN + ?LOADING_BATCH, FilterFun, diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index cff4a45..413205b 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -73,7 +73,7 @@ -record(state, {owner :: pid(), change_pending=false :: boolean(), - work_item :: #penciller_work{}}). + work_item :: #penciller_work{}|null}). %%%============================================================================ %%% API @@ -208,28 +208,23 @@ merge(WI) -> {WI#penciller_work.ledger_filepath, WI#penciller_work.next_sqn}) end, - case MergedFiles of - error -> - merge_failure; - _ -> - NewLevel = lists:sort(lists:append(MergedFiles, Others)), - UpdMFest2 = lists:keystore(SrcLevel + 1, - 1, - UpdMFest1, - {SrcLevel + 1, NewLevel}), - - ok = filelib:ensure_dir(WI#penciller_work.manifest_file), - {ok, Handle} = file:open(WI#penciller_work.manifest_file, - [binary, raw, write]), - ok = file:write(Handle, term_to_binary(UpdMFest2)), - ok = file:close(Handle), - case lists:member(SrcF, MergedFiles) of - true -> - {UpdMFest2, Candidates}; - false -> - %% Can rub out src file as it is not part of output - {UpdMFest2, Candidates ++ [SrcF]} - end + NewLevel = lists:sort(lists:append(MergedFiles, Others)), + UpdMFest2 = lists:keystore(SrcLevel + 1, + 1, + UpdMFest1, + {SrcLevel + 1, NewLevel}), + + ok = filelib:ensure_dir(WI#penciller_work.manifest_file), + {ok, Handle} = file:open(WI#penciller_work.manifest_file, + [binary, raw, write]), + ok = file:write(Handle, term_to_binary(UpdMFest2)), + ok = file:close(Handle), + case lists:member(SrcF, MergedFiles) of + true -> + {UpdMFest2, Candidates}; + false -> + %% Can rub out src file as it is not part of output + {UpdMFest2, Candidates ++ [SrcF]} end. @@ -317,6 +312,8 @@ do_merge(KL1, KL2, Level, {Filepath, MSN}, FileCounter, OutList) -> [Level + 1, FileCounter])), io:format("File to be created as part of MSN=~w Filename=~s~n", [MSN, FileName]), + % Attempt to trace intermittent eaccess failures + false = filelib:is_file(FileName), TS1 = os:timestamp(), {ok, Pid, Reply} = leveled_sft:sft_new(FileName, KL1, KL2, Level + 1), {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index 05ea41f..b785802 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -335,7 +335,7 @@ pcl_loadsnapshot(Pid, Increment) -> gen_server:call(Pid, {load_snapshot, Increment}, infinity). pcl_close(Pid) -> - gen_server:call(Pid, close). + gen_server:call(Pid, close, 60000). %%%============================================================================ @@ -493,6 +493,8 @@ handle_call({fetch_keys, StartKey, EndKey, AccFun, InitAcc}, SFTiter = initiate_rangequery_frommanifest(StartKey, EndKey, State#state.manifest), + io:format("Manifest for iterator of:~n"), + print_manifest(SFTiter), Acc = keyfolder(L0iter, SFTiter, StartKey, EndKey, {AccFun, InitAcc}), {reply, Acc, State}; handle_call(work_for_clerk, From, State) -> @@ -523,7 +525,8 @@ handle_call({load_snapshot, Increment}, _From, State) -> ok end, {Tree1, TreeSQN1} = roll_new_tree(Tree0, [Increment], TreeSQN0), - io:format("Snapshot loaded to start at SQN~w~n", [TreeSQN1]), + io:format("Snapshot loaded with increments to start at SQN=~w~n", + [TreeSQN1]), {reply, ok, State#state{levelzero_snapshot=Tree1, ledger_sqn=TreeSQN1, snapshot_fully_loaded=true}}; @@ -852,6 +855,7 @@ compare_to_sqn(Obj, SQN) -> %% Manifest lock - don't have two changes to the manifest happening %% concurrently +% TODO: Is this necessary now? manifest_locked(State) -> if @@ -930,11 +934,15 @@ return_work(State, From) -> roll_new_tree(Tree, [], HighSQN) -> {Tree, HighSQN}; roll_new_tree(Tree, [{SQN, KVList}|TailIncs], HighSQN) when SQN >= HighSQN -> - UpdTree = lists:foldl(fun({K, V}, TreeAcc) -> - gb_trees:enter(K, V, TreeAcc) end, - Tree, - KVList), - roll_new_tree(UpdTree, TailIncs, SQN); + R = lists:foldl(fun({Kx, Vx}, {TreeAcc, MaxSQN}) -> + UpdTree = gb_trees:enter(Kx, Vx, TreeAcc), + SQNx = leveled_bookie:strip_to_seqonly({Kx, Vx}), + {UpdTree, max(SQNx, MaxSQN)} + end, + {Tree, HighSQN}, + KVList), + {UpdTree, UpdSQN} = R, + roll_new_tree(UpdTree, TailIncs, UpdSQN); roll_new_tree(Tree, [_H|TailIncs], HighSQN) -> roll_new_tree(Tree, TailIncs, HighSQN). @@ -949,14 +957,14 @@ cache_tree_in_memcopy(MemCopy, Tree, SQN) -> SQN >= PushSQN -> Acc; true -> - Acc ++ {PushSQN, PushL} + Acc ++ [{PushSQN, PushL}] end end, [], MemCopy#l0snapshot.increments), #l0snapshot{ledger_sqn = SQN, increments = Incs, tree = Tree}; - _ -> + _CurrentSQN -> MemCopy end. @@ -1004,10 +1012,17 @@ print_manifest(Manifest) -> io:format("Manifest at Level ~w~n", [L]), Level = get_item(L, Manifest, []), lists:foreach(fun(M) -> - print_manifest_entry(M) end, + R = is_record(M, manifest_entry), + case R of + true -> + print_manifest_entry(M); + false -> + {_, M1} = M, + print_manifest_entry(M1) + end end, Level) end, - lists:seq(1, ?MAX_LEVELS - 1)). + lists:seq(0, ?MAX_LEVELS - 1)). print_manifest_entry(Entry) -> {S1, S2, S3} = leveled_bookie:print_key(Entry#manifest_entry.start_key), @@ -1045,7 +1060,7 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> end end, [], - lists:seq(1, ?MAX_LEVELS - 1)). + lists:seq(0, ?MAX_LEVELS - 1)). %% Looks to find the best choice for the next key across the levels (other %% than in-memory table) @@ -1054,7 +1069,7 @@ initiate_rangequery_frommanifest(StartKey, EndKey, Manifest) -> find_nextkey(QueryArray, StartKey, EndKey) -> find_nextkey(QueryArray, - 1, + 0, {null, null}, {fun leveled_sft:sft_getkvrange/4, StartKey, EndKey, 1}). @@ -1178,40 +1193,47 @@ keyfolder(IMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc}) -> % iterate only over the remaining keys in the SFT iterator keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}); {IMMKey, IMMVal, NxtIMMiterator} -> - case {leveled_bookie:key_compare(EndKey, IMMKey, lt), - find_nextkey(SFTiterator, StartKey, EndKey)} of - {true, _} -> + case leveled_bookie:key_compare(EndKey, IMMKey, lt) of + true -> % There are no more keys in-range in the in-memory % iterator, so take action as if this iterator is empty % (see above) keyfolder(null, SFTiterator, StartKey, EndKey, {AccFun, Acc}); - {false, no_more_keys} -> - % No more keys in range in the persisted store, so use the - % in-memory KV as the next - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder(NxtIMMiterator, SFTiterator, - StartKey, EndKey, {AccFun, Acc1}); - {false, {NxtSFTiterator, {SFTKey, SFTVal}}} -> - % There is a next key, so need to know which is the next - % key between the two (and handle two keys with different - % sequence numbers). - case leveled_bookie:key_dominates({IMMKey, IMMVal}, - {SFTKey, SFTVal}) of - left_hand_first -> + false -> + case find_nextkey(SFTiterator, StartKey, EndKey) of + no_more_keys -> + % No more keys in range in the persisted store, so use the + % in-memory KV as the next Acc1 = AccFun(IMMKey, IMMVal, Acc), keyfolder(NxtIMMiterator, SFTiterator, StartKey, EndKey, {AccFun, Acc1}); - right_hand_first -> - Acc1 = AccFun(SFTKey, SFTVal, Acc), - keyfolder(IMMiterator, NxtSFTiterator, - StartKey, EndKey, {AccFun, Acc1}); - left_hand_dominant -> - Acc1 = AccFun(IMMKey, IMMVal, Acc), - keyfolder(NxtIMMiterator, NxtSFTiterator, - StartKey, EndKey, {AccFun, Acc1}) + {NxtSFTiterator, {SFTKey, SFTVal}} -> + % There is a next key, so need to know which is the + % next key between the two (and handle two keys + % with different sequence numbers). + case leveled_bookie:key_dominates({IMMKey, + IMMVal}, + {SFTKey, + SFTVal}) of + left_hand_first -> + Acc1 = AccFun(IMMKey, IMMVal, Acc), + keyfolder(NxtIMMiterator, SFTiterator, + StartKey, EndKey, + {AccFun, Acc1}); + right_hand_first -> + Acc1 = AccFun(SFTKey, SFTVal, Acc), + keyfolder(IMMiterator, NxtSFTiterator, + StartKey, EndKey, + {AccFun, Acc1}); + left_hand_dominant -> + Acc1 = AccFun(IMMKey, IMMVal, Acc), + keyfolder(NxtIMMiterator, NxtSFTiterator, + StartKey, EndKey, + {AccFun, Acc1}) + end end - end + end end. @@ -1373,10 +1395,12 @@ clean_subdir(DirPath) -> case filelib:is_dir(DirPath) of true -> {ok, Files} = file:list_dir(DirPath), - lists:foreach(fun(FN) -> file:delete(filename:join(DirPath, FN)), - io:format("Delete file ~s/~s~n", - [DirPath, FN]) - end, + lists:foreach(fun(FN) -> + File = filename:join(DirPath, FN), + case file:delete(File) of + ok -> io:format("Success deleting ~s~n", [File]); + _ -> io:format("Error deleting ~s~n", [File]) + end end, Files); false -> ok @@ -1556,35 +1580,19 @@ simple_server_test() -> ok = pcl_close(PCLr), clean_testdir(RootPath). -memcopy_test() -> +memcopy_updatecache1_test() -> KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "A"} end, + {X, null, "Val" ++ integer_to_list(X) ++ "A"}} + end, lists:seq(1, 1000)), KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "B"} end, + {X, null, "Val" ++ integer_to_list(X) ++ "B"}} + end, lists:seq(1001, 2000)), KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "C"} end, - lists:seq(1, 1000)), - MemCopy0 = #l0snapshot{}, - MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), - MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), - MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), - {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), - Size1 = gb_trees:size(Tree1), - ?assertMatch(2000, Size1), - ?assertMatch(3000, HighSQN1). - -memcopy_updatecache_test() -> - KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "A"} end, - lists:seq(1, 1000)), - KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "B"} end, - lists:seq(1001, 2000)), - KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), - "Value" ++ integer_to_list(X) ++ "C"} end, - lists:seq(1, 1000)), + {X, null, "Val" ++ integer_to_list(X) ++ "C"}} + end, + lists:seq(2001, 3000)), MemCopy0 = #l0snapshot{}, MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), @@ -1594,9 +1602,34 @@ memcopy_updatecache_test() -> MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1), ?assertMatch(0, length(MemCopy4#l0snapshot.increments)), Size2 = gb_trees:size(MemCopy4#l0snapshot.tree), - ?assertMatch(2000, Size2), + ?assertMatch(3000, Size2), ?assertMatch(3000, MemCopy4#l0snapshot.ledger_sqn). +memcopy_updatecache2_test() -> + KVL1 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + {X, null, "Val" ++ integer_to_list(X) ++ "A"}} + end, + lists:seq(1, 1000)), + KVL2 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + {X, null, "Val" ++ integer_to_list(X) ++ "B"}} + end, + lists:seq(1001, 2000)), + KVL3 = lists:map(fun(X) -> {"Key" ++ integer_to_list(X), + {X, null, "Val" ++ integer_to_list(X) ++ "C"}} + end, + lists:seq(1, 1000)), + MemCopy0 = #l0snapshot{}, + MemCopy1 = add_increment_to_memcopy(MemCopy0, 1000, KVL1), + MemCopy2 = add_increment_to_memcopy(MemCopy1, 2000, KVL2), + MemCopy3 = add_increment_to_memcopy(MemCopy2, 3000, KVL3), + ?assertMatch(0, MemCopy3#l0snapshot.ledger_sqn), + {Tree1, HighSQN1} = roll_new_tree(gb_trees:empty(), MemCopy3#l0snapshot.increments, 0), + MemCopy4 = cache_tree_in_memcopy(MemCopy3, Tree1, HighSQN1), + ?assertMatch(1, length(MemCopy4#l0snapshot.increments)), + Size2 = gb_trees:size(MemCopy4#l0snapshot.tree), + ?assertMatch(2000, Size2), + ?assertMatch(2000, MemCopy4#l0snapshot.ledger_sqn). + rangequery_manifest_test() -> {E1, E2, diff --git a/src/leveled_sft.erl b/src/leveled_sft.erl index 4a54eb6..dda8607 100644 --- a/src/leveled_sft.erl +++ b/src/leveled_sft.erl @@ -278,8 +278,6 @@ handle_call({sft_new, Filename, KL1, KL2, Level}, _From, State) -> {error, Reason} -> {reply, {error, Reason}, State}; {Handle, FileMD} -> - io:format("Creating file with inputs of size ~w ~w~n", - [length(KL1), length(KL2)]), {ReadHandle, UpdFileMD, KeyRemainders} = complete_file(Handle, FileMD, KL1, KL2, diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 7029e0d..2f30975 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -5,12 +5,14 @@ -export([simple_put_fetch_head/1, many_put_fetch_head/1, journal_compaction/1, - fetchput_snapshot/1]). + fetchput_snapshot/1, + load_and_count/1]). all() -> [simple_put_fetch_head, many_put_fetch_head, journal_compaction, - fetchput_snapshot]. + fetchput_snapshot, + load_and_count]. simple_put_fetch_head(_Config) -> @@ -216,6 +218,73 @@ fetchput_snapshot(_Config) -> reset_filestructure(). +load_and_count(_Config) -> + % Use artificially small files, and the load keys, counting they're all + % present + RootPath = reset_filestructure(), + StartOpts1 = #bookie_options{root_path=RootPath, max_journalsize=50000000}, + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + {TestObject, TestSpec} = generate_testobject(), + ok = leveled_bookie:book_riakput(Bookie1, TestObject, TestSpec), + check_bookie_forobject(Bookie1, TestObject), + io:format("Loading initial small objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, TestObject, + fun generate_multiple_smallobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Acc + 5000 == Count -> + ok + end, + Acc + 5000 end, + 0, + lists:seq(1, 20)), + check_bookie_forobject(Bookie1, TestObject), + io:format("Loading larger compressible objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, TestObject, + fun generate_multiple_compressibleobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Acc + 5000 == Count -> + ok + end, + Acc + 5000 end, + 100000, + lists:seq(1, 20)), + check_bookie_forobject(Bookie1, TestObject), + io:format("Replacing small objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, TestObject, + fun generate_multiple_smallobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Count == 200000 -> + ok + end, + Acc + 5000 end, + 0, + lists:seq(1, 20)), + check_bookie_forobject(Bookie1, TestObject), + io:format("Loading more small objects~n"), + lists:foldl(fun(_X, Acc) -> + load_objects(5000, [Acc + 2], Bookie1, TestObject, + fun generate_multiple_compressibleobjects/2), + {_Size, Count} = check_bucket_stats(Bookie1, "Bucket"), + if + Acc + 5000 == Count -> + ok + end, + Acc + 5000 end, + 200000, + lists:seq(1, 20)), + check_bookie_forobject(Bookie1, TestObject), + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = leveled_bookie:book_start(StartOpts1), + {_BSize, 300000} = check_bucket_stats(Bookie2, "Bucket"), + ok = leveled_bookie:book_close(Bookie2), + reset_filestructure(). + reset_filestructure() -> RootPath = "test", @@ -236,7 +305,7 @@ check_bucket_stats(Bookie, Bucket) -> {B1Size, B1Count} = Folder1(), io:format("Bucket fold completed in ~w microseconds~n", [timer:now_diff(os:timestamp(), FoldSW1)]), - io:format("Bucket ~w has size ~w and count ~w~n", + io:format("Bucket ~s has size ~w and count ~w~n", [Bucket, B1Size, B1Count]), {B1Size, B1Count}. @@ -306,6 +375,26 @@ generate_testobject(B, K, V, Spec, MD) -> {#r_object{bucket=B, key=K, contents=[Content], vclock=[{'a',1}]}, Spec}. + +generate_multiple_compressibleobjects(Count, KeyNumber) -> + S1 = "111111111111111", + S2 = "222222222222222", + S3 = "333333333333333", + S4 = "aaaaaaaaaaaaaaa", + S5 = "AAAAAAAAAAAAAAA", + S6 = "GGGGGGGGGGGGGGG", + S7 = "===============", + S8 = "...............", + Selector = [{1, S1}, {2, S2}, {3, S3}, {4, S4}, + {5, S5}, {6, S6}, {7, S7}, {8, S8}], + L = lists:seq(1, 1024), + V = lists:foldl(fun(_X, Acc) -> + {_, Str} = lists:keyfind(random:uniform(8), 1, Selector), + Acc ++ Str end, + "", + L), + generate_multiple_objects(Count, KeyNumber, [], V). + generate_multiple_smallobjects(Count, KeyNumber) -> generate_multiple_objects(Count, KeyNumber, [], crypto:rand_bytes(512)).