master
Paulo Simão 2021-11-15 20:06:05 -03:00
commit 217325edf0
12 changed files with 653 additions and 0 deletions

1
.gitignore vendored 100644
View File

@ -0,0 +1 @@
.idea

21
cmd/recv/main.go 100644
View File

@ -0,0 +1,21 @@
package main
import (
"log"
"simplemq/lib/client"
"simplemq/lib/types"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
c.Sub("a", func(m *types.Msg) {
log.Printf(string(m.Payload))
})
for {
time.Sleep(time.Second)
}
}

36
cmd/rpccli/main.go 100644
View File

@ -0,0 +1,36 @@
package main
import (
"fmt"
"log"
"simplemq/lib/client"
"simplemq/lib/types"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
c.Sub("a", func(m *types.Msg) {
ret := fmt.Sprintf("==> Got %s at %s", string(m.Payload), time.Now().String())
log.Printf(ret)
c.RpcReply(m, []byte(ret))
})
c.Sub("ab.*", func(m *types.Msg) {
log.Printf("Got wildcard")
})
for {
bs, err := c.Rpc("a", []byte("A Request"))
if err != nil {
log.Printf(err.Error())
} else {
log.Printf("Recv: %s", string(bs))
}
time.Sleep(time.Second)
}
}

View File

@ -0,0 +1,24 @@
package main
import (
"fmt"
"log"
"simplemq/lib/client"
"simplemq/lib/types"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
c.Sub("a", func(m *types.Msg) {
ret := fmt.Sprintf("Got %s at %s", string(m.Payload), time.Now().String())
log.Printf(ret)
c.RpcReply(m, []byte(ret))
})
for {
time.Sleep(time.Second)
}
}

21
cmd/send/main.go 100644
View File

@ -0,0 +1,21 @@
package main
import (
"log"
"simplemq/lib/client"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
for {
err = c.Pub("a", []byte("A:"+time.Now().String()))
if err != nil {
log.Printf(err.Error())
}
time.Sleep(time.Second)
}
}

14
cmd/server/main.go 100644
View File

@ -0,0 +1,14 @@
package main
import (
"net/http"
"simplemq/lib/server"
)
func main() {
server.Serve()
err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)
}
}

5
go.mod 100644
View File

@ -0,0 +1,5 @@
module simplemq
go 1.17
require github.com/gorilla/websocket v1.4.2 // indirect

2
go.sum 100644
View File

@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

213
lib/client/lib.go 100644
View File

@ -0,0 +1,213 @@
package client
import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"log"
"regexp"
"simplemq/lib/random"
"simplemq/lib/types"
"sync"
"time"
)
func dbg(s string, p ...interface{}) {
log.Printf(s, p...)
}
type MQConn struct {
url string
retry bool
chRetry chan *struct{}
//chReady chan *struct{}
subs sync.Map
mtx sync.Mutex
conn *websocket.Conn
}
type MQSub struct {
Topic string
Rx *regexp.Regexp
H func(m *types.Msg)
}
func (c *MQConn) Sub(s string, h func(m *types.Msg)) error {
//dbg("MQConn.Sub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_SUB,
Topic: s,
}
rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, s))
if err != nil {
return err
}
sub := &MQSub{
Topic: s,
Rx: rx,
H: h,
}
c.subs.Store(s, sub)
return c.Write(m)
}
func (c *MQConn) UnSub(s string) error {
//dbg("MQConn.UnSub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_UNSUB,
Topic: s,
}
c.subs.Delete(s)
return c.Write(m)
}
func (c *MQConn) Write(m *types.Msg) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.conn != nil {
return c.conn.WriteJSON(m)
} else {
return errors.New("c.conn is nil")
}
}
func (c *MQConn) Pub(s string, pl []byte) error {
//dbg("MQConn.Pub:: TOPIC:%s", s)
m := &types.Msg{
Cmd: types.CMD_PUB,
Topic: s,
Payload: pl,
}
if c.conn == nil {
return errors.New("Not connected")
}
return c.Write(m)
}
func (c *MQConn) Rpc(s string, pli []byte) ([]byte, error) {
retid := "__RPCREPLY__" + random.Str(32)
lchan := make(chan *struct{})
m := &types.Msg{
Cmd: types.CMD_RPC,
Topic: s,
Payload: pli,
Id: retid,
ReplyTo: retid,
}
var ret []byte
c.Sub(retid, func(m *types.Msg) {
//dbg("MQConn.Rpc:: Got reply from TOPIC:%s", retid)
ret = m.Payload
if lchan != nil {
lchan <- &struct{}{}
}
})
defer c.UnSub(retid)
err := c.Write(m)
select {
case <-lchan:
case <-time.After(time.Second * 15):
}
close(lchan)
lchan = nil
return ret, err
}
func (c *MQConn) RpcReply(msg *types.Msg, pli []byte) error {
retid := random.Str(32)
m := &types.Msg{
Cmd: types.CMD_RPCREPLY,
Topic: msg.ReplyTo,
Payload: pli,
Id: retid,
ReplyTo: "",
}
err := c.Write(m)
return err
}
func (c *MQConn) Loop() {
nerr := 0
for c.conn != nil {
m := &types.Msg{}
if c.conn == nil {
return
}
err := c.conn.ReadJSON(m)
if err != nil {
nerr++
log.Printf("Error reading msg: %s", err.Error())
if nerr > 990 {
c.conn = nil
}
continue
}
//dbg("MQConn.Loop:: Recv Msg: ID:%s TOPIC:%s => %s", m.Id, m.Topic, string(m.Payload))
c.subs.Range(func(key, raw interface{}) bool {
sub, ok := raw.(*MQSub)
//dbg("MQConn.Loop:: Checking %s x %s", m.Topic, key.(string))
if ok && sub.Rx.MatchString(m.Topic) {
sub.H(m)
if m.Cmd == types.CMD_RPCREPLY {
return true
}
return false
}
return true
})
}
}
func (c *MQConn) Init() error {
c.retry = true
doConn := func() error {
con, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
return err
}
c.conn = con
con.SetCloseHandler(func(code int, text string) error {
c.conn = nil
return nil
})
return nil
}
err := doConn()
if err != nil {
return err
}
c.subs.Range(func(key, value interface{}) bool {
m := &types.Msg{
Cmd: types.CMD_SUB,
Topic: key.(string),
}
err := c.Write(m)
if err != nil {
log.Printf("Error re-listening: %s", err.Error())
return false
}
return true
})
return nil
}
func New(u string) (*MQConn, error) {
ret := &MQConn{
url: u,
}
err := ret.Init()
go func() {
for ret.retry {
ret.Loop()
ret.Init()
}
}()
return ret, err
}

