Skip to content

Commit

Permalink
allow configuration of multiple compaction workers
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Sep 4, 2024
1 parent 6ac3da7 commit aad5e6f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
5 changes: 5 additions & 0 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
promclient "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"gorm.io/gorm"
Expand Down Expand Up @@ -641,6 +642,8 @@ func (bgs *BGS) EventsHandler(c echo.Context) error {
logger.Infow("new consumer", "cursor", since)

header := events.EventHeader{Op: events.EvtKindMessage}
cw := cbg.NewCborWriter(nil)

for {
select {
case evt, ok := <-evts:
Expand Down Expand Up @@ -686,6 +689,8 @@ func (bgs *BGS) EventsHandler(c echo.Context) error {
return fmt.Errorf("unrecognized event kind")
}

cw.SetWriter(wc)

if err := header.MarshalCBOR(wc); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
Expand Down
55 changes: 32 additions & 23 deletions bgs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,24 @@ type CompactorState struct {
// Compactor is a compactor daemon that compacts repos in the background
type Compactor struct {
q *uniQueue
state *CompactorState
state []*CompactorState
stateLk sync.RWMutex
exit chan struct{}
exited chan struct{}
requeueInterval time.Duration
requeueLimit int
requeueShardCount int
requeueFast bool

numWorkers int
}

type CompactorOptions struct {
RequeueInterval time.Duration
RequeueLimit int
RequeueShardCount int
RequeueFast bool
NumWorkers int
}

func DefaultCompactorOptions() *CompactorOptions {
Expand All @@ -131,6 +134,7 @@ func DefaultCompactorOptions() *CompactorOptions {
RequeueLimit: 0,
RequeueShardCount: 50,
RequeueFast: true,
NumWorkers: 2,
}
}

Expand All @@ -143,13 +147,14 @@ func NewCompactor(opts *CompactorOptions) *Compactor {
q: &uniQueue{
members: make(map[models.Uid]struct{}),
},
state: &CompactorState{},
state: make([]*CompactorState, opts.NumWorkers),
exit: make(chan struct{}),
exited: make(chan struct{}),
requeueInterval: opts.RequeueInterval,
requeueLimit: opts.RequeueLimit,
requeueFast: opts.RequeueFast,
requeueShardCount: opts.RequeueShardCount,
numWorkers: opts.NumWorkers,
}
}

Expand All @@ -158,25 +163,27 @@ type compactionStats struct {
Targets []carstore.CompactionTarget
}

func (c *Compactor) SetState(uid models.Uid, did, status string, stats *carstore.CompactionStats) {
func (c *Compactor) SetState(worker int, uid models.Uid, did, status string, stats *carstore.CompactionStats) {
c.stateLk.Lock()
defer c.stateLk.Unlock()

c.state.latestUID = uid
c.state.latestDID = did
c.state.status = status
c.state.stats = stats
cs := c.state[worker]

cs.latestUID = uid
cs.latestDID = did
cs.status = status
cs.stats = stats
}

func (c *Compactor) GetState() *CompactorState {
func (c *Compactor) GetState(workerID int) *CompactorState {
c.stateLk.RLock()
defer c.stateLk.RUnlock()

return &CompactorState{
latestUID: c.state.latestUID,
latestDID: c.state.latestDID,
status: c.state.status,
stats: c.state.stats,
latestUID: c.state[workerID].latestUID,
latestDID: c.state[workerID].latestDID,
status: c.state[workerID].status,
stats: c.state[workerID].stats,
}
}

Expand All @@ -185,7 +192,9 @@ var errNoReposToCompact = fmt.Errorf("no repos to compact")
// Start starts the compactor
func (c *Compactor) Start(bgs *BGS) {
log.Info("starting compactor")
go c.doWork(bgs)
for i := 0; i < c.numWorkers; i++ {
go c.doWork(bgs, i)
}
if c.requeueInterval > 0 {
go func() {
log.Infow("starting compactor requeue routine",
Expand Down Expand Up @@ -221,7 +230,7 @@ func (c *Compactor) Shutdown() {
log.Info("compactor stopped")
}

func (c *Compactor) doWork(bgs *BGS) {
func (c *Compactor) doWork(bgs *BGS, workerID int) {
for {
select {
case <-c.exit:
Expand All @@ -233,14 +242,14 @@ func (c *Compactor) doWork(bgs *BGS) {

ctx := context.Background()
start := time.Now()
state, err := c.compactNext(ctx, bgs)
state, err := c.compactNext(ctx, bgs, workerID)
if err != nil {
if err == errNoReposToCompact {
log.Debug("no repos to compact, waiting and retrying")
time.Sleep(time.Second * 5)
continue
}
state = c.GetState()
state = c.GetState(workerID)
log.Errorw("failed to compact repo",
"err", err,
"uid", state.latestUID,
Expand All @@ -263,7 +272,7 @@ func (c *Compactor) doWork(bgs *BGS) {
}
}

func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) {
func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, workerID int) (*CompactorState, error) {
ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext")
defer span.End()

Expand All @@ -272,24 +281,24 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState,
return nil, errNoReposToCompact
}

c.SetState(item.uid, "unknown", "getting_user", nil)
c.SetState(workerID, item.uid, "unknown", "getting_user", nil)

user, err := bgs.lookupUserByUID(ctx, item.uid)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, "unknown", "failed_getting_user", nil)
c.SetState(workerID, item.uid, "unknown", "failed_getting_user", nil)
return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err)
}

span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid)))

c.SetState(item.uid, user.Did, "compacting", nil)
c.SetState(workerID, item.uid, user.Did, "compacting", nil)

start := time.Now()
st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast)
if err != nil {
span.RecordError(err)
c.SetState(item.uid, user.Did, "failed_compacting", nil)
c.SetState(workerID, item.uid, user.Did, "failed_compacting", nil)
return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err)
}
compactionDuration.Observe(time.Since(start).Seconds())
Expand All @@ -302,9 +311,9 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState,
attribute.Int("refs", st.TotalRefs),
)

c.SetState(item.uid, user.Did, "done", st)
c.SetState(workerID, item.uid, user.Did, "done", st)

return c.GetState(), nil
return c.GetState(workerID), nil
}

func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {
Expand Down

0 comments on commit aad5e6f

Please sign in to comment.