From aad5e6f9280e562604ab6642410457a7253926b1 Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Wed, 4 Sep 2024 12:06:24 -0700 Subject: [PATCH] allow configuration of multiple compaction workers --- bgs/bgs.go | 5 +++++ bgs/compactor.go | 55 ++++++++++++++++++++++++++++-------------------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index d28481b45..fa4a86544 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -39,6 +39,7 @@ import ( promclient "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" dto "github.com/prometheus/client_model/go" + cbg "github.com/whyrusleeping/cbor-gen" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "gorm.io/gorm" @@ -641,6 +642,8 @@ func (bgs *BGS) EventsHandler(c echo.Context) error { logger.Infow("new consumer", "cursor", since) header := events.EventHeader{Op: events.EvtKindMessage} + cw := cbg.NewCborWriter(nil) + for { select { case evt, ok := <-evts: @@ -686,6 +689,8 @@ func (bgs *BGS) EventsHandler(c echo.Context) error { return fmt.Errorf("unrecognized event kind") } + cw.SetWriter(wc) + if err := header.MarshalCBOR(wc); err != nil { return fmt.Errorf("failed to write header: %w", err) } diff --git a/bgs/compactor.go b/bgs/compactor.go index 113973a53..e71787511 100644 --- a/bgs/compactor.go +++ b/bgs/compactor.go @@ -108,7 +108,7 @@ type CompactorState struct { // Compactor is a compactor daemon that compacts repos in the background type Compactor struct { q *uniQueue - state *CompactorState + state []*CompactorState stateLk sync.RWMutex exit chan struct{} exited chan struct{} @@ -116,6 +116,8 @@ type Compactor struct { requeueLimit int requeueShardCount int requeueFast bool + + numWorkers int } type CompactorOptions struct { @@ -123,6 +125,7 @@ type CompactorOptions struct { RequeueLimit int RequeueShardCount int RequeueFast bool + NumWorkers int } func DefaultCompactorOptions() *CompactorOptions { @@ -131,6 +134,7 @@ func DefaultCompactorOptions() *CompactorOptions { RequeueLimit: 0, RequeueShardCount: 50, RequeueFast: true, + NumWorkers: 2, } } @@ -143,13 +147,14 @@ func NewCompactor(opts *CompactorOptions) *Compactor { q: &uniQueue{ members: make(map[models.Uid]struct{}), }, - state: &CompactorState{}, + state: make([]*CompactorState, opts.NumWorkers), exit: make(chan struct{}), exited: make(chan struct{}), requeueInterval: opts.RequeueInterval, requeueLimit: opts.RequeueLimit, requeueFast: opts.RequeueFast, requeueShardCount: opts.RequeueShardCount, + numWorkers: opts.NumWorkers, } } @@ -158,25 +163,27 @@ type compactionStats struct { Targets []carstore.CompactionTarget } -func (c *Compactor) SetState(uid models.Uid, did, status string, stats *carstore.CompactionStats) { +func (c *Compactor) SetState(worker int, uid models.Uid, did, status string, stats *carstore.CompactionStats) { c.stateLk.Lock() defer c.stateLk.Unlock() - c.state.latestUID = uid - c.state.latestDID = did - c.state.status = status - c.state.stats = stats + cs := c.state[worker] + + cs.latestUID = uid + cs.latestDID = did + cs.status = status + cs.stats = stats } -func (c *Compactor) GetState() *CompactorState { +func (c *Compactor) GetState(workerID int) *CompactorState { c.stateLk.RLock() defer c.stateLk.RUnlock() return &CompactorState{ - latestUID: c.state.latestUID, - latestDID: c.state.latestDID, - status: c.state.status, - stats: c.state.stats, + latestUID: c.state[workerID].latestUID, + latestDID: c.state[workerID].latestDID, + status: c.state[workerID].status, + stats: c.state[workerID].stats, } } @@ -185,7 +192,9 @@ var errNoReposToCompact = fmt.Errorf("no repos to compact") // Start starts the compactor func (c *Compactor) Start(bgs *BGS) { log.Info("starting compactor") - go c.doWork(bgs) + for i := 0; i < c.numWorkers; i++ { + go c.doWork(bgs, i) + } if c.requeueInterval > 0 { go func() { log.Infow("starting compactor requeue routine", @@ -221,7 +230,7 @@ func (c *Compactor) Shutdown() { log.Info("compactor stopped") } -func (c *Compactor) doWork(bgs *BGS) { +func (c *Compactor) doWork(bgs *BGS, workerID int) { for { select { case <-c.exit: @@ -233,14 +242,14 @@ func (c *Compactor) doWork(bgs *BGS) { ctx := context.Background() start := time.Now() - state, err := c.compactNext(ctx, bgs) + state, err := c.compactNext(ctx, bgs, workerID) if err != nil { if err == errNoReposToCompact { log.Debug("no repos to compact, waiting and retrying") time.Sleep(time.Second * 5) continue } - state = c.GetState() + state = c.GetState(workerID) log.Errorw("failed to compact repo", "err", err, "uid", state.latestUID, @@ -263,7 +272,7 @@ func (c *Compactor) doWork(bgs *BGS) { } } -func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, error) { +func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, workerID int) (*CompactorState, error) { ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext") defer span.End() @@ -272,24 +281,24 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, return nil, errNoReposToCompact } - c.SetState(item.uid, "unknown", "getting_user", nil) + c.SetState(workerID, item.uid, "unknown", "getting_user", nil) user, err := bgs.lookupUserByUID(ctx, item.uid) if err != nil { span.RecordError(err) - c.SetState(item.uid, "unknown", "failed_getting_user", nil) + c.SetState(workerID, item.uid, "unknown", "failed_getting_user", nil) return nil, fmt.Errorf("failed to get user %d: %w", item.uid, err) } span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid))) - c.SetState(item.uid, user.Did, "compacting", nil) + c.SetState(workerID, item.uid, user.Did, "compacting", nil) start := time.Now() st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast) if err != nil { span.RecordError(err) - c.SetState(item.uid, user.Did, "failed_compacting", nil) + c.SetState(workerID, item.uid, user.Did, "failed_compacting", nil) return nil, fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) } compactionDuration.Observe(time.Since(start).Seconds()) @@ -302,9 +311,9 @@ func (c *Compactor) compactNext(ctx context.Context, bgs *BGS) (*CompactorState, attribute.Int("refs", st.TotalRefs), ) - c.SetState(item.uid, user.Did, "done", st) + c.SetState(workerID, item.uid, user.Did, "done", st) - return c.GetState(), nil + return c.GetState(workerID), nil } func (c *Compactor) EnqueueRepo(ctx context.Context, user User, fast bool) {