Skip to content

Commit

Permalink
Merge branch 'main' of github.com:bluesky-social/indigo into ozone-ac…
Browse files Browse the repository at this point in the history
…count-events
  • Loading branch information
foysalit committed Sep 20, 2024
2 parents d3c8e89 + d8556af commit d41c7cd
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 17 deletions.
2 changes: 1 addition & 1 deletion automod/rules/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewAccountRule(c *automod.AccountContext) error {

// new PDS host
if existingAccounts == 0 {
c.Logger.Info("new PDS instance", "host", pdsHost)
c.Logger.Info("new PDS instance", "pdsHost", pdsHost)
c.Increment("host", "new")
c.AddAccountFlag("host-first-account")
c.Notify("slack")
Expand Down
10 changes: 5 additions & 5 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
case env.RepoCommit != nil:
repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
evt := env.RepoCommit
log.Debugw("bgs got repo append event", "seq", evt.Seq, "host", host.Host, "repo", evt.Repo)
log.Debugw("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
u, err := bgs.lookupUserByDid(ctx, evt.Repo)
if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
Expand All @@ -822,17 +822,17 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event

if u.TakenDown || u.UpstreamStatus == events.AccountStatusTakendown {
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.TakenDown))
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host)
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusSuspended {
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host)
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

if u.UpstreamStatus == events.AccountStatusDeactivated {
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "host", host.Host)
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
return nil
}

Expand Down Expand Up @@ -891,7 +891,7 @@ func (bgs *BGS) handleFedEvent(ctx context.Context, host *models.PDS, env *event
}

if err := bgs.repoman.HandleExternalUserEvent(ctx, host.ID, u.ID, u.Did, evt.Since, evt.Rev, evt.Blocks, evt.Ops); err != nil {
log.Warnw("failed handling event", "err", err, "host", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
log.Warnw("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())

if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
ai, lerr := bgs.Index.LookupUser(ctx, u.ID)
Expand Down
14 changes: 7 additions & 7 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ func (s *Slurper) subscribeWithRedialer(ctx context.Context, host *models.PDS, s
url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host.Host, cursor)
con, res, err := d.DialContext(ctx, url, nil)
if err != nil {
log.Warnw("dialing failed", "host", host.Host, "err", err, "backoff", backoff)
log.Warnw("dialing failed", "pdsHost", host.Host, "err", err, "backoff", backoff)
time.Sleep(sleepForBackoff(backoff))
backoff++

if backoff > 15 {
log.Warnw("pds does not appear to be online, disabling for now", "host", host.Host)
log.Warnw("pds does not appear to be online, disabling for now", "pdsHost", host.Host)
if err := s.db.Model(&models.PDS{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil {
log.Errorf("failed to unregister failing pds: %w", err)
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
log.Debugw("got remote repo event", "host", host.Host, "repo", evt.Repo, "seq", evt.Seq)
log.Debugw("got remote repo event", "pdsHost", host.Host, "repo", evt.Repo, "seq", evt.Seq)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoCommit: evt,
}); err != nil {
Expand All @@ -551,7 +551,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
log.Infow("got remote handle update event", "host", host.Host, "did", evt.Did, "handle", evt.Handle)
log.Infow("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoHandle: evt,
}); err != nil {
Expand All @@ -566,7 +566,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
log.Infow("got remote repo migrate event", "host", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
log.Infow("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoMigrate: evt,
}); err != nil {
Expand All @@ -581,7 +581,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
log.Infow("got remote repo tombstone event", "host", host.Host, "did", evt.Did)
log.Infow("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
RepoTombstone: evt,
}); err != nil {
Expand All @@ -596,7 +596,7 @@ func (s *Slurper) handleConnection(ctx context.Context, host *models.PDS, con *w
return nil
},
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
log.Infow("info event", "name", info.Name, "message", info.Message, "host", host.Host)
log.Infow("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
return nil
},
RepoIdentity: func(ident *comatproto.SyncSubscribeRepos_Identity) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/athome/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (srv *Server) reqHandle(c echo.Context) syntax.Handle {
host = strings.SplitN(host, ":", 2)[0]
handle, err := syntax.ParseHandle(host)
if err != nil {
slog.Warn("host is not a valid handle, fallback to default", "host", host)
slog.Warn("host is not a valid handle, fallback to default", "hostname", host)
handle = srv.defaultHandle
}
return handle
Expand Down
6 changes: 6 additions & 0 deletions cmd/beemo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func run(args []string) error {
Required: true,
EnvVars: []string{"BEEMO_MENTION_DIDS"},
},
&cli.IntFlag{
Name: "minimum-words",
Usage: "minimum length of post text (word count; zero for no minimum)",
Value: 0,
EnvVars: []string{"BEEMO_MINIMUM_WORDS"},
},
},
},
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/beemo/notify_mentions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ type MentionChecker struct {
mentionDIDs []syntax.DID
logger *slog.Logger
directory identity.Directory
minimumWords int
}

func (mc *MentionChecker) ProcessPost(ctx context.Context, did syntax.DID, rkey syntax.RecordKey, post appbsky.FeedPost) error {
mc.logger.Debug("processing post record", "did", did, "rkey", rkey)

if mc.minimumWords > 0 {
words := strings.Split(post.Text, " ")
if len(words) < mc.minimumWords {
return nil
}
}

for _, facet := range post.Facets {
for _, feature := range facet.Features {
mention := feature.RichtextFacet_Mention
Expand Down Expand Up @@ -57,6 +65,7 @@ func notifyMentions(cctx *cli.Context) error {
ctx := context.Background()
logger := configLogger(cctx, os.Stdout)
relayHost := cctx.String("relay-host")
minimumWords := cctx.Int("minimum-words")

mentionDIDs := []syntax.DID{}
for _, raw := range strings.Split(cctx.String("mention-dids"), ",") {
Expand All @@ -72,6 +81,7 @@ func notifyMentions(cctx *cli.Context) error {
mentionDIDs: mentionDIDs,
logger: logger,
directory: identity.DefaultDirectory(),
minimumWords: minimumWords,
}

logger.Info("beemo mention checker starting up...", "relayHost", relayHost, "mentionDIDs", mentionDIDs)
Expand Down
2 changes: 1 addition & 1 deletion cmd/bigsky/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Wipe all local data:
# careful! double-check this destructive command
rm -rf ./data/bigsky/*

There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/admin/>. Paste in the admin key, eg `localdev`.
There is a basic web dashboard, though it will not be included unless built and copied to a local directory `./public/`. Run `make build-relay-ui`, and then when running the daemon the dashboard will be available at: <http://localhost:2470/dash/>. Paste in the admin key, eg `localdev`.

The local admin routes can also be accessed by passing the admin key as a bearer token, for example:

Expand Down
4 changes: 2 additions & 2 deletions cmd/hepa/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err)
}
ozoneClient.Auth.Did = od.String()
logger.Info("configured ozone admin client", "did", od.String(), "host", config.OzoneHost)
logger.Info("configured ozone admin client", "did", od.String(), "ozoneHost", config.OzoneHost)
} else {
logger.Info("did not configure ozone client")
}
Expand All @@ -113,7 +113,7 @@ func NewServer(dir identity.Directory, config Config) (*Server, error) {
adminClient.Headers = make(map[string]string)
adminClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
}
logger.Info("configured PDS admin client", "host", config.PDSHost)
logger.Info("configured PDS admin client", "pdsHost", config.PDSHost)
} else {
logger.Info("did not configure PDS admin client")
}
Expand Down

0 comments on commit d41c7cd

Please sign in to comment.