268 lines
5.6 KiB
Go
268 lines
5.6 KiB
Go
|
package smqsvr
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"github.com/gorilla/websocket"
|
||
|
"go.digitalcircle.com.br/open/simplemq/lib/random"
|
||
|
"go.digitalcircle.com.br/open/simplemq/lib/types"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"regexp"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
func dbg(s string, p ...interface{}) {
|
||
|
log.Printf(s, p...)
|
||
|
}
|
||
|
|
||
|
type MQStats struct {
|
||
|
ActMsgs int64
|
||
|
Nreads float64
|
||
|
AvgMsgs float64
|
||
|
}
|
||
|
|
||
|
func (m *MQStats) Init() {
|
||
|
m.Nreads = 1
|
||
|
go func() {
|
||
|
for {
|
||
|
m.AvgMsgs = float64(m.ActMsgs) / m.Nreads
|
||
|
m.Nreads++
|
||
|
time.Sleep(time.Second)
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
func (m *MQStats) Inc() {
|
||
|
m.ActMsgs++
|
||
|
}
|
||
|
|
||
|
type MQServer struct {
|
||
|
pool sync.Map
|
||
|
subs sync.Map
|
||
|
once sync.Map
|
||
|
|
||
|
stats MQStats
|
||
|
}
|
||
|
|
||
|
func (s *MQServer) Stats() MQStats {
|
||
|
return s.stats
|
||
|
}
|
||
|
func (s *MQServer) Get(id string) *MQConn {
|
||
|
raw, ok := s.pool.Load(id)
|
||
|
if !ok {
|
||
|
return nil
|
||
|
}
|
||
|
return raw.(*MQConn)
|
||
|
}
|
||
|
func (s *MQServer) Put(id string, conn *websocket.Conn) *MQConn {
|
||
|
mqconn := &MQConn{id: id, conn: conn, server: s}
|
||
|
s.pool.Store(id, mqconn)
|
||
|
//dbg("MQServer.Put:: Got new conn: %s", mqconn.id)
|
||
|
return mqconn
|
||
|
}
|
||
|
func (s *MQServer) Del(id string) {
|
||
|
s.pool.Delete(id)
|
||
|
//dbg("MQServer.Del:: Removing conn: %s", id)
|
||
|
}
|
||
|
func (s *MQServer) Process(msg *types.Msg) error {
|
||
|
if msg.Topic == "" {
|
||
|
err := "Cant proceed with empty topic!"
|
||
|
dbg("Src:%s Id:%s Err:%s", msg.Src, msg.Id, err)
|
||
|
return errors.New(err)
|
||
|
}
|
||
|
switch msg.Cmd {
|
||
|
case types.CMD_PUB:
|
||
|
//dbg("MQServer.Process:: Got new msg: %s.%s=>%s", msg.Cmd, msg.Src, msg.Topic)
|
||
|
s.Range(func(m *MQConn) bool {
|
||
|
m.Process(msg)
|
||
|
return true
|
||
|
})
|
||
|
case types.CMD_RPC:
|
||
|
|
||
|
//dbg("MQServer.Process:: Got new RPC: Src:%s Reply:%s Topic:%s", msg.Src, msg.ReplyTo, msg.Topic)
|
||
|
s.Range(func(m *MQConn) bool {
|
||
|
done, _ := m.ProcessFirst(msg)
|
||
|
return !done
|
||
|
})
|
||
|
|
||
|
case types.CMD_RPCREPLY:
|
||
|
//dbg("MQServer.Process:: Got new RPC Reply: Src:%s Topic:%s", msg.Src, msg.Topic)
|
||
|
|
||
|
s.Range(func(m *MQConn) bool {
|
||
|
done, _ := m.ProcessFirst(msg)
|
||
|
return !done
|
||
|
})
|
||
|
|
||
|
default:
|
||
|
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
func (s *MQServer) Range(h func(m *MQConn) bool) {
|
||
|
s.pool.Range(func(key, value interface{}) bool {
|
||
|
mq := s.Get(key.(string))
|
||
|
if mq != nil {
|
||
|
cont := h(mq)
|
||
|
return cont
|
||
|
}
|
||
|
return true
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func NewServer() *MQServer {
|
||
|
ret := &MQServer{}
|
||
|
ret.pool = sync.Map{}
|
||
|
ret.subs = sync.Map{}
|
||
|
ret.once = sync.Map{}
|
||
|
ret.stats.Init()
|
||
|
return ret
|
||
|
}
|
||
|
|
||
|
type MQConn struct {
|
||
|
id string
|
||
|
server *MQServer
|
||
|
subs sync.Map
|
||
|
|
||
|
mtx sync.Mutex
|
||
|
conn *websocket.Conn
|
||
|
}
|
||
|
|
||
|
func (m *MQConn) Recv() error {
|
||
|
ret := &types.Msg{}
|
||
|
//dbg("MQConn.Recv:: Getting new message: %s", m.id)
|
||
|
_, mbs, err := m.conn.ReadMessage()
|
||
|
if err != nil {
|
||
|
dbg("MQConn.Recv:: Error retrieving: %s: %s", m.id, err.Error())
|
||
|
m.Close()
|
||
|
return err
|
||
|
}
|
||
|
server.stats.ActMsgs++
|
||
|
//dbg("MQConn.Recv:: Got new message: %s", m.id)
|
||
|
err = json.Unmarshal(mbs, ret)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
} else {
|
||
|
switch ret.Cmd {
|
||
|
case types.CMD_SUB:
|
||
|
//dbg("MQConn.Recv:: Got new sub: %s=>%s", m.id, ret.Topic)
|
||
|
m.Sub(ret.Topic)
|
||
|
case types.CMD_UNSUB:
|
||
|
//dbg("MQConn.Recv:: Got new unsub: %s=>%s", m.id, ret.Topic)
|
||
|
m.Unsub(ret.Topic)
|
||
|
case types.CMD_CLOSE:
|
||
|
//dbg("MQConn.Recv:: Got close: %s=>%s", m.id)
|
||
|
m.Close()
|
||
|
default:
|
||
|
ret.Src = m.id
|
||
|
m.server.Process(ret)
|
||
|
}
|
||
|
|
||
|
}
|
||
|
return nil
|
||
|
|
||
|
}
|
||
|
func (m *MQConn) Loop() {
|
||
|
for {
|
||
|
err := m.Recv()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
func (m *MQConn) Sub(t string) error {
|
||
|
|
||
|
rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, t))
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
m.subs.Store(t, rx)
|
||
|
//dbg("MQConn.Sub: Subs: %s", strings.Join(m.Subs(), ","))
|
||
|
return nil
|
||
|
}
|
||
|
func (m *MQConn) Subs() []string {
|
||
|
ret := make([]string, 0)
|
||
|
m.subs.Range(func(key, value interface{}) bool {
|
||
|
ret = append(ret, key.(string))
|
||
|
return true
|
||
|
})
|
||
|
return ret
|
||
|
}
|
||
|
func (m *MQConn) Unsub(s string) {
|
||
|
m.subs.Delete(s)
|
||
|
//dbg("MQConn.Unsub: Subs: %s", strings.Join(m.Subs(), ","))
|
||
|
}
|
||
|
func (m *MQConn) Send(msg *types.Msg) error {
|
||
|
return m.server.Process(msg)
|
||
|
}
|
||
|
func (m *MQConn) Process(msg *types.Msg) error {
|
||
|
var err error = nil
|
||
|
m.subs.Range(func(key, value interface{}) bool {
|
||
|
rx := value.(*regexp.Regexp)
|
||
|
if rx.MatchString(msg.Topic) {
|
||
|
//dbg("MQConn.Process: RX:%s matches topic: %s", rx.String(), msg.Topic)
|
||
|
//msg.Topic = key.(string)
|
||
|
err = m.Write(msg)
|
||
|
}
|
||
|
return true
|
||
|
})
|
||
|
return err
|
||
|
}
|
||
|
func (m *MQConn) ProcessFirst(msg *types.Msg) (bool, error) {
|
||
|
var err error = nil
|
||
|
var ret bool = false
|
||
|
m.subs.Range(func(key, value interface{}) bool {
|
||
|
rx := value.(*regexp.Regexp)
|
||
|
if rx.MatchString(msg.Topic) {
|
||
|
//msg.Topic = key.(string)
|
||
|
//dbg("MQConn.ProcessFirst: RX:%s matches topic: %s", rx.String(), msg.Topic)
|
||
|
err = m.Write(msg)
|
||
|
ret = true
|
||
|
return false
|
||
|
} else {
|
||
|
//dbg("MQConn.ProcessFirst: RX:%s DOES NOT match topic: %s", rx.String(), msg.Topic)
|
||
|
}
|
||
|
return true
|
||
|
})
|
||
|
return ret, err
|
||
|
}
|
||
|
func (m *MQConn) Close() {
|
||
|
m.conn.Close()
|
||
|
m.server.Del(m.id)
|
||
|
}
|
||
|
func (c *MQConn) Write(m *types.Msg) error {
|
||
|
c.mtx.Lock()
|
||
|
defer c.mtx.Unlock()
|
||
|
server.stats.ActMsgs++
|
||
|
return c.conn.WriteJSON(m)
|
||
|
}
|
||
|
|
||
|
var upgrader = websocket.Upgrader{}
|
||
|
var server *MQServer = NewServer()
|
||
|
|
||
|
func Serve(writer http.ResponseWriter, request *http.Request) {
|
||
|
|
||
|
id := random.Str(32)
|
||
|
con, err := upgrader.Upgrade(writer, request, nil)
|
||
|
if err != nil {
|
||
|
http.Error(writer, err.Error(), http.StatusInternalServerError)
|
||
|
return
|
||
|
}
|
||
|
mqconn := server.Put(id, con)
|
||
|
con.SetCloseHandler(func(code int, text string) error {
|
||
|
mqconn.Close()
|
||
|
return nil
|
||
|
})
|
||
|
mqconn.Loop()
|
||
|
}
|
||
|
|
||
|
func Stats(writer http.ResponseWriter, request *http.Request) {
|
||
|
b := strings.Builder{}
|
||
|
s := server.stats
|
||
|
b.WriteString(fmt.Sprintf("Total Msgs:%v, Avg Msgs/s: %v, NReads: %v", s.ActMsgs, s.AvgMsgs, s.Nreads))
|
||
|
writer.Write([]byte(b.String()))
|
||
|
}
|