simplemq/smqcli/lib.go

214 lines
3.7 KiB
Go

package smqcli
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"go.digitalcircle.com.br/open/simplemq/lib/random"
"go.digitalcircle.com.br/open/simplemq/lib/types"
"log"
"regexp"
"sync"
"time"
)
func dbg(s string, p ...interface{}) {
log.Printf(s, p...)
}
type MQConn struct {
url string
retry bool
chRetry chan *struct{}
//chReady chan *struct{}
subs sync.Map
mtx sync.Mutex
conn *websocket.Conn
}
type MQSub struct {
Topic string
Rx *regexp.Regexp
H func(m *types.Msg)
}
func (c *MQConn) Sub(s string, h func(m *types.Msg)) error {
//dbg("MQConn.Sub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_SUB,
Topic: s,
}
rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, s))
if err != nil {
return err
}
sub := &MQSub{
Topic: s,
Rx: rx,
H: h,
}
c.subs.Store(s, sub)
return c.Write(m)
}
func (c *MQConn) UnSub(s string) error {
//dbg("MQConn.UnSub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_UNSUB,
Topic: s,
}
c.subs.Delete(s)
return c.Write(m)
}
func (c *MQConn) Write(m *types.Msg) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.conn != nil {
return c.conn.WriteJSON(m)
} else {
return errors.New("c.conn is nil")
}
}
func (c *MQConn) Pub(s string, pl []byte) error {
//dbg("MQConn.Pub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_PUB,
Topic: s,
Payload: pl,
}
if c.conn == nil {
return errors.New("Not connected")
}
return c.Write(m)
}
func (c *MQConn) Rpc(s string, pli []byte) ([]byte, error) {
retid := "__RPCREPLY__" + random.Str(32)
lchan := make(chan *struct{})
m := &types.Msg{
Cmd: types.CMD_RPC,
Topic: s,
Payload: pli,
Id: retid,
ReplyTo: retid,
}
var ret []byte
c.Sub(retid, func(m *types.Msg) {
//dbg("MQConn.Rpc:: Got reply from TOPIC:%s", retid)
ret = m.Payload
if lchan != nil {
lchan <- &struct{}{}
}
})
defer c.UnSub(retid)
err := c.Write(m)
select {
case <-lchan:
case <-time.After(time.Second * 15):
}
close(lchan)
lchan = nil
return ret, err
}
func (c *MQConn) RpcReply(msg *types.Msg, pli []byte) error {
retid := random.Str(32)
m := &types.Msg{
Cmd: types.CMD_RPCREPLY,
Topic: msg.ReplyTo,
Payload: pli,
Id: retid,
ReplyTo: "",
}
err := c.Write(m)
return err
}
func (c *MQConn) Loop() {
nerr := 0
for c.conn != nil {
m := &types.Msg{}
if c.conn == nil {
return
}
err := c.conn.ReadJSON(m)
if err != nil {
nerr++
log.Printf("Error reading msg: %s", err.Error())
if nerr > 990 {
c.conn = nil
}
continue
}
//dbg("MQConn.Loop:: Recv Msg: ID:%s TOPIC:%s => %s", m.Id, m.Topic, string(m.Payload))
c.subs.Range(func(key, raw interface{}) bool {
sub, ok := raw.(*MQSub)
//dbg("MQConn.Loop:: Checking %s x %s", m.Topic, key.(string))
if ok && sub.Rx.MatchString(m.Topic) {
sub.H(m)
if m.Cmd == types.CMD_RPCREPLY {
return true
}
return false
}
return true
})
}
}
func (c *MQConn) Init() error {
c.retry = true
doConn := func() error {
con, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
return err
}
c.conn = con
con.SetCloseHandler(func(code int, text string) error {
c.conn = nil
return nil
})
return nil
}
err := doConn()
if err != nil {
return err
}
c.subs.Range(func(key, value interface{}) bool {
m := &types.Msg{
Cmd: types.CMD_SUB,
Topic: key.(string),
}
err := c.Write(m)
if err != nil {
log.Printf("Error re-listening: %s", err.Error())
return false
}
return true
})
return nil
}
func New(u string) (*MQConn, error) {
ret := &MQConn{
url: u,
}
err := ret.Init()
go func() {
for ret.retry {
ret.Loop()
ret.Init()
}
}()
return ret, err
}