Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Commit

Permalink
Web view: Implement show page
Browse files Browse the repository at this point in the history
  • Loading branch information
apostergiou committed Apr 17, 2018
1 parent 625a1cc commit 5d499fd
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 0 deletions.
93 changes: 93 additions & 0 deletions public/templates/show.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<!DOCTYPE html>
<html lang="en">

<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Job Logs</title>
<link rel="stylesheet" href="../../css/foundation.min.css">
</head>

<body>

<div class="top-bar">
<div class="top-bar-left">
<h1><a href="/">Mistry</a></h1>
</div>
</div>

<div class="grid-container">
<div class="grid">
<div class="cell">
<h4>Job: {{.ID}}</h4>
</div>
<div class="cell">
<div class="card">
<div class="card-section">
<div class="card-divider">
<h4> Details </h4>
</div>
<p id="js-job-info"></p>
</div>
</div>
</div>
<div class="cell">
<div class="card">
<div class="card-section">
<div class="card-divider">
<h4> Logs </h4>
</div>
<p id="js-job-log">{{.Log}}</p>
</div>
</div>
</div>
</div>

<script type="text/javascript">
const jobInfo = document.getElementById('js-job-info');
const state = {{.State}}

jobInfo.innerHTML += "Project: ".big() + {{.Project}} + "<br>";
jobInfo.innerHTML += "State: ".big() + {{.State}} + "<br>";
jobInfo.innerHTML += "Start Time: ".big() + {{.StartedAt}} + "<br>";

if (state == "ready") {
const output = {{.Output}}
const jobJSON = JSON.parse(output);
jobInfo.innerHTML += "Path: ".big() + jobJSON["Path"] + "<br>";
jobInfo.innerHTML += "Cached: ".big() + jobJSON["Cached"] + "<br>";
jobInfo.innerHTML += "Coaleshed: ".big() + jobJSON["Coaleshed:"] + "<br>";
jobInfo.innerHTML += "Error: ".big() + jobJSON["Err"] + "<br>";
jobInfo.innerHTML += "ExitCode: ".big() + jobJSON["ExitCode"] + "<br>";
jobInfo.innerHTML += "Transport method: ".big() + jobJSON["TransportMethod"] + "<br>";
}

if (state == "pending") {
setInterval(checkState, 3000);

function checkState() {
let jHeaders = new Headers();
jHeaders.append('Content-Type', 'application/json');
const jobRequest = new Request('/job/{{.Project}}/{{.ID}}', {headers: jHeaders});
fetch(jobRequest)
.then(function(response) { return response.json(); })
.then(function(data) {
if (data["state"] == "ready"){
location.reload();
}
})
}

const source = new EventSource('/log/{{.Project}}/{{.ID}}');
const jobLog = document.getElementById('js-job-log')
source.onmessage = function(e) {
jobLog.innerHTML += e.data;
};

source.onerror = function(e) {
document.body.innerHTML += "Web tail failed.";
};
}
</script>
</body>
</html>
202 changes: 202 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package main

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"html/template"
"io/ioutil"
"log"
"net/http"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/skroutz/mistry/broker"
"github.com/skroutz/mistry/tailer"
"github.com/skroutz/mistry/types"
)

Expand All @@ -24,6 +30,13 @@ type Server struct {
jq *JobQueue
pq *ProjectQueue
cfg *Config

br *broker.Broker

// Queue used to track all open tailers by their id. Every tailer id
// matches a job id.
// The stored map type is [string]bool.
tq *sync.Map
}

// NewServer accepts a non-nil configuration and an optional logger, and
Expand All @@ -43,12 +56,17 @@ func NewServer(cfg *Config, logger *log.Logger) (*Server, error) {
mux.Handle("/", http.StripPrefix("/", http.FileServer(http.Dir("public"))))
mux.HandleFunc("/jobs", s.HandleNewJob)
mux.HandleFunc("/index/", s.HandleIndex)
mux.HandleFunc("/job/", s.HandleShowJob)
mux.HandleFunc("/log/", s.HandleServerPush)

s.srv = &http.Server{Handler: mux, Addr: cfg.Addr}
s.cfg = cfg
s.Log = logger
s.jq = NewJobQueue()
s.pq = NewProjectQueue()
s.br = broker.NewBroker(s.Log)
s.tq = new(sync.Map)
go s.br.ListenForClients()
return s, nil
}

