Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
This ensures we close connection after sync up on documents sent (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
stiangrindvoll authored Nov 27, 2017
1 parent d348320 commit 53b68fb
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 8 deletions.
13 changes: 9 additions & 4 deletions cmd/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cmd

import (
"sync"

"github.com/meltwater/rabbitio/file"
"github.com/meltwater/rabbitio/rmq"
"github.com/spf13/cobra"
Expand All @@ -32,14 +34,17 @@ var inCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {

channel := make(chan rmq.Message, prefetch)
var wg sync.WaitGroup

override := rmq.Override{RoutingKey: routingKey}
rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch)
path := file.NewInput(fileInput)
path.Wg = &wg
rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch)
rabbit.Wg = &wg

go path.Send(channel)

rabbit.Publish(channel, override)
go rabbit.Publish(channel, override)
path.Send(channel)
rabbit.Close()
},
}

Expand Down
8 changes: 6 additions & 2 deletions file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"log"
"os"
"path/filepath"
"sync"

"github.com/meltwater/rabbitio/rmq"
)
Expand All @@ -28,6 +29,7 @@ type Path struct {
name string
batchSize int
queue []string
Wg *sync.WaitGroup
}

// NewInput returns a *Path with a queue of files paths, all files in a directory
Expand Down Expand Up @@ -83,15 +85,17 @@ func (p *Path) Send(messages chan rmq.Message) {
// and clean up afterwards
defer fh.Close()

tarNum, err := UnPack(fh, messages)
tarNum, err := UnPack(p.Wg, fh, messages)
if err != nil {
log.Fatalf("Failed to unpack: %s ", err)
}
log.Printf("Extracted %d Messages from tarball: %s", tarNum, file)
num = num + tarNum
}
// when all files are read, close

p.Wg.Wait()
close(messages)
// when all files are read, close
log.Printf("Total %d Messages from tarballs", num)

}
Expand Down
3 changes: 2 additions & 1 deletion file/tarball.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (t *TarballBuilder) addFile(tw *tar.Writer, name string, m *rmq.Message) er
}

// UnPack will decompress and send messages out on channel from file
func UnPack(file *os.File, messages chan rmq.Message) (n int, err error) {
func UnPack(wg *sync.WaitGroup, file *os.File, messages chan rmq.Message) (n int, err error) {

// wrap fh in a gzip reader
gr, err := gzip.NewReader(file)
Expand All @@ -109,6 +109,7 @@ func UnPack(file *os.File, messages chan rmq.Message) (n int, err error) {
if terr != nil {
return n, terr
}
wg.Add(1)

// create a Buffer to work on
// TODO: reuse if GC pressure is a problem
Expand Down
15 changes: 15 additions & 0 deletions rmq/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,20 @@ func (r *RabbitMQ) Publish(messages chan Message, o Override) {
); err != nil {
log.Fatalf("writer failed to write document to rabbit: %s", err)
}
r.Wg.Done()
}
}

// Close will close the RabbitMQ channel and connection
func (r *RabbitMQ) Close() error {
err := r.channel.Close()
if err != nil {
return err
}
err = r.conn.Close()
if err != nil {
return err
}
log.Println("RabbitMQ Connection closed with success")
return nil
}
7 changes: 6 additions & 1 deletion rmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package rmq

import "github.com/streadway/amqp"
import (
"sync"

"github.com/streadway/amqp"
)

// RabbitMQ type for talking to RabbitMQ
type RabbitMQ struct {
Expand All @@ -29,6 +33,7 @@ type RabbitMQ struct {
prefetch int
consume bool
publish bool
Wg *sync.WaitGroup
}

// Override will be used to override RabbitMQ settings on publishing messages
Expand Down

0 comments on commit 53b68fb

Please sign in to comment.