diff --git a/api/version/version.go b/api/version/version.go index d37d92f850d9..d47275b2a25e 100644 --- a/api/version/version.go +++ b/api/version/version.go @@ -26,7 +26,7 @@ import ( var ( // MinClusterVersion is the min cluster version this etcd binary is compatible with. MinClusterVersion = "3.0.0" - Version = "3.5.26" + Version = "3.5.31" APIVersion = "unknown" // Git SHA Value will be set during build diff --git a/bill-of-materials.json b/bill-of-materials.json index 16bc625ed19b..04b1969d09bf 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -562,7 +562,7 @@ "licenses": [ { "type": "Apache License 2.0", - "confidence": 1 + "confidence": 0.9647812166488794 } ] }, @@ -589,7 +589,7 @@ "licenses": [ { "type": "Apache License 2.0", - "confidence": 1 + "confidence": 0.9647812166488794 } ] }, @@ -598,7 +598,7 @@ "licenses": [ { "type": "Apache License 2.0", - "confidence": 1 + "confidence": 0.9647812166488794 } ] }, @@ -607,7 +607,7 @@ "licenses": [ { "type": "Apache License 2.0", - "confidence": 1 + "confidence": 0.9647812166488794 } ] }, diff --git a/client/pkg/fileutil/fileutil_test.go b/client/pkg/fileutil/fileutil_test.go index 1804163823ff..fc1deb62f18b 100644 --- a/client/pkg/fileutil/fileutil_test.go +++ b/client/pkg/fileutil/fileutil_test.go @@ -20,7 +20,6 @@ import ( "io/ioutil" "math/rand" "os" - "os/user" "path/filepath" "runtime" "strings" @@ -42,13 +41,7 @@ func TestIsDirWriteable(t *testing.T) { if err = os.Chmod(tmpdir, 0444); err != nil { t.Fatalf("unexpected os.Chmod error: %v", err) } - me, err := user.Current() - if err != nil { - // err can be non-nil when cross compiled - // http://stackoverflow.com/questions/20609415/cross-compiling-user-current-not-implemented-on-linux-amd64 - t.Skipf("failed to get current user: %v", err) - } - if me.Name == "root" || runtime.GOOS == "windows" { + if os.Getuid() == 0 || runtime.GOOS == "windows" { // ideally we should check CAP_DAC_OVERRIDE. // but it does not matter for tests. // Chmod is not supported under windows. diff --git a/client/v3/naming/endpoints/endpoints.go b/client/v3/naming/endpoints/endpoints.go index 9e975259dd09..099387094193 100644 --- a/client/v3/naming/endpoints/endpoints.go +++ b/client/v3/naming/endpoints/endpoints.go @@ -15,11 +15,8 @@ type Endpoint struct { // Since etcd 3.1 Addr string - // Metadata is the information associated with Addr, which may be used - // to make load balancing decision. + // Metadata is the information associated with Addr. // Since etcd 3.1 - // - // Deprecated: The field is deprecated and will be removed in 3.7. Metadata any } diff --git a/client/v3/naming/resolver/resolver.go b/client/v3/naming/resolver/resolver.go index d2ce187903f0..f059fc49c8de 100644 --- a/client/v3/naming/resolver/resolver.go +++ b/client/v3/naming/resolver/resolver.go @@ -98,8 +98,7 @@ func convertToGRPCEndpoint(ups map[string]*endpoints.Update) []gresolver.Endpoin ep := gresolver.Endpoint{ Addresses: []gresolver.Address{ { - Addr: up.Endpoint.Addr, - Metadata: up.Endpoint.Metadata, + Addr: up.Endpoint.Addr, }, }, } diff --git a/client/v3/retry_interceptor.go b/client/v3/retry_interceptor.go index 8c50dcfa93d7..c63910d78afc 100644 --- a/client/v3/retry_interceptor.go +++ b/client/v3/retry_interceptor.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" "google.golang.org/grpc/status" ) @@ -41,6 +42,8 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) + var p peer.Peer + grpcOpts = append(grpcOpts, grpc.Peer(&p)) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. if callOpts.max == 0 { @@ -63,6 +66,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien c.GetLogger().Warn( "retrying of unary invoker failed", zap.String("target", cc.Target()), + zap.String("peer", p.String()), zap.Uint("attempt", attempt), zap.Error(lastErr), ) diff --git a/etcdctl/ctlv3/command/ep_command.go b/etcdctl/ctlv3/command/ep_command.go index 5de3a990d939..423a0c71943b 100644 --- a/etcdctl/ctlv3/command/ep_command.go +++ b/etcdctl/ctlv3/command/ep_command.go @@ -246,6 +246,7 @@ func endpointsFromCluster(cmd *cobra.Command) []string { } sec := secureCfgFromCmd(cmd) + au := authCfgFromCmd(cmd) dt := dialTimeoutFromCmd(cmd) ka := keepAliveTimeFromCmd(cmd) kat := keepAliveTimeoutFromCmd(cmd) @@ -255,7 +256,7 @@ func endpointsFromCluster(cmd *cobra.Command) []string { } // exclude auth for not asking needless password (MemberList() doesn't need authentication) - cfg, err := newClientCfg(eps, dt, ka, kat, sec, nil) + cfg, err := newClientCfg(eps, dt, ka, kat, sec, au) if err != nil { cobrautl.ExitWithError(cobrautl.ExitError, err) } diff --git a/etcdctl/ctlv3/command/util.go b/etcdctl/ctlv3/command/util.go index cd15fd339521..524b12ea1423 100644 --- a/etcdctl/ctlv3/command/util.go +++ b/etcdctl/ctlv3/command/util.go @@ -65,7 +65,7 @@ func argify(s string) []string { } if args[i][0] == '\'' { // 'single-quoted string' - args[i] = args[i][1 : len(args)-1] + args[i] = args[i][1 : len(args[i])-1] } else if args[i][0] == '"' { // "double quoted string" if _, err := fmt.Sscanf(args[i], "%q", &args[i]); err != nil { diff --git a/etcdctl/ctlv3/command/util_test.go b/etcdctl/ctlv3/command/util_test.go new file mode 100644 index 000000000000..6bc92989b4fa --- /dev/null +++ b/etcdctl/ctlv3/command/util_test.go @@ -0,0 +1,78 @@ +// Copyright 2026 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "reflect" + "testing" +) + +func TestArgify(t *testing.T) { + tests := []struct { + name string + input string + expected []string + }{ + { + name: "empty string", + input: "", + expected: nil, + }, + { + name: "simple args", + input: "foo bar baz", + expected: []string{"foo", "bar", "baz"}, + }, + { + name: "single-quoted string", + input: "'hello world'", + expected: []string{"hello world"}, + }, + { + name: "double-quoted string", + input: `"hello world"`, + expected: []string{"hello world"}, + }, + { + name: "mixed args", + input: "put 'my key' 'my value'", + expected: []string{"put", "my key", "my value"}, + }, + { + name: "empty single-quoted string", + input: "''", + expected: []string{""}, + }, + { + name: "empty double-quoted string", + input: `""`, + expected: []string{""}, + }, + { + name: "double-quoted with escape", + input: `"hello\"world"`, + expected: []string{`hello"world`}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := argify(tt.input) + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("argify(%q) = %v, want %v", tt.input, result, tt.expected) + } + }) + } +} diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 0fa1879ed4f9..ffd3a81854fa 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -24,6 +24,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/pkg/v3/cobrautl" "go.etcd.io/etcd/pkg/v3/idutil" "go.etcd.io/etcd/pkg/v3/pbutil" "go.etcd.io/etcd/raft/v3/raftpb" @@ -68,7 +69,9 @@ func NewBackupCommand() *cobra.Command { } func doBackup(cmd *cobra.Command, args []string) { - HandleBackup(withV3, dataDir, backupDir, walDir, backupWalDir) + if err := HandleBackup(withV3, dataDir, backupDir, walDir, backupWalDir); err != nil { + cobrautl.ExitWithError(cobrautl.ExitError, err) + } } type desiredCluster struct { @@ -103,6 +106,10 @@ func newDesiredCluster() desiredCluster { func HandleBackup(withV3 bool, srcDir string, destDir string, srcWAL string, destWAL string) error { lg := GetLogger() + if err := validateDataDir(srcDir); err != nil { + return err + } + srcSnap := datadir.ToSnapDir(srcDir) destSnap := datadir.ToSnapDir(destDir) diff --git a/etcdutl/etcdutl/common.go b/etcdutl/etcdutl/common.go index 281058bc24e9..1394ef396dc1 100644 --- a/etcdutl/etcdutl/common.go +++ b/etcdutl/etcdutl/common.go @@ -15,12 +15,28 @@ package etcdutl import ( + "os" + "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/datadir" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// validateDataDir checks if the data directory's backend database file exists. +func validateDataDir(dataDir string) error { + dbPath := datadir.ToBackendFileName(dataDir) + return validateFilePath(dbPath) +} + +// validateFilePath checks if the specified file exists. +// It returns an error encountered by os.Stat, such as os.ErrNotExist, os.ErrPermission, etc. +func validateFilePath(filePath string) error { + _, err := os.Stat(filePath) + return err +} + func GetLogger() *zap.Logger { config := logutil.DefaultZapLoggerConfig config.Encoding = "console" diff --git a/etcdutl/etcdutl/defrag_command.go b/etcdutl/etcdutl/defrag_command.go index 1660dd7071a2..d47b41362b7f 100644 --- a/etcdutl/etcdutl/defrag_command.go +++ b/etcdutl/etcdutl/defrag_command.go @@ -50,8 +50,13 @@ func defragCommandFunc(cmd *cobra.Command, args []string) { } func DefragData(dataDir string) error { - var be backend.Backend - lg := GetLogger() + var ( + be backend.Backend + lg = GetLogger() + ) + if err := validateDataDir(dataDir); err != nil { + return err + } bch := make(chan struct{}) dbDir := datadir.ToBackendFileName(dataDir) go func() { diff --git a/etcdutl/etcdutl/snapshot_command.go b/etcdutl/etcdutl/snapshot_command.go index a7d9397bccc1..27249a59696e 100644 --- a/etcdutl/etcdutl/snapshot_command.go +++ b/etcdutl/etcdutl/snapshot_command.go @@ -109,6 +109,9 @@ func SnapshotStatusCommandFunc(cmd *cobra.Command, args []string) { err := fmt.Errorf("snapshot status requires exactly one argument") cobrautl.ExitWithError(cobrautl.ExitBadArgs, err) } + if err := validateFilePath(args[0]); err != nil { + cobrautl.ExitWithError(cobrautl.ExitError, err) + } printer := initPrinterFromCmd(cmd) lg := GetLogger() diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index a684d0b5e55a..a56a9570d512 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -239,50 +239,42 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { m := mustListenCMux(lg, tlsInfo) grpcl := m.Match(cmux.HTTP2()) + httpl := mustMatchHTTPListener(m, tlsInfo) defer func() { grpcl.Close() lg.Info("stop listening gRPC proxy client requests", zap.String("address", grpcProxyListenAddr)) }() client := mustNewClient(lg) + grpcServer := newGRPCProxyServer(lg, client) + + errc := make(chan error, 3) + + // NOTE: + // Start gRPC + cmux before creating proxyClient. + // + // proxyClient dials the proxy endpoint with a 5-second timeout. If cmux is not + // serving yet, the self-dial can time out because the gRPC path is not being + // accepted/dispatched. + // + // It is safe to start cmux before the HTTP server goroutine: HTTP has already + // been matched/registered with cmux, so accepted HTTP connections are queued + // and served once http.Serve starts. + startServe(errc, func() error { return grpcServer.Serve(grpcl) }) + startServe(errc, m.Serve) // The proxy client is used for self-healthchecking. // TODO: The mechanism should be refactored to use internal connection. - var proxyClient *clientv3.Client - if grpcProxyAdvertiseClientURL != "" { - proxyClient = mustNewProxyClient(lg, tlsInfo) - } - httpClient := mustNewHTTPClient(lg) + // + // Create it after gRPC/cmux serving goroutines have started + proxyClient := newProxyHealthClient(lg, tlsInfo) - srvhttp, httpl := mustHTTPListener(lg, m, tlsInfo, client, proxyClient) + httpClient := mustNewHTTPClient() + srvhttp := mustHTTPServer(lg, tlsInfo, httpClient, client, proxyClient) - if err := http2.ConfigureServer(srvhttp, &http2.Server{ - MaxConcurrentStreams: maxConcurrentStreams, - }); err != nil { - lg.Fatal("Failed to configure the http server", zap.Error(err)) - } + startServe(errc, func() error { return srvhttp.Serve(httpl) }) - errc := make(chan error, 3) - go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() - go func() { errc <- srvhttp.Serve(httpl) }() - go func() { errc <- m.Serve() }() - if len(grpcProxyMetricsListenAddr) > 0 { - mhttpl := mustMetricsListener(lg, tlsInfo) - go func() { - mux := http.NewServeMux() - grpcproxy.HandleMetrics(mux, httpClient, client.Endpoints()) - grpcproxy.HandleHealth(lg, mux, client) - grpcproxy.HandleProxyMetrics(mux) - grpcproxy.HandleProxyHealth(lg, mux, proxyClient) - lg.Info("gRPC proxy server metrics URL serving") - herr := http.Serve(mhttpl, mux) - if herr != nil { - lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr)) - } else { - lg.Info("gRPC proxy server metrics URL returned") - } - }() - } + maybeServeMetrics(lg, tlsInfo, httpClient, client, proxyClient) lg.Info("started gRPC proxy", zap.String("address", grpcProxyListenAddr)) @@ -375,11 +367,21 @@ func mustNewProxyClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client return client } +func newProxyHealthClient(lg *zap.Logger, tls *transport.TLSInfo) *clientv3.Client { + if grpcProxyAdvertiseClientURL == "" { + return nil + } + return mustNewProxyClient(lg, tls) +} + func newProxyClientCfg(lg *zap.Logger, eps []string, tls *transport.TLSInfo) (*clientv3.Config, error) { cfg := clientv3.Config{ Endpoints: eps, DialTimeout: 5 * time.Second, Logger: lg, + DialOptions: []grpc.DialOption{ + grpc.WithBlock(), //nolint:staticcheck // TODO: remove for a supported version + }, } if tls != nil { clientTLS, err := tls.ClientConfig() @@ -532,14 +534,20 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { return server } -func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c *clientv3.Client, proxy *clientv3.Client) (*http.Server, net.Listener) { - httpClient := mustNewHTTPClient(lg) +func mustMatchHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) net.Listener { + if tlsinfo == nil { + return m.Match(cmux.HTTP1()) + } + return m.Match(cmux.Any()) +} + +func mustHTTPServer(lg *zap.Logger, tlsinfo *transport.TLSInfo, httpClient *http.Client, c *clientv3.Client, proxyClient *clientv3.Client) *http.Server { httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) grpcproxy.HandleMetrics(httpmux, httpClient, c.Endpoints()) grpcproxy.HandleHealth(lg, httpmux, c) grpcproxy.HandleProxyMetrics(httpmux) - grpcproxy.HandleProxyHealth(lg, httpmux, proxy) + grpcproxy.HandleProxyHealth(lg, httpmux, proxyClient) if grpcProxyEnablePprof { for p, h := range debugutil.PProfHandlers() { httpmux.Handle(p, h) @@ -550,9 +558,14 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c Handler: httpmux, ErrorLog: log.New(ioutil.Discard, "net/http", 0), } + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + lg.Fatal("Failed to configure the http server", zap.Error(err)) + } if tlsinfo == nil { - return srvhttp, m.Match(cmux.HTTP1()) + return srvhttp } srvTLS, err := tlsinfo.ServerConfig() @@ -560,10 +573,35 @@ func mustHTTPListener(lg *zap.Logger, m cmux.CMux, tlsinfo *transport.TLSInfo, c lg.Fatal("failed to set up TLS", zap.Error(err)) } srvhttp.TLSConfig = srvTLS - return srvhttp, m.Match(cmux.Any()) + return srvhttp +} + +func maybeServeMetrics(lg *zap.Logger, tlsinfo *transport.TLSInfo, httpClient *http.Client, c *clientv3.Client, proxyClient *clientv3.Client) { + if len(grpcProxyMetricsListenAddr) == 0 { + return + } + mhttpl := mustMetricsListener(lg, tlsinfo) + go func() { + mux := http.NewServeMux() + grpcproxy.HandleMetrics(mux, httpClient, c.Endpoints()) + grpcproxy.HandleHealth(lg, mux, c) + grpcproxy.HandleProxyMetrics(mux) + grpcproxy.HandleProxyHealth(lg, mux, proxyClient) + lg.Info("gRPC proxy server metrics URL serving") + herr := http.Serve(mhttpl, mux) + if herr != nil { + lg.Fatal("gRPC proxy server metrics URL returned", zap.Error(herr)) + } else { + lg.Info("gRPC proxy server metrics URL returned") + } + }() +} + +func startServe(errc chan<- error, serve func() error) { + go func() { errc <- serve() }() } -func mustNewHTTPClient(lg *zap.Logger) *http.Client { +func mustNewHTTPClient() *http.Client { transport, err := newHTTPTransport(grpcProxyCA, grpcProxyCert, grpcProxyKey) if err != nil { fmt.Fprintln(os.Stderr, err) diff --git a/server/etcdserver/api/etcdhttp/peer.go b/server/etcdserver/api/etcdhttp/peer.go index badc98634b1f..a71e70d5ad28 100644 --- a/server/etcdserver/api/etcdhttp/peer.go +++ b/server/etcdserver/api/etcdhttp/peer.go @@ -21,12 +21,14 @@ import ( "strconv" "strings" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/lease/leasehttp" + "google.golang.org/grpc/metadata" "go.uber.org/zap" ) @@ -135,7 +137,14 @@ func (h *peerMemberPromoteHandler) ServeHTTP(w http.ResponseWriter, r *http.Requ return } - resp, err := h.server.PromoteMember(r.Context(), id) + // reconstruct gRPC metadata from HTTP header (if present) so admin check can pass + ctx := r.Context() + if token := r.Header.Get("Authorization"); token != "" { + md := metadata.New(map[string]string{rpctypes.TokenFieldNameGRPC: token}) + ctx = metadata.NewIncomingContext(ctx, md) + } + + resp, err := h.server.PromoteMember(ctx, id) if err != nil { switch err { case membership.ErrIDNotFound: diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 448fbb2cc7ff..87f1cc74afc6 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -555,12 +555,29 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, c.Lock() defer c.Unlock() - c.members[id].RaftAttributes = raftAttr + var ( + m *Member + ok bool + ) + if m, ok = c.members[id]; !ok { + c.lg.Info("Skipped updating non-existent member in v2store", + zap.String("cluster-id", c.cid.String()), + zap.String("local-member-id", c.localID.String()), + zap.String("updated-remote-peer-id", id.String()), + zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs), + zap.Bool("updated-remote-peer-is-learner", raftAttr.IsLearner), + zap.Bool("should-apply-v3", bool(shouldApplyV3)), + ) + return + } + // `MemberUpdateRequest` only supports updating PeerURLs. + m.RaftAttributes.PeerURLs = raftAttr.PeerURLs + if c.v2store != nil { - mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) + mustUpdateMemberInStore(c.lg, c.v2store, m) } if c.be != nil && shouldApplyV3 { - unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) + unsafeSaveMemberToBackend(c.lg, c.be, m) } c.lg.Info( diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index d65103cccee9..c8e866be9dea 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -1415,3 +1415,72 @@ func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { }) } } + +func TestUpdateRaftAttributes(t *testing.T) { + clientURLs := []string{"http://127.0.0.1:2379"} + oldPeerURLs := []string{"http://127.0.0.1:2380"} + newPeerURLs := []string{"http://127.0.0.1:2382"} + testCases := []struct { + name string + members []*Member + updateMemberID types.ID + wantMembers map[types.ID]*Member + }{ + { + name: "update an existing voting member", + members: []*Member{ + newTestMember(1, oldPeerURLs, "1", clientURLs), + newTestMember(2, oldPeerURLs, "2", clientURLs), + }, + updateMemberID: 2, + wantMembers: map[types.ID]*Member{ + 1: newTestMember(1, oldPeerURLs, "1", clientURLs), + 2: newTestMember(2, newPeerURLs, "2", clientURLs), + }, + }, + { + name: "update an existing learner member", + members: []*Member{ + newTestMember(1, oldPeerURLs, "1", clientURLs), + newTestMemberAsLearner(2, oldPeerURLs, "2", clientURLs), + }, + updateMemberID: 2, + wantMembers: map[types.ID]*Member{ + 1: newTestMember(1, oldPeerURLs, "1", clientURLs), + 2: newTestMemberAsLearner(2, newPeerURLs, "2", clientURLs), + }, + }, + { + name: "update a non-exist member", + members: []*Member{ + newTestMember(1, oldPeerURLs, "1", clientURLs), + newTestMember(2, oldPeerURLs, "2", clientURLs), + }, + updateMemberID: 3, + wantMembers: map[types.ID]*Member{ + 1: newTestMember(1, oldPeerURLs, "1", clientURLs), + 2: newTestMember(2, oldPeerURLs, "2", clientURLs), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lg := zaptest.NewLogger(t) + c := newTestCluster(t, tc.members) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + c.SetBackend(be) + for _, m := range tc.members { + unsafeSaveMemberToBackend(lg, be, m) + } + be.ForceCommit() + + c.UpdateRaftAttributes(tc.updateMemberID, RaftAttributes{PeerURLs: newPeerURLs}, true) + + mst, _ := mustReadMembersFromBackend(lg, c.be) + require.Equal(t, tc.wantMembers, mst, tc.name) + require.Equal(t, tc.wantMembers, c.members) + }) + } +} diff --git a/server/etcdserver/api/v3rpc/auth.go b/server/etcdserver/api/v3rpc/auth.go index d986037a1b4c..c5c28a8f9594 100644 --- a/server/etcdserver/api/v3rpc/auth.go +++ b/server/etcdserver/api/v3rpc/auth.go @@ -164,3 +164,18 @@ func (as *AuthServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChan } return resp, nil } + +type AuthAdmin struct { + ag AuthGetter +} + +// isPermitted verifies the user has admin privilege. +// Only users with "root" role are permitted. +func (aa *AuthAdmin) isPermitted(ctx context.Context) error { + authInfo, err := aa.ag.AuthInfoFromCtx(ctx) + if err != nil { + return err + } + + return aa.ag.AuthStore().IsAdminPermitted(authInfo) +} diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index 6947903a3865..28e1e4422e3a 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -44,16 +44,16 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer } chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), - newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, + newUnaryInterceptor(s), } if interceptor != nil { chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) } chainStreamInterceptors := []grpc.StreamServerInterceptor{ - newStreamInterceptor(s), grpc_prometheus.StreamServerInterceptor, + newStreamInterceptor(s), } if s.Cfg.ExperimentalEnableDistributedTracing { diff --git a/server/etcdserver/api/v3rpc/key.go b/server/etcdserver/api/v3rpc/key.go index d1a7ee633455..2354f211aa89 100644 --- a/server/etcdserver/api/v3rpc/key.go +++ b/server/etcdserver/api/v3rpc/key.go @@ -27,6 +27,7 @@ import ( type kvServer struct { hdr header kv etcdserver.RaftKV + aa *AuthAdmin // maxTxnOps is the max operations per txn. // e.g suppose maxTxnOps = 128. // Txn.Success can have at most 128 operations, @@ -35,7 +36,7 @@ type kvServer struct { } func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { - return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps} + return &kvServer{hdr: newHeader(s), kv: s, aa: &AuthAdmin{s}, maxTxnOps: s.Cfg.MaxTxnOps} } func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -102,6 +103,10 @@ func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, } func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + if err := s.aa.isPermitted(ctx); err != nil { + return nil, togRPCError(err) + } + resp, err := s.kv.Compact(ctx, r) if err != nil { return nil, togRPCError(err) diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 0bc8ae824607..0876d257571c 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -280,6 +280,22 @@ func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error { return ams.ag.AuthStore().IsAdminPermitted(authInfo) } +func (ams *authMaintenanceServer) requireAuthInfo(ctx context.Context) error { + if !ams.ag.AuthStore().IsAuthEnabled() { + return nil + } + + authInfo, err := ams.ag.AuthInfoFromCtx(ctx) + if err != nil { + return err + } + + if authInfo == nil { + return auth.ErrUserEmpty + } + return nil +} + func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) { if err := ams.isAuthenticated(ctx); err != nil { return nil, err @@ -311,14 +327,37 @@ func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVReques return ams.maintenanceServer.HashKV(ctx, r) } +func (ams *authMaintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) { + switch ar.GetAction() { + case pb.AlarmRequest_GET: + if err := ams.requireAuthInfo(ctx); err != nil { + return nil, togRPCError(err) + } + default: + if err := ams.isAuthenticated(ctx); err != nil { + return nil, togRPCError(err) + } + } + return ams.maintenanceServer.Alarm(ctx, ar) +} + func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) { + if err := ams.isAuthenticated(ctx); err != nil { + return nil, togRPCError(err) + } return ams.maintenanceServer.Status(ctx, ar) } func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) { + if err := ams.isAuthenticated(ctx); err != nil { + return nil, togRPCError(err) + } return ams.maintenanceServer.MoveLeader(ctx, tr) } func (ams *authMaintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { + if err := ams.isAuthenticated(ctx); err != nil { + return nil, togRPCError(err) + } return ams.maintenanceServer.Downgrade(ctx, r) } diff --git a/server/etcdserver/api/v3rpc/member.go b/server/etcdserver/api/v3rpc/member.go index 54fcc24843d4..2d1fe27d81b2 100644 --- a/server/etcdserver/api/v3rpc/member.go +++ b/server/etcdserver/api/v3rpc/member.go @@ -88,12 +88,12 @@ func (cs *ClusterServer) MemberUpdate(ctx context.Context, r *pb.MemberUpdateReq } func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest) (*pb.MemberListResponse, error) { - if r.Linearizable { - if err := cs.server.LinearizableReadNotify(ctx); err != nil { - return nil, togRPCError(err) - } + members, err := cs.server.MemberList(ctx, r) + if err != nil { + return nil, togRPCError(err) } - membs := membersToProtoMembers(cs.cluster.Members()) + + membs := membersToProtoMembers(members) return &pb.MemberListResponse{Header: cs.header(), Members: membs}, nil } diff --git a/server/etcdserver/apply_auth.go b/server/etcdserver/apply_auth.go index beafa967ba95..2f90b14f3c10 100644 --- a/server/etcdserver/apply_auth.go +++ b/server/etcdserver/apply_auth.go @@ -65,25 +65,34 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membersh } func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { - if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { + if err := checkPutAuth(aa.as, &aa.authInfo, aa.lessor, r); err != nil { return nil, nil, err } - if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil { + return aa.applierV3.Put(ctx, txn, r) +} + +func checkPutAuth(as auth.AuthStore, ai *auth.AuthInfo, lessor lease.Lessor, r *pb.PutRequest) error { + if err := as.IsPutPermitted(ai, r.Key); err != nil { + return err + } + + if err := checkLeasePuts(as, ai, lessor, lease.LeaseID(r.Lease)); err != nil { // The specified lease is already attached with a key that cannot // be written by this user. It means the user cannot revoke the // lease so attaching the lease to the newly written key should // be forbidden. - return nil, nil, err + return err } if r.PrevKv { - err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil) + err := as.IsRangePermitted(ai, r.Key, nil) if err != nil { - return nil, nil, err + return err } } - return aa.applierV3.Put(ctx, txn, r) + + return nil } func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -107,7 +116,7 @@ func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest return aa.applierV3.DeleteRange(txn, r) } -func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.RequestOp) error { +func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, lessor lease.Lessor, reqs []*pb.RequestOp) error { for _, requ := range reqs { switch tv := requ.Request.(type) { case *pb.RequestOp_RequestRange: @@ -124,10 +133,9 @@ func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.Req continue } - if err := as.IsPutPermitted(ai, tv.RequestPut.Key); err != nil { + if err := checkPutAuth(as, ai, lessor, tv.RequestPut); err != nil { return err } - case *pb.RequestOp_RequestDeleteRange: if tv.RequestDeleteRange == nil { continue @@ -144,56 +152,65 @@ func checkTxnReqsPermission(as auth.AuthStore, ai *auth.AuthInfo, reqs []*pb.Req if err != nil { return err } + case *pb.RequestOp_RequestTxn: + if tv.RequestTxn == nil { + continue + } + + err := checkTxnAuth(as, ai, lessor, tv.RequestTxn) + if err != nil { + return err + } } } return nil } -func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error { +func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, lessor lease.Lessor, rt *pb.TxnRequest) error { for _, c := range rt.Compare { if err := as.IsRangePermitted(ai, c.Key, c.RangeEnd); err != nil { return err } } - if err := checkTxnReqsPermission(as, ai, rt.Success); err != nil { + if err := checkTxnReqsPermission(as, ai, lessor, rt.Success); err != nil { return err } - return checkTxnReqsPermission(as, ai, rt.Failure) + return checkTxnReqsPermission(as, ai, lessor, rt.Failure) } func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { - if err := checkTxnAuth(aa.as, &aa.authInfo, rt); err != nil { + if err := checkTxnAuth(aa.as, &aa.authInfo, aa.lessor, rt); err != nil { return nil, nil, err } return aa.applierV3.Txn(ctx, rt) } func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { - if err := aa.checkLeasePuts(lease.LeaseID(lc.ID)); err != nil { + if err := checkLeasePuts(aa.as, &aa.authInfo, aa.lessor, lease.LeaseID(lc.ID)); err != nil { return nil, err } return aa.applierV3.LeaseRevoke(lc) } -func (aa *authApplierV3) checkLeasePuts(leaseID lease.LeaseID) error { - l := aa.lessor.Lookup(leaseID) +func checkLeasePuts(as auth.AuthStore, ai *auth.AuthInfo, lessor lease.Lessor, leaseID lease.LeaseID) error { + l := lessor.Lookup(leaseID) if l != nil { - return aa.checkLeasePutsKeys(l) + return checkLeasePutsKeys(as, ai, l) } return nil } -func (aa *authApplierV3) checkLeasePutsKeys(l *lease.Lease) error { +func checkLeasePutsKeys(as auth.AuthStore, ai *auth.AuthInfo, l *lease.Lease) error { // early return for most-common scenario of either disabled auth or admin user. // IsAdminPermitted also checks whether auth is enabled - if err := aa.as.IsAdminPermitted(&aa.authInfo); err == nil { + if err := as.IsAdminPermitted(ai); err == nil { return nil } for _, key := range l.Keys() { - if err := aa.as.IsPutPermitted(&aa.authInfo, []byte(key)); err != nil { + if err := as.IsPutPermitted(ai, []byte(key)); err != nil { return err } } diff --git a/server/etcdserver/apply_auth_test.go b/server/etcdserver/apply_auth_test.go index 68681c7f1bec..1acd704f2a2b 100644 --- a/server/etcdserver/apply_auth_test.go +++ b/server/etcdserver/apply_auth_test.go @@ -53,24 +53,24 @@ func TestCheckLeasePutsKeys(t *testing.T) { defer as.AuthDisable() aa := authApplierV3{as: as} - assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is disabled, should allow puts") + assert.NoError(t, checkLeasePutsKeys(aa.as, &aa.authInfo, lease.NewLease(lease.LeaseID(1), 3600)), "auth is disabled, should allow puts") assert.NoError(t, enableAuthAndCreateRoot(aa.as), "error while enabling auth") aa.authInfo = auth.AuthInfo{Username: "root"} - assert.NoError(t, aa.checkLeasePutsKeys(lease.NewLease(lease.LeaseID(1), 3600)), "auth is enabled, should allow puts for root") + assert.NoError(t, checkLeasePutsKeys(aa.as, &aa.authInfo, lease.NewLease(lease.LeaseID(1), 3600)), "auth is enabled, should allow puts for root") l := lease.NewLease(lease.LeaseID(1), 3600) l.SetLeaseItem(lease.LeaseItem{Key: "a"}) aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 0} - assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrUserEmpty, "auth is enabled, should not allow bob, non existing at rev 0") + assert.ErrorIs(t, checkLeasePutsKeys(aa.as, &aa.authInfo, l), auth.ErrUserEmpty, "auth is enabled, should not allow bob, non existing at rev 0") aa.authInfo = auth.AuthInfo{Username: "bob", Revision: 1} - assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrAuthOldRevision, "auth is enabled, old revision") + assert.ErrorIs(t, checkLeasePutsKeys(aa.as, &aa.authInfo, l), auth.ErrAuthOldRevision, "auth is enabled, old revision") aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()} - assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob does not have permissions, bob does not exist") + assert.ErrorIs(t, checkLeasePutsKeys(aa.as, &aa.authInfo, l), auth.ErrPermissionDenied, "auth is enabled, bob does not have permissions, bob does not exist") _, err := aa.as.UserAdd(&pb.AuthUserAddRequest{Name: "bob", Options: &authpb.UserAddOptions{NoPassword: true}}) assert.NoError(t, err, "bob should be added without error") aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()} - assert.ErrorIs(t, aa.checkLeasePutsKeys(l), auth.ErrPermissionDenied, "auth is enabled, bob exists yet does not have permissions") + assert.ErrorIs(t, checkLeasePutsKeys(aa.as, &aa.authInfo, l), auth.ErrPermissionDenied, "auth is enabled, bob exists yet does not have permissions") // allow bob to access "a" _, err = aa.as.RoleAdd(&pb.AuthRoleAddRequest{Name: "bobsrole"}) @@ -91,7 +91,7 @@ func TestCheckLeasePutsKeys(t *testing.T) { assert.NoError(t, err, "bob should be granted bobsrole without error") aa.authInfo = auth.AuthInfo{Username: "bob", Revision: aa.as.Revision()} - assert.NoError(t, aa.checkLeasePutsKeys(l), "bob should be able to access key 'a'") + assert.NoError(t, checkLeasePutsKeys(aa.as, &aa.authInfo, l), "bob should be able to access key 'a'") } diff --git a/server/etcdserver/apply_auth_txn_test.go b/server/etcdserver/apply_auth_txn_test.go new file mode 100644 index 000000000000..b352a523fb44 --- /dev/null +++ b/server/etcdserver/apply_auth_txn_test.go @@ -0,0 +1,394 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcdserver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + + "go.etcd.io/etcd/api/v3/authpb" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/auth" + "go.etcd.io/etcd/server/v3/lease" + "go.etcd.io/etcd/server/v3/mvcc/backend" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" +) + +// checkTxnAuth variables setup. +var ( + inRangeCompare = &pb.Compare{ + Key: []byte("foo"), + RangeEnd: []byte("zoo"), + } + outOfRangeCompare = &pb.Compare{ + Key: []byte("boo"), + RangeEnd: []byte("zoo"), + } + nilRequestPut = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: nil, + }, + } + inRangeRequestPut = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("foo"), + }, + }, + } + outOfRangeRequestPut = &pb.RequestOp{ + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("boo"), + }, + }, + } + nilRequestRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestRange{ + RequestRange: nil, + }, + } + inRangeRequestRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("foo"), + RangeEnd: []byte("zoo"), + }, + }, + } + outOfRangeRequestRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("boo"), + RangeEnd: []byte("zoo"), + }, + }, + } + nilRequestDeleteRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: nil, + }, + } + inRangeRequestDeleteRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte("foo"), + RangeEnd: []byte("zoo"), + PrevKv: true, + }, + }, + } + outOfRangeRequestDeleteRange = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte("boo"), + RangeEnd: []byte("zoo"), + PrevKv: true, + }, + }, + } + outOfRangeRequestDeleteRangeKvFalse = &pb.RequestOp{ + Request: &pb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: &pb.DeleteRangeRequest{ + Key: []byte("boo"), + RangeEnd: []byte("zoo"), + PrevKv: false, + }, + }, + } +) + +func setupAuth(t *testing.T, be backend.Backend) auth.AuthStore { + lg := zaptest.NewLogger(t) + + simpleTokenTTLDefault := 300 * time.Second + tokenTypeSimple := "simple" + dummyIndexWaiter := func(index uint64) <-chan struct{} { + ch := make(chan struct{}, 1) + go func() { + ch <- struct{}{} + }() + return ch + } + + tp, _ := auth.NewTokenProvider(zaptest.NewLogger(t), tokenTypeSimple, dummyIndexWaiter, simpleTokenTTLDefault) + + as := auth.NewAuthStore(lg, be, tp, 4) + + // create "root" user and "foo" user with limited range + _, err := as.RoleAdd(&pb.AuthRoleAddRequest{Name: "root"}) + require.NoError(t, err) + + _, err = as.RoleAdd(&pb.AuthRoleAddRequest{Name: "rw"}) + require.NoError(t, err) + + _, err = as.RoleGrantPermission(&pb.AuthRoleGrantPermissionRequest{ + Name: "rw", + Perm: &authpb.Permission{ + PermType: authpb.READWRITE, + Key: []byte("foo"), + RangeEnd: []byte("zoo"), + }, + }) + require.NoError(t, err) + + _, err = as.UserAdd(&pb.AuthUserAddRequest{Name: "root", Password: "foo"}) + require.NoError(t, err) + + _, err = as.UserAdd(&pb.AuthUserAddRequest{Name: "foo", Password: "foo"}) + require.NoError(t, err) + + _, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "root", Role: "root"}) + require.NoError(t, err) + + _, err = as.UserGrantRole(&pb.AuthUserGrantRoleRequest{User: "foo", Role: "rw"}) + require.NoError(t, err) + + err = as.AuthEnable() + require.NoError(t, err) + + return as +} + +func TestCheckTxnAuth(t *testing.T) { + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + as := setupAuth(t, be) + + tests := []struct { + name string + txnRequest *pb.TxnRequest + err error + }{ + { + name: "Out of range compare is unauthorized", + txnRequest: &pb.TxnRequest{ + Compare: []*pb.Compare{outOfRangeCompare}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "In range compare is authorized", + txnRequest: &pb.TxnRequest{ + Compare: []*pb.Compare{inRangeCompare}, + }, + err: nil, + }, + { + name: "Nil request range is always authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{nilRequestRange}, + }, + err: nil, + }, + { + name: "Range request in range is authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestRange}, + Failure: []*pb.RequestOp{inRangeRequestRange}, + }, + err: nil, + }, + { + name: "Range request out of range success case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestRange}, + Failure: []*pb.RequestOp{inRangeRequestRange}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Range request out of range failure case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestRange}, + Failure: []*pb.RequestOp{outOfRangeRequestRange}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Nil Put request is always authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{nilRequestPut}, + }, + err: nil, + }, + { + name: "Put request in range in authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestPut}, + Failure: []*pb.RequestOp{inRangeRequestPut}, + }, + err: nil, + }, + { + name: "Put request out of range success case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestPut}, + Failure: []*pb.RequestOp{inRangeRequestPut}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Put request out of range failure case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestPut}, + Failure: []*pb.RequestOp{outOfRangeRequestPut}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Nil delete request is authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{nilRequestDeleteRange}, + }, + err: nil, + }, + { + name: "Delete range request in range is authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestDeleteRange}, + Failure: []*pb.RequestOp{inRangeRequestDeleteRange}, + }, + err: nil, + }, + { + name: "Delete range request out of range success case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestDeleteRange}, + Failure: []*pb.RequestOp{inRangeRequestDeleteRange}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Delete range request out of range failure case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestDeleteRange}, + Failure: []*pb.RequestOp{outOfRangeRequestDeleteRange}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Delete range request out of range and PrevKv false success case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestDeleteRangeKvFalse}, + Failure: []*pb.RequestOp{inRangeRequestDeleteRange}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Delete range request out of range and PrevKv false failure case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestDeleteRange}, + Failure: []*pb.RequestOp{outOfRangeRequestDeleteRangeKvFalse}, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Nested txn request in range is authorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Success: []*pb.RequestOp{inRangeRequestRange, inRangeRequestPut}, + Failure: []*pb.RequestOp{inRangeRequestDeleteRange}, + }, + }, + }, + }, + }, + err: nil, + }, + { + name: "Nested txn request out of range success case is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestRange}, + }, + }, + }, + }, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Nested txn request out of range failure case is unauthorized", + txnRequest: &pb.TxnRequest{ + Failure: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Failure: []*pb.RequestOp{outOfRangeRequestPut}, + }, + }, + }, + }, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Nested txn request out of range delete is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestDeleteRange}, + }, + }, + }, + }, + }, + err: auth.ErrPermissionDenied, + }, + { + name: "Two level nested txn request out of range delete is unauthorized", + txnRequest: &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Failure: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestTxn{ + RequestTxn: &pb.TxnRequest{ + Success: []*pb.RequestOp{outOfRangeRequestDeleteRange}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + err: auth.ErrPermissionDenied, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := checkTxnAuth(as, &auth.AuthInfo{Username: "foo", Revision: 8}, &lease.FakeLessor{}, tt.txnRequest) + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/server/etcdserver/cluster_util.go b/server/etcdserver/cluster_util.go index eace8f69afc4..dde84e01e168 100644 --- a/server/etcdserver/cluster_util.go +++ b/server/etcdserver/cluster_util.go @@ -25,9 +25,11 @@ import ( "strings" "time" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" + "google.golang.org/grpc/metadata" "github.com/coreos/go-semver/semver" "go.uber.org/zap" @@ -340,6 +342,20 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R if err != nil { return nil, err } + + // add the auth token via HTTP header if present in gRPC metadata + if md, ok := metadata.FromIncomingContext(ctx); ok { + ts, ok := md[rpctypes.TokenFieldNameGRPC] + if !ok { + ts, ok = md[rpctypes.TokenFieldNameSwagger] + } + + if ok && len(ts) > 0 { + token := ts[0] + req.Header.Set("Authorization", token) + } + } + req = req.WithContext(ctx) resp, err := cc.Do(req) if err != nil { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7306dad10081..636fed42db52 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1714,9 +1714,10 @@ func (s *EtcdServer) mayAddMember(memb membership.Member) error { return ErrNotEnoughStartedMembers } - if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) { + // Treat the new member as unavailable when checking quorum safety. + if !isConnectedToQuorumAfterAddingNewMemberSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) { lg.Warn( - "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", + "rejecting member add request; local member has not been connected to majority peers, reconfigure breaks active quorum", zap.String("local-member-id", s.ID().String()), zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), zap.Error(ErrUnhealthy), @@ -1949,6 +1950,19 @@ func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ( return s.configure(ctx, cc) } +func (s *EtcdServer) MemberList(ctx context.Context, r *pb.MemberListRequest) ([]*membership.Member, error) { + if r.Linearizable { + if err := s.LinearizableReadNotify(ctx); err != nil { + return nil, err + } + } + + if err := s.requireAuthInfo(ctx); err != nil { + return nil, err + } + return s.cluster.Members(), nil +} + func (s *EtcdServer) setCommittedIndex(v uint64) { atomic.StoreUint64(&s.committedIndex, v) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 28878c177706..11c3811e2fca 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -16,6 +16,7 @@ package etcdserver import ( "context" + "encoding/binary" "encoding/json" "fmt" "io/ioutil" @@ -2232,3 +2233,216 @@ func TestIsActive(t *testing.T) { require.Equal(t, tc.expectActive, s.isActive()) } } + +func TestRequestCurrentIndex_LeaderChangedRace(t *testing.T) { + s, _ := setupTestRequestCurrentIndex(t) + + for i := 0; i < 100; i++ { + s.r.readStateC <- raft.ReadState{Index: 100} + + s.leaderChangedMu.Lock() + leaderChangedNotifier := s.leaderChanged + close(leaderChangedNotifier) + s.leaderChanged = make(chan struct{}) + s.leaderChangedMu.Unlock() + + index, err := s.requestCurrentIndex(leaderChangedNotifier) + require.ErrorIs(t, err, ErrLeaderChanged) + require.Equal(t, uint64(0), index) + + // Clear the readStateC channel for the next iteration, + select { + case <-s.r.readStateC: + default: + } + } +} + +func TestRequestCurrentIndex_UniqueRequestID(t *testing.T) { + s, mockRaft := setupTestRequestCurrentIndex(t) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + s.leaderChangedMu.Lock() + leaderChanged := s.leaderChanged + s.leaderChangedMu.Unlock() + s.requestCurrentIndex(leaderChanged) + }() + + require.Eventually(t, func() bool { + return len(mockRaft.getRequests()) >= 2 + }, time.Second, 100*time.Millisecond) + + s.leaderChangedMu.Lock() + close(s.leaderChanged) + s.leaderChangedMu.Unlock() + wg.Wait() + + seen := make(map[uint64]bool) + for _, id := range mockRaft.getRequests() { + require.Falsef(t, seen[id], "Found duplicate request ID: %d", id) + seen[id] = true + } +} + +func TestRequestCurrentIndex_Success(t *testing.T) { + s, mockRaft := setupTestRequestCurrentIndex(t) + + wg := sync.WaitGroup{} + wg.Add(1) + var index uint64 + var err error + go func() { + defer wg.Done() + s.leaderChangedMu.Lock() + leaderChanged := s.leaderChanged + s.leaderChangedMu.Unlock() + index, err = s.requestCurrentIndex(leaderChanged) + }() + + require.Eventually(t, func() bool { + return len(mockRaft.getRequests()) == 1 + }, time.Second, 100*time.Millisecond) + + reqID := mockRaft.getRequests()[0] + reqIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(reqIDBytes, reqID) + + s.r.readStateC <- raft.ReadState{ + Index: 100, + RequestCtx: reqIDBytes, + } + + wg.Wait() + + require.NoError(t, err) + require.Equal(t, uint64(100), index) + require.Lenf(t, mockRaft.getRequests(), 1, "Expected exactly 1 ReadIndex request") +} + +func TestRequestCurrentIndex_WrongRequestID(t *testing.T) { + s, mockRaft := setupTestRequestCurrentIndex(t) + + wg := sync.WaitGroup{} + wg.Add(1) + var index uint64 + var err error + go func() { + defer wg.Done() + s.leaderChangedMu.Lock() + leaderChanged := s.leaderChanged + s.leaderChangedMu.Unlock() + index, err = s.requestCurrentIndex(leaderChanged) + }() + + require.Eventually(t, func() bool { + return len(mockRaft.getRequests()) == 1 + }, time.Second, 10*time.Millisecond) + + wrongReqIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(wrongReqIDBytes, 99999) + + s.r.readStateC <- raft.ReadState{ + Index: 100, + RequestCtx: wrongReqIDBytes, + } + + time.Sleep(100 * time.Millisecond) + requests := mockRaft.getRequests() + require.Lenf(t, requests, 1, "Expected exactly 1 ReadIndex request") + + reqID := requests[0] + reqIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(reqIDBytes, reqID) + + s.r.readStateC <- raft.ReadState{ + Index: 99, + RequestCtx: reqIDBytes, + } + wg.Wait() + + require.NoError(t, err) + require.Equal(t, uint64(99), index) + require.Lenf(t, mockRaft.getRequests(), 1, "Expected exactly 1 ReadIndex request") +} + +func TestRequestCurrentIndex_DelayedResponse(t *testing.T) { + s, mockRaft := setupTestRequestCurrentIndex(t) + + wg := sync.WaitGroup{} + wg.Add(1) + var index uint64 + var err error + go func() { + defer wg.Done() + s.leaderChangedMu.Lock() + leaderChanged := s.leaderChanged + s.leaderChangedMu.Unlock() + index, err = s.requestCurrentIndex(leaderChanged) + }() + + require.Eventually(t, func() bool { + return len(mockRaft.getRequests()) >= 3 + }, 2*time.Second, 100*time.Millisecond) + requests := mockRaft.getRequests() + + reqID := requests[1] + reqIDBytes := make([]byte, 8) + binary.BigEndian.PutUint64(reqIDBytes, reqID) + + select { + case s.r.readStateC <- raft.ReadState{ + Index: 100, + RequestCtx: reqIDBytes, + }: + case <-time.After(time.Second): + t.Fatal("timed out sending read state") + } + wg.Wait() + + require.NoError(t, err) + require.Equal(t, uint64(100), index) +} + +func setupTestRequestCurrentIndex(t *testing.T) (*EtcdServer, *testRaftNode) { + mockRaft := &testRaftNode{} + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + reqIDGen: idutil.NewGenerator(0, time.Time{}), + firstCommitInTermC: make(chan struct{}), + leaderChanged: make(chan struct{}), + r: raftNode{ + raftNodeConfig: raftNodeConfig{ + Node: mockRaft, + }, + readStateC: make(chan raft.ReadState, 1), + }, + } + return s, mockRaft +} + +type testRaftNode struct { + raft.Node + mu sync.Mutex + readIndexRequests []uint64 +} + +func (m *testRaftNode) ReadIndex(ctx context.Context, rctx []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + if len(rctx) == 8 { + m.readIndexRequests = append(m.readIndexRequests, binary.BigEndian.Uint64(rctx)) + } + return nil +} + +func (m *testRaftNode) getRequests() []uint64 { + m.mu.Lock() + defer m.mu.Unlock() + res := make([]uint64, len(m.readIndexRequests)) + copy(res, m.readIndexRequests) + return res +} diff --git a/server/etcdserver/util.go b/server/etcdserver/util.go index b048b86416e2..7f115829e8f5 100644 --- a/server/etcdserver/util.go +++ b/server/etcdserver/util.go @@ -21,6 +21,7 @@ import ( "time" "github.com/golang/protobuf/proto" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" @@ -29,12 +30,30 @@ import ( "go.uber.org/zap" ) -// isConnectedToQuorumSince checks whether the local member is connected to the -// quorum of the cluster since the given time. +// isConnectedToQuorumSince reports whether the local member has been connected +// to a quorum of the current cluster continuously since the given time. func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1 } +// isConnectedToQuorumAfterAddingNewMemberSince reports whether the local member +// has been connected to a quorum continuously since the given time, assuming a +// new member is being added to the cluster. +// +// For a single-member cluster, it always returns true to allow membership +// expansion. +func isConnectedToQuorumAfterAddingNewMemberSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { + if len(members) == 1 { + // If it's a single member cluster, we should allow adding a new member + return true + } + return numConnectedSince(transport, since, self, members) >= quorum(len(members)+1) +} + +func quorum(num int) int { + return num/2 + 1 +} + // isConnectedSince checks whether the local member is connected to the // remote member since the given time. func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool { @@ -42,12 +61,6 @@ func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote ty return !t.IsZero() && t.Before(since) } -// isConnectedFullySince checks whether the local member is connected to all -// members in the cluster since the given time. -func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { - return numConnectedSince(transport, since, self, members) == len(members) -} - // numConnectedSince counts how many members are connected to the local member // since the given time. func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index a9e1f3586afe..01b32ea1a60e 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -15,7 +15,6 @@ package etcdserver import ( - "bytes" "context" "encoding/base64" "encoding/binary" @@ -170,7 +169,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse var resp *pb.TxnResponse var err error chk := func(ai *auth.AuthInfo) error { - return checkTxnAuth(s.authStore, ai, r) + return checkTxnAuth(s.authStore, ai, s.lessor, r) } defer func(start time.Time) { @@ -268,6 +267,11 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (* // only use positive int64 id's r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1)) } + + if err := s.requireAuthInfo(ctx); err != nil { + return nil, err + } + resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r}) if err != nil { return nil, err @@ -288,6 +292,10 @@ func (s *EtcdServer) waitAppliedIndex() error { } func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { + if err := s.requireAuthInfo(ctx); err != nil { + return nil, err + } + resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r}) if err != nil { return nil, err @@ -312,6 +320,10 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e return 0, err } + if err := s.checkLeaseRenew(ctx, id); err != nil { + return 0, err + } + ttl, err := s.lessor.Renew(id) if err == nil { // already requested to primary lessor(leader) return ttl, nil @@ -330,6 +342,11 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e if lerr != nil { return -1, lerr } + + if err := s.checkLeaseRenew(ctx, id); err != nil { + return 0, err + } + for _, url := range leader.PeerURLs { lurl := url + leasehttp.LeasePrefix ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt) @@ -347,6 +364,39 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e return -1, ErrCanceled } +func (s *EtcdServer) checkLeaseRenew(ctx context.Context, leaseID lease.LeaseID) error { + rev := s.AuthStore().Revision() + if !s.AuthStore().IsAuthEnabled() { + return nil + } + + authInfo, err := s.AuthInfoFromCtx(ctx) + if err != nil { + return err + } + if authInfo == nil { + return auth.ErrUserEmpty + } + + if s.AuthStore().IsAdminPermitted(authInfo) == nil { + return nil + } + + l := s.lessor.Lookup(leaseID) + if l != nil { + for _, key := range l.Keys() { + if err := s.AuthStore().IsPutPermitted(authInfo, []byte(key)); err != nil { + return err + } + } + } + + if rev != s.AuthStore().Revision() { + return auth.ErrAuthOldRevision + } + return nil +} + func (s *EtcdServer) checkLeaseTimeToLive(ctx context.Context, leaseID lease.LeaseID) (uint64, error) { rev := s.AuthStore().Revision() if !s.AuthStore().IsAuthEnabled() { @@ -360,6 +410,10 @@ func (s *EtcdServer) checkLeaseTimeToLive(ctx context.Context, leaseID lease.Lea return rev, auth.ErrUserEmpty } + if s.AuthStore().IsAdminPermitted(authInfo) == nil { + return rev, nil + } + l := s.lessor.Lookup(leaseID) if l != nil { for _, key := range l.Keys() { @@ -435,6 +489,10 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR } func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { + if err := s.requireAuthInfo(ctx); err != nil { + return nil, err + } + var rev uint64 var err error if r.Keys { @@ -461,6 +519,11 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR // LeaseLeases is really ListLeases !??? func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { ls := s.lessor.Leases() + + if err := s.checkLeaseLeases(ctx, ls); err != nil { + return nil, err + } + lss := make([]*pb.LeaseStatus, len(ls)) for i := range ls { lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)} @@ -468,6 +531,40 @@ func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil } +func (s *EtcdServer) checkLeaseLeases(ctx context.Context, leases []*lease.Lease) error { + rev := s.AuthStore().Revision() + + if !s.AuthStore().IsAuthEnabled() { + return nil + } + + authInfo, err := s.AuthInfoFromCtx(ctx) + if err != nil { + return err + } + + if authInfo == nil { + return auth.ErrUserEmpty + } + + if err := s.AuthStore().IsAdminPermitted(authInfo); err == nil { + return nil + } + + for _, l := range leases { + for _, key := range l.Keys() { + if err := s.AuthStore().IsRangePermitted(authInfo, []byte(key), []byte{}); err != nil { + return err + } + } + } + + if rev != s.AuthStore().Revision() { + return auth.ErrAuthOldRevision + } + return nil +} + func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) { leader := s.cluster.Member(s.Leader()) for leader == nil { @@ -813,7 +910,6 @@ func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } func (s *EtcdServer) linearizableReadLoop() { for { - requestId := s.reqIDGen.Next() leaderChangedNotifier := s.LeaderChangedNotify() select { case <-leaderChangedNotifier: @@ -833,7 +929,7 @@ func (s *EtcdServer) linearizableReadLoop() { s.readNotifier = nextnr s.readMu.Unlock() - confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId) + confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier) if isStopped(err) { return } @@ -868,8 +964,11 @@ func isStopped(err error) bool { return err == raft.ErrStopped || err == ErrStopped } -func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { - err := s.sendReadIndex(requestId) +func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}) (uint64, error) { + requestIDs := map[uint64]struct{}{} + requestID := s.reqIDGen.Next() + requestIDs[requestID] = struct{}{} + err := s.sendReadIndex(requestID) if err != nil { return 0, err } @@ -885,19 +984,23 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, for { select { case rs := <-s.r.readStateC: - requestIdBytes := uint64ToBigEndianBytes(requestId) - gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes) - if !gotOwnResponse { + // Check again if leader changed as when multiple channels are ready, select picks randomly. + select { + case <-leaderChangedNotifier: + readIndexFailed.Inc() + return 0, ErrLeaderChanged + default: + } + responseID := uint64(0) + if len(rs.RequestCtx) == 8 { + responseID = binary.BigEndian.Uint64(rs.RequestCtx) + } + if _, ok := requestIDs[responseID]; !ok { // a previous request might time out. now we should ignore the response of it and // continue waiting for the response of the current requests. - responseId := uint64(0) - if len(rs.RequestCtx) == 8 { - responseId = binary.BigEndian.Uint64(rs.RequestCtx) - } lg.Warn( "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", - zap.Uint64("sent-request-id", requestId), - zap.Uint64("received-request-id", responseId), + zap.Uint64("received-request-id", responseID), ) slowReadIndex.Inc() continue @@ -910,7 +1013,9 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, case <-firstCommitInTermNotifier: firstCommitInTermNotifier = s.FirstCommitInTermNotify() lg.Info("first commit in current term: resending ReadIndex request") - err := s.sendReadIndex(requestId) + requestID = s.reqIDGen.Next() + requestIDs[requestID] = struct{}{} + err := s.sendReadIndex(requestID) if err != nil { return 0, err } @@ -919,10 +1024,12 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, case <-retryTimer.C: lg.Warn( "waiting for ReadIndex response took too long, retrying", - zap.Uint64("sent-request-id", requestId), + zap.Uint64("sent-request-id", requestID), zap.Duration("retry-timeout", readIndexRetryTime), ) - err := s.sendReadIndex(requestId) + requestID = s.reqIDGen.Next() + requestIDs[requestID] = struct{}{} + err := s.sendReadIndex(requestID) if err != nil { return 0, err } @@ -1094,3 +1201,19 @@ func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()} return &resp, nil } + +func (s *EtcdServer) requireAuthInfo(ctx context.Context) error { + if !s.authStore.IsAuthEnabled() { + return nil + } + + authInfo, err := s.AuthInfoFromCtx(ctx) + if err != nil { + return err + } + + if authInfo == nil { + return auth.ErrUserEmpty + } + return nil +} diff --git a/tests/e2e/ctl_v3_auth_test.go b/tests/e2e/ctl_v3_auth_test.go index be9b44061ffc..3a6f490e89aa 100644 --- a/tests/e2e/ctl_v3_auth_test.go +++ b/tests/e2e/ctl_v3_auth_test.go @@ -20,12 +20,14 @@ import ( "net/url" "os" "path/filepath" + "strings" "syscall" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -1136,21 +1138,26 @@ func authTestEndpointHealth(cx ctlCtx) { cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) } - // health checking with an ordinary user "succeeds" since permission denial goes through consensus - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } - // succeed if permissions granted for ordinary user cx.user, cx.pass = "root", "root" if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil { cx.t.Fatal(err) } + cx.user, cx.pass = "test-user", "pass" if err := ctlV3EndpointHealth(cx); err != nil { cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) } + + cmdArgs := append(cx.PrefixArgs(), "endpoint", "health", "--user=root:root", "--cluster") + proc, err := e2e.SpawnCmd(cmdArgs, cx.envMap) + require.NoError(cx.t, err) + defer func() { + require.NoError(cx.t, proc.Close()) + }() + proc.Wait() + response := strings.Join(proc.Lines(), "\n") + require.Contains(cx.t, response, "is healthy: successfully") } func certCNAndUsername(cx ctlCtx, noPassword bool) { @@ -1455,7 +1462,7 @@ func authTestRecoverSnapshot(cx ctlCtx) { _, err = cliUser.Put(context.TODO(), "foo", "bar") require.NoError(cx.t, err) - //verify all nodes have the same revision and hash + // verify all nodes have the same revision and hash var endpoints []string for _, proc := range cx.epc.Procs { endpoints = append(endpoints, proc.Config().Acurl) diff --git a/tests/e2e/ctl_v3_member_test.go b/tests/e2e/ctl_v3_member_test.go index 76b33fbc6686..aeed229913b9 100644 --- a/tests/e2e/ctl_v3_member_test.go +++ b/tests/e2e/ctl_v3_member_test.go @@ -21,13 +21,16 @@ import ( "reflect" "strings" "testing" + "time" "github.com/stretchr/testify/require" "go.etcd.io/bbolt" + "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/server/v3/datadir" + "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/tests/v3/framework/e2e" @@ -95,6 +98,14 @@ func TestCtlV3MemberUpdatePeerTLS(t *testing.T) { testCtl(t, memberUpdateTest, withCfg(*e2e.NewConfigPeerTLS())) } +func TestCtlV3MemberPromoteWithAuthFromLeader(t *testing.T) { + testCtl(t, memberPromoteWithAuth(false), withTestTimeout(30*time.Second)) +} + +func TestCtlV3MemberPromoteWithAuthFromFollower(t *testing.T) { + testCtl(t, memberPromoteWithAuth(true), withTestTimeout(30*time.Second)) +} + func memberListTest(cx ctlCtx) { if err := ctlV3MemberList(cx); err != nil { cx.t.Fatalf("memberListTest ctlV3MemberList error (%v)", err) @@ -417,3 +428,119 @@ func ensureAllMembersFromV3StoreAreVotingMembers(t *testing.T, dataDir string) { require.Falsef(t, m.IsLearner, "member is still learner: %+v", m) } } + +func memberPromoteWithAuth(fromFollower bool) func(cx ctlCtx) { + return func(cx ctlCtx) { + // Add a regular member + _, err := cx.epc.StartNewProc(nil, false, cx.t) + require.NoError(cx.t, err) + + var learnerID uint64 + var addErr error + for { + // Add a learner once the cluster is healthy + learnerID, addErr = cx.epc.StartNewProc(nil, true, cx.t) + if addErr != nil { + if strings.Contains(addErr.Error(), "etcdserver: unhealthy cluster") { + time.Sleep(1 * time.Second) + continue + } + } + break + } + require.NoError(cx.t, addErr) + + leaderIdx := cx.epc.WaitLeader(cx.t) + followerIdx := (leaderIdx + 1) % len(cx.epc.Procs) + + require.NoError(cx.t, authEnable(cx)) + cx.user, cx.pass = "root", "root" + + if fromFollower { + _, err = cx.epc.Procs[followerIdx]. + Etcdctl(e2e.ClientNonTLS, false, false). + MemberPromoteWithAuth(learnerID, cx.user, cx.pass) + } else { + _, err = cx.epc.Procs[leaderIdx]. + Etcdctl(e2e.ClientNonTLS, false, false). + MemberPromoteWithAuth(learnerID, cx.user, cx.pass) + } + + require.NoError(cx.t, err) + } +} + +// TestCtlV3MemberAddAsLearnerWithOneMemberDown verifies the case +// of adding new member when one or two existing members are down. +// Refer to https://github.com/etcd-io/etcd/issues/21640 +func TestCtlV3MemberAddAsLearnerWithOneMemberDown(t *testing.T) { + testCases := []struct { + name string + clusterSize int + downMembers int + expectErr bool + }{ + { + name: "0 out of 1 member is down, allow adding member", + clusterSize: 1, + downMembers: 0, + expectErr: false, + }, + { + name: "1 out of 3 members is down, reject adding member", + clusterSize: 3, + downMembers: 1, + expectErr: true, + }, + { + name: "1 out of 4 members is down, allow adding member", + clusterSize: 4, + downMembers: 1, + expectErr: false, + }, + { + name: "1 out of 5 members is down, allow adding member", + clusterSize: 5, + downMembers: 1, + expectErr: false, + }, + { + name: "2 out of 5 members are down, reject adding member", + clusterSize: 5, + downMembers: 2, + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + e2e.BeforeTest(t) + + t.Logf("Bootstrap a cluster with %d members", tc.clusterSize) + epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{ + ClusterSize: tc.clusterSize, + }) + require.NoError(t, err) + defer func() { + _ = epc.Close() + }() + + t.Logf("Killing %d member", tc.downMembers) + for i := 0; i < tc.downMembers; i++ { + t.Logf("Killing member, name: %s, peerURL: %s", epc.Procs[i].Config().Name, epc.Procs[i].Config().Purl.String()) + err = epc.Procs[i].Kill() + require.NoError(t, err) + } + + time.Sleep(etcdserver.HealthInterval + 2*time.Second) + + t.Log("Adding a new learner") + _, err = epc.Procs[len(epc.Procs)-1].Etcdctl(e2e.ClientNonTLS, false, false).MemberAddAsLearner("new-learner", []string{"http://10.0.0.12:2380"}) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/tests/e2e/etcd_grpcproxy_test.go b/tests/e2e/etcd_grpcproxy_test.go index 84fafba03bcf..6b3ce88fa601 100644 --- a/tests/e2e/etcd_grpcproxy_test.go +++ b/tests/e2e/etcd_grpcproxy_test.go @@ -137,7 +137,7 @@ func TestGrpcProxyTlsVersions(t *testing.T) { }() var ( - node1ClientURL = epc.Procs[0].Config().ClientHttpUrl + node1ClientURL = epc.Procs[0].Config().Acurl proxyClientURL = "127.0.0.1:42379" ) @@ -149,13 +149,30 @@ func TestGrpcProxyTlsVersions(t *testing.T) { "--endpoints-auto-sync-interval", "1s", "--cert-file", e2e.CertPath2, "--key-file", e2e.PrivateKeyPath2, + "--trusted-ca-file", e2e.CaPath, "--tls-min-version", "TLS1.2", "--tls-max-version", "TLS1.3", }, nil) require.NoError(t, err) defer proxyProc.Stop() - _, err = proxyProc.Expect("listening for gRPC proxy client requests") + _, err = proxyProc.Expect("started gRPC proxy") + require.NoError(t, err) + + curlProc, err := e2e.SpawnCmd([]string{ + "curl", + "--http1.1", + "--fail", + "--verbose", + "--cacert", e2e.CaPath, + "--cert", e2e.CertPath2, + "--key", e2e.PrivateKeyPath2, + "--max-time", "10", + "https://" + proxyClientURL + "/proxy/health", + }, nil) + require.NoError(t, err) + + _, err = curlProc.Expect(`"health":"true"`) require.NoError(t, err) } diff --git a/tests/framework/e2e/etcdctl.go b/tests/framework/e2e/etcdctl.go index f0bd53cf6fd8..27c0a1b089f5 100644 --- a/tests/framework/e2e/etcdctl.go +++ b/tests/framework/e2e/etcdctl.go @@ -183,6 +183,15 @@ func (ctl *Etcdctl) MemberPromote(id uint64) (*clientv3.MemberPromoteResponse, e return &resp, err } +func (ctl *Etcdctl) MemberPromoteWithAuth(id uint64, username, password string) (*clientv3.MemberPromoteResponse, error) { + if ctl.v2 { + panic("Unsupported method for v2") + } + var resp clientv3.MemberPromoteResponse + err := ctl.spawnJsonCmd(&resp, "member", "promote", fmt.Sprintf("%x", id), "--user", fmt.Sprintf("%s:%s", username, password)) + return &resp, err +} + func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) { if ctl.v2 { panic("Unsupported method for v2") diff --git a/tests/integration/clientv3/cluster_test.go b/tests/integration/clientv3/cluster_test.go index d624a56d7b2c..f606d4a7cb8c 100644 --- a/tests/integration/clientv3/cluster_test.go +++ b/tests/integration/clientv3/cluster_test.go @@ -23,7 +23,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/types" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/integration" ) @@ -426,3 +429,51 @@ func TestMaxLearnerInCluster(t *testing.T) { t.Errorf("failed to add member %v", err) } } + +func TestMemberUpdateLearner(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + capi := clus.Client(0) + + urls := []string{"http://127.0.0.1:1234"} + addResp, err := capi.MemberAddAsLearner(t.Context(), urls) + require.NoError(t, err) + learnerID := addResp.Member.ID + + learner, err := getMemberByID(t.Context(), capi, learnerID) + require.NoError(t, err) + require.Truef(t, learner.IsLearner, "added a member as learner, IsLearner is %t", learner.IsLearner) + + updatedURLs := []string{"http://127.0.0.1:5678"} + _, err = capi.MemberUpdate(t.Context(), learnerID, updatedURLs) + require.NoError(t, err) + + learner, err = getMemberByID(t.Context(), capi, learnerID) + require.NoError(t, err) + require.Equal(t, learner.PeerURLs, updatedURLs) + require.Truef(t, learner.IsLearner, "updated peer address of a learner member, IsLearner is %t", learner.IsLearner) +} + +func getMemberByID(ctx context.Context, cli *clientv3.Client, id uint64) (member *etcdserverpb.Member, err error) { + var resp *clientv3.MemberListResponse + resp, err = cli.MemberList(ctx) + if err != nil { + return member, err + } + + for _, m := range resp.Members { + if m.ID == id { + member = m + break + } + } + + if member == nil { + err = fmt.Errorf("failed to find member by id %d", id) + } + + return member, err +} diff --git a/tests/integration/v3_auth_test.go b/tests/integration/v3_auth_test.go index f030c4e164d9..b53265de76a4 100644 --- a/tests/integration/v3_auth_test.go +++ b/tests/integration/v3_auth_test.go @@ -17,15 +17,22 @@ package integration import ( "context" "fmt" + "strings" "sync" "testing" "time" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/api/v3/authpb" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/testutil" - "go.etcd.io/etcd/client/v3" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + PermissionDenied = "etcdserver: permission denied" ) // TestV3AuthEmptyUserGet ensures that a get with an empty user will return an empty user error. @@ -152,6 +159,16 @@ func testV3AuthWithLeaseRevokeWithRoot(t *testing.T, ccfg ClusterConfig) { defer clus.Terminate(t) api := toGRPC(clus.Client(0)) + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + key: "foo", + }, + } + authSetupUsers(t, api.Auth, users) authSetupRoot(t, api.Auth) rootc, cerr := NewClient(t, clientv3.Config{ @@ -164,6 +181,26 @@ func testV3AuthWithLeaseRevokeWithRoot(t *testing.T, ccfg ClusterConfig) { } defer rootc.Close() + testUserCli, terr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, terr) + defer testUserCli.Close() + + anonCli, aerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + }) + require.NoError(t, aerr) + defer anonCli.Close() + + _, aerr = anonCli.Grant(context.TODO(), 2) + require.ErrorContains(t, aerr, "etcdserver: user name is empty") + + _, terr = testUserCli.Grant(context.TODO(), 2) + require.NoError(t, terr) + leaseResp, err := rootc.Grant(context.TODO(), 2) if err != nil { t.Fatal(err) @@ -197,6 +234,7 @@ type user struct { name string password string role string + perm string key string end string } @@ -333,8 +371,15 @@ func authSetupUsers(t *testing.T, auth pb.AuthClient, users []user) { continue } + permType := authpb.READWRITE + if len(user.perm) > 0 { + val, ok := authpb.Permission_Type_value[strings.ToUpper(user.perm)] + if ok { + permType = authpb.Permission_Type(val) + } + } perm := &authpb.Permission{ - PermType: authpb.READWRITE, + PermType: permType, Key: []byte(user.key), RangeEnd: []byte(user.end), } @@ -589,6 +634,12 @@ func TestV3AuthWithLeaseTimeToLive(t *testing.T) { } defer user2c.Close() + anonCli, cerr := NewClient(t, clientv3.Config{Endpoints: clus.Client(0).Endpoints()}) + if cerr != nil { + t.Fatal(cerr) + } + defer anonCli.Close() + leaseResp, err := user1c.Grant(context.TODO(), 90) if err != nil { t.Fatal(err) @@ -614,6 +665,12 @@ func TestV3AuthWithLeaseTimeToLive(t *testing.T) { t.Fatal(err) } + _, err = anonCli.TimeToLive(context.TODO(), leaseID) + require.ErrorContains(t, err, "etcdserver: user name is empty") + + _, err = anonCli.TimeToLive(context.TODO(), leaseID, clientv3.WithAttachedKeys()) + require.ErrorContains(t, err, "etcdserver: user name is empty") + _, err = user2c.TimeToLive(context.TODO(), leaseID, clientv3.WithAttachedKeys()) if err == nil { t.Fatal("timetolive from user2 should be failed with permission denied") @@ -634,3 +691,456 @@ func TestV3AuthWithLeaseTimeToLive(t *testing.T) { t.Fatal("timetolive from user2 should be failed with permission denied") } } + +func TestV3AuthWithLeaseRenew(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + // test-user can only write keys in [k1, k3), i.e. k1 and k2. + key: "k1", + end: "k3", + }, + } + authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users) + authSetupRoot(t, toGRPC(clus.Client(0)).Auth) + + rootCli, cerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, cerr) + defer rootCli.Close() + + testUserClis := []*clientv3.Client{} + for i := 0; i < len(clus.Members); i++ { + testUserCli, err := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(i).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, err) + defer testUserCli.Close() + + testUserClis = append(testUserClis, testUserCli) + } + + anonCli, cerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + }) + require.NoError(t, cerr) + defer anonCli.Close() + + leaseResp, err := rootCli.Grant(t.Context(), 90) + require.NoError(t, err) + leaseID := leaseResp.ID + + _, err = rootCli.Put(t.Context(), "k1", "val", clientv3.WithLease(leaseID)) + require.NoError(t, err) + _, err = rootCli.Put(t.Context(), "k3", "val", clientv3.WithLease(leaseID)) + require.NoError(t, err) + + _, err = anonCli.KeepAliveOnce(t.Context(), leaseID) + require.ErrorContainsf(t, err, "etcdserver: user name is empty", "should reject renew") + + _, err = rootCli.KeepAliveOnce(t.Context(), leaseID) + require.NoError(t, err) + + for _, testUserCli := range testUserClis { + _, err = testUserCli.KeepAliveOnce(t.Context(), leaseID) + require.ErrorContainsf(t, err, "etcdserver: permission denied", "[%v] should reject renew", testUserCli.Endpoints()) + } + + leaseResp, err = rootCli.Grant(t.Context(), 90) + require.NoError(t, err) + leaseID = leaseResp.ID + + _, err = rootCli.Put(t.Context(), "k1", "val", clientv3.WithLease(leaseID)) + require.NoError(t, err) + _, err = rootCli.Put(t.Context(), "k2", "val", clientv3.WithLease(leaseID)) + require.NoError(t, err) + + for _, testUserCli := range testUserClis { + _, err = testUserCli.KeepAliveOnce(t.Context(), leaseID) + require.NoErrorf(t, err, "[%v] should accept renew", testUserCli.Endpoints()) + } +} + +func TestV3AuthAlarm(t *testing.T) { + BeforeTest(t) + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + clus := NewClusterV3(t, &ClusterConfig{ + Size: 1, + QuotaBackendBytes: 1024 * 5, + }) + defer clus.Terminate(t) + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + key: "k1", + end: "k3", + }, + } + authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users) + authSetupRoot(t, toGRPC(clus.Client(0)).Auth) + + rootCli, rerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, rerr) + defer rootCli.Close() + + testUserCli, terr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, terr) + defer testUserCli.Close() + + anonCli, aerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + }) + require.NoError(t, aerr) + defer anonCli.Close() + + for i := 0; ; i++ { + _, err := rootCli.Put(ctx, fmt.Sprintf("%v", int64(i)), strings.Repeat("A", 1024)) + if err == nil { + continue + } + + require.ErrorContains(t, err, "etcdserver: mvcc: database space exceeded") + break + } + + _, err := anonCli.AlarmList(ctx) + require.ErrorContains(t, err, "etcdserver: user name is empty") + + memberID := uint64(0) + + for i := 0; i < 10; i++ { + resp, rerr := rootCli.AlarmList(ctx) + require.NoError(t, rerr) + + if len(resp.Alarms) > 0 { + memberID = resp.Header.MemberId + break + } + time.Sleep(1 * time.Second) + } + require.NotEqualf(t, uint64(0), memberID, "expect to find alarm with non-zero member ID") + + resp, err := testUserCli.AlarmList(ctx) + require.NoError(t, err) + require.Len(t, resp.Alarms, 1) + + _, err = testUserCli.AlarmDisarm(ctx, &clientv3.AlarmMember{ + MemberID: memberID, + Alarm: pb.AlarmType_NOSPACE, + }) + require.ErrorContains(t, err, PermissionDenied) + + resp, err = rootCli.AlarmDisarm(ctx, &clientv3.AlarmMember{ + MemberID: memberID, + Alarm: pb.AlarmType_NOSPACE, + }) + require.NoError(t, err) + require.Lenf(t, resp.Alarms, 1, "expect 1 alarm from disarm but got %v", resp.Alarms) + + resp, err = rootCli.AlarmList(ctx) + require.NoError(t, err) + require.Emptyf(t, resp.Alarms, "expect no alarm after disarm but got %v", resp.Alarms) +} + +func TestV3AuthMemberListAndStatus(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + key: "k1", + end: "k3", + }, + } + authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users) + authSetupRoot(t, toGRPC(clus.Client(0)).Auth) + + rootCli, rerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, rerr) + defer rootCli.Close() + + testUserCli, terr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, terr) + defer testUserCli.Close() + + anonCli, aerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + }) + require.NoError(t, aerr) + defer anonCli.Close() + + _, err := anonCli.MemberList(ctx) + require.ErrorContains(t, err, "etcdserver: user name is empty") + + _, err = testUserCli.MemberList(ctx) + require.NoError(t, err) + + _, err = testUserCli.Status(ctx, clus.Client(0).Endpoints()[0]) + require.ErrorContains(t, err, PermissionDenied) + + _, err = rootCli.MemberList(ctx) + require.NoError(t, err) + + _, err = rootCli.Status(ctx, clus.Client(0).Endpoints()[0]) + require.NoError(t, err) +} + +func TestV3AuthLeaseLeases(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + key: "foo", + }, + } + authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users) + authSetupRoot(t, toGRPC(clus.Client(0)).Auth) + + rootCli, rerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, rerr) + defer rootCli.Close() + + testUserCli, terr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, terr) + defer testUserCli.Close() + + anonCli, aerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + }) + require.NoError(t, aerr) + defer anonCli.Close() + + lresp, err := rootCli.Grant(ctx, 90) + require.NoError(t, err) + firstLeaseID := lresp.ID + + _, err = rootCli.Put(ctx, "foo", "value", clientv3.WithLease(firstLeaseID)) + require.NoError(t, err) + + lresp, err = rootCli.Grant(ctx, 90) + require.NoError(t, err) + secondLeaseID := lresp.ID + + _, err = rootCli.Put(ctx, "foo1", "value", clientv3.WithLease(secondLeaseID)) + require.NoError(t, err) + + _, err = testUserCli.Leases(ctx) + require.ErrorContains(t, err, PermissionDenied) + + _, err = anonCli.Leases(ctx) + require.ErrorContains(t, err, "etcdserver: user name is empty") + + resp, err := rootCli.Leases(ctx) + require.NoError(t, err) + require.Lenf(t, resp.Leases, 2, "want 2 leases but got %v", resp.Leases) + + leaseIDs := []clientv3.LeaseID{firstLeaseID, secondLeaseID} + for _, lease := range resp.Leases { + require.Containsf(t, leaseIDs, lease.ID, "unexpected lease ID %v, want one of %v", lease.ID, leaseIDs) + } + + _, err = rootCli.Revoke(ctx, secondLeaseID) + require.NoError(t, err) + + _, err = testUserCli.Leases(ctx) + require.NoError(t, err) +} + +func TestV3AuthCompact(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + users := []user{ + { + name: "test-user", + password: "test-user-123", + role: "test-role", + key: "foo", + }, + } + authSetupUsers(t, toGRPC(clus.Client(0)).Auth, users) + authSetupRoot(t, toGRPC(clus.Client(0)).Auth) + + rootCli, rerr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, rerr) + defer rootCli.Close() + + testUserCli, terr := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "test-user", + Password: "test-user-123", + }) + require.NoError(t, terr) + defer testUserCli.Close() + + _, err := rootCli.Put(ctx, "key", "value") + require.NoError(t, err) + + _, err = rootCli.Put(ctx, "key", "value") + require.NoError(t, err) + + _, err = testUserCli.Compact(ctx, 1, clientv3.WithCompactPhysical()) + require.ErrorContains(t, err, PermissionDenied) + + _, err = rootCli.Compact(ctx, 1, clientv3.WithCompactPhysical()) + require.NoError(t, err) +} + +func TestReadWithPrevKvInTXN(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + users := []user{ + { + name: "user1", + password: "user1-123", + role: "role1", + perm: "write", + key: "foo", + end: "zoo", + }, + } + anonCli := toGRPC(clus.Client(0)) + authSetupUsers(t, anonCli.Auth, users) + authSetupRoot(t, anonCli.Auth) + + rootc, err := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, err) + defer rootc.Close() + + userc, err := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "user1", + Password: "user1-123", + }) + require.NoError(t, err) + defer userc.Close() + + _, err = rootc.Put(t.Context(), "foo", "bar") + require.NoError(t, err) + + _, err = userc.Txn(t.Context()). + Then(clientv3.OpPut("foo", "new", clientv3.WithPrevKV())). + Commit() + + require.Error(t, err) + require.Truef(t, eqErrGRPC(err, rpctypes.ErrGRPCPermissionDenied), "got %v, expected %v", err, rpctypes.ErrGRPCPermissionDenied) +} + +func TestPutWithLeaseInTXN(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + users := []user{ + { + name: "user1", + password: "user1-123", + role: "role1", + perm: "write", + key: "foo", + end: "fop", + }, + } + anonCli := toGRPC(clus.Client(0)) + authSetupUsers(t, anonCli.Auth, users) + authSetupRoot(t, anonCli.Auth) + + rootc, err := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "root", + Password: "123", + }) + require.NoError(t, err) + defer rootc.Close() + + userc, err := NewClient(t, clientv3.Config{ + Endpoints: clus.Client(0).Endpoints(), + Username: "user1", + Password: "user1-123", + }) + require.NoError(t, err) + defer userc.Close() + + t.Log("Create a lease and attach it to a key which the user1 doesn't have permission to write") + leaseResp, err := rootc.Grant(t.Context(), 90) + require.NoError(t, err) + leaseID := leaseResp.ID + _, err = rootc.Put(t.Context(), "eoo", "bar", clientv3.WithLease(leaseID)) + require.NoError(t, err) + + _, err = userc.Txn(t.Context()). + Then(clientv3.OpPut("foo", "new", clientv3.WithLease(leaseID))). + Commit() + + require.Error(t, err) + require.Truef(t, eqErrGRPC(err, rpctypes.ErrGRPCPermissionDenied), "got %v, expected %v", err, rpctypes.ErrGRPCPermissionDenied) +}