diff --git a/rpc/lux_rpc_server.go b/rpc/lux_rpc_server.go index 53daeea..69a81f2 100644 --- a/rpc/lux_rpc_server.go +++ b/rpc/lux_rpc_server.go @@ -1,15 +1,21 @@ package rpc -import "sync" +import ( + "encoding/xml" + "net" + "sync" +) type LuxRpcServer struct { controllers map[string]LuxRpcController lock sync.Mutex + stopSignal chan bool } func NewLuxRpcServer() LuxRpcServer { return LuxRpcServer{ controllers: make(map[string]LuxRpcController, 0), + stopSignal: make(chan bool), } } @@ -33,3 +39,93 @@ func (rpc *LuxRpcServer) HandleRequest(request LuxRpcRequest, rpcType LuxRpcType return ctrl.Handle(request, rpcType) } + +func (rpc *LuxRpcServer) AddEndpoint(network string, listenOn string, rpcType LuxRpcType) error { + listener, err := net.Listen(network, listenOn) + if err != nil { + return err + } + + go func(rpc *LuxRpcServer, listener net.Listener, rpcType LuxRpcType) { + defer listener.Close() + + for { + select { + case <-rpc.stopSignal: + return + default: + conn, err := listener.Accept() + if err != nil { + log.Warningf("rpc failed to accept: %v", err) + return + } + + log.Debugf("accepting rpc connection from %s", conn.RemoteAddr().String()) + go func(rpc *LuxRpcServer, conn net.Conn, rpcType LuxRpcType) { + defer conn.Close() + + def := NewLuxRpcDefrag() + part := make([]byte, 1500) + + for { + select { + case <-rpc.stopSignal: + return + default: + n, err := conn.Read(part) + if err != nil { + log.Debugf("error reading from %s: %v", conn.RemoteAddr().String(), err) + return + } + + if def.Feed(part[:n]) { + // we got full xml + xmlBytes := def.GetAndForget() + + var cmd LuxRpcRequest + if err := xml.Unmarshal(xmlBytes, &cmd); err != nil { + log.Errorf("faield to parse rpc %v: %s", err, string(xmlBytes)) + continue + } + + // issue command to controller + rpcRes, rpcErr, ok := rpc.HandleRequest(cmd, rpcType) + if ok { + // marshal response + xmlBytes, err := xml.Marshal(&rpcRes) + if err != nil { + log.Errorf("failed to marshal response: %v", err) + continue + } + + // send + _, err = conn.Write(xmlBytes) + if err != nil { + log.Debugf("failed to send: %v", err) + return + } + } else { + // marshal error + xmlBytes, err := xml.Marshal(&rpcErr) + if err != nil { + log.Errorf("failed to marshal error: %v", err) + continue + } + + // send + _, err = conn.Write(xmlBytes) + if err != nil { + log.Debugf("failed to send: %v", err) + return + } + } + } + } + } + }(rpc, conn, rpcType) + } + } + }(rpc, listener, rpcType) + + return nil +}