Skip to content

Commit

Permalink
Fixed #17
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Lobunets committed Jan 19, 2015
1 parent 617943a commit 29307e3
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 10 deletions.
6 changes: 1 addition & 5 deletions cmd/cascades/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ func run(c *cli.Context) {
scheduler.PrintGraph()
}

if c.Bool("dry") {
return
}

// Start the network
go scheduler.Start()
go scheduler.Start(c.Bool("dry"))

// Shutdown ZMQ upon shutdown
defer zmq.Term()
Expand Down
10 changes: 9 additions & 1 deletion components/core/joiner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func openPorts() {

for i, endpoint := range inports {
endpoint = strings.TrimSpace(endpoint)
log.Printf("Binding OUT[%v]=%s", i, endpoint)
log.Printf("Binding IN[%v]=%s", i, endpoint)
port, err = utils.CreateInputPort(fmt.Sprintf("joiner.in[%v]", i), endpoint, inCh)
utils.AssertError(err)
inPortArray = append(inPortArray, port)
Expand Down Expand Up @@ -163,6 +163,14 @@ func main() {
continue
}

if *debug {
for i, s := range inPortArray {
if s == r.Socket {
log.Printf("Data from port IN[%v]: %#v", i, string(ip[1]))
}
}
}

outPort.SendMessage(ip)
}
}
Expand Down
34 changes: 30 additions & 4 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runtime

import (
"fmt"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -204,6 +205,7 @@ func (r *Runtime) prepareProcesses() error {
sockets[endpoint] = s
}
r.iips = append(r.iips, iip)

} else {
srcIndex = 0
tgtIndex = 0
Expand Down Expand Up @@ -232,18 +234,37 @@ func (r *Runtime) prepareProcesses() error {
}
}
}

// Solves: https://github.com/cascades-fbp/cascades/issues/17
keys := make([]string, len(sockets))
i := 0
for k := range sockets {
keys[i] = k
i++
}
sort.Strings(keys)

// Compact sockets
arguments := map[string][]string{}
for n, s := range sockets {
for _, n := range keys {
parts := strings.SplitN(n, ".", 3)
k := parts[0] + "." + parts[1]
if _, ok := arguments[k]; ok {
arguments[k] = append(arguments[k], s)
arguments[k] = append(arguments[k], sockets[n])
} else {
arguments[k] = []string{s}
arguments[k] = []string{sockets[n]}
}

}

if r.Debug {
fmt.Println("------------ IIPs -------------")
for _, d := range r.iips {
fmt.Printf("'%v' -> %v\n", string(d.Payload), d.Socket)
}
fmt.Println("-------------------------------")
}

// Add sockets to component CLI arguments
for n, s := range arguments {
parts := strings.SplitN(n, ".", 2)
Expand All @@ -269,7 +290,7 @@ func (r *Runtime) prepareProcesses() error {
//
// Start the network based on the current graph
//
func (r *Runtime) Start() {
func (r *Runtime) Start(dry bool) {
err := r.prepareProcesses()
if err != nil {
log.ErrorOutput("Failed to create a process: " + err.Error())
Expand All @@ -283,6 +304,11 @@ func (r *Runtime) Start() {
return
}

if dry {
r.Done <- true
return
}

log.SystemOutput("Starting processes...")
idx := 0
for name, ps := range r.processes {
Expand Down

0 comments on commit 29307e3

Please sign in to comment.