implement RPC server, needs testing
This commit is contained in:
parent
337930c0d4
commit
9d15dba9d5
1 changed files with 97 additions and 1 deletions
|
|
@ -1,15 +1,21 @@
|
||||||
package rpc
|
package rpc
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
type LuxRpcServer struct {
|
type LuxRpcServer struct {
|
||||||
controllers map[string]LuxRpcController
|
controllers map[string]LuxRpcController
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
stopSignal chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLuxRpcServer() LuxRpcServer {
|
func NewLuxRpcServer() LuxRpcServer {
|
||||||
return LuxRpcServer{
|
return LuxRpcServer{
|
||||||
controllers: make(map[string]LuxRpcController, 0),
|
controllers: make(map[string]LuxRpcController, 0),
|
||||||
|
stopSignal: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -33,3 +39,93 @@ func (rpc *LuxRpcServer) HandleRequest(request LuxRpcRequest, rpcType LuxRpcType
|
||||||
|
|
||||||
return ctrl.Handle(request, rpcType)
|
return ctrl.Handle(request, rpcType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (rpc *LuxRpcServer) AddEndpoint(network string, listenOn string, rpcType LuxRpcType) error {
|
||||||
|
listener, err := net.Listen(network, listenOn)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func(rpc *LuxRpcServer, listener net.Listener, rpcType LuxRpcType) {
|
||||||
|
defer listener.Close()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-rpc.stopSignal:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
conn, err := listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("rpc failed to accept: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("accepting rpc connection from %s", conn.RemoteAddr().String())
|
||||||
|
go func(rpc *LuxRpcServer, conn net.Conn, rpcType LuxRpcType) {
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
def := NewLuxRpcDefrag()
|
||||||
|
part := make([]byte, 1500)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-rpc.stopSignal:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
n, err := conn.Read(part)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("error reading from %s: %v", conn.RemoteAddr().String(), err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if def.Feed(part[:n]) {
|
||||||
|
// we got full xml
|
||||||
|
xmlBytes := def.GetAndForget()
|
||||||
|
|
||||||
|
var cmd LuxRpcRequest
|
||||||
|
if err := xml.Unmarshal(xmlBytes, &cmd); err != nil {
|
||||||
|
log.Errorf("faield to parse rpc %v: %s", err, string(xmlBytes))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// issue command to controller
|
||||||
|
rpcRes, rpcErr, ok := rpc.HandleRequest(cmd, rpcType)
|
||||||
|
if ok {
|
||||||
|
// marshal response
|
||||||
|
xmlBytes, err := xml.Marshal(&rpcRes)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to marshal response: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// send
|
||||||
|
_, err = conn.Write(xmlBytes)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("failed to send: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// marshal error
|
||||||
|
xmlBytes, err := xml.Marshal(&rpcErr)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to marshal error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// send
|
||||||
|
_, err = conn.Write(xmlBytes)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("failed to send: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(rpc, conn, rpcType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(rpc, listener, rpcType)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue