Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-31749: Interceptors to reject requests from removed Raft nodes #28875

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/28875.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:change
storage/raft: Do not allow nodes that have been removed from the raft cluster configuration to respond to requests. Shutdown and seal raft nodes when they are removed.
```
9 changes: 7 additions & 2 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,9 +988,14 @@ func forwardRequest(core *vault.Core, w http.ResponseWriter, r *http.Request) {
// ErrCannotForward and we simply fall back
statusCode, header, retBytes, err := core.ForwardRequest(r)
if err != nil {
if err == vault.ErrCannotForward {
switch {
case errors.Is(err, vault.ErrCannotForward):
core.Logger().Trace("cannot forward request (possibly disabled on active node), falling back to redirection to standby")
} else {
case errors.Is(err, vault.StatusNotHAMember):
core.Logger().Trace("this node is not a member of the HA cluster", "error", err)
respondError(w, http.StatusInternalServerError, err)
return
default:
core.Logger().Error("forward request error", "error", err)
}

Expand Down
12 changes: 2 additions & 10 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -4579,16 +4579,8 @@ func (c *Core) setupAuditedHeadersConfig(ctx context.Context) error {
// RemovableNodeHABackend interface. The value of the `ok` result will be false
// if the HA and underlyingPhysical backends are nil or do not support this operation.
func (c *Core) IsRemovedFromCluster() (removed, ok bool) {
var haBackend any
if c.ha != nil {
haBackend = c.ha
} else if c.underlyingPhysical != nil {
haBackend = c.underlyingPhysical
} else {
return false, false
}
removableNodeHA, ok := haBackend.(physical.RemovableNodeHABackend)
if !ok {
removableNodeHA := c.getRemovableHABackend()
if removableNodeHA == nil {
return false, false
}

Expand Down
3 changes: 2 additions & 1 deletion vault/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3724,7 +3724,7 @@ func TestCore_IsRemovedFromCluster(t *testing.T) {
core.underlyingPhysical = mockHA
removed, ok = core.IsRemovedFromCluster()
if removed || !ok {
t.Fatalf("expected removed and ok to be false, got removed: %v, ok: %v", removed, ok)
t.Fatalf("expected removed to be false and ok to be true, got removed: %v, ok: %v", removed, ok)
}

// Test case where HA backend is nil, but the underlying physical is there, supports RemovableNodeHABackend, and is removed
Expand All @@ -3735,6 +3735,7 @@ func TestCore_IsRemovedFromCluster(t *testing.T) {
}

// Test case where HA backend does not support RemovableNodeHABackend
core.underlyingPhysical = &MockHABackend{}
core.ha = &MockHABackend{}
removed, ok = core.IsRemovedFromCluster()
if removed || ok {
Expand Down
30 changes: 30 additions & 0 deletions vault/external_tests/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,3 +1360,33 @@ func TestRaft_Join_InitStatus(t *testing.T) {
verifyInitStatus(i, true)
}
}

// TestRaftCluster_Removed creates a 3 node raft cluster and then removes one of
// the nodes. The test verifies that a write on the removed node errors, and that
// the removed node is sealed.
func TestRaftCluster_Removed(t *testing.T) {
t.Parallel()
cluster, _ := raftCluster(t, nil)
defer cluster.Cleanup()

follower := cluster.Cores[2]
followerClient := follower.Client
_, err := followerClient.Logical().Write("secret/foo", map[string]interface{}{
"test": "data",
})
require.NoError(t, err)

_, err = cluster.Cores[0].Client.Logical().Write("/sys/storage/raft/remove-peer", map[string]interface{}{
"server_id": follower.NodeID,
})
followerClient.SetCheckRedirect(func(request *http.Request, requests []*http.Request) error {
require.Fail(t, "request caused a redirect", request.URL.Path)
return fmt.Errorf("no redirects allowed")
})
require.NoError(t, err)
_, err = followerClient.Logical().Write("secret/foo", map[string]interface{}{
"test": "other_data",
})
require.Error(t, err)
require.True(t, follower.Sealed())
}
13 changes: 13 additions & 0 deletions vault/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -1223,3 +1223,16 @@ func (c *Core) SetNeverBecomeActive(on bool) {
atomic.StoreUint32(c.neverBecomeActive, 0)
}
}

func (c *Core) getRemovableHABackend() physical.RemovableNodeHABackend {
var haBackend physical.RemovableNodeHABackend
if removableHA, ok := c.ha.(physical.RemovableNodeHABackend); ok {
haBackend = removableHA
}

if removableHA, ok := c.underlyingPhysical.(physical.RemovableNodeHABackend); ok {
haBackend = removableHA
}

return haBackend
}
114 changes: 114 additions & 0 deletions vault/request_forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,116 @@ import (
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/vault/cluster"
"github.com/hashicorp/vault/vault/replication"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

var (
NotHAMember = "node is not in HA cluster membership"
StatusNotHAMember = status.Errorf(codes.FailedPrecondition, NotHAMember)
)

const haNodeIDKey = "ha_node_id"

func haIDFromContext(ctx context.Context) (string, bool) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
res := md.Get(haNodeIDKey)
if len(res) == 0 {
return "", false
}
return res[0], true
}

// haMembershipServerCheck extracts the client's HA node ID from the context
// and checks if this client has been removed. The function returns
// StatusNotHAMember if the client has been removed
func haMembershipServerCheck(ctx context.Context, c *Core, haBackend physical.RemovableNodeHABackend) error {
if haBackend == nil {
return nil
}
nodeID, ok := haIDFromContext(ctx)
if !ok {
return nil
}
removed, err := haBackend.IsNodeRemoved(ctx, nodeID)
if err != nil {
c.logger.Error("failed to check if node is removed", "error", err)
return err
}
if removed {
return StatusNotHAMember
}
return nil
}

func haMembershipUnaryServerInterceptor(c *Core, haBackend physical.RemovableNodeHABackend) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
err = haMembershipServerCheck(ctx, c, haBackend)
if err != nil {
return nil, err
}
return handler(ctx, req)
}
}

func haMembershipStreamServerInterceptor(c *Core, haBackend physical.RemovableNodeHABackend) grpc.StreamServerInterceptor {
return func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
err := haMembershipServerCheck(ss.Context(), c, haBackend)
if err != nil {
return err
}
return handler(srv, ss)
}
}

// haMembershipClientCheck checks if the given error from the server
// is StatusNotHAMember. If so, the client will mark itself as removed
// and shutdown
func haMembershipClientCheck(err error, c *Core, haBackend physical.RemovableNodeHABackend) {
if !errors.Is(err, StatusNotHAMember) {
return
}
removeErr := haBackend.RemoveSelf()
if removeErr != nil {
c.logger.Debug("failed to remove self", "error", removeErr)
}
go c.ShutdownCoreError(errors.New("node removed from HA configuration"))
}

func haMembershipUnaryClientInterceptor(c *Core, haBackend physical.RemovableNodeHABackend) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if haBackend == nil {
return invoker(ctx, method, req, reply, cc, opts...)
}
ctx = metadata.AppendToOutgoingContext(ctx, haNodeIDKey, haBackend.NodeID())
err := invoker(ctx, method, req, reply, cc, opts...)
haMembershipClientCheck(err, c, haBackend)
return err
}
}

func haMembershipStreamClientInterceptor(c *Core, haBackend physical.RemovableNodeHABackend) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
if haBackend == nil {
return streamer(ctx, desc, cc, method, opts...)
}
ctx = metadata.AppendToOutgoingContext(ctx, haNodeIDKey, haBackend.NodeID())
stream, err := streamer(ctx, desc, cc, method, opts...)
haMembershipClientCheck(err, c, haBackend)
return stream, err
}
}

type requestForwardingHandler struct {
fws *http2.Server
fwRPCServer *grpc.Server
Expand All @@ -47,13 +150,16 @@ type requestForwardingClusterClient struct {
func NewRequestForwardingHandler(c *Core, fws *http2.Server, perfStandbySlots chan struct{}, perfStandbyRepCluster *replication.Cluster) (*requestForwardingHandler, error) {
// Resolve locally to avoid races
ha := c.ha != nil
removableHABackend := c.getRemovableHABackend()

fwRPCServer := grpc.NewServer(
grpc.KeepaliveParams(keepalive.ServerParameters{
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.MaxRecvMsgSize(math.MaxInt32),
grpc.MaxSendMsgSize(math.MaxInt32),
grpc.StreamInterceptor(haMembershipStreamServerInterceptor(c, removableHABackend)),
grpc.UnaryInterceptor(haMembershipUnaryServerInterceptor(c, removableHABackend)),
)

if ha && c.clusterHandler != nil {
Expand Down Expand Up @@ -274,6 +380,8 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
core: c,
})

removableHABackend := c.getRemovableHABackend()

// Set up grpc forwarding handling
// It's not really insecure, but we have to dial manually to get the
// ALPN header right. It's just "insecure" because GRPC isn't managing
Expand All @@ -285,6 +393,8 @@ func (c *Core) refreshRequestForwardingConnection(ctx context.Context, clusterAd
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 2 * c.clusterHeartbeatInterval,
}),
grpc.WithStreamInterceptor(haMembershipStreamClientInterceptor(c, removableHABackend)),
grpc.WithUnaryInterceptor(haMembershipUnaryClientInterceptor(c, removableHABackend)),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
Expand Down Expand Up @@ -374,6 +484,10 @@ func (c *Core) ForwardRequest(req *http.Request) (int, http.Header, []byte, erro
if err != nil {
metrics.IncrCounter([]string{"ha", "rpc", "client", "forward", "errors"}, 1)
c.logger.Error("error during forwarded RPC request", "error", err)

if errors.Is(err, StatusNotHAMember) {
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request: %w", err)
}
return 0, nil, nil, fmt.Errorf("error during forwarding RPC request")
}

Expand Down
Loading
Loading