Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-threaded compactor with randomness #744

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 89 additions & 47 deletions bgs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bgs
import (
"context"
"fmt"
"math/rand/v2"
"sync"
"time"

Expand Down Expand Up @@ -82,20 +83,47 @@ func (q *uniQueue) Remove(uid models.Uid) {
}

// Pop pops the first item off the front of the queue
func (q *uniQueue) Pop() (*queueItem, bool) {
func (q *uniQueue) Pop() (queueItem, bool) {
q.lk.Lock()
defer q.lk.Unlock()

if len(q.q) == 0 {
return nil, false
return queueItem{}, false
}

item := q.q[0]
q.q = q.q[1:]
delete(q.members, item.uid)

compactionQueueDepth.Dec()
return &item, true
return item, true
}

// PopRandom pops a random item off the of the queue
// Note: this disrupts the sorted order of the queue and in-order is no longer quite in-order. The randomly popped element is replaced with the last element.
func (q *uniQueue) PopRandom() (queueItem, bool) {
q.lk.Lock()
defer q.lk.Unlock()

if len(q.q) == 0 {
return queueItem{}, false
}

var item queueItem
if len(q.q) == 1 {
item = q.q[0]
q.q = nil
} else {
pos := rand.IntN(len(q.q))
item = q.q[pos]
last := len(q.q) - 1
q.q[pos] = q.q[last]
q.q = q.q[:last]
}
delete(q.members, item.uid)

compactionQueueDepth.Dec()
return item, true
}

type CompactorState struct {
Expand All @@ -105,24 +133,33 @@ type CompactorState struct {
stats *carstore.CompactionStats
}

func (cstate *CompactorState) set(uid models.Uid, did, status string, stats *carstore.CompactionStats) {
cstate.latestUID = uid
cstate.latestDID = did
cstate.status = status
cstate.stats = stats
}

// Compactor is a compactor daemon that compacts repos in the background
type Compactor struct {
q *uniQueue
state *CompactorState
stateLk sync.RWMutex
exit chan struct{}
exited chan struct{}
requeueInterval time.Duration
requeueLimit int
requeueShardCount int
requeueFast bool

numWorkers int
wg sync.WaitGroup
}

type CompactorOptions struct {
RequeueInterval time.Duration
RequeueLimit int
RequeueShardCount int
RequeueFast bool
NumWorkers int
}

func DefaultCompactorOptions() *CompactorOptions {
Expand All @@ -131,6 +168,7 @@ func DefaultCompactorOptions() *CompactorOptions {
RequeueLimit: 0,
RequeueShardCount: 50,
RequeueFast: true,
NumWorkers: 2,
}
}

Expand All @@ -143,13 +181,12 @@ func NewCompactor(opts *CompactorOptions) *Compactor {
q: &uniQueue{
members: make(map[models.Uid]struct{}),
},
state: &CompactorState{},
exit: make(chan struct{}),
exited: make(chan struct{}),
requeueInterval: opts.RequeueInterval,
requeueLimit: opts.RequeueLimit,
requeueFast: opts.RequeueFast,
requeueShardCount: opts.RequeueShardCount,
numWorkers: opts.NumWorkers,
}
}

Expand All @@ -158,34 +195,19 @@ type compactionStats struct {
Targets []carstore.CompactionTarget
}

func (c *Compactor) SetState(uid models.Uid, did, status string, stats *carstore.CompactionStats) {
c.stateLk.Lock()
defer c.stateLk.Unlock()

c.state.latestUID = uid
c.state.latestDID = did
c.state.status = status
c.state.stats = stats
}

func (c *Compactor) GetState() *CompactorState {
c.stateLk.RLock()
defer c.stateLk.RUnlock()

return &CompactorState{
latestUID: c.state.latestUID,
latestDID: c.state.latestDID,
status: c.state.status,
stats: c.state.stats,
}
}

var errNoReposToCompact = fmt.Errorf("no repos to compact")

// Start starts the compactor
func (c *Compactor) Start(bgs *BGS) {
log.Info("starting compactor")
go c.doWork(bgs)
c.wg.Add(c.numWorkers)
for i := range c.numWorkers {
strategy := NextInOrder
if i%2 != 0 {
strategy = NextRandom
}
go c.doWork(bgs, strategy)
}
if c.requeueInterval > 0 {
go func() {
log.Infow("starting compactor requeue routine",
Expand Down Expand Up @@ -217,30 +239,29 @@ func (c *Compactor) Start(bgs *BGS) {
func (c *Compactor) Shutdown() {
log.Info("stopping compactor")
close(c.exit)
<-c.exited
c.wg.Wait()
log.Info("compactor stopped")
}

func (c *Compactor) doWork(bgs *BGS) {
func (c *Compactor) doWork(bgs *BGS, strategy NextStrategy) {
defer c.wg.Done()
for {
select {
case <-c.exit:
log.Info("compactor worker exiting, no more active compactions running")
close(c.exited)
return
default:
}

ctx := context.Background()
start := time.Now()
state, err := c.compactNext(ctx, bgs)
state, err := c.compactNext(ctx, bgs, strategy)
if err != nil {
if err == errNoReposToCompact {
log.Debug("no repos to compact, waiting and retrying")
time.Sleep(time.Second * 5)
continue
}
state = c.GetState()
log.Errorw("failed to compact repo",
"err", err,
"uid", state.latestUID,
Expand All @@ -263,34 +284,54 @@ func (c *Compactor) doWork(bgs *BGS) {
}
}

func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) {
type NextStrategy int

const (
NextInOrder NextStrategy = iota
NextRandom
)

func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStrategy) (CompactorState, error) {
ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext")
defer span.End()

item, ok := c.q.Pop()
if !ok || item == nil {
return nil, errNoReposToCompact
var item queueItem
var ok bool
switch strategy {
case NextRandom:
item, ok = c.q.PopRandom()
default:
item, ok = c.q.Pop()
}
if !ok {
return CompactorState{}, errNoReposToCompact
}

c.SetState(item.uid, "unknown", "getting_user", nil)
state := CompactorState{
latestUID: item.uid,
latestDID: "unknown",
status: "getting_user",
}

user, err := bgs.lookupUserByUID(ctx, item.uid)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, "unknown", "failed_getting_user", nil)
return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err)
state.status = "failed_getting_user"
err := fmt.Errorf("failed to get user %d: %w", item.uid, err)
return state, err
}

span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid)))

c.SetState(item.uid, user.Did, "compacting", nil)
state.latestDID = user.Did

start := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, user.Did, "failed_compacting", nil)
return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err)
state.status = "failed_compacting"
err := fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err)
return state, err
}
compactionDuration.Observe(time.Since(start).Seconds())

Expand All @@ -302,9 +343,10 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState,
attribute.Int("refs", st.TotalRefs),
)

c.SetState(item.uid, user.Did, "done", st)
state.status = "done"
state.stats = st

return c.GetState(), nil
return state, nil
}

func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
Expand Down
Loading