diff --git a/main.go b/main.go index b7569f8..92c86d7 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "strconv" + "sync" "text/template" "time" "wargh/db" @@ -35,6 +36,7 @@ const ( ERROR_INVALID_INPUT = 1 ERROR_INVALID_LOGIN_PASSWORD = 2 ERROR_UNAUTHORIZED = 3 + ERROR_NOT_FOUND = 4 ) var ERROR_TEXT = []string{ @@ -42,21 +44,24 @@ var ERROR_TEXT = []string{ "Invalid input", "Invalid login or password", "Unathorized", + "Not found", } +const PIPE_BUFFER_SIZE = 1024 + var wsUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, + ReadBufferSize: PIPE_BUFFER_SIZE, + WriteBufferSize: PIPE_BUFFER_SIZE, } type Job struct { - Command *exec.Cmd - StdoutPipe io.ReadCloser - StderrPipe io.ReadCloser - Socket []*websocket.Conn + Command *exec.Cmd + Output io.ReadCloser + Sockets []*websocket.Conn } -var jobs map[string]Job = make(map[string]Job) +var jobLock = &sync.RWMutex{} +var jobs map[string]*Job = make(map[string]*Job) func errorHandler(w http.ResponseWriter, r *http.Request) { errorParam := r.URL.Query().Get("error") @@ -243,28 +248,55 @@ func loginHandler(w http.ResponseWriter, r *http.Request) { } } +func handleJob(jobId string) { + jobLock.RLock() + job := jobs[jobId] + jobLock.RUnlock() + + var buffer []byte = make([]byte, PIPE_BUFFER_SIZE) + for read, err := job.Output.Read(buffer); err == nil; { + jobLock.Lock() + for idx, conn := range job.Sockets { + if conn.WriteMessage(websocket.BinaryMessage, buffer[:read]) != nil { + // got an error, should close connection + conn.Close() + job.Sockets = append(job.Sockets[:idx], job.Sockets[idx+1:]...) + } + } + jobLock.Unlock() + } + + // we're done here, remove job + log.Printf("job %s done\n", jobId) + + jobLock.Lock() + delete(jobs, jobId) + jobLock.Unlock() +} + func createJob() (string, error) { - cmd := exec.Command("/usr/bin/sh", "-c", "while true; do echo $(date) \"test\"; sleep 1; done") + cmd := exec.Command("/usr/bin/sh", "-c", "'while true; do echo $(date) \"test\"; sleep 1; done'") stdout, err := cmd.StdoutPipe() if err != nil { return "", err } - stderr, err := cmd.StderrPipe() - if err != nil { - return "", err - } + cmd.Stderr = cmd.Stdout err = cmd.Start() jobId := randString(64) - jobs[jobId] = Job{ - Command: cmd, - StdoutPipe: stdout, - StderrPipe: stderr, - Socket: make([]*websocket.Conn, 0), + + jobLock.Lock() + jobs[jobId] = &Job{ + Command: cmd, + Output: stdout, + Sockets: make([]*websocket.Conn, 0), } + jobLock.Unlock() + + go handleJob(jobId) return jobId, err } @@ -273,7 +305,20 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { return } - // get job id and attach + jobId := r.URL.Query().Get("id") + if jobId == "" { + redirectError(w, r, ERROR_INVALID_INPUT) + return + } + + jobLock.RLock() + job, ok := jobs[jobId] + jobLock.RUnlock() + + if !ok { + redirectError(w, r, ERROR_NOT_FOUND) + return + } ws, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { @@ -283,8 +328,10 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { return } - ws.WriteMessage(websocket.TextMessage, ([]byte)("test")) - ws.Close() + // add websocket to job + jobLock.Lock() + job.Sockets = append(job.Sockets, ws) + jobLock.Unlock() } func main() { @@ -315,5 +362,11 @@ func main() { http.HandleFunc("/login", loginHandler) http.HandleFunc("/ws", wsHandler) + jobId, err := createJob() + if err != nil { + log.Fatal(err) + } + log.Printf("created job %s\n", jobId) + log.Fatal(http.ListenAndServe(":8080", nil)) }