Skip to content

Commit

Permalink
client: add support for connect-once background resync interface
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jan 16, 2025
1 parent 0083384 commit aca0ee2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 4 deletions.
42 changes: 42 additions & 0 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,23 @@ import (
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
"maunium.net/go/mautrix/bridge/status"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/networkid"

"go.mau.fi/mautrix-signal/pkg/signalid"
"go.mau.fi/mautrix-signal/pkg/signalmeow"
"go.mau.fi/mautrix-signal/pkg/signalmeow/web"
)

type SignalClient struct {
Main *SignalConnector
UserLogin *bridgev2.UserLogin
Client *signalmeow.Client
Ghost *bridgev2.Ghost

queueEmptyWaiter *exsync.Event
}

var (
Expand All @@ -51,6 +55,7 @@ var (
_ bridgev2.RoomNameHandlingNetworkAPI = (*SignalClient)(nil)
_ bridgev2.RoomAvatarHandlingNetworkAPI = (*SignalClient)(nil)
_ bridgev2.RoomTopicHandlingNetworkAPI = (*SignalClient)(nil)
_ bridgev2.BackgroundSyncingNetworkAPI = (*SignalClient)(nil)
)

var pushCfg = &bridgev2.PushConfig{
Expand Down Expand Up @@ -210,6 +215,43 @@ func (s *SignalClient) Connect(ctx context.Context) {
s.tryConnect(ctx, 0)
}

func (s *SignalClient) ConnectBackground(ctx context.Context) error {
s.queueEmptyWaiter.Clear()
ch, err := s.Client.StartAuthedWS(ctx)
if err != nil {
return err
}
defer s.Disconnect()
log := zerolog.Ctx(ctx)
queueEmpty := s.queueEmptyWaiter.GetChan()
for {
select {
case status := <-ch:
switch status.Event {
case web.SignalWebsocketConnectionEventConnected:
log.Info().Msg("Authed websocket connected")
case web.SignalWebsocketConnectionEventDisconnected:
log.Err(status.Err).Msg("Authed websocket disconnected")
return fmt.Errorf("authed websocket disconnected: %w", status.Err)
case web.SignalWebsocketConnectionEventLoggedOut:
log.Err(status.Err).Msg("Authed websocket logged out")
return fmt.Errorf("authed websocket logged out: %w", status.Err)
case web.SignalWebsocketConnectionEventError:
log.Err(status.Err).Msg("Authed websocket error")
return fmt.Errorf("authed websocket errored: %w", status.Err)
case web.SignalWebsocketConnectionEventCleanShutdown:
log.Info().Msg("Authed websocket clean shutdown")
}
case <-ctx.Done():
log.Warn().Msg("Context finished before queue empty event")
return ctx.Err()
case <-queueEmpty:
log.Info().Msg("Received queue empty event")
return nil
}
}
}

func (s *SignalClient) Disconnect() {
if s.Client == nil {
return
Expand Down
3 changes: 3 additions & 0 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"go.mau.fi/util/dbutil"
"go.mau.fi/util/exsync"
"maunium.net/go/mautrix/bridgev2"

"go.mau.fi/mautrix-signal/pkg/msgconv"
Expand Down Expand Up @@ -89,6 +90,8 @@ func (s *SignalConnector) LoadUserLogin(ctx context.Context, login *bridgev2.Use
sc := &SignalClient{
Main: s,
UserLogin: login,

queueEmptyWaiter: exsync.NewEvent(),
}
if device != nil {
sc.Client = &signalmeow.Client{
Expand Down
4 changes: 4 additions & 0 deletions pkg/connector/handlesignal.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (s *SignalClient) handleSignalEvent(rawEvt events.SignalEvent) {
s.handleSignalContactList(evt)
case *events.ACIFound:
s.handleSignalACIFound(evt)
case *events.QueueEmpty:
s.queueEmptyWaiter.Set()
default:
s.UserLogin.Log.Warn().Type("event_type", evt).Msg("Unrecognized signalmeow event type")
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/signalmeow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (cli *Client) IsConnected() bool {
return cli.AuthedWS.IsConnected() && cli.UnauthedWS.IsConnected()
}

func (cli *Client) ConnectAuthedWS(ctx context.Context, requestHandler web.RequestHandlerFunc) (chan web.SignalWebsocketConnectionStatus, error) {
func (cli *Client) connectAuthedWS(ctx context.Context, requestHandler web.RequestHandlerFunc) (chan web.SignalWebsocketConnectionStatus, error) {
if cli.AuthedWS != nil {
return nil, errors.New("authed websocket already connected")
}
Expand All @@ -90,7 +90,7 @@ func (cli *Client) ConnectAuthedWS(ctx context.Context, requestHandler web.Reque
return statusChan, nil
}

func (cli *Client) ConnectUnauthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) {
func (cli *Client) connectUnauthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) {
if cli.UnauthedWS != nil {
return nil, errors.New("unauthed websocket already connected")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/signalmeow/events/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (*ReadSelf) isSignalEvent() {}
func (*Call) isSignalEvent() {}
func (*ContactList) isSignalEvent() {}
func (*ACIFound) isSignalEvent() {}
func (*QueueEmpty) isSignalEvent() {}

type MessageInfo struct {
Sender uuid.UUID
Expand Down Expand Up @@ -78,3 +79,5 @@ type ACIFound struct {
PNI libsignalgo.ServiceID
ACI libsignalgo.ServiceID
}

type QueueEmpty struct{}
17 changes: 15 additions & 2 deletions pkg/signalmeow/receiving.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,29 @@ type SignalConnectionStatus struct {
Err error
}

func (cli *Client) StartAuthedWS(ctx context.Context) (chan web.SignalWebsocketConnectionStatus, error) {
ctx, cancel := context.WithCancel(ctx)
cli.WSCancel = cancel
authChan, err := cli.connectAuthedWS(ctx, cli.incomingRequestHandler)
if err != nil {
cancel()
return nil, err
}
zerolog.Ctx(ctx).Info().Msg("Authed websocket connecting")
return authChan, nil
}

func (cli *Client) StartReceiveLoops(ctx context.Context) (chan SignalConnectionStatus, error) {
log := zerolog.Ctx(ctx).With().Str("action", "start receive loops").Logger()
ctx, cancel := context.WithCancel(log.WithContext(ctx))
cli.WSCancel = cancel
authChan, err := cli.ConnectAuthedWS(ctx, cli.incomingRequestHandler)
authChan, err := cli.connectAuthedWS(ctx, cli.incomingRequestHandler)
if err != nil {
cancel()
return nil, err
}
log.Info().Msg("Authed websocket connecting")
unauthChan, err := cli.ConnectUnauthedWS(ctx)
unauthChan, err := cli.connectUnauthedWS(ctx)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -257,6 +269,7 @@ func (cli *Client) incomingRequestHandler(ctx context.Context, req *signalpb.Web
return cli.incomingAPIMessageHandler(ctx, req)
} else if *req.Verb == http.MethodPut && *req.Path == "/api/v1/queue/empty" {
log.Trace().Msg("Received queue empty")
cli.handleEvent(&events.QueueEmpty{})
} else {
log.Warn().Any("req", req).Msg("Unknown websocket request message")
}
Expand Down

0 comments on commit aca0ee2

Please sign in to comment.