package smqsvr import ( "encoding/json" "errors" "fmt" "github.com/gorilla/websocket" "go.digitalcircle.com.br/open/simplemq/lib/random" "go.digitalcircle.com.br/open/simplemq/lib/types" "log" "net/http" "regexp" "strings" "sync" "time" ) func dbg(s string, p ...interface{}) { log.Printf(s, p...) } type MQStats struct { ActMsgs int64 Nreads float64 AvgMsgs float64 } func (m *MQStats) Init() { m.Nreads = 1 go func() { for { m.AvgMsgs = float64(m.ActMsgs) / m.Nreads m.Nreads++ time.Sleep(time.Second) } }() } func (m *MQStats) Inc() { m.ActMsgs++ } type MQServer struct { pool sync.Map subs sync.Map once sync.Map stats MQStats } func (s *MQServer) Stats() MQStats { return s.stats } func (s *MQServer) Get(id string) *MQConn { raw, ok := s.pool.Load(id) if !ok { return nil } return raw.(*MQConn) } func (s *MQServer) Put(id string, conn *websocket.Conn) *MQConn { mqconn := &MQConn{id: id, conn: conn, server: s} s.pool.Store(id, mqconn) //dbg("MQServer.Put:: Got new conn: %s", mqconn.id) return mqconn } func (s *MQServer) Del(id string) { s.pool.Delete(id) //dbg("MQServer.Del:: Removing conn: %s", id) } func (s *MQServer) Process(msg *types.Msg) error { if msg.Topic == "" { err := "Cant proceed with empty topic!" dbg("Src:%s Id:%s Err:%s", msg.Src, msg.Id, err) return errors.New(err) } switch msg.Cmd { case types.CMD_PUB: //dbg("MQServer.Process:: Got new msg: %s.%s=>%s", msg.Cmd, msg.Src, msg.Topic) s.Range(func(m *MQConn) bool { m.Process(msg) return true }) case types.CMD_RPC: //dbg("MQServer.Process:: Got new RPC: Src:%s Reply:%s Topic:%s", msg.Src, msg.ReplyTo, msg.Topic) s.Range(func(m *MQConn) bool { done, _ := m.ProcessFirst(msg) return !done }) case types.CMD_RPCREPLY: //dbg("MQServer.Process:: Got new RPC Reply: Src:%s Topic:%s", msg.Src, msg.Topic) s.Range(func(m *MQConn) bool { done, _ := m.ProcessFirst(msg) return !done }) default: } return nil } func (s *MQServer) Range(h func(m *MQConn) bool) { s.pool.Range(func(key, value interface{}) bool { mq := s.Get(key.(string)) if mq != nil { cont := h(mq) return cont } return true }) } func NewServer() *MQServer { ret := &MQServer{} ret.pool = sync.Map{} ret.subs = sync.Map{} ret.once = sync.Map{} ret.stats.Init() return ret } type MQConn struct { id string server *MQServer subs sync.Map mtx sync.Mutex conn *websocket.Conn } func (m *MQConn) Recv() error { ret := &types.Msg{} //dbg("MQConn.Recv:: Getting new message: %s", m.id) _, mbs, err := m.conn.ReadMessage() if err != nil { dbg("MQConn.Recv:: Error retrieving: %s: %s", m.id, err.Error()) m.Close() return err } server.stats.ActMsgs++ //dbg("MQConn.Recv:: Got new message: %s", m.id) err = json.Unmarshal(mbs, ret) if err != nil { return err } else { switch ret.Cmd { case types.CMD_SUB: //dbg("MQConn.Recv:: Got new sub: %s=>%s", m.id, ret.Topic) m.Sub(ret.Topic) case types.CMD_UNSUB: //dbg("MQConn.Recv:: Got new unsub: %s=>%s", m.id, ret.Topic) m.Unsub(ret.Topic) case types.CMD_CLOSE: //dbg("MQConn.Recv:: Got close: %s=>%s", m.id) m.Close() default: ret.Src = m.id m.server.Process(ret) } } return nil } func (m *MQConn) Loop() { for { err := m.Recv() if err != nil { return } } } func (m *MQConn) Sub(t string) error { rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, t)) if err != nil { return err } m.subs.Store(t, rx) //dbg("MQConn.Sub: Subs: %s", strings.Join(m.Subs(), ",")) return nil } func (m *MQConn) Subs() []string { ret := make([]string, 0) m.subs.Range(func(key, value interface{}) bool { ret = append(ret, key.(string)) return true }) return ret } func (m *MQConn) Unsub(s string) { m.subs.Delete(s) //dbg("MQConn.Unsub: Subs: %s", strings.Join(m.Subs(), ",")) } func (m *MQConn) Send(msg *types.Msg) error { return m.server.Process(msg) } func (m *MQConn) Process(msg *types.Msg) error { var err error = nil m.subs.Range(func(key, value interface{}) bool { rx := value.(*regexp.Regexp) if rx.MatchString(msg.Topic) { //dbg("MQConn.Process: RX:%s matches topic: %s", rx.String(), msg.Topic) //msg.Topic = key.(string) err = m.Write(msg) } return true }) return err } func (m *MQConn) ProcessFirst(msg *types.Msg) (bool, error) { var err error = nil var ret bool = false m.subs.Range(func(key, value interface{}) bool { rx := value.(*regexp.Regexp) if rx.MatchString(msg.Topic) { //msg.Topic = key.(string) //dbg("MQConn.ProcessFirst: RX:%s matches topic: %s", rx.String(), msg.Topic) err = m.Write(msg) ret = true return false } else { //dbg("MQConn.ProcessFirst: RX:%s DOES NOT match topic: %s", rx.String(), msg.Topic) } return true }) return ret, err } func (m *MQConn) Close() { m.conn.Close() m.server.Del(m.id) } func (c *MQConn) Write(m *types.Msg) error { c.mtx.Lock() defer c.mtx.Unlock() server.stats.ActMsgs++ return c.conn.WriteJSON(m) } var upgrader = websocket.Upgrader{} var server *MQServer = NewServer() func Serve(writer http.ResponseWriter, request *http.Request) { id := random.Str(32) con, err := upgrader.Upgrade(writer, request, nil) if err != nil { http.Error(writer, err.Error(), http.StatusInternalServerError) return } mqconn := server.Put(id, con) con.SetCloseHandler(func(code int, text string) error { mqconn.Close() return nil }) mqconn.Loop() } func Stats(writer http.ResponseWriter, request *http.Request) { b := strings.Builder{} s := server.stats b.WriteString(fmt.Sprintf("Total Msgs:%v, Avg Msgs/s: %v, NReads: %v", s.ActMsgs, s.AvgMsgs, s.Nreads)) writer.Write([]byte(b.String())) }