Skip to content

Commit 1cada5c

Browse files
committed
add scanner to upgrade nouveau indexes
1 parent c589dbb commit 1cada5c

5 files changed

Lines changed: 318 additions & 3 deletions

File tree

rel/overlay/etc/default.ini

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,7 @@ url = {{nouveau_url}}
10931093
;couch_scanner_plugin_find = false
10941094
;couch_scanner_plugin_conflict_finder = false
10951095
;couch_quickjs_scanner_plugin = false
1096+
;nouveau_index_upgrader = false
10961097

10971098
; The following [$plugin*] settings apply to all plugins
10981099

@@ -1211,6 +1212,11 @@ url = {{nouveau_url}}
12111212
; Defaults to undefined, which disables auto purging.
12121213
;deleted_document_ttl =
12131214

1215+
[nouveau_index_upgrader]
1216+
; Common scanner scheduling settings
1217+
;after = restart
1218+
;repeat = restart
1219+
12141220
[chttpd_auth_lockout]
12151221
; CouchDB can temporarily lock out IP addresses that repeatedly fail authentication
12161222
; mode can be set to one of three recognised values;

src/nouveau/src/nouveau_api.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
search/2,
3030
set_purge_seq/3,
3131
set_update_seq/3,
32+
supported_lucene_versions/0,
3233
jaxrs_error/2
3334
]).
3435