53
lib/random/lib.go 100644
View File

@ -0,0 +1,53 @@
package random
import (
"math/rand"
"time"
"unsafe"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const upperBytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
const letterNumsBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
const letterNumsUpperBytes = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
var src = rand.NewSource(time.Now().UnixNano())
func strFromSrc(n int, srcChars string) string {
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(srcChars) {
b[i] = srcChars[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return *(*string)(unsafe.Pointer(&b))
}
func Str(n int) string {
return strFromSrc(n, letterBytes)
}
func StrUpper(n int) string {
return strFromSrc(n, upperBytes)
}
func StrLetterNum(n int) string {
return strFromSrc(n, letterNumsBytes)
}
func StrLetterNumUpper(n int) string {
return strFromSrc(n, letterNumsUpperBytes)
}

236
lib/server/lib.go 100644
View File

@ -0,0 +1,236 @@
package server
import (
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"log"
"net/http"
"regexp"
"simplemq/lib/random"
"simplemq/lib/types"
"sync"
)
func dbg(s string, p ...interface{}) {
log.Printf(s, p...)
}
type MQServer struct {
pool sync.Map
subs sync.Map
once sync.Map
}
func (s *MQServer) Get(id string) *MQConn {
raw, ok := s.pool.Load(id)
if !ok {
return nil
}
return raw.(*MQConn)
}
func (s *MQServer) Put(id string, conn *websocket.Conn) *MQConn {
mqconn := &MQConn{id: id, conn: conn, server: s}
s.pool.Store(id, mqconn)
//dbg("MQServer.Put:: Got new conn: %s", mqconn.id)
return mqconn
}
func (s *MQServer) Del(id string) {
s.pool.Delete(id)
//dbg("MQServer.Del:: Removing conn: %s", id)
}
func (s *MQServer) Process(msg *types.Msg) error {
if msg.Topic == "" {
err := "Cant proceed with empty topic!"
dbg("Src:%s Id:%s Err:%s", msg.Src, msg.Id, err)
return errors.New(err)
}
switch msg.Cmd {
case types.CMD_PUB:
//dbg("MQServer.Process:: Got new msg: %s.%s=>%s", msg.Cmd, msg.Src, msg.Topic)
s.Range(func(m *MQConn) bool {
m.Process(msg)
return true
})
case types.CMD_RPC:
//dbg("MQServer.Process:: Got new RPC: Src:%s Reply:%s Topic:%s", msg.Src, msg.ReplyTo, msg.Topic)
s.Range(func(m *MQConn) bool {
done, _ := m.ProcessFirst(msg)
return !done
})
case types.CMD_RPCREPLY:
//dbg("MQServer.Process:: Got new RPC Reply: Src:%s Topic:%s", msg.Src, msg.Topic)
s.Range(func(m *MQConn) bool {
done, _ := m.ProcessFirst(msg)
return !done
})
default:
}
return nil
}
func (s *MQServer) Range(h func(m *MQConn) bool) {
s.pool.Range(func(key, value interface{}) bool {
mq := s.Get(key.(string))
if mq != nil {
cont := h(mq)
return cont
}
return true
})
}
func NewServer() *MQServer {
ret := &MQServer{}
ret.pool = sync.Map{}
ret.subs = sync.Map{}
ret.once = sync.Map{}
return ret
}
type MQConn struct {
id string
server *MQServer
subs sync.Map
mtx sync.Mutex
conn *websocket.Conn
}
func (m *MQConn) Recv() error {
ret := &types.Msg{}
//dbg("MQConn.Recv:: Getting new message: %s", m.id)
_, mbs, err := m.conn.ReadMessage()
if err != nil {
dbg("MQConn.Recv:: Error retrieving: %s: %s", m.id, err.Error())
m.Close()
return err
}
//dbg("MQConn.Recv:: Got new message: %s", m.id)
err = json.Unmarshal(mbs, ret)
if err != nil {
return err
} else {
switch ret.Cmd {
case types.CMD_SUB:
//dbg("MQConn.Recv:: Got new sub: %s=>%s", m.id, ret.Topic)
m.Sub(ret.Topic)
case types.CMD_UNSUB:
//dbg("MQConn.Recv:: Got new unsub: %s=>%s", m.id, ret.Topic)
m.Unsub(ret.Topic)
case types.CMD_CLOSE:
//dbg("MQConn.Recv:: Got close: %s=>%s", m.id)
m.Close()
default:
ret.Src = m.id
m.server.Process(ret)
}
}
return nil
}
func (m *MQConn) Loop() {
for {
err := m.Recv()
if err != nil {
return
}
}
}
func (m *MQConn) Sub(t string) error {
rx, err := regexp.Compile(fmt.Sprintf(`^%s$`, t))
if err != nil {
return err
}
m.subs.Store(t, rx)
//dbg("MQConn.Sub: Subs: %s", strings.Join(m.Subs(), ","))
return nil
}
func (m *MQConn) Subs() []string {
ret := make([]string, 0)
m.subs.Range(func(key, value interface{}) bool {
ret = append(ret, key.(string))
return true
})
return ret
}
func (m *MQConn) Unsub(s string) {
m.subs.Delete(s)
//dbg("MQConn.Unsub: Subs: %s", strings.Join(m.Subs(), ","))
}
func (m *MQConn) Send(msg *types.Msg) error {
return m.server.Process(msg)
}
func (m *MQConn) Process(msg *types.Msg) error {
var err error = nil
m.subs.Range(func(key, value interface{}) bool {
rx := value.(*regexp.Regexp)
if rx.MatchString(msg.Topic) {
//dbg("MQConn.Process: RX:%s matches topic: %s", rx.String(), msg.Topic)
//msg.Topic = key.(string)
err = m.Write(msg)
}
return true
})
return err
}
func (m *MQConn) ProcessFirst(msg *types.Msg) (bool, error) {
var err error = nil
var ret bool = false
m.subs.Range(func(key, value interface{}) bool {
rx := value.(*regexp.Regexp)
if rx.MatchString(msg.Topic) {
//msg.Topic = key.(string)
//dbg("MQConn.ProcessFirst: RX:%s matches topic: %s", rx.String(), msg.Topic)
err = m.Write(msg)
ret = true
return false
} else {
//dbg("MQConn.ProcessFirst: RX:%s DOES NOT match topic: %s", rx.String(), msg.Topic)
}
return true
})
return ret, err
}
func (m *MQConn) Close() {
m.conn.Close()
m.server.Del(m.id)
}
func (c *MQConn) Write(m *types.Msg) error {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.conn.WriteJSON(m)
}
func Serve() {
var upgrader = websocket.Upgrader{}
server := NewServer()
http.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) {
id := random.Str(32)
con, err := upgrader.Upgrade(writer, request, nil)
if err != nil {
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
}
mqconn := server.Put(id, con)
con.SetCloseHandler(func(code int, text string) error {
mqconn.Close()
return nil
})
mqconn.Loop()
})
http.HandleFunc("/stats", func(writer http.ResponseWriter, request *http.Request) {
//server.Range(func(m *MQConn) bool {
// m.
// return true
//})
})
}

27
lib/types/lib.go 100644
View File

@ -0,0 +1,27 @@
package types
import "encoding/json"
const (
CMD_SUB = "sub"
CMD_UNSUB = "unsub"
CMD_PUB = "pub"
CMD_RPC = "rpc"
CMD_RPCREPLY = "rpcreply"
CMD_CLOSE = "close"
)
type Msg struct {
Cmd string `json:"cmd,omitempty"`
Id string `json:"id,omitempty"`
ReplyTo string `json:"reply_to,omitempty"`
Topic string `json:"topic,omitempty"`
Src string `json:"src,omitempty"`
Header map[string]string `json:"header,omitempty"`
Payload []byte `json:"payload,omitempty"`
}
func (m *Msg) Bytes() []byte {
bs, _ := json.Marshal(m)
return bs
}