Skip to content

Commit

Permalink
Update more core components to respect port states
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Lobunets committed Jan 15, 2015
1 parent 08d86f7 commit d897681
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 71 deletions.
83 changes: 62 additions & 21 deletions components/core/readfile/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log"
"os"
"syscall"
"time"

"github.com/cascades-fbp/cascades/components/utils"
"github.com/cascades-fbp/cascades/runtime"
Expand Down Expand Up @@ -83,25 +84,74 @@ func main() {
fileCh = make(chan bool)
outCh = make(chan bool)
errCh = make(chan bool)
go func() {
select {
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-errCh:
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}()

openPorts()
defer closePorts()

ports := 1
if outPort != nil {
ports++
}
if errPort != nil {
ports++
}

waitCh := make(chan bool)
fileExitCh := make(chan bool, 1)
go func(num int) {
total := 0
for {
select {
case v := <-fileCh:
if v {
total++
} else {
fileExitCh <- true
}
case v := <-outCh:
if !v {
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-errCh:
if !v {
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
}
if total >= num && waitCh != nil {
waitCh <- true
}
}
}(ports)

log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

log.Println("Started...")
for {
ip, err := filePort.RecvMessageBytes(0)
ip, err := filePort.RecvMessageBytes(zmq.DONTWAIT)
if err != nil {
log.Println("Error receiving message:", err.Error())
select {
case <-fileExitCh:
log.Println("FILE port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
default:
// IN port is still open
}
time.Sleep(2 * time.Second)
continue
}
if !runtime.IsValidIP(ip) {
Expand Down Expand Up @@ -131,14 +181,5 @@ func main() {
f.Close()

outPort.SendMessage(runtime.NewCloseBracket())

select {
case <-fileCh:
log.Println("FILE port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
default:
// file port is still open
}
}
}
55 changes: 41 additions & 14 deletions components/core/splitter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"strings"
"syscall"
"time"

"github.com/cascades-fbp/cascades/components/utils"
"github.com/cascades-fbp/cascades/runtime"
Expand Down Expand Up @@ -87,24 +88,50 @@ func main() {
ch := utils.HandleInterruption()
inCh = make(chan bool)
outCh = make(chan bool)
closed := 0
go func() {
select {
case <-inCh:
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-outCh:
closed++
if closed == len(outPortArray) {
log.Println("OUT array port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}
}()

openPorts()
defer closePorts()

waitCh := make(chan bool)
go func(num int) {
total := 0
for {
select {
case v := <-outCh:
if v {
total++
} else {
total--
}
case v := <-inCh:
if !v {
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
} else {
total++
}
}
if total >= num && waitCh != nil {
waitCh <- true
} else if total <= 1 && waitCh == nil {
log.Println("All output ports closed. Interrupting execution")
ch <- syscall.SIGTERM
break
}
}
}(len(outPortArray) + 1)

log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

log.Println("Started...")
for {
ip, err := inPort.RecvMessageBytes(0)
Expand Down
47 changes: 37 additions & 10 deletions components/core/submatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"regexp"
"syscall"
"time"

"github.com/cascades-fbp/cascades/components/utils"
"github.com/cascades-fbp/cascades/runtime"
Expand All @@ -25,7 +26,7 @@ var (

// Internal
patternPort, inPort, mapPort *zmq.Socket
patCh, inCh, mapCh chan bool
inCh, mapCh chan bool
err error
)

Expand Down Expand Up @@ -103,19 +104,45 @@ func main() {
ch := utils.HandleInterruption()
inCh = make(chan bool)
mapCh = make(chan bool)

openPorts()
defer closePorts()

waitCh := make(chan bool)
go func() {
select {
case <-inCh:
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-mapCh:
log.Println("MAP port is closed. Interrupting execution")
ch <- syscall.SIGTERM
total := 0
for {
select {
case v := <-inCh:
if !v {
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-mapCh:
if !v {
log.Println("MAP port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
}
if total >= 2 && waitCh != nil {
waitCh <- true
}
}
}()

openPorts()
defer closePorts()
log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

log.Println("Waiting for pattern...")
var (
Expand Down
73 changes: 58 additions & 15 deletions components/core/switch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,6 @@ func main() {
gateCh = make(chan bool)
inCh = make(chan bool)
outCh = make(chan bool)
go func() {
select {
case <-gateCh:
log.Println("GATE port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-inCh:
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}()

defer closePorts()

// Start a separate goroutine to receive gate signals and avoid stocking them
// blocking the channel (use timeout to skip ticks if data sending is still in progress)
Expand Down Expand Up @@ -127,6 +112,64 @@ func main() {
}()

openPorts()
defer closePorts()

waitCh := make(chan bool)
go func() {
total := 0
for {
select {
case v := <-gateCh:
if !v {
log.Println("GATE port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-inCh:
if !v {
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-outCh:
if !v {
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
}
if total >= 3 && waitCh != nil {
waitCh <- true
}
}
}()

log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

go func() {
select {
case <-gateCh:
log.Println("GATE port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-inCh:
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}()

log.Println("Started...")
var (
Expand Down
Loading

0 comments on commit d897681

Please sign in to comment.