Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serialize outbound once #738

Merged
merged 3 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 5 additions & 36 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ func (bgs *BGS) EventsHandler(c echo.Context) error {

logger.Infow("new consumer", "cursor", since)

header := events.EventHeader{Op: events.EvtKindMessage}
for {
select {
case evt, ok := <-evts:
Expand All @@ -655,42 +654,12 @@ func (bgs *BGS) EventsHandler(c echo.Context) error {
return err
}

var obj lexutil.CBOR

switch {
case evt.Error != nil:
header.Op = events.EvtKindErrorFrame
obj = evt.Error
case evt.RepoCommit != nil:
header.MsgType = "#commit"
obj = evt.RepoCommit
case evt.RepoHandle != nil:
header.MsgType = "#handle"
obj = evt.RepoHandle
case evt.RepoIdentity != nil:
header.MsgType = "#identity"
obj = evt.RepoIdentity
case evt.RepoAccount != nil:
header.MsgType = "#account"
obj = evt.RepoAccount
case evt.RepoInfo != nil:
header.MsgType = "#info"
obj = evt.RepoInfo
case evt.RepoMigrate != nil:
header.MsgType = "#migrate"
obj = evt.RepoMigrate
case evt.RepoTombstone != nil:
header.MsgType = "#tombstone"
obj = evt.RepoTombstone
default:
return fmt.Errorf("unrecognized event kind")
if evt.Preserialized != nil {
_, err = wc.Write(evt.Preserialized)
} else {
err = evt.Serialize(wc)
}

if err := header.MarshalCBOR(wc); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

if err := obj.MarshalCBOR(wc); err != nil {
if err != nil {
return fmt.Errorf("failed to write event: %w", err)
}

Expand Down
3 changes: 3 additions & 0 deletions events/diskpersist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ func runEventManagerTest(t *testing.T, cs *carstore.CarStore, db *gorm.DB, p eve
outEvtCount := 0
p.Playback(ctx, 0, func(evt *events.XRPCStreamEvent) error {
// Check that the contents of the output events match the input events
// Clear cache, don't care if one has it and not the other
inEvts[outEvtCount].Preserialized = nil
evt.Preserialized = nil
if !reflect.DeepEqual(inEvts[outEvtCount], evt) {
t.Logf("%v", inEvts[outEvtCount].RepoCommit)
t.Logf("%v", evt.RepoCommit)
Expand Down
66 changes: 66 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package events

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"sync"
"time"

comatproto "github.com/bluesky-social/indigo/api/atproto"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/models"
"github.com/prometheus/client_golang/prometheus"

logging "github.com/ipfs/go-log"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opentelemetry.io/otel"
)

Expand Down Expand Up @@ -61,6 +65,13 @@ func (em *EventManager) Shutdown(ctx context.Context) error {
}

func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
// the main thing we do is send it out, so MarshalCBOR once
if err := evt.Preserialize(); err != nil {
log.Errorf("broadcast serialize failed, %s", err)
// serialize isn't going to go better later, this event is cursed
return
}

em.subsLk.Lock()
defer em.subsLk.Unlock()

Expand Down Expand Up @@ -165,6 +176,61 @@ type XRPCStreamEvent struct {
PrivUid models.Uid `json:"-" cborgen:"-"`
PrivPdsId uint `json:"-" cborgen:"-"`
PrivRelevantPds []uint `json:"-" cborgen:"-"`
Preserialized []byte `json:"-" cborgen:"-"`
}

func (evt *XRPCStreamEvent) Serialize(wc io.Writer) error {
header := EventHeader{Op: EvtKindMessage}
var obj lexutil.CBOR

switch {
case evt.Error != nil:
header.Op = EvtKindErrorFrame
obj = evt.Error
case evt.RepoCommit != nil:
header.MsgType = "#commit"
obj = evt.RepoCommit
case evt.RepoHandle != nil:
header.MsgType = "#handle"
obj = evt.RepoHandle
case evt.RepoIdentity != nil:
header.MsgType = "#identity"
obj = evt.RepoIdentity
case evt.RepoAccount != nil:
header.MsgType = "#account"
obj = evt.RepoAccount
case evt.RepoInfo != nil:
header.MsgType = "#info"
obj = evt.RepoInfo
case evt.RepoMigrate != nil:
header.MsgType = "#migrate"
obj = evt.RepoMigrate
case evt.RepoTombstone != nil:
header.MsgType = "#tombstone"
obj = evt.RepoTombstone
default:
return fmt.Errorf("unrecognized event kind")
}

cborWriter := cbg.NewCborWriter(wc)
if err := header.MarshalCBOR(cborWriter); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
return obj.MarshalCBOR(cborWriter)
}

// serialize content into Preserialized cache
func (evt *XRPCStreamEvent) Preserialize() error {
if evt.Preserialized != nil {
return nil
}
var buf bytes.Buffer
err := evt.Serialize(&buf)
if err != nil {
return err
}
evt.Preserialized = buf.Bytes()
return nil
}

type ErrorFrame struct {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/PuerkitoBio/purell v1.2.1
github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b
github.com/adrg/xdg v0.5.0
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de
github.com/brianvoe/gofakeit/v6 v6.25.0
github.com/carlmjohnson/versioninfo v0.22.5
Expand Down Expand Up @@ -52,7 +53,7 @@ require (
github.com/samber/slog-echo v1.8.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.25.7
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240731173018-74d74643234c
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c
github.com/whyrusleeping/go-did v0.0.0-20230824162731-404d1707d5d6
gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1
Expand All @@ -76,7 +77,6 @@ require (
)

require (
github.com/adrg/xdg v0.5.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-redis/redis v6.15.9+incompatible // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,8 @@ github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSD
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0=
github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ=
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240731173018-74d74643234c h1:Jmc9fHbd0LKFmS5CkLgczNUyW36UbiyvbHCG9xCTyiw=
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240731173018-74d74643234c/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so=
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c h1:UsxJNcLPfyLyVaA4iusIrsLAqJn/xh36Qgb8emqtXzk=
github.com/whyrusleeping/cbor-gen v0.1.3-0.20240904181319-8dc02b38228c/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/whyrusleeping/go-did v0.0.0-20230824162731-404d1707d5d6 h1:yJ9/LwIGIk/c0CdoavpC9RNSGSruIspSZtxG3Nnldic=
Expand Down Expand Up @@ -873,8 +873,6 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
Loading