lux/rpc/lux_rpc_server.go

137 lines
3.1 KiB
Go

package rpc
import (
"encoding/xml"
"net"
"os"
"sync"
)
type LuxRpcServer struct {
controllers map[string]LuxRpcController
lock sync.Mutex
stopSignal chan bool
}
func NewLuxRpcServer() LuxRpcServer {
return LuxRpcServer{
controllers: make(map[string]LuxRpcController, 0),
stopSignal: make(chan bool),
}
}
func (rpc *LuxRpcServer) RegisterController(ctrl LuxRpcController) {
rpc.controllers[ctrl.GetRpcName()] = ctrl
}
func (rpc *LuxRpcServer) HandleRequest(request LuxRpcRequest, rpcType LuxRpcType) (LuxRpcResponse, LuxRpcError, bool) {
// lock rpc
rpc.lock.Lock()
defer rpc.lock.Unlock()
// find controller
ctrl, ok := rpc.controllers[request.Controller]
if !ok {
return LuxRpcResponse{}, LuxRpcError{
RequestID: request.RequestID,
ErrorCode: 1,
Message: "unknown controller",
}, false
}
return ctrl.Handle(request, rpcType)
}
func (rpc *LuxRpcServer) AddEndpoint(network string, listenOn string, rpcType LuxRpcType) error {
// cleanup old socket files
if network == "unix" {
os.Remove(listenOn)
}
listener, err := net.Listen(network, listenOn)
if err != nil {
return err
}
go func(rpc *LuxRpcServer, listener net.Listener, rpcType LuxRpcType) {
defer listener.Close()
for {
select {
case <-rpc.stopSignal:
return
default:
conn, err := listener.Accept()
if err != nil {
log.Warningf("rpc failed to accept: %v", err)
return
}
log.Debugf("accepting rpc connection from %s", conn.RemoteAddr().String())
go func(rpc *LuxRpcServer, conn net.Conn, rpcType LuxRpcType) {
defer conn.Close()
def := NewLuxRpcDefrag()
part := make([]byte, 1500)
for {
select {
case <-rpc.stopSignal:
return
default:
n, err := conn.Read(part)
if err != nil {
log.Debugf("error reading from %s: %v", conn.RemoteAddr().String(), err)
return
}
if def.Feed(part[:n]) {
// we got full xml
xmlBytes := def.GetAndForget()
var cmd LuxRpcRequest
if err := xml.Unmarshal(xmlBytes, &cmd); err != nil {
log.Errorf("faield to parse rpc %v: %s", err, string(xmlBytes))
continue
}
// issue command to controller
rpcRes, rpcErr, ok := rpc.HandleRequest(cmd, rpcType)
if ok {
// marshal response
xmlBytes, err := xml.Marshal(&rpcRes)
if err != nil {
log.Errorf("failed to marshal response: %v", err)
continue
}
// send
_, err = conn.Write(xmlBytes)
if err != nil {
log.Debugf("failed to send: %v", err)
return
}
} else {
// marshal error
xmlBytes, err := xml.Marshal(&rpcErr)
if err != nil {
log.Errorf("failed to marshal error: %v", err)
continue
}
// send
_, err = conn.Write(xmlBytes)
if err != nil {
log.Debugf("failed to send: %v", err)
return
}
}
}
}
}
}(rpc, conn, rpcType)
}
}
}(rpc, listener, rpcType)
return nil
}