commit 217325edf0886ba494f7a7d490b3afdd41dcf3c7 Author: Paulo Simão Date: Mon Nov 15 20:06:05 2021 -0300 1st ver diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..485dee6 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea diff --git a/cmd/recv/main.go b/cmd/recv/main.go new file mode 100644 index 0000000..f7999ed --- /dev/null +++ b/cmd/recv/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "log" + "simplemq/lib/client" + "simplemq/lib/types" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + c.Sub("a", func(m *types.Msg) { + log.Printf(string(m.Payload)) + }) + for { + time.Sleep(time.Second) + } +} diff --git a/cmd/rpccli/main.go b/cmd/rpccli/main.go new file mode 100644 index 0000000..6bf75de --- /dev/null +++ b/cmd/rpccli/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "fmt" + "log" + "simplemq/lib/client" + "simplemq/lib/types" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + + c.Sub("a", func(m *types.Msg) { + ret := fmt.Sprintf("==> Got %s at %s", string(m.Payload), time.Now().String()) + log.Printf(ret) + c.RpcReply(m, []byte(ret)) + }) + + c.Sub("ab.*", func(m *types.Msg) { + log.Printf("Got wildcard") + }) + + for { + bs, err := c.Rpc("a", []byte("A Request")) + if err != nil { + log.Printf(err.Error()) + } else { + log.Printf("Recv: %s", string(bs)) + } + time.Sleep(time.Second) + } +} diff --git a/cmd/rpcserver/main.go b/cmd/rpcserver/main.go new file mode 100644 index 0000000..cec89ed --- /dev/null +++ b/cmd/rpcserver/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "fmt" + "log" + "simplemq/lib/client" + "simplemq/lib/types" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + c.Sub("a", func(m *types.Msg) { + ret := fmt.Sprintf("Got %s at %s", string(m.Payload), time.Now().String()) + log.Printf(ret) + c.RpcReply(m, []byte(ret)) + }) + for { + time.Sleep(time.Second) + } +} diff --git a/cmd/send/main.go b/cmd/send/main.go new file mode 100644 index 0000000..a73d171 --- /dev/null +++ b/cmd/send/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "log" + "simplemq/lib/client" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + for { + err = c.Pub("a", []byte("A:"+time.Now().String())) + if err != nil { + log.Printf(err.Error()) + } + time.Sleep(time.Second) + } +} diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..62c6861 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "net/http" + "simplemq/lib/server" +) + +func main() { + server.Serve() + err := http.ListenAndServe(":8080", nil) + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..53b05f8 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module simplemq + +go 1.17 + +require github.com/gorilla/websocket v1.4.2 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..85efffd --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/lib/client/lib.go b/lib/client/lib.go new file mode 100644 index 0000000..18275c5 --- /dev/null +++ b/lib/client/lib.go @@ -0,0 +1,213 @@ +package client + +import ( + "errors" + "fmt" + "github.com/gorilla/websocket" + "log" + "regexp" + "simplemq/lib/random" + "simplemq/lib/types" + "sync" + "time" +) + +func dbg(s string, p ...interface{}) { + log.Printf(s, p...) +} + +type MQConn struct { + url string + retry bool + chRetry chan *struct{} + //chReady chan *struct{} + subs sync.Map + + mtx sync.Mutex + conn *websocket.Conn +} + +type MQSub struct { + Topic string + Rx *regexp.Regexp + H func(m *types.Msg) +} + +func (c *MQConn) Sub(s string, h func(m *types.Msg)) error { + //dbg("MQConn.Sub:: TOPIC:%s", s) + m := &types.Msg{ + Cmd: types.CMD_SUB, + Topic: s, + } + rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, s)) + if err != nil { + return err + } + sub := &MQSub{ + Topic: s, + Rx: rx, + H: h, + } + c.subs.Store(s, sub) + return c.Write(m) +} +func (c *MQConn) UnSub(s string) error { + //dbg("MQConn.UnSub:: TOPIC:%s", s) + m := &types.Msg{ + Cmd: types.CMD_UNSUB, + Topic: s, + } + c.subs.Delete(s) + return c.Write(m) +} + +func (c *MQConn) Write(m *types.Msg) error { + c.mtx.Lock() + defer c.mtx.Unlock() + if c.conn != nil { + return c.conn.WriteJSON(m) + } else { + return errors.New("c.conn is nil") + } +} + +func (c *MQConn) Pub(s string, pl []byte) error { + //dbg("MQConn.Pub:: TOPIC:%s", s) + m := &types.Msg{ + Cmd: types.CMD_PUB, + Topic: s, + Payload: pl, + } + if c.conn == nil { + return errors.New("Not connected") + } + return c.Write(m) +} +func (c *MQConn) Rpc(s string, pli []byte) ([]byte, error) { + retid := "__RPCREPLY__" + random.Str(32) + lchan := make(chan *struct{}) + m := &types.Msg{ + Cmd: types.CMD_RPC, + Topic: s, + Payload: pli, + Id: retid, + ReplyTo: retid, + } + var ret []byte + c.Sub(retid, func(m *types.Msg) { + //dbg("MQConn.Rpc:: Got reply from TOPIC:%s", retid) + ret = m.Payload + if lchan != nil { + lchan <- &struct{}{} + } + }) + defer c.UnSub(retid) + err := c.Write(m) + select { + case <-lchan: + case <-time.After(time.Second * 15): + } + close(lchan) + lchan = nil + return ret, err + +} +func (c *MQConn) RpcReply(msg *types.Msg, pli []byte) error { + retid := random.Str(32) + m := &types.Msg{ + Cmd: types.CMD_RPCREPLY, + Topic: msg.ReplyTo, + Payload: pli, + Id: retid, + ReplyTo: "", + } + err := c.Write(m) + return err +} + +func (c *MQConn) Loop() { + nerr := 0 + for c.conn != nil { + m := &types.Msg{} + if c.conn == nil { + return + } + err := c.conn.ReadJSON(m) + if err != nil { + nerr++ + log.Printf("Error reading msg: %s", err.Error()) + if nerr > 990 { + c.conn = nil + } + continue + } + //dbg("MQConn.Loop:: Recv Msg: ID:%s TOPIC:%s => %s", m.Id, m.Topic, string(m.Payload)) + c.subs.Range(func(key, raw interface{}) bool { + + sub, ok := raw.(*MQSub) + + //dbg("MQConn.Loop:: Checking %s x %s", m.Topic, key.(string)) + if ok && sub.Rx.MatchString(m.Topic) { + sub.H(m) + if m.Cmd == types.CMD_RPCREPLY { + return true + } + return false + } + return true + }) + } + +} +func (c *MQConn) Init() error { + c.retry = true + + doConn := func() error { + con, _, err := websocket.DefaultDialer.Dial(c.url, nil) + if err != nil { + return err + } + c.conn = con + con.SetCloseHandler(func(code int, text string) error { + c.conn = nil + return nil + }) + return nil + } + + err := doConn() + + if err != nil { + return err + } + + c.subs.Range(func(key, value interface{}) bool { + m := &types.Msg{ + Cmd: types.CMD_SUB, + Topic: key.(string), + } + + err := c.Write(m) + if err != nil { + log.Printf("Error re-listening: %s", err.Error()) + return false + } + return true + }) + + return nil +} + +func New(u string) (*MQConn, error) { + ret := &MQConn{ + url: u, + } + err := ret.Init() + go func() { + for ret.retry { + ret.Loop() + ret.Init() + } + }() + return ret, err +} diff --git a/lib/random/lib.go b/lib/random/lib.go new file mode 100644 index 0000000..a773b99 --- /dev/null +++ b/lib/random/lib.go @@ -0,0 +1,53 @@ +package random + +import ( + "math/rand" + "time" + "unsafe" +) + +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +const upperBytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" +const letterNumsBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +const letterNumsUpperBytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + +const ( + letterIdxBits = 6 // 6 bits to represent a letter index + letterIdxMask = 1<= 0; { + if remain == 0 { + cache, remain = src.Int63(), letterIdxMax + } + if idx := int(cache & letterIdxMask); idx < len(srcChars) { + b[i] = srcChars[idx] + i-- + } + cache >>= letterIdxBits + remain-- + } + + return *(*string)(unsafe.Pointer(&b)) +} + +func Str(n int) string { + return strFromSrc(n, letterBytes) +} + +func StrUpper(n int) string { + return strFromSrc(n, upperBytes) +} + +func StrLetterNum(n int) string { + return strFromSrc(n, letterNumsBytes) +} +func StrLetterNumUpper(n int) string { + return strFromSrc(n, letterNumsUpperBytes) +} diff --git a/lib/server/lib.go b/lib/server/lib.go new file mode 100644 index 0000000..7691052 --- /dev/null +++ b/lib/server/lib.go @@ -0,0 +1,236 @@ +package server + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/gorilla/websocket" + "log" + "net/http" + "regexp" + "simplemq/lib/random" + "simplemq/lib/types" + "sync" +) + +func dbg(s string, p ...interface{}) { + log.Printf(s, p...) +} + +type MQServer struct { + pool sync.Map + subs sync.Map + once sync.Map +} + +func (s *MQServer) Get(id string) *MQConn { + raw, ok := s.pool.Load(id) + if !ok { + return nil + } + return raw.(*MQConn) +} +func (s *MQServer) Put(id string, conn *websocket.Conn) *MQConn { + mqconn := &MQConn{id: id, conn: conn, server: s} + s.pool.Store(id, mqconn) + //dbg("MQServer.Put:: Got new conn: %s", mqconn.id) + return mqconn +} +func (s *MQServer) Del(id string) { + s.pool.Delete(id) + //dbg("MQServer.Del:: Removing conn: %s", id) +} +func (s *MQServer) Process(msg *types.Msg) error { + if msg.Topic == "" { + err := "Cant proceed with empty topic!" + dbg("Src:%s Id:%s Err:%s", msg.Src, msg.Id, err) + return errors.New(err) + } + switch msg.Cmd { + case types.CMD_PUB: + //dbg("MQServer.Process:: Got new msg: %s.%s=>%s", msg.Cmd, msg.Src, msg.Topic) + s.Range(func(m *MQConn) bool { + m.Process(msg) + return true + }) + case types.CMD_RPC: + + //dbg("MQServer.Process:: Got new RPC: Src:%s Reply:%s Topic:%s", msg.Src, msg.ReplyTo, msg.Topic) + s.Range(func(m *MQConn) bool { + done, _ := m.ProcessFirst(msg) + return !done + }) + + case types.CMD_RPCREPLY: + //dbg("MQServer.Process:: Got new RPC Reply: Src:%s Topic:%s", msg.Src, msg.Topic) + + s.Range(func(m *MQConn) bool { + done, _ := m.ProcessFirst(msg) + return !done + }) + + default: + + } + + return nil +} +func (s *MQServer) Range(h func(m *MQConn) bool) { + s.pool.Range(func(key, value interface{}) bool { + mq := s.Get(key.(string)) + if mq != nil { + cont := h(mq) + return cont + } + return true + }) +} + +func NewServer() *MQServer { + ret := &MQServer{} + ret.pool = sync.Map{} + ret.subs = sync.Map{} + ret.once = sync.Map{} + return ret +} + +type MQConn struct { + id string + server *MQServer + subs sync.Map + + mtx sync.Mutex + conn *websocket.Conn +} + +func (m *MQConn) Recv() error { + ret := &types.Msg{} + //dbg("MQConn.Recv:: Getting new message: %s", m.id) + _, mbs, err := m.conn.ReadMessage() + if err != nil { + dbg("MQConn.Recv:: Error retrieving: %s: %s", m.id, err.Error()) + m.Close() + return err + } + //dbg("MQConn.Recv:: Got new message: %s", m.id) + err = json.Unmarshal(mbs, ret) + if err != nil { + return err + } else { + switch ret.Cmd { + case types.CMD_SUB: + //dbg("MQConn.Recv:: Got new sub: %s=>%s", m.id, ret.Topic) + m.Sub(ret.Topic) + case types.CMD_UNSUB: + //dbg("MQConn.Recv:: Got new unsub: %s=>%s", m.id, ret.Topic) + m.Unsub(ret.Topic) + case types.CMD_CLOSE: + //dbg("MQConn.Recv:: Got close: %s=>%s", m.id) + m.Close() + default: + ret.Src = m.id + m.server.Process(ret) + } + + } + return nil + +} +func (m *MQConn) Loop() { + for { + err := m.Recv() + if err != nil { + return + } + } +} +func (m *MQConn) Sub(t string) error { + + rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, t)) + if err != nil { + return err + } + m.subs.Store(t, rx) + //dbg("MQConn.Sub: Subs: %s", strings.Join(m.Subs(), ",")) + return nil +} +func (m *MQConn) Subs() []string { + ret := make([]string, 0) + m.subs.Range(func(key, value interface{}) bool { + ret = append(ret, key.(string)) + return true + }) + return ret +} +func (m *MQConn) Unsub(s string) { + m.subs.Delete(s) + //dbg("MQConn.Unsub: Subs: %s", strings.Join(m.Subs(), ",")) +} +func (m *MQConn) Send(msg *types.Msg) error { + return m.server.Process(msg) +} +func (m *MQConn) Process(msg *types.Msg) error { + var err error = nil + m.subs.Range(func(key, value interface{}) bool { + rx := value.(*regexp.Regexp) + if rx.MatchString(msg.Topic) { + //dbg("MQConn.Process: RX:%s matches topic: %s", rx.String(), msg.Topic) + //msg.Topic = key.(string) + err = m.Write(msg) + } + return true + }) + return err +} +func (m *MQConn) ProcessFirst(msg *types.Msg) (bool, error) { + var err error = nil + var ret bool = false + m.subs.Range(func(key, value interface{}) bool { + rx := value.(*regexp.Regexp) + if rx.MatchString(msg.Topic) { + //msg.Topic = key.(string) + //dbg("MQConn.ProcessFirst: RX:%s matches topic: %s", rx.String(), msg.Topic) + err = m.Write(msg) + ret = true + return false + } else { + //dbg("MQConn.ProcessFirst: RX:%s DOES NOT match topic: %s", rx.String(), msg.Topic) + } + return true + }) + return ret, err +} +func (m *MQConn) Close() { + m.conn.Close() + m.server.Del(m.id) +} +func (c *MQConn) Write(m *types.Msg) error { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.conn.WriteJSON(m) +} + +func Serve() { + var upgrader = websocket.Upgrader{} + server := NewServer() + http.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) { + id := random.Str(32) + con, err := upgrader.Upgrade(writer, request, nil) + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + mqconn := server.Put(id, con) + con.SetCloseHandler(func(code int, text string) error { + mqconn.Close() + return nil + }) + mqconn.Loop() + }) + http.HandleFunc("/stats", func(writer http.ResponseWriter, request *http.Request) { + //server.Range(func(m *MQConn) bool { + // m. + // return true + //}) + }) +} diff --git a/lib/types/lib.go b/lib/types/lib.go new file mode 100644 index 0000000..91daac0 --- /dev/null +++ b/lib/types/lib.go @@ -0,0 +1,27 @@ +package types + +import "encoding/json" + +const ( + CMD_SUB = "sub" + CMD_UNSUB = "unsub" + CMD_PUB = "pub" + CMD_RPC = "rpc" + CMD_RPCREPLY = "rpcreply" + CMD_CLOSE = "close" +) + +type Msg struct { + Cmd string `json:"cmd,omitempty"` + Id string `json:"id,omitempty"` + ReplyTo string `json:"reply_to,omitempty"` + Topic string `json:"topic,omitempty"` + Src string `json:"src,omitempty"` + Header map[string]string `json:"header,omitempty"` + Payload []byte `json:"payload,omitempty"` +} + +func (m *Msg) Bytes() []byte { + bs, _ := json.Marshal(m) + return bs +}