From 64fe630ae9b48e23232097ae493ae82c89ce6657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sima=CC=83o?= Date: Tue, 16 Nov 2021 20:28:08 -0300 Subject: [PATCH] Added stats, set test to more cores to check stressing. --- Readme.md | 2 +- cmd/rpccli/main.go | 73 +++++++++++++++++++++++++++++++++++++++++----- cmd/server/main.go | 1 + lib/server/lib.go | 37 +++++++++++++++++++++++ 4 files changed, 104 insertions(+), 9 deletions(-) diff --git a/Readme.md b/Readme.md index 1a89511..b02a39e 100644 --- a/Readme.md +++ b/Readme.md @@ -7,7 +7,7 @@ Actual implementation allows only websockets, but later, virtually any stream ba ## Motivations - CGO Free solution -- Embedable, multi platform +- Embeddable, multi platform - Little to no dependencies - Shareable with existing solutions (can be bound as route) - Low to no boilerplate diff --git a/cmd/rpccli/main.go b/cmd/rpccli/main.go index 6bf75de..3a778a1 100644 --- a/cmd/rpccli/main.go +++ b/cmd/rpccli/main.go @@ -3,12 +3,14 @@ package main import ( "fmt" "log" + "runtime" "simplemq/lib/client" "simplemq/lib/types" "time" ) func main() { + runtime.GOMAXPROCS(runtime.NumCPU()) c, err := client.New("ws://localhost:8080/ws") if err != nil { panic(err.Error()) @@ -21,16 +23,71 @@ func main() { }) c.Sub("ab.*", func(m *types.Msg) { - log.Printf("Got wildcard") + log.Printf("Got msg at %s: %s", m.Topic, string(m.Payload)) + c.RpcReply(m, []byte(fmt.Sprintf("Got msg at %s: %s", m.Topic, string(m.Payload)))) + }) + c.Sub("c.*", func(m *types.Msg) { + //log.Printf("Got msg at %s: %s", m.Topic, string(m.Payload)) }) - for { - bs, err := c.Rpc("a", []byte("A Request")) - if err != nil { - log.Printf(err.Error()) - } else { - log.Printf("Recv: %s", string(bs)) + go func() { + for { + //bs, err := c.Rpc("ab01", []byte("A msg to AB01: "+time.Now().String())) + //if err != nil { + // log.Printf(err.Error()) + //} + //log.Printf(string(bs)) + //time.Sleep(time.Millisecond * 100) } - time.Sleep(time.Second) + }() + + go func() { + for { + c.Pub("ca", []byte("A msg to CA: "+time.Now().String())) + //time.Sleep(time.Millisecond * 100) + } + }() + + go func() { + for { + c.Pub("cb", []byte("A msg to CB: "+time.Now().String())) + //time.Sleep(time.Millisecond * 75) + } + }() + + go func() { + for { + c.Pub("cc", []byte("A msg to CC: "+time.Now().String())) + //time.Sleep(time.Millisecond * 15) + } + }() + + go func() { + for { + c.Pub("cd", []byte("A msg to CD: "+time.Now().String())) + //time.Sleep(time.Millisecond * 15) + } + }() + go func() { + for { + c.Pub("ce", []byte("A msg to CE: "+time.Now().String())) + //time.Sleep(time.Millisecond * 15) + } + }() + go func() { + for { + c.Pub("cf", []byte("A msg to CF: "+time.Now().String())) + //time.Sleep(time.Millisecond * 15) + } + }() + + for { + //bs, err := c.Rpc("a", []byte("A Request")) + //if err != nil { + // log.Printf(err.Error()) + //} else { + // log.Printf("Recv: %s", string(bs)) + //} + time.Sleep(time.Minute) } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 128f62e..7b02f44 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,6 +7,7 @@ import ( func main() { http.HandleFunc("/ws", server.Serve) + http.HandleFunc("/stats", server.Stats) err := http.ListenAndServe(":8080", nil) if err != nil { panic(err) diff --git a/lib/server/lib.go b/lib/server/lib.go index 3201f2d..86c8aef 100644 --- a/lib/server/lib.go +++ b/lib/server/lib.go @@ -10,19 +10,46 @@ import ( "regexp" "simplemq/lib/random" "simplemq/lib/types" + "strings" "sync" + "time" ) func dbg(s string, p ...interface{}) { log.Printf(s, p...) } +type MQStats struct { + ActMsgs int64 + Nreads float64 + AvgMsgs float64 +} + +func (m *MQStats) Init() { + m.Nreads = 1 + go func() { + for { + m.AvgMsgs = float64(m.ActMsgs) / m.Nreads + m.Nreads++ + time.Sleep(time.Second) + } + }() +} +func (m *MQStats) Inc() { + m.ActMsgs++ +} + type MQServer struct { pool sync.Map subs sync.Map once sync.Map + + stats MQStats } +func (s *MQServer) Stats() MQStats { + return s.stats +} func (s *MQServer) Get(id string) *MQConn { raw, ok := s.pool.Load(id) if !ok { @@ -91,6 +118,7 @@ func NewServer() *MQServer { ret.pool = sync.Map{} ret.subs = sync.Map{} ret.once = sync.Map{} + ret.stats.Init() return ret } @@ -112,6 +140,7 @@ func (m *MQConn) Recv() error { m.Close() return err } + server.stats.ActMsgs++ //dbg("MQConn.Recv:: Got new message: %s", m.id) err = json.Unmarshal(mbs, ret) if err != nil { @@ -207,6 +236,7 @@ func (m *MQConn) Close() { func (c *MQConn) Write(m *types.Msg) error { c.mtx.Lock() defer c.mtx.Unlock() + server.stats.ActMsgs++ return c.conn.WriteJSON(m) } @@ -228,3 +258,10 @@ func Serve(writer http.ResponseWriter, request *http.Request) { }) mqconn.Loop() } + +func Stats(writer http.ResponseWriter, request *http.Request) { + b := strings.Builder{} + s := server.stats + b.WriteString(fmt.Sprintf("Total Msgs:%v, Avg Msgs/s: %v, NReads: %v", s.ActMsgs, s.AvgMsgs, s.Nreads)) + writer.Write([]byte(b.String())) +}