Skip to content
This repository has been archived by the owner on Feb 8, 2023. It is now read-only.

Commit

Permalink
Refactor functionality outside of cmd directory (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
stiangrindvoll authored Nov 21, 2017
1 parent 9c217a7 commit 6653231
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 241 deletions.
11 changes: 7 additions & 4 deletions cmd/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cmd

import (
"github.com/meltwater/rabbitio/file"
"github.com/meltwater/rabbitio/rmq"
"github.com/spf13/cobra"
)

Expand All @@ -29,14 +31,15 @@ var inCmd = &cobra.Command{
Long: `Specify a directory or file and tarballs will be published.`,
Run: func(cmd *cobra.Command, args []string) {

channel := make(chan Message, prefetch)
channel := make(chan rmq.Message, prefetch)

rabbit := NewRabbitMQ(uri, exchange, userQueue, routingKey, tag, prefetch, false, true)
path := NewFileInput(fileInput)
override := rmq.Override{RoutingKey: routingKey}
rabbit := rmq.NewPublisher(uri, exchange, queue, tag, prefetch)
path := file.NewInput(fileInput)

go path.Send(channel)

rabbit.Publish(channel)
rabbit.Publish(channel, override)
},
}

Expand Down
10 changes: 6 additions & 4 deletions cmd/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os/signal"
"syscall"

"github.com/meltwater/rabbitio/file"
"github.com/meltwater/rabbitio/rmq"
"github.com/spf13/cobra"
)

Expand All @@ -36,10 +38,10 @@ var outCmd = &cobra.Command{
When there are no more messages in the queue, press CTRL + c, to interrupt
the consumption and save the last message buffers.`,
Run: func(cmd *cobra.Command, args []string) {
channel := make(chan Message, prefetch*2)
channel := make(chan rmq.Message, prefetch*2)

rabbit := NewRabbitMQ(uri, exchange, userQueue, routingKey, tag, prefetch, true, false)
savePath := NewFileOutput(outputDirectory, batchSize)
rabbit := rmq.NewConsumer(uri, exchange, queue, routingKey, tag, prefetch)
path := file.NewOutput(outputDirectory, batchSize)

go rabbit.Consume(channel)

Expand All @@ -51,7 +53,7 @@ var outCmd = &cobra.Command{
close(channel)
}()

savePath.Receive(channel)
path.Receive(channel)
},
}

Expand Down
207 changes: 0 additions & 207 deletions cmd/rabbitmq.go

This file was deleted.

10 changes: 5 additions & 5 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
)

var (
version string
uri, exchange, userQueue, tag, routingKey string
prefetch int
version string
uri, exchange, queue, tag, routingKey string
prefetch int
)

// RootCmd represents the base command when called without any subcommands
Expand All @@ -49,8 +49,8 @@ func Execute(ver string) {
func init() {
RootCmd.PersistentFlags().StringVarP(&uri, "uri", "u", "amqp://guest:guest@localhost:5672/", "AMQP URI, uri to for instance RabbitMQ")
RootCmd.PersistentFlags().StringVarP(&exchange, "exchange", "e", "", "Exchange to connect to")
RootCmd.PersistentFlags().StringVarP(&userQueue, "queue", "q", "", "Queue to connect to")
RootCmd.PersistentFlags().StringVarP(&routingKey, "routingkey", "r", "", "Routing Key, if specified will override tarball routing key configuration")
RootCmd.PersistentFlags().StringVarP(&queue, "queue", "q", "", "Queue to connect to")
RootCmd.PersistentFlags().StringVarP(&routingKey, "routingkey", "r", "#", "Routing Key, if specified will override tarball routing key configuration")
RootCmd.PersistentFlags().StringVarP(&tag, "tag", "t", "Rabbit IO Connector "+version, "AMQP Client Tag")
RootCmd.PersistentFlags().IntVarP(&prefetch, "prefetch", "p", 100, "Prefetch for batches")
}
37 changes: 21 additions & 16 deletions cmd/file.go → file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd
package file

import (
"io/ioutil"
"log"
"os"
"path/filepath"
)

// FileInput is nice
type FileInput struct {
queue []string
}
"github.com/meltwater/rabbitio/rmq"
)

// Path is directory path for consumed RabbitMQ messages
// Path is a directory file path
type Path struct {
name string
batchSize int
queue []string
}

// NewFileInput creates a FileInput from the specified directory
func NewFileInput(path string) *FileInput {
// NewInput returns a *Path with a queue of files paths, all files in a directory
func NewInput(path string) *Path {
fi, err := os.Stat(path)
if err != nil {
log.Fatalln(err)
}

var f *FileInput
var f *Path
q := []string{}
switch mode := fi.Mode(); {
case mode.IsDir():
Expand All @@ -55,7 +53,7 @@ func NewFileInput(path string) *FileInput {
q = append(q, path)
}

f = &FileInput{
f = &Path{
queue: q,
}

Expand All @@ -72,11 +70,11 @@ func writeFile(b []byte, dir, file string) {
}

// Send delivers messages to the channel
func (f *FileInput) Send(messages chan Message) {
func (p *Path) Send(messages chan rmq.Message) {
var num int

// loop over the queued up files
for _, file := range f.queue {
for _, file := range p.queue {
// open file from the queue
fh, err := os.Open(file)
if err != nil {
Expand All @@ -98,16 +96,23 @@ func (f *FileInput) Send(messages chan Message) {

}

// NewFileOutput creates a Path to output files in from RabbitMQ
func NewFileOutput(path string, batchSize int) *Path {
// NewOutput creates a Path to output files in from RabbitMQ
func NewOutput(path string, batchSize int) *Path {
if _, err := os.Stat(path); os.IsNotExist(err) {
log.Println("Creating missing directory:", path)
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
log.Fatalln(err)
}
}
return &Path{
name: path,
batchSize: batchSize,
}
}

// Receive will handle messages and save to path
func (p *Path) Receive(messages chan Message) {
func (p *Path) Receive(messages chan rmq.Message) {

// create new TarballBuilder
builder := NewTarballBuilder(p.batchSize)
Expand Down
Loading

0 comments on commit 6653231

Please sign in to comment.