simplemq/smqsvr/lib.go

268 lines
5.6 KiB
Go

package smqsvr
import (
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"go.digitalcircle.com.br/open/simplemq/lib/random"
"go.digitalcircle.com.br/open/simplemq/lib/types"
"log"
"net/http"
"regexp"
"strings"
"sync"
"time"
)
func dbg(s string, p ...interface{}) {
log.Printf(s, p...)
}
type MQStats struct {
ActMsgs int64
Nreads float64
AvgMsgs float64
}
func (m *MQStats) Init() {
m.Nreads = 1
go func() {
for {
m.AvgMsgs = float64(m.ActMsgs) / m.Nreads
m.Nreads++
time.Sleep(time.Second)
}
}()
}
func (m *MQStats) Inc() {
m.ActMsgs++
}
type MQServer struct {
pool sync.Map
subs sync.Map
once sync.Map
stats MQStats
}
func (s *MQServer) Stats() MQStats {
return s.stats
}
func (s *MQServer) Get(id string) *MQConn {
raw, ok := s.pool.Load(id)
if !ok {
return nil
}
return raw.(*MQConn)
}
func (s *MQServer) Put(id string, conn *websocket.Conn) *MQConn {
mqconn := &MQConn{id: id, conn: conn, server: s}
s.pool.Store(id, mqconn)
//dbg("MQServer.Put:: Got new conn: %s", mqconn.id)
return mqconn
}
func (s *MQServer) Del(id string) {
s.pool.Delete(id)
//dbg("MQServer.Del:: Removing conn: %s", id)
}
func (s *MQServer) Process(msg *types.Msg) error {
if msg.Topic == "" {
err := "Cant proceed with empty topic!"
dbg("Src:%s Id:%s Err:%s", msg.Src, msg.Id, err)
return errors.New(err)
}
switch msg.Cmd {
case types.CMD_PUB:
//dbg("MQServer.Process:: Got new msg: %s.%s=>%s", msg.Cmd, msg.Src, msg.Topic)
s.Range(func(m *MQConn) bool {
m.Process(msg)
return true
})
case types.CMD_RPC:
//dbg("MQServer.Process:: Got new RPC: Src:%s Reply:%s Topic:%s", msg.Src, msg.ReplyTo, msg.Topic)
s.Range(func(m *MQConn) bool {
done, _ := m.ProcessFirst(msg)
return !done
})
case types.CMD_RPCREPLY:
//dbg("MQServer.Process:: Got new RPC Reply: Src:%s Topic:%s", msg.Src, msg.Topic)
s.Range(func(m *MQConn) bool {
done, _ := m.ProcessFirst(msg)
return !done
})
default:
}
return nil
}
func (s *MQServer) Range(h func(m *MQConn) bool) {
s.pool.Range(func(key, value interface{}) bool {
mq := s.Get(key.(string))
if mq != nil {
cont := h(mq)
return cont
}
return true
})
}
func NewServer() *MQServer {
ret := &MQServer{}
ret.pool = sync.Map{}
ret.subs = sync.Map{}
ret.once = sync.Map{}
ret.stats.Init()
return ret
}
type MQConn struct {
id string
server *MQServer
subs sync.Map
mtx sync.Mutex
conn *websocket.Conn
}
func (m *MQConn) Recv() error {
ret := &types.Msg{}
//dbg("MQConn.Recv:: Getting new message: %s", m.id)
_, mbs, err := m.conn.ReadMessage()
if err != nil {
dbg("MQConn.Recv:: Error retrieving: %s: %s", m.id, err.Error())
m.Close()
return err
}
server.stats.ActMsgs++
//dbg("MQConn.Recv:: Got new message: %s", m.id)
err = json.Unmarshal(mbs, ret)
if err != nil {
return err
} else {
switch ret.Cmd {
case types.CMD_SUB:
//dbg("MQConn.Recv:: Got new sub: %s=>%s", m.id, ret.Topic)
m.Sub(ret.Topic)
case types.CMD_UNSUB:
//dbg("MQConn.Recv:: Got new unsub: %s=>%s", m.id, ret.Topic)
m.Unsub(ret.Topic)
case types.CMD_CLOSE:
//dbg("MQConn.Recv:: Got close: %s=>%s", m.id)
m.Close()
default:
ret.Src = m.id
m.server.Process(ret)
}
}
return nil
}
func (m *MQConn) Loop() {
for {
err := m.Recv()
if err != nil {
return
}
}
}
func (m *MQConn) Sub(t string) error {
rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, t))
if err != nil {
return err
}
m.subs.Store(t, rx)
//dbg("MQConn.Sub: Subs: %s", strings.Join(m.Subs(), ","))
return nil
}
func (m *MQConn) Subs() []string {
ret := make([]string, 0)
m.subs.Range(func(key, value interface{}) bool {
ret = append(ret, key.(string))
return true
})
return ret
}
func (m *MQConn) Unsub(s string) {
m.subs.Delete(s)
//dbg("MQConn.Unsub: Subs: %s", strings.Join(m.Subs(), ","))
}
func (m *MQConn) Send(msg *types.Msg) error {
return m.server.Process(msg)
}
func (m *MQConn) Process(msg *types.Msg) error {
var err error = nil
m.subs.Range(func(key, value interface{}) bool {
rx := value.(*regexp.Regexp)
if rx.MatchString(msg.Topic) {
//dbg("MQConn.Process: RX:%s matches topic: %s", rx.String(), msg.Topic)
//msg.Topic = key.(string)
err = m.Write(msg)
}
return true
})
return err
}
func (m *MQConn) ProcessFirst(msg *types.Msg) (bool, error) {
var err error = nil
var ret bool = false
m.subs.Range(func(key, value interface{}) bool {
rx := value.(*regexp.Regexp)
if rx.MatchString(msg.Topic) {
//msg.Topic = key.(string)
//dbg("MQConn.ProcessFirst: RX:%s matches topic: %s", rx.String(), msg.Topic)
err = m.Write(msg)
ret = true
return false
} else {
//dbg("MQConn.ProcessFirst: RX:%s DOES NOT match topic: %s", rx.String(), msg.Topic)
}
return true
})
return ret, err
}
func (m *MQConn) Close() {
m.conn.Close()
m.server.Del(m.id)
}
func (c *MQConn) Write(m *types.Msg) error {
c.mtx.Lock()
defer c.mtx.Unlock()
server.stats.ActMsgs++
return c.conn.WriteJSON(m)
}
var upgrader = websocket.Upgrader{}
var server *MQServer = NewServer()
func Serve(writer http.ResponseWriter, request *http.Request) {
id := random.Str(32)
con, err := upgrader.Upgrade(writer, request, nil)
if err != nil {
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
}
mqconn := server.Put(id, con)
con.SetCloseHandler(func(code int, text string) error {
mqconn.Close()
return nil
})
mqconn.Loop()
}
func Stats(writer http.ResponseWriter, request *http.Request) {
b := strings.Builder{}
s := server.stats
b.WriteString(fmt.Sprintf("Total Msgs:%v, Avg Msgs/s: %v, NReads: %v", s.ActMsgs, s.AvgMsgs, s.Nreads))
writer.Write([]byte(b.String()))
}