diff --git a/cmd/cascades/run.go b/cmd/cascades/run.go index 2769d0b..11fded8 100644 --- a/cmd/cascades/run.go +++ b/cmd/cascades/run.go @@ -46,12 +46,8 @@ func run(c *cli.Context) { scheduler.PrintGraph() } - if c.Bool("dry") { - return - } - // Start the network - go scheduler.Start() + go scheduler.Start(c.Bool("dry")) // Shutdown ZMQ upon shutdown defer zmq.Term() diff --git a/components/core/joiner/main.go b/components/core/joiner/main.go index e927241..de294d4 100644 --- a/components/core/joiner/main.go +++ b/components/core/joiner/main.go @@ -53,7 +53,7 @@ func openPorts() { for i, endpoint := range inports { endpoint = strings.TrimSpace(endpoint) - log.Printf("Binding OUT[%v]=%s", i, endpoint) + log.Printf("Binding IN[%v]=%s", i, endpoint) port, err = utils.CreateInputPort(fmt.Sprintf("joiner.in[%v]", i), endpoint, inCh) utils.AssertError(err) inPortArray = append(inPortArray, port) @@ -163,6 +163,14 @@ func main() { continue } + if *debug { + for i, s := range inPortArray { + if s == r.Socket { + log.Printf("Data from port IN[%v]: %#v", i, string(ip[1])) + } + } + } + outPort.SendMessage(ip) } } diff --git a/runtime/runtime.go b/runtime/runtime.go index b52b205..7e61379 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -2,6 +2,7 @@ package runtime import ( "fmt" + "sort" "strings" "sync" "time" @@ -204,6 +205,7 @@ func (r *Runtime) prepareProcesses() error { sockets[endpoint] = s } r.iips = append(r.iips, iip) + } else { srcIndex = 0 tgtIndex = 0 @@ -232,18 +234,37 @@ func (r *Runtime) prepareProcesses() error { } } } + + // Solves: https://github.com/cascades-fbp/cascades/issues/17 + keys := make([]string, len(sockets)) + i := 0 + for k := range sockets { + keys[i] = k + i++ + } + sort.Strings(keys) + // Compact sockets arguments := map[string][]string{} - for n, s := range sockets { + for _, n := range keys { parts := strings.SplitN(n, ".", 3) k := parts[0] + "." + parts[1] if _, ok := arguments[k]; ok { - arguments[k] = append(arguments[k], s) + arguments[k] = append(arguments[k], sockets[n]) } else { - arguments[k] = []string{s} + arguments[k] = []string{sockets[n]} } } + + if r.Debug { + fmt.Println("------------ IIPs -------------") + for _, d := range r.iips { + fmt.Printf("'%v' -> %v\n", string(d.Payload), d.Socket) + } + fmt.Println("-------------------------------") + } + // Add sockets to component CLI arguments for n, s := range arguments { parts := strings.SplitN(n, ".", 2) @@ -269,7 +290,7 @@ func (r *Runtime) prepareProcesses() error { // // Start the network based on the current graph // -func (r *Runtime) Start() { +func (r *Runtime) Start(dry bool) { err := r.prepareProcesses() if err != nil { log.ErrorOutput("Failed to create a process: " + err.Error()) @@ -283,6 +304,11 @@ func (r *Runtime) Start() { return } + if dry { + r.Done <- true + return + } + log.SystemOutput("Starting processes...") idx := 0 for name, ps := range r.processes {