@@ -214,6 +215,18 @@ set_seq(#index{} = Index, ReqBody) ->
214215
send_error(Reason)
215216
end.
216217

218+
supported_lucene_versions() ->
219+
Resp = send_if_enabled(<<"/">>, [], <<"GET">>),
220+
case Resp of
221+
{ok, 200, _, RespBody} ->
222+
Json = jiffy:decode(RespBody, [return_maps]),
223+
{ok, maps:get(<<"supported_lucene_versions">>, Json, [])};
224+
{ok, StatusCode, _, RespBody} ->
225+
{error, jaxrs_error(StatusCode, RespBody)};
226+
{error, Reason} ->
227+
send_error(Reason)
228+
end.
229+
217230
%% private functions
218231

219232
index_path(Path) when is_binary(Path) ->

src/nouveau/src/nouveau_fabric_search.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
-module(nouveau_fabric_search).
1717

18-
-export([go/4]).
18+
-export([go/3, go/4]).
1919

2020
-include_lib("mem3/include/mem3.hrl").
2121
-include_lib("couch/include/couch_db.hrl").
@@ -38,12 +38,12 @@ go(DbName, GroupId, IndexName, QueryArgs0) when is_binary(GroupId) ->
3838
go(DbName, #doc{} = DDoc, IndexName, QueryArgs0) ->
3939
case nouveau_util:design_doc_to_index(DbName, DDoc, IndexName) of
4040
{ok, Index} ->
41-
go(DbName, DDoc, IndexName, QueryArgs0, Index);
41+
go(DbName, QueryArgs0, Index);
4242
{error, Reason} ->
4343
{error, Reason}
4444
end.
4545

46-
go(DbName, #doc{} = _DDoc, _IndexName, QueryArgs0, Index) ->
46+
go(DbName, QueryArgs0, Index) ->
4747
Shards = get_shards(DbName, QueryArgs0),
4848
{PackedBookmark, #{limit := Limit, sort := Sort} = QueryArgs1} =
4949
maps:take(bookmark, QueryArgs0),
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(nouveau_index_upgrader).
14+
-behaviour(couch_scanner_plugin).
15+
16+
-export([
17+
start/2,
18+
resume/2,
19+
complete/1,
20+
checkpoint/1,
21+
db/2,
22+
ddoc/3
23+
]).
24+
25+
-include("nouveau.hrl").
26+
-include_lib("couch_scanner/include/couch_scanner_plugin.hrl").
27+
28+
start(ScanId, #{}) ->
29+
St = init_config(ScanId),
30+
case should_run(St) of
31+
true ->
32+
?INFO("Starting.", [], St),
33+
{ok, St};
34+
false ->
35+
?INFO("Not starting.", [], St),
36+
skip
37+
end.
38+
39+
resume(ScanId, #{}) ->
40+
St = init_config(ScanId),
41+
case should_run(St) of
42+
true ->
43+
?INFO("Resuming.", [], St),
44+
{ok, St};
45+
false ->
46+
?INFO("Not resuming.", [], St),
47+
skip
48+
end.
49+
50+
complete(St) ->
51+
?INFO("Completed", [], St),
52+
{ok, #{}}.
53+
54+
checkpoint(_St) ->
55+
{ok, #{}}.
56+
57+
db(St, _DbName) ->
58+
{ok, St}.
59+
60+
ddoc(St, _DbName, #doc{id = <<"_design/_", _/binary>>}) ->
61+
{ok, St};
62+
ddoc(St, DbName, #doc{} = DDoc0) ->
63+
case update_ddoc_versions(DDoc0) of
64+
DDoc0 ->
65+
ok;
66+
DDoc1 ->
67+
Indexes = nouveau_util:design_doc_to_indexes(DbName, DDoc1),
68+
case upgrade_indexes(DbName, Indexes) of
69+
true ->
70+
save_ddoc(DbName, DDoc1);
71+
false ->
72+
ok
73+
end
74+
end,
75+
{ok, St}.
76+
77+
upgrade_indexes(_DbName, []) ->
78+
true;
79+
upgrade_indexes(DbName, [Index | Rest]) ->
80+
case upgrade_index(DbName, Index) of
81+
true ->
82+
upgrade_indexes(DbName, Rest);
83+
false ->
84+
false
85+
end.
86+
87+
upgrade_index(DbName, #index{} = Index) ->
88+
?INFO("Upgrading ~s/~s/~s to version ~B", [
89+
DbName,
90+
Index#index.ddoc_id,
91+
Index#index.name,
92+
?TARGET_LUCENE_VERSION
93+
]),
94+
case
95+
nouveau_fabric_search:go(
96+
DbName,
97+
#{query => <<"*:*">>, bookmark => null, sort => null, limit => 1},
98+
Index#index{lucene_version = ?TARGET_LUCENE_VERSION}
99+
)
100+
of
101+
{ok, _SearchResults} ->
102+
true;
103+
{error, _Reason} ->
104+
false
105+
end.
106+
107+
update_ddoc_versions(#doc{} = Doc) ->
108+
#doc{body = {Fields0}} = Doc,
109+
{Indexes0} = couch_util:get_value(<<"nouveau">>, Fields0),
110+
Indexes1 = lists:map(fun update_version/1, Indexes0),
111+
Fields1 = couch_util:set_value(<<"nouveau">>, Fields0, {Indexes1}),
112+
Doc#doc{body = {Fields1}}.
113+
114+
save_ddoc(DbName, #doc{} = DDoc) ->
115+
{Pid, Ref} = spawn_monitor(fun() ->
116+
case fabric:update_doc(DbName, DDoc, [?ADMIN_CTX]) of
117+
{ok, _} ->
118+
exit(ok);
119+
Else ->
120+
exit(Else)
121+
end
122+
end),
123+
receive
124+
{'DOWN', Ref, process, Pid, ok} ->
125+
?INFO(
126+
"Updated ~s/~s indexes to version ~B", [DbName, DDoc#doc.id, ?TARGET_LUCENE_VERSION]
127+
);
128+
{'DOWN', Ref, process, Pid, Else} ->
129+
?INFO("Failed to update ~s/~s for reason ~p", [DbName, DDoc#doc.id, Else])
130+
end.
131+
132+
update_version({IndexName, {Index}}) ->
133+
{IndexName, {couch_util:set_value(<<"lucene_version">>, Index, ?TARGET_LUCENE_VERSION)}}.
134+
135+
init_config(ScanId) ->
136+
#{sid => ScanId}.
137+
138+
should_run(St) ->
139+
couch_scanner_util:on_first_node() andalso upgrade_supported(St).
140+
141+
upgrade_supported(St) ->
142+
case nouveau_api:supported_lucene_versions() of
143+
{ok, Versions} ->
144+
case lists:member(?TARGET_LUCENE_VERSION, Versions) of
145+
true ->
146+
?INFO(
147+
"Nouveau server supports upgrades to Lucene ~B",
148+
[?TARGET_LUCENE_VERSION],
149+
St
150+
),
151+
true;
152+
false ->
153+
?WARN(
154+
"Nouveau server does not support upgrades to Lucene ~B",
155+
[?TARGET_LUCENE_VERSION],
156+
St
157+
),
158+
false
159+
end;
160+
{error, Reason} ->
161+
?ERR(
162+
"Nouveau server upgrade check failed for reason ~p", [Reason], St
163+
),
164+
false
165+
end.
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2+
% use this file except in compliance with the License. You may obtain a copy of
3+
% the License at
4+
%
5+
% http://www.apache.org/licenses/LICENSE-2.0
6+
%
7+
% Unless required by applicable law or agreed to in writing, software
8+
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9+
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10+
% License for the specific language governing permissions and limitations under
11+
% the License.
12+
13+
-module(nouveau_index_upgrader_tests).
14+
15+
-include_lib("couch/include/couch_eunit.hrl").
16+
-include_lib("couch/include/couch_db.hrl").
17+
-include_lib("nouveau/include/nouveau.hrl").
18+
19+
-define(PLUGIN, nouveau_index_upgrader).
20+
21+
nouveau_index_upgrader_test_() ->
22+
{
23+
foreach,
24+
fun setup/0,
25+
fun teardown/1,
26+
[
27+
?TDEF_FE(t_upgrade_legacy_index, 10),
28+
?TDEF_FE(t_dont_upgrade_latest_index, 10)
29+
]
30+
}.
31+
32+
setup() ->
33+
{module, _} = code:ensure_loaded(?PLUGIN),
34+
meck:new(?PLUGIN, [passthrough]),
35+
meck:new(couch_scanner_server, [passthrough]),
36+
meck:new(couch_scanner_util, [passthrough]),
37+
meck:new(nouveau_api, [passthrough]),
38+
meck:expect(nouveau_api, supported_lucene_versions, fun() ->
39+
{ok, [?LEGACY_LUCENE_VERSION, ?TARGET_LUCENE_VERSION]}
40+
end),
41+
Ctx = test_util:start_couch([fabric, couch_scanner]),
42+
DbName = ?tempdb(),
43+
ok = fabric:create_db(DbName, [{q, "2"}, {n, "1"}]),
44+
config:set(atom_to_list(?PLUGIN), "max_batch_items", "1", false),
45+
reset_stats(),
46+
{Ctx, DbName}.
47+
48+
teardown({Ctx, DbName}) ->
49+
config_delete_section("couch_scanner"),
50+
config_delete_section("couch_scanner_plugins"),
51+
config_delete_section(atom_to_list(?PLUGIN)),
52+
couch_scanner:reset_checkpoints(),
53+
couch_scanner:resume(),
54+
fabric:delete_db(DbName),
55+
test_util:stop_couch(Ctx),
56+
meck:unload().
57+
58+
t_upgrade_legacy_index({_, DbName}) ->
59+
DDocId = <<"_design/foo">>,
60+
IndexName = <<"bar">>,
61+
ok = add_ddoc(DbName, DDocId, IndexName, ?LEGACY_LUCENE_VERSION),
62+
meck:reset(couch_scanner_server),
63+
meck:reset(?PLUGIN),
64+
meck:new(nouveau_fabric_search, [passthrough]),
65+
meck:expect(nouveau_fabric_search, go, fun(_DbName, _Args, _Index) -> {ok, []} end),
66+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
67+
wait_exit(10000),
68+
?assertEqual(1, num_calls(start, 2)),
69+
?assertEqual(1, num_calls(complete, 1)),
70+
?assertEqual(?TARGET_LUCENE_VERSION, get_lucene_version(DbName, DDocId, IndexName)),
71+
ok.
72+
73+
t_dont_upgrade_latest_index({_, DbName}) ->
74+
DDocId = <<"_design/foo">>,
75+
IndexName = <<"bar">>,
76+
ok = add_ddoc(DbName, DDocId, IndexName, ?TARGET_LUCENE_VERSION),
77+
meck:reset(couch_scanner_server),
78+
meck:reset(?PLUGIN),
79+
config:set("couch_scanner_plugins", atom_to_list(?PLUGIN), "true", false),
80+
wait_exit(10000),
81+
?assertEqual(1, num_calls(start, 2)),
82+
?assertEqual(1, num_calls(complete, 1)),
83+
?assertEqual(?TARGET_LUCENE_VERSION, get_lucene_version(DbName, DDocId, IndexName)),
84+
ok.
85+
86+
reset_stats() ->
87+
Counters = [
88+
[couchdb, query_server, process_error_exits],
89+
[couchdb, query_server, process_errors],
90+
[couchdb, query_server, process_exits]
91+
],
92+
[reset_counter(C) || C <- Counters].
93+
94+
reset_counter(Counter) ->
95+
case couch_stats:sample(Counter) of
96+
0 ->
97+
ok;
98+
N when is_integer(N), N > 0 ->
99+
couch_stats:decrement_counter(Counter, N)
100+
end.
101+
102+
config_delete_section(Section) ->
103+
[config:delete(K, V, false) || {K, V} <- config:get(Section)].
104+
105+
add_ddoc(DbName, DDocId, IndexName, LuceneVersion) ->
106+
{ok, _} = fabric:update_doc(DbName, mkddoc(DDocId, IndexName, LuceneVersion), [?ADMIN_CTX]),
107+
ok.
108+
109+
get_lucene_version(DbName, DDocId, IndexName) ->
110+
{ok, #doc{body = {Props}}} = fabric:open_doc(DbName, DDocId, [?ADMIN_CTX]),
111+
{Indexes} = couch_util:get_value(<<"nouveau">>, Props),
112+
{Index} = couch_util:get_value(IndexName, Indexes),
113+
couch_util:get_value(<<"lucene_version">>, Index).
114+
115+
mkddoc(DocId, IndexName, LuceneVersion) ->
116+
Body = #{
117+
<<"_id">> => DocId,
118+
<<"nouveau">> => #{
119+
IndexName => #{
120+
<<"lucene_version">> => LuceneVersion,
121+
<<"index">> => <<"function(doc){}">>
122+
}
123+
}
124+
},
125+
jiffy:decode(jiffy:encode(Body)).
126+
127+
num_calls(Fun, Args) ->
128+
meck:num_calls(?PLUGIN, Fun, Args).
129+
130+
wait_exit(MSec) ->
131+
meck:wait(couch_scanner_server, handle_info, [{'EXIT', '_', '_'}, '_'], MSec).

0 commit comments

Comments
 (0)