package smqcli import ( "encoding/json" "errors" "fmt" "github.com/gorilla/websocket" "go.digitalcircle.com.br/open/simplemq/lib/random" "go.digitalcircle.com.br/open/simplemq/lib/types" "log" "regexp" "sync" "time" ) func dbg(s string, p ...interface{}) { log.Printf(s, p...) } type MQConn struct { url string retry bool chRetry chan *struct{} //chReady chan *struct{} subs sync.Map mtx sync.Mutex conn *websocket.Conn } type MQSub struct { Topic string Rx *regexp.Regexp H func(m *types.Msg) } func (c *MQConn) Write(m *types.Msg) error { c.mtx.Lock() defer c.mtx.Unlock() if c.conn != nil { return c.conn.WriteJSON(m) } else { return errors.New("c.conn is nil") } } func (c *MQConn) Sub(s string, h func(m *types.Msg)) error { //dbg("MQConn.Sub:: TOPIC:%s", s) m := &types.Msg{ Cmd: types.CMD_SUB, Topic: s, } rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, s)) if err != nil { return err } sub := &MQSub{ Topic: s, Rx: rx, H: h, } c.subs.Store(s, sub) return c.Write(m) } func (c *MQConn) UnSub(s string) error { //dbg("MQConn.UnSub:: TOPIC:%s", s) m := &types.Msg{ Cmd: types.CMD_UNSUB, Topic: s, } c.subs.Delete(s) return c.Write(m) } func (c *MQConn) Pub(s string, pl []byte) error { //dbg("MQConn.Pub:: TOPIC:%s", s) m := &types.Msg{ Cmd: types.CMD_PUB, Topic: s, Payload: pl, } if c.conn == nil { return errors.New("Not connected") } return c.Write(m) } func (c *MQConn) Rpc(s string, pli []byte) ([]byte, error) { retid := "__RPCREPLY__" + random.Str(32) lchan := make(chan *struct{}) m := &types.Msg{ Cmd: types.CMD_RPC, Topic: s, Payload: pli, Id: retid, ReplyTo: retid, } var ret []byte c.Sub(retid, func(m *types.Msg) { //dbg("MQConn.Rpc:: Got reply from TOPIC:%s", retid) ret = m.Payload if lchan != nil { lchan <- &struct{}{} } }) defer c.UnSub(retid) err := c.Write(m) select { case <-lchan: case <-time.After(time.Second * 15): } close(lchan) lchan = nil return ret, err } func (c *MQConn) RpcReply(msg *types.Msg, pli []byte) error { retid := random.Str(32) m := &types.Msg{ Cmd: types.CMD_RPCREPLY, Topic: msg.ReplyTo, Payload: pli, Id: retid, ReplyTo: "", } err := c.Write(m) return err } func (c *MQConn) PubO(s string, i interface{}) error { bs, err := json.Marshal(i) if err != nil { return err } return c.Pub(s, bs) } func (c *MQConn) RpcO(s string, oi interface{}, oo interface{}) error { bs, err := json.Marshal(oi) if err != nil { return err } bs, err = c.Rpc(s, bs) if err != nil { return err } err = json.Unmarshal(bs, oo) return err } func (c *MQConn) RpcReplyO(msg *types.Msg, i interface{}) error { bs, err := json.Marshal(i) if err != nil { return err } return c.RpcReply(msg, bs) } func (c *MQConn) Loop() { nerr := 0 for c.conn != nil { m := &types.Msg{} if c.conn == nil { return } err := c.conn.ReadJSON(m) if err != nil { nerr++ log.Printf("Error reading msg: %s", err.Error()) if nerr > 990 { c.conn = nil } continue } //dbg("MQConn.Loop:: Recv Msg: ID:%s TOPIC:%s => %s", m.Id, m.Topic, string(m.Payload)) c.subs.Range(func(key, raw interface{}) bool { sub, ok := raw.(*MQSub) //dbg("MQConn.Loop:: Checking %s x %s", m.Topic, key.(string)) if ok && sub.Rx.MatchString(m.Topic) { sub.H(m) if m.Cmd == types.CMD_RPCREPLY { return true } return false } return true }) } } func (c *MQConn) Init() error { c.retry = true doConn := func() error { con, _, err := websocket.DefaultDialer.Dial(c.url, nil) if err != nil { return err } c.conn = con con.SetCloseHandler(func(code int, text string) error { c.conn = nil return nil }) return nil } err := doConn() if err != nil { return err } c.subs.Range(func(key, value interface{}) bool { m := &types.Msg{ Cmd: types.CMD_SUB, Topic: key.(string), } err := c.Write(m) if err != nil { log.Printf("Error re-listening: %s", err.Error()) return false } return true }) return nil } func New(u string) (*MQConn, error) { ret := &MQConn{ url: u, } err := ret.Init() go func() { for ret.retry { ret.Loop() ret.Init() } }() return ret, err }