Skip to content

Commit

Permalink
Finished transition of the core components
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Lobunets committed Jan 15, 2015
1 parent d897681 commit 617943a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 158 deletions.
44 changes: 35 additions & 9 deletions components/debug/crasher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,45 @@ func main() {
ch := utils.HandleInterruption()
inCh = make(chan bool)
outCh = make(chan bool)

openPorts()
defer closePorts()

waitCh := make(chan bool)
go func() {
select {
case <-inCh:
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
total := 0
for {
select {
case v := <-inCh:
if !v {
log.Println("IN port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-outCh:
if !v {
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
}
if total >= 2 && waitCh != nil {
waitCh <- true
}
}
}()

openPorts()
defer closePorts()
log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

go func() {
log.Println("Waiting for a crash")
Expand Down
26 changes: 0 additions & 26 deletions components/debug/oneshot/doc.go

This file was deleted.

94 changes: 0 additions & 94 deletions components/debug/oneshot/main.go

This file was deleted.

76 changes: 63 additions & 13 deletions components/fs/walk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"syscall"
"time"

"github.com/cascades-fbp/cascades/components/utils"
"github.com/cascades-fbp/cascades/runtime"
Expand Down Expand Up @@ -41,7 +42,7 @@ func validateArgs() {
}

func openPorts() {
inPort, err = utils.CreateInputPort("fs/walk.in", *inputEndpoint, inCh)
inPort, err = utils.CreateInputPort("fs/walk.dir", *inputEndpoint, inCh)
utils.AssertError(err)

outPort, err = utils.CreateOutputPort("fs/walk.file", *outputEndpoint, outCh)
Expand Down Expand Up @@ -84,25 +85,74 @@ func main() {
inCh = make(chan bool)
outCh = make(chan bool)
errCh = make(chan bool)
go func() {
select {
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-errCh:
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}()

openPorts()
defer closePorts()

ports := 1
if outPort != nil {
ports++
}
if errPort != nil {
ports++
}

waitCh := make(chan bool)
inExitCh := make(chan bool, 1)
go func(num int) {
total := 0
for {
select {
case v := <-inCh:
if v {
total++
} else {
inExitCh <- true
}
case v := <-outCh:
if !v {
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
case v := <-errCh:
if !v {
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
} else {
total++
}
}
if total >= num && waitCh != nil {
waitCh <- true
}
}
}(ports)

log.Println("Waiting for port connections to establish... ")
select {
case <-waitCh:
log.Println("Ports connected")
waitCh = nil
case <-time.Tick(30 * time.Second):
log.Println("Timeout: port connections were not established within provided interval")
os.Exit(1)
}

log.Println("Started...")
for {
ip, err = inPort.RecvMessageBytes(0)
ip, err := inPort.RecvMessageBytes(zmq.DONTWAIT)
if err != nil {
log.Println("Error receiving message:", err.Error())
select {
case <-inExitCh:
log.Println("DIR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
default:
// IN port is still open
}
time.Sleep(2 * time.Second)
continue
}
if !runtime.IsValidIP(ip) {
Expand Down
41 changes: 25 additions & 16 deletions components/fs/watchdog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,28 +88,18 @@ func main() {

validateArgs()

// Setup watcher
watcher, err := fsnotify.NewWatcher()
utils.AssertError(err)
defer watcher.Close()

ch := utils.HandleInterruption()
outCh = make(chan bool)
errCh = make(chan bool)
go func() {
select {
case <-outCh:
log.Println("OUT port is closed. Interrupting execution")
ch <- syscall.SIGTERM
case <-errCh:
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
}
}()

openPorts()
defer closePorts()

// Setup watcher
watcher, err := fsnotify.NewWatcher()
utils.AssertError(err)
defer watcher.Close()

// Process events
go func() {
// Socket to send messages to task sink
Expand Down Expand Up @@ -151,6 +141,25 @@ func main() {
}
}()

go func() {
for {
select {
case v := <-outCh:
if !v {
log.Println("CREATED port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
}
case v := <-errCh:
if !v {
log.Println("ERR port is closed. Interrupting execution")
ch <- syscall.SIGTERM
break
}
}
}
}()

// Main loop
log.Println("Started")
for {
Expand All @@ -175,7 +184,7 @@ func main() {
return nil
})
if err != nil {
log.Printf("ERROR openning file %s: %s", dir, err.Error())
log.Printf("ERROR opening file %s: %s", dir, err.Error())
if errPort != nil {
errPort.SendMessage(runtime.NewPacket([]byte(err.Error())))
}
Expand Down

0 comments on commit 617943a

Please sign in to comment.