Expand Down Expand Up @@ -177,6 +195,190 @@ func (s *Server) HandleIndex(w http.ResponseWriter, r *http.Request) {
}
}

// HandleShowJob receives requests for a job and produces the appropriate output
// based on the content type of the request.
func (s *Server) HandleShowJob(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed)
return
}

path := strings.Split(r.URL.Path, "/")
project := path[2]
id := path[3]

j := &Job{}
state, err := j.State(s.cfg.BuildPath, project, id)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
jPath := filepath.Join(s.cfg.BuildPath, project, state, id)
buildLogPath := filepath.Join(jPath, BuildLogFname)
buildResultFilePath := filepath.Join(jPath, BuildResultFname)
var rawLog []byte
var rawResult []byte

// Decide whether to tail the log file or print it immediately,
// based on the job state.
if state != "pending" {
rawResult, err = ioutil.ReadFile(buildResultFilePath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}
rawLog, err = ioutil.ReadFile(buildLogPath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

jDir, err := ioutil.ReadDir(jPath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
ji := types.JobInfo{
Output: string(rawResult),
Log: string(rawLog),
ID: id,
Project: project,
State: state,
StartedAt: jDir[0].ModTime(),
}

ct := r.Header.Get("Content-type")
if ct == "application/json" {
jiData, err := json.Marshal(ji)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(jiData)
return
}

if state == "pending" {
// For each job id there is only one tailer responsible for
// emitting the read bytes to the s.br.Notifier channel.
hasTail, ok := s.tq.Load(id)
if !ok || hasTail.(bool) == false {
// Mark the id to the tailers' queue to identify that a
// tail reader has been spawned.
s.tq.Store(id, true)
// Create a channel to communicate the closure of all connections
// for the job id to the spawned tailer goroutine.
if _, ok := s.br.CloseClientC[id]; !ok {
s.br.CloseClientC[id] = make(chan struct{}, 1)
}
// Spawns a tailer which tails the build log file and communicates
// the read results to the s.br.Notifier channel.
go func() {
s.Log.Printf("[Tailer] Starting for: %s", id)
tl, err := tailer.New(buildLogPath)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer tl.Close()
scanner := bufio.NewScanner(tl)
for scanner.Scan() {
select {
case <-s.br.CloseClientC[id]:
s.Log.Printf("[Tailer] Exiting for: %s", id)
s.tq.Store(id, false)
return
default:
s.br.Notifier <- &broker.Event{Msg: []byte(scanner.Text()), ID: id}
}
}
}()
}
}

t, err := template.ParseFiles("./public/templates/show.html")
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusBadRequest)
return
}
t.Execute(w, ji)
}

// HandleServerPush handles the server push logic.
func (s *Server) HandleServerPush(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
http.Error(w, "Expected GET, got "+r.Method, http.StatusMethodNotAllowed)
return
}

path := strings.Split(r.URL.Path, "/")
project := path[2]
id := path[3]

j := &Job{}
state, err := j.State(s.cfg.BuildPath, project, id)
if err != nil {
s.Log.Print(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

// Decide whether to tail the log file and keep the connection alive for
// sending server side events.
if state != "pending" {
w.WriteHeader(http.StatusNoContent)
return
}

flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}

// Set the headers for browsers that support server sent events.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")

// Each connection registers its own event channel with the
// broker's client connections registry s.br.Clients.
client := &broker.Client{ID: id, EventC: make(chan []byte)}

// Signal the broker that we have a new connection.
s.br.NewClients <- client

// Remove this client from the map of connected clients when the
// handler exits.
defer func() {
s.br.ClosingClients <- client
}()

// Listen to connection close and un-register the client.
notify := w.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
s.br.ClosingClients <- client
}()

for {
// Emit the message from the server.
fmt.Fprintf(w, "data: %s\n\n", <-client.EventC)
// Send any buffered content to the client immediately.
flusher.Flush()
}
}

// ListenAndServe listens on the TCP network address s.srv.Addr and handle
// requests on incoming connections. ListenAndServe always returns a
// non-nil error.
Expand Down
Loading

0 comments on commit 5d499fd

Please sign in to comment.