Added stats, set test to more cores to check stressing.

master
Paulo Simão 2021-11-16 20:28:08 -03:00
parent a722ce0db4
commit 64fe630ae9
4 changed files with 104 additions and 9 deletions

View File

@ -7,7 +7,7 @@ Actual implementation allows only websockets, but later, virtually any stream ba
## Motivations ## Motivations
- CGO Free solution - CGO Free solution
- Embedable, multi platform - Embeddable, multi platform
- Little to no dependencies - Little to no dependencies
- Shareable with existing solutions (can be bound as route) - Shareable with existing solutions (can be bound as route)
- Low to no boilerplate - Low to no boilerplate

View File

@ -3,12 +3,14 @@ package main
import ( import (
"fmt" "fmt"
"log" "log"
"runtime"
"simplemq/lib/client" "simplemq/lib/client"
"simplemq/lib/types" "simplemq/lib/types"
"time" "time"
) )
func main() { func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
c, err := client.New("ws://localhost:8080/ws") c, err := client.New("ws://localhost:8080/ws")
if err != nil { if err != nil {
panic(err.Error()) panic(err.Error())
@ -21,16 +23,71 @@ func main() {
}) })
c.Sub("ab.*", func(m *types.Msg) { c.Sub("ab.*", func(m *types.Msg) {
log.Printf("Got wildcard") log.Printf("Got msg at %s: %s", m.Topic, string(m.Payload))
c.RpcReply(m, []byte(fmt.Sprintf("Got msg at %s: %s", m.Topic, string(m.Payload))))
})
c.Sub("c.*", func(m *types.Msg) {
//log.Printf("Got msg at %s: %s", m.Topic, string(m.Payload))
}) })
for { go func() {
bs, err := c.Rpc("a", []byte("A Request")) for {
if err != nil { //bs, err := c.Rpc("ab01", []byte("A msg to AB01: "+time.Now().String()))
log.Printf(err.Error()) //if err != nil {
} else { // log.Printf(err.Error())
log.Printf("Recv: %s", string(bs)) //}
//log.Printf(string(bs))
//time.Sleep(time.Millisecond * 100)
} }
time.Sleep(time.Second) }()
go func() {
for {
c.Pub("ca", []byte("A msg to CA: "+time.Now().String()))
//time.Sleep(time.Millisecond * 100)
}
}()
go func() {
for {
c.Pub("cb", []byte("A msg to CB: "+time.Now().String()))
//time.Sleep(time.Millisecond * 75)
}
}()
go func() {
for {
c.Pub("cc", []byte("A msg to CC: "+time.Now().String()))
//time.Sleep(time.Millisecond * 15)
}
}()
go func() {
for {
c.Pub("cd", []byte("A msg to CD: "+time.Now().String()))
//time.Sleep(time.Millisecond * 15)
}
}()
go func() {
for {
c.Pub("ce", []byte("A msg to CE: "+time.Now().String()))
//time.Sleep(time.Millisecond * 15)
}
}()
go func() {
for {
c.Pub("cf", []byte("A msg to CF: "+time.Now().String()))
//time.Sleep(time.Millisecond * 15)
}
}()
for {
//bs, err := c.Rpc("a", []byte("A Request"))
//if err != nil {
// log.Printf(err.Error())
//} else {
// log.Printf("Recv: %s", string(bs))
//}
time.Sleep(time.Minute)
} }
} }

View File

@ -7,6 +7,7 @@ import (
func main() { func main() {
http.HandleFunc("/ws", server.Serve) http.HandleFunc("/ws", server.Serve)
http.HandleFunc("/stats", server.Stats)
err := http.ListenAndServe(":8080", nil) err := http.ListenAndServe(":8080", nil)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -10,19 +10,46 @@ import (
"regexp" "regexp"
"simplemq/lib/random" "simplemq/lib/random"
"simplemq/lib/types" "simplemq/lib/types"
"strings"
"sync" "sync"
"time"
) )
func dbg(s string, p ...interface{}) { func dbg(s string, p ...interface{}) {
log.Printf(s, p...) log.Printf(s, p...)
} }
type MQStats struct {
ActMsgs int64
Nreads float64
AvgMsgs float64
}
func (m *MQStats) Init() {
m.Nreads = 1
go func() {
for {
m.AvgMsgs = float64(m.ActMsgs) / m.Nreads
m.Nreads++
time.Sleep(time.Second)
}
}()
}
func (m *MQStats) Inc() {
m.ActMsgs++
}
type MQServer struct { type MQServer struct {
pool sync.Map pool sync.Map
subs sync.Map subs sync.Map
once sync.Map once sync.Map
stats MQStats
} }
func (s *MQServer) Stats() MQStats {
return s.stats
}
func (s *MQServer) Get(id string) *MQConn { func (s *MQServer) Get(id string) *MQConn {
raw, ok := s.pool.Load(id) raw, ok := s.pool.Load(id)
if !ok { if !ok {
@ -91,6 +118,7 @@ func NewServer() *MQServer {
ret.pool = sync.Map{} ret.pool = sync.Map{}
ret.subs = sync.Map{} ret.subs = sync.Map{}
ret.once = sync.Map{} ret.once = sync.Map{}
ret.stats.Init()
return ret return ret
} }
@ -112,6 +140,7 @@ func (m *MQConn) Recv() error {
m.Close() m.Close()
return err return err
} }
server.stats.ActMsgs++
//dbg("MQConn.Recv:: Got new message: %s", m.id) //dbg("MQConn.Recv:: Got new message: %s", m.id)
err = json.Unmarshal(mbs, ret) err = json.Unmarshal(mbs, ret)
if err != nil { if err != nil {
@ -207,6 +236,7 @@ func (m *MQConn) Close() {
func (c *MQConn) Write(m *types.Msg) error { func (c *MQConn) Write(m *types.Msg) error {
c.mtx.Lock() c.mtx.Lock()
defer c.mtx.Unlock() defer c.mtx.Unlock()
server.stats.ActMsgs++
return c.conn.WriteJSON(m) return c.conn.WriteJSON(m)
} }
@ -228,3 +258,10 @@ func Serve(writer http.ResponseWriter, request *http.Request) {
}) })
mqconn.Loop() mqconn.Loop()
} }
func Stats(writer http.ResponseWriter, request *http.Request) {
b := strings.Builder{}
s := server.stats
b.WriteString(fmt.Sprintf("Total Msgs:%v, Avg Msgs/s: %v, NReads: %v", s.ActMsgs, s.AvgMsgs, s.Nreads))
writer.Write([]byte(b.String()))
}