Skip to content

Commit

Permalink
add support for self-service opt out
Browse files Browse the repository at this point in the history
in particular:

follow:
- (no record) -> PENDING
- OPTED_OUT -> APPROVED

unfollow:
- PENDING -> NONE
- APPROVED -> OPTED_OUT

this also tries to normalize NONE status with record not present
  • Loading branch information
itstolf committed Oct 21, 2023
1 parent afeda80 commit af7fd77
Show file tree
Hide file tree
Showing 27 changed files with 219 additions and 46 deletions.
47 changes: 46 additions & 1 deletion ingester/candidate_actor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package ingester
import (
"context"
"fmt"
"github.com/jonboulle/clockwork"
"sync"
"time"

"github.com/jonboulle/clockwork"

v1 "github.com/strideynet/bsky-furry-feed/proto/bff/v1"
"github.com/strideynet/bsky-furry-feed/store"
"go.uber.org/zap"
Expand Down Expand Up @@ -78,6 +79,9 @@ func (crc *ActorCache) Sync(ctx context.Context) error {

mapped := map[string]*v1.Actor{}
for _, cr := range data {
if cr.Status == v1.ActorStatus_ACTOR_STATUS_NONE {
continue
}
mapped[cr.Did] = cr
}

Expand Down Expand Up @@ -128,3 +132,44 @@ func (crc *ActorCache) CreatePendingCandidateActor(ctx context.Context, did stri
crc.cached[ca.Did] = ca
return nil
}

func (crc *ActorCache) OptIn(ctx context.Context, did string) (err error) {
ctx, span := tracer.Start(ctx, "actor_cache.opt_in")
defer func() {
endSpan(span, err)
}()

status, err := crc.store.OptInActor(ctx, did)
if err != nil {
return fmt.Errorf("opting in actor: %w", err)
}

crc.mu.Lock()
defer crc.mu.Unlock()
ca := crc.cached[did]
if ca != nil {
ca.Status = status
}

return nil
}

func (crc *ActorCache) OptOutOrForget(ctx context.Context, did string) (err error) {
ctx, span := tracer.Start(ctx, "actor_cache.opt_out")
defer func() {
endSpan(span, err)
}()

status, err := crc.store.OptOutOrForgetActor(ctx, did)
if err != nil {
return fmt.Errorf("opting out actor: %w", err)
}

crc.mu.Lock()
defer crc.mu.Unlock()
if status == v1.ActorStatus_ACTOR_STATUS_NONE {
delete(crc.cached, did)
}

return nil
}
19 changes: 17 additions & 2 deletions ingester/handle_graph_follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,38 @@ func (fi *FirehoseIngester) handleGraphFollowCreate(
return fmt.Errorf("creating follow: %w", err)
}

if fi.IsFurryFeedDID(data.Subject) {
if err := fi.actorCache.OptIn(ctx, repoDID); err != nil {
return fmt.Errorf("opting in: %w", err)
}
}

return nil
}

func (fi *FirehoseIngester) handleGraphFollowDelete(
ctx context.Context,
repoDID string,
recordUri string,
) (err error) {
ctx, span := tracer.Start(ctx, "firehose_ingester.handle_feed_follow_delete")
defer func() {
endSpan(span, err)
}()

if err := fi.store.DeleteFollow(
subjectDID, err := fi.store.DeleteFollow(
ctx, store.DeleteFollowOpts{URI: recordUri},
); err != nil {
)

if err != nil {
return fmt.Errorf("deleting follow: %w", err)
}

if fi.IsFurryFeedDID(subjectDID) {
if err := fi.actorCache.OptOutOrForget(ctx, repoDID); err != nil {
return fmt.Errorf("opting out: %w", err)
}
}

return nil
}
18 changes: 12 additions & 6 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"context"
"errors"
"fmt"
"github.com/bluesky-social/indigo/events/schedulers/sequential"
"strconv"

"github.com/bluesky-social/indigo/events/schedulers/sequential"

"github.com/bluesky-social/indigo/util"
"github.com/ipfs/go-cid"

Expand Down Expand Up @@ -49,6 +50,8 @@ var workItemsProcessed = promauto.NewSummaryVec(prometheus.SummaryOpts{
type actorCacher interface {
GetByDID(did string) *v1.Actor
CreatePendingCandidateActor(ctx context.Context, did string) (err error)
OptIn(ctx context.Context, did string) (err error)
OptOutOrForget(ctx context.Context, did string) (err error)
}

var workerCursors = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand Down Expand Up @@ -342,15 +345,18 @@ func (fi *FirehoseIngester) handleCommit(ctx context.Context, evt *atproto.SyncS
return nil
}

func (fi *FirehoseIngester) IsFurryFeedDID(did string) bool {
// TODO: Make this not hard coded
// https://bsky.app/profile/furryli.st
return did == "did:plc:jdkvwye2lf4mingzk7qdebzc"
}

func (fi *FirehoseIngester) isFurryFeedFollow(record typegen.CBORMarshaler) bool {
follow, ok := record.(*bsky.GraphFollow)
if !ok {
return false
}

// TODO: Make this not hard coded
// https://bsky.app/profile/furryli.st
return follow.Subject == "did:plc:jdkvwye2lf4mingzk7qdebzc"
return fi.IsFurryFeedDID(follow.Subject)
}

func endSpan(span trace.Span, err error) {
Expand Down Expand Up @@ -461,7 +467,7 @@ func (fi *FirehoseIngester) handleRecordDelete(
case "app.bsky.feed.like":
err = fi.handleFeedLikeDelete(ctx, recordUri)
case "app.bsky.graph.follow":
err = fi.handleGraphFollowDelete(ctx, recordUri)
err = fi.handleGraphFollowDelete(ctx, repoDID, recordUri)
default:
span.AddEvent("ignoring record due to unrecognized type")
}
Expand Down
17 changes: 11 additions & 6 deletions proto/bff/v1/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion proto/bff/v1/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ enum ActorStatus {
ACTOR_STATUS_APPROVED = 2;
ACTOR_STATUS_BANNED = 3;
ACTOR_STATUS_NONE = 4;
ACTOR_STATUS_OPTED_OUT = 5;
}

message Actor {
Expand Down Expand Up @@ -40,4 +41,4 @@ message Actor {
// is ignored in the queue to be processed later, e.g. when the actor doesn’t
// have an avatar
google.protobuf.Timestamp held_until = 8;
}
}
44 changes: 41 additions & 3 deletions store/gen/candidate_actors.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions store/gen/candidate_follows.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions store/gen/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
-- Sorry, can't actually undo this!
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TYPE actor_status ADD VALUE IF NOT EXISTS 'opted_out';
36 changes: 32 additions & 4 deletions store/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,18 +538,18 @@ type DeleteFollowOpts struct {
URI string
}

func (s *PGXStore) DeleteFollow(ctx context.Context, opts DeleteFollowOpts) (err error) {
func (s *PGXStore) DeleteFollow(ctx context.Context, opts DeleteFollowOpts) (subjectDID string, err error) {
ctx, span := tracer.Start(ctx, "pgx_store.delete_follow")
defer func() {
endSpan(span, err)
}()

err = s.queries.SoftDeleteCandidateFollow(ctx, opts.URI)
subjectDID, err = s.queries.SoftDeleteCandidateFollow(ctx, opts.URI)
if err != nil {
return fmt.Errorf("executing SoftDeleteCandidateFollow query: %w", convertPGXError(err))
return "", fmt.Errorf("executing SoftDeleteCandidateFollow query: %w", convertPGXError(err))
}

return nil
return subjectDID, nil
}

type ListPostsForNewFeedOpts struct {
Expand Down Expand Up @@ -819,3 +819,31 @@ func (s *PGXStore) HoldBackPendingActor(ctx context.Context, did string, duratio
HeldUntil: pgtype.Timestamptz{Time: duration, Valid: true},
})
}

func (s *PGXStore) OptOutOrForgetActor(ctx context.Context, did string) (v1.ActorStatus, error) {
status, err := s.queries.OptOutOrForgetActor(ctx, did)
if err != nil {
return v1.ActorStatus_ACTOR_STATUS_UNSPECIFIED, nil
}

protoStatus, err := actorStatusToProto(status)
if err != nil {
return v1.ActorStatus_ACTOR_STATUS_UNSPECIFIED, nil
}

return protoStatus, err
}

func (s *PGXStore) OptInActor(ctx context.Context, did string) (v1.ActorStatus, error) {
status, err := s.queries.OptInActor(ctx, did)
if err != nil {
return v1.ActorStatus_ACTOR_STATUS_UNSPECIFIED, nil
}

protoStatus, err := actorStatusToProto(status)
if err != nil {
return v1.ActorStatus_ACTOR_STATUS_UNSPECIFIED, nil
}

return protoStatus, err
}
Loading

0 comments on commit af7fd77

Please sign in to comment.