Skip to content

Commit

Permalink
Don't block appends when segments are highly contented
Browse files Browse the repository at this point in the history
Under higher write load, the segment write lock can cause
long delays for a goroutine to acquire it for a write and append
operation.  This causes ingestor to report "slow request" at times
because multiple locks are getting blocked for while and the request
goes slower.

For collector shipping segments, it generally has many segements
on disk that it needs to send.  So instead of blocking when a segment
is locked, we now just return a locked error and let collector move
to the next segment.  It will retry the prior segment again in a few
seconds.  This allows for better and reduces the slow requests warnings
quite a bit.
  • Loading branch information
jwilder committed Nov 13, 2024
1 parent fcd703b commit 93cb433
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 18 deletions.
8 changes: 8 additions & 0 deletions cmd/ingestor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,14 @@ func realMain(ctx *cli.Context) error {
srv.ErrorLog = newLogger()

go func() {
// Under high connection load and when the server is doing a lot of IO, this
// can cause the server to be unresponsive. This pins the accept goroutine
// to a single CPU to reduce context switching and improve performance.
runtime.LockOSThread()
if err := pinToCPU(0); err != nil {
logger.Warnf("Failed to pin to CPU: %s", err)
}

if err := srv.ServeTLS(l, cacert, key); err != nil {
logger.Errorf(err.Error())
}
Expand Down
5 changes: 5 additions & 0 deletions cmd/ingestor/thread_affinity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !linux

package main

func pinToCPU(cpu int) error {}
11 changes: 11 additions & 0 deletions cmd/ingestor/thread_affinity_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package main

import (
"golang.org/x/sys/unix"
)

func pinToCPU(cpu int) error {
var newMask unix.CPUSet
newMask.Set(cpu)
return unix.SchedSetaffinity(cpu, &newMask)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/valyala/fastjson v1.6.4
golang.org/x/net v0.30.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80
google.golang.org/protobuf v1.35.1
k8s.io/api v0.30.3
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.5.0 // indirect
Expand Down
8 changes: 5 additions & 3 deletions ingestor/cluster/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,12 @@ func prioritizeOldest(a []string) []string {
var b []string

// Find the index that is roughly 10% from the end of the list
idx := len(a) - int(math.Round(float64(len(a))*0.1))
// Move last 10% of batches to the front of the list
idx := len(a) - int(math.Round(float64(len(a))*0.2))
// Move last 20% of batches to the front of the list
b = append(b, a[idx:]...)
// Move first 90% of batches to the end of the list
// Reverse the list so the oldest batches are first
slices.Reverse(b)
// Move first 80% of batches to the end of the list
b = append(b, a[:idx]...)
return b
}
5 changes: 5 additions & 0 deletions ingestor/cluster/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
var (
ErrPeerOverloaded = fmt.Errorf("peer overloaded")
ErrSegmentExists = fmt.Errorf("segment already exists")
ErrSegmentLocked = fmt.Errorf("segment is locked")
)

type ErrBadRequest struct {
Expand Down Expand Up @@ -164,6 +165,10 @@ func (c *Client) Write(ctx context.Context, endpoint string, filename string, bo
return ErrSegmentExists
}

if resp.StatusCode == http.StatusLocked {
return ErrSegmentLocked
}

if resp.StatusCode == http.StatusBadRequest {
return &ErrBadRequest{Msg: fmt.Sprintf("write failed: %s", strings.TrimSpace(string(body)))}
}
Expand Down
3 changes: 3 additions & 0 deletions ingestor/cluster/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ func (r *replicator) transfer(ctx context.Context) {
logger.Errorf("Failed to remove segment: %s", err)
}
return nil
} else if errors.Is(err, ErrSegmentLocked) {
// Segment is locked, retry later.
return nil
} else if errors.Is(err, ErrPeerOverloaded) {
// Ingestor is overloaded, mark the peer as unhealthy and retry later.
r.Health.SetPeerUnhealthy(owner)
Expand Down
3 changes: 3 additions & 0 deletions ingestor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func (s *Service) HandleTransfer(w http.ResponseWriter, r *http.Request) {
m.WithLabelValues(strconv.Itoa(http.StatusTooManyRequests)).Inc()
http.Error(w, "Overloaded. Retry later", http.StatusTooManyRequests)
return
} else if errors.Is(err, wal.ErrSegmentLocked) {
http.Error(w, err.Error(), http.StatusLocked)
return
} else if err != nil {
logger.Errorf("Failed to import %s: %s", filename, err.Error())
m.WithLabelValues(strconv.Itoa(http.StatusInternalServerError)).Inc()
Expand Down
54 changes: 54 additions & 0 deletions pkg/sync/rwmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package sync

import (
"sync"
"sync/atomic"
)

// CountingRWMutex is a RWMutex that keeps track of the number of goroutines waiting for the lock.
type CountingRWMutex struct {
mu sync.RWMutex
waiting int64
limit int64
}

func NewCountingRWMutex(limit int) *CountingRWMutex {
return &CountingRWMutex{limit: int64(limit)}
}

// RLock locks rw for reading.
func (rw *CountingRWMutex) RLock() {
atomic.AddInt64(&rw.waiting, 1)
rw.mu.RLock()
}

// RUnlock undoes a single RLock call; it does not affect other simultaneous readers.
func (rw *CountingRWMutex) RUnlock() {
atomic.AddInt64(&rw.waiting, -1)
rw.mu.RUnlock()
}

// Lock locks rw for writing.
func (rw *CountingRWMutex) Lock() {
atomic.AddInt64(&rw.waiting, 1)
rw.mu.Lock()
}

// Unlock undoes a single Lock call; it does not affect other simultaneous readers.
func (rw *CountingRWMutex) Unlock() {
atomic.AddInt64(&rw.waiting, -1)
rw.mu.Unlock()
}

// TryLock tries to lock rw for writing if the pending waitinger is less than the limit.
func (rw *CountingRWMutex) TryLock() bool {
if atomic.LoadInt64(&rw.waiting) > rw.limit {
return false
}
return rw.mu.TryLock()
}

// Waiters returns the number of goroutines waiting for the lock.
func (rw *CountingRWMutex) Waiters() int64 {
return atomic.LoadInt64(&rw.waiting)
}
19 changes: 9 additions & 10 deletions pkg/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

flakeutil "github.com/Azure/adx-mon/pkg/flake"
"github.com/Azure/adx-mon/pkg/logger"
adxsync "github.com/Azure/adx-mon/pkg/sync"
"github.com/klauspost/compress/s2"
gbp "github.com/libp2p/go-buffer-pool"
)
Expand Down Expand Up @@ -83,7 +84,7 @@ type segment struct {
filePos uint64

wg sync.WaitGroup
mu sync.RWMutex
mu *adxsync.CountingRWMutex

// w is the underlying segment file on disk
w *os.File
Expand Down Expand Up @@ -147,6 +148,7 @@ func NewSegment(dir, prefix string, opts ...SegmentOption) (Segment, error) {
closing: make(chan struct{}),
flushCh: make(chan chan error),
flushInterval: 100 * time.Millisecond,
mu: adxsync.NewCountingRWMutex(5),
}

for _, opt := range opts {
Expand Down Expand Up @@ -225,6 +227,7 @@ func Open(path string) (Segment, error) {
closing: make(chan struct{}),
flushCh: make(chan chan error),
flushInterval: 100 * time.Millisecond,
mu: adxsync.NewCountingRWMutex(5),
}

if err := f.Repair(); err != nil {
Expand Down Expand Up @@ -313,13 +316,6 @@ func (s *segment) Iterator() (Iterator, error) {
// Append appends a raw blocks to the segment. This is used for appending blocks that have already been compressed.
// Misuse of this func could lead to data corruption. In general, you probably want to use Write instead.
func (s *segment) Append(ctx context.Context, buf []byte) error {
s.mu.RLock()
if s.closed {
s.mu.RUnlock()
return ErrSegmentClosed
}
s.mu.RUnlock()

iter, err := NewSegmentIterator(io.NopCloser(bytes.NewReader(buf)))
if err != nil {
return err
Expand All @@ -332,8 +328,11 @@ func (s *segment) Append(ctx context.Context, buf []byte) error {
return nil
}

s.mu.Lock()
defer s.mu.Unlock()
if s.mu.TryLock() {
defer s.mu.Unlock()
} else {
return ErrSegmentLocked
}

if s.closed {
return ErrSegmentClosed
Expand Down
13 changes: 10 additions & 3 deletions pkg/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,18 @@ func TestSegment_Closed(t *testing.T) {
for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
dir := t.TempDir()
s, err := wal.NewSegment(dir, "Foo")
s, err := wal.NewSegment(dir, "Foo1")
require.NoError(t, s.Write(context.Background(), []byte("test")))
p := s.Path()
require.NoError(t, s.Close())
buf, err := os.ReadFile(p)
require.NoError(t, err)

s, err = wal.NewSegment(dir, "Foo")
require.NoError(t, err)
require.NoError(t, s.Close())
require.Equal(t, s.Write(context.Background(), []byte("test")), wal.ErrSegmentClosed)
require.Equal(t, s.Append(context.Background(), []byte("test")), wal.ErrSegmentClosed)
require.Equal(t, wal.ErrSegmentClosed, s.Write(context.Background(), []byte("test")))
require.Equal(t, wal.ErrSegmentClosed, s.Append(context.Background(), buf))

_, err = s.Iterator()
require.Equal(t, err, wal.ErrSegmentClosed)
Expand Down
3 changes: 2 additions & 1 deletion pkg/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ var (
ErrMaxDiskUsageExceeded = fmt.Errorf("max disk usage exceeded")
ErrMaxSegmentsExceeded = fmt.Errorf("max segments exceeded")
ErrSegmentClosed = fmt.Errorf("segment closed")
ErrSegmentLocked = fmt.Errorf("segment locked")

idgen *flake.Flake

bwPool = pool.NewGeneric(10000, func(sz int) interface{} {
return bufio.NewWriterSize(nil, 4*1024)
return bufio.NewWriterSize(nil, 8*1024)
})
)

Expand Down

0 comments on commit 93cb433

Please sign in to comment.