goroutine that reads stdout pipe and sends it to websockets, with mutex locks (incomplete)

This commit is contained in:
mykola2312 2024-11-25 10:23:36 +02:00
parent faf9946699
commit acb9b4230d

89
main.go
View file

@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"strconv"
"sync"
"text/template"
"time"
"wargh/db"
@ -35,6 +36,7 @@ const (
ERROR_INVALID_INPUT = 1
ERROR_INVALID_LOGIN_PASSWORD = 2
ERROR_UNAUTHORIZED = 3
ERROR_NOT_FOUND = 4
)
var ERROR_TEXT = []string{
@ -42,21 +44,24 @@ var ERROR_TEXT = []string{
"Invalid input",
"Invalid login or password",
"Unathorized",
"Not found",
}
const PIPE_BUFFER_SIZE = 1024
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
ReadBufferSize: PIPE_BUFFER_SIZE,
WriteBufferSize: PIPE_BUFFER_SIZE,
}
type Job struct {
Command *exec.Cmd
StdoutPipe io.ReadCloser
StderrPipe io.ReadCloser
Socket []*websocket.Conn
Output io.ReadCloser
Sockets []*websocket.Conn
}
var jobs map[string]Job = make(map[string]Job)
var jobLock = &sync.RWMutex{}
var jobs map[string]*Job = make(map[string]*Job)
func errorHandler(w http.ResponseWriter, r *http.Request) {
errorParam := r.URL.Query().Get("error")
@ -243,28 +248,55 @@ func loginHandler(w http.ResponseWriter, r *http.Request) {
}
}
func handleJob(jobId string) {
jobLock.RLock()
job := jobs[jobId]
jobLock.RUnlock()
var buffer []byte = make([]byte, PIPE_BUFFER_SIZE)
for read, err := job.Output.Read(buffer); err == nil; {
jobLock.Lock()
for idx, conn := range job.Sockets {
if conn.WriteMessage(websocket.BinaryMessage, buffer[:read]) != nil {
// got an error, should close connection
conn.Close()
job.Sockets = append(job.Sockets[:idx], job.Sockets[idx+1:]...)
}
}
jobLock.Unlock()
}
// we're done here, remove job
log.Printf("job %s done\n", jobId)
jobLock.Lock()
delete(jobs, jobId)
jobLock.Unlock()
}
func createJob() (string, error) {
cmd := exec.Command("/usr/bin/sh", "-c", "while true; do echo $(date) \"test\"; sleep 1; done")
cmd := exec.Command("/usr/bin/sh", "-c", "'while true; do echo $(date) \"test\"; sleep 1; done'")
stdout, err := cmd.StdoutPipe()
if err != nil {
return "", err
}
stderr, err := cmd.StderrPipe()
if err != nil {
return "", err
}
cmd.Stderr = cmd.Stdout
err = cmd.Start()
jobId := randString(64)
jobs[jobId] = Job{
jobLock.Lock()
jobs[jobId] = &Job{
Command: cmd,
StdoutPipe: stdout,
StderrPipe: stderr,
Socket: make([]*websocket.Conn, 0),
Output: stdout,
Sockets: make([]*websocket.Conn, 0),
}
jobLock.Unlock()
go handleJob(jobId)
return jobId, err
}
@ -273,7 +305,20 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
return
}
// get job id and attach
jobId := r.URL.Query().Get("id")
if jobId == "" {
redirectError(w, r, ERROR_INVALID_INPUT)
return
}
jobLock.RLock()
job, ok := jobs[jobId]
jobLock.RUnlock()
if !ok {
redirectError(w, r, ERROR_NOT_FOUND)
return
}
ws, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
@ -283,8 +328,10 @@ func wsHandler(w http.ResponseWriter, r *http.Request) {
return
}
ws.WriteMessage(websocket.TextMessage, ([]byte)("test"))
ws.Close()
// add websocket to job
jobLock.Lock()
job.Sockets = append(job.Sockets, ws)
jobLock.Unlock()
}
func main() {
@ -315,5 +362,11 @@ func main() {
http.HandleFunc("/login", loginHandler)
http.HandleFunc("/ws", wsHandler)
jobId, err := createJob()
if err != nil {
log.Fatal(err)
}
log.Printf("created job %s\n", jobId)
log.Fatal(http.ListenAndServe(":8080", nil))
}