Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mintoolkit/mint
Browse files Browse the repository at this point in the history
  • Loading branch information
kcq committed Nov 6, 2024
2 parents 3986d94 + b059444 commit bcc68db
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/app/master/command/debug/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Volume struct {
}

type CommandParams struct {
KubeComm *KubernetesHandlerComm
RuntimeCommunicator *RuntimeCommunicator
/// the runtime environment type
Runtime string
/// the running container which we want to attach to
Expand Down
22 changes: 18 additions & 4 deletions pkg/app/master/command/debug/handle_docker_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,18 @@ func HandleDockerRuntime(
}

//todo: need to validate that the session container exists and it's running

r, w := io.Pipe()
go io.Copy(w, os.Stdin)
var input io.Reader
if commandParams.TUI {
input = &TUIReader{inputChan: commandParams.RuntimeCommunicator.InputChan}
} else {
r, w := io.Pipe()
input = r
go io.Copy(w, os.Stdin)
}

options := dockerapi.AttachToContainerOptions{
Container: containerID,
InputStream: r,
InputStream: input,
OutputStream: os.Stdout,
ErrorStream: os.Stderr,
Stdin: true,
Expand Down Expand Up @@ -252,6 +257,15 @@ func HandleDockerRuntime(
Terminal: commandParams.DoTerminal,
}

if commandParams.TUI {
reader := &TUIReader{inputChan: commandParams.RuntimeCommunicator.InputChan}
options.IO = container.ExecutionIO{
Input: reader,
Output: os.Stdout,
Error: os.Stderr,
}
}

exe, err := container.NewExecution(
xc,
logger,
Expand Down
12 changes: 6 additions & 6 deletions pkg/app/master/command/debug/handle_kubernetes_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,8 @@ func HandleKubernetesRuntime(
fmt.Printf("\n")
//note: blocks until done streaming or failure...
if commandParams.TUI {
// TODO - move KubeComm off of command params
reader := &KubeReader{inputChan: commandParams.KubeComm.InputChan}
// TODO - move RuntimeCommunicator off of command params
reader := &TUIReader{inputChan: commandParams.RuntimeCommunicator.InputChan}
err = attach.StreamWithContext(
ctx,
remotecommand.StreamOptions{
Expand Down Expand Up @@ -862,16 +862,16 @@ func HandleKubernetesRuntime(
// as per the comment in `debug/tui.go`.
// An InputReader usable by Docker, Podman, Kubernetes, and Containerd
// will be added to this directory.
type KubeReader struct {
type TUIReader struct {
inputChan chan InputKey
}

func (kr *KubeReader) Read(p []byte) (n int, err error) {
inputKey, ok := <-kr.inputChan
func (tuiReader *TUIReader) Read(p []byte) (n int, err error) {
inputKey, ok := <-tuiReader.inputChan
if !ok {
return 0, io.EOF
}
log.Debugf("KubeReader received inputKey %v", inputKey)
log.Debugf("TUIReader received inputKey %v", inputKey)
switch inputKey.Special {
case NotSpecial:
p[0] = byte(inputKey.Rune)
Expand Down
45 changes: 29 additions & 16 deletions pkg/app/master/command/debug/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type TUI struct {
// Handle kubernetes session connections
subscriptionHandler subscriptionHandler
isListening bool
kubeComm *KubernetesHandlerComm
runtimeCommunicator *RuntimeCommunicator
exitedSession bool
}

Expand Down Expand Up @@ -94,15 +94,15 @@ type subscriptionHandler struct {
}

// newSubscription creates a new subscription handler with an async data channel
func newSubscription(gcvalues *command.GenericParams, kubeComm *KubernetesHandlerComm) subscriptionHandler {
func newSubscription(gcvalues *command.GenericParams, runtimeCommunicator *RuntimeCommunicator) subscriptionHandler {
dataChan := make(chan terminalStartMessage)
go launchSessionHandler(dataChan, gcvalues, kubeComm)
go launchSessionHandler(dataChan, gcvalues, runtimeCommunicator)
return subscriptionHandler{
dataChan: dataChan,
}
}

func launchSessionHandler(dataChan chan terminalStartMessage, gcvalues *command.GenericParams, kubeComm *KubernetesHandlerComm) {
func launchSessionHandler(dataChan chan terminalStartMessage, gcvalues *command.GenericParams, runtimeCommunicator *RuntimeCommunicator) {
// Create a subscription channel and define subscriptionChannels map for passing data
subscriptionChannel := make(chan interface{})
subscriptionChannels := map[string]chan interface{}{
Expand All @@ -117,21 +117,34 @@ func launchSessionHandler(dataChan chan terminalStartMessage, gcvalues *command.
subscriptionChannels,
)

// Define command parameters for k8s runtime
// Define command parameters for docker runtime
// + Hard coded values at the moment for this PoC
cparams := &CommandParams{
Runtime: "k8s",
TargetRef: "nginx",
Kubeconfig: crt.KubeconfigDefault,
TargetNamespace: "default",
Runtime: "docker",
TargetRef: "docker-amor",
DebugContainerImage: BusyboxImage,
DoFallbackToTargetUser: true,
DoRunAsTargetShell: true,
DoTerminal: true,
KubeComm: kubeComm,
RuntimeCommunicator: runtimeCommunicator,
TUI: true,
}

// Connect to active session | Kubernetes session
// cparams := &CommandParams{
// Runtime: "docker",
// TargetRef: "docker-amor",
// // Kubeconfig: crt.KubeconfigDefault,
// // TargetNamespace: "default",
// DebugContainerImage: BusyboxImage,
// DoFallbackToTargetUser: true,
// DoRunAsTargetShell: true,
// DoTerminal: true,
// RuntimeCommunicator: runtimeCommunicator,
// TUI: true,
// ActionConnectSession: true,
// }

// TODO - Pass runtime communicator
go OnCommand(xc, gcvalues, cparams)

Expand Down Expand Up @@ -160,7 +173,7 @@ func launchSessionHandler(dataChan chan terminalStartMessage, gcvalues *command.
if infoValue, exists := channelResponse["info"]; exists {
if infoValue == "terminal.start" {
dataChan <- terminalStartMessage("Session ready. Opening session below...\nPress esc to exit session.\n")
kubeComm.InputChan <- InputKey{Special: Enter}
runtimeCommunicator.InputChan <- InputKey{Special: Enter}
}
}
}
Expand Down Expand Up @@ -198,7 +211,7 @@ func InitialTUI(standalone bool, gcvalues *command.GenericParams) *TUI {
// This handler should not live on CommandParams,
// but be passed in to OnCommand, then to the respective
// runtime handler.
type KubernetesHandlerComm struct {
type RuntimeCommunicator struct {
InputChan chan InputKey
}

Expand Down Expand Up @@ -348,7 +361,7 @@ func (m TUI) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}

select {
case m.kubeComm.InputChan <- inputKey:
case m.runtimeCommunicator.InputChan <- inputKey:
// Key sent successfully
default:
// Channel is full or closed, handle accordingly
Expand Down Expand Up @@ -390,11 +403,11 @@ func (m TUI) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
// this can be done by rendering state output,
m.isListening = true
log.Debug("Start listening")
kubeComm := &KubernetesHandlerComm{
runtimeCommunicator := &RuntimeCommunicator{
InputChan: make(chan InputKey, 100),
}
m.kubeComm = kubeComm
m.subscriptionHandler = newSubscription(m.gcvalues, kubeComm)
m.runtimeCommunicator = runtimeCommunicator
m.subscriptionHandler = newSubscription(m.gcvalues, runtimeCommunicator)
return m, listenToAsyncData(m.subscriptionHandler.dataChan)

}
Expand Down
13 changes: 10 additions & 3 deletions pkg/app/master/container/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,18 @@ func (ref *Execution) monitorSysExitSync() {
}

func (ref *Execution) startTerminal() {
r, w := io.Pipe()
go io.Copy(w, os.Stdin)
var input io.Reader
if ref.options.IO.Input == nil {
r, w := io.Pipe()
input = r
go io.Copy(w, os.Stdin)
} else {
input = ref.options.IO.Input
}

options := dockerapi.AttachToContainerOptions{
Container: ref.ContainerID,
InputStream: r,
InputStream: input,
OutputStream: os.Stdout,
ErrorStream: os.Stderr,
Stdin: true,
Expand Down

0 comments on commit bcc68db

Please sign in to comment.