From a722ce0db457792b31cd1a15771802d8c8d27e1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sima=CC=83o?= Date: Mon, 15 Nov 2021 20:23:10 -0300 Subject: [PATCH] Docs --- Readme.md | 143 +++++++++++++++++++++++++++++++++++++++++++++ cmd/server/main.go | 2 +- lib/server/lib.go | 38 +++++------- 3 files changed, 160 insertions(+), 23 deletions(-) create mode 100644 Readme.md diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..1a89511 --- /dev/null +++ b/Readme.md @@ -0,0 +1,143 @@ +# Simple MQ + +A simple, but embeddable message broker. + +Actual implementation allows only websockets, but later, virtually any stream based conn would do it. + +## Motivations + +- CGO Free solution +- Embedable, multi platform +- Little to no dependencies +- Shareable with existing solutions (can be bound as route) +- Low to no boilerplate +- As little overhead as possible +- No limitations for message size and possibility of streaming + +## Installation + +`go get go.digitalcircle.com.br/open/simplemq` + +(Did you seriously expect something different?) + +## Simple Server Impl + +```go +package main + +import ( + "net/http" + "simplemq/lib/server" +) + +func main() { + http.HandleFunc("/ws", server.Serve) //Bind our server to /ws + err := http.ListenAndServe(":8080", nil) + if err != nil { + panic(err) + } +} +``` + +## Simple RPC Example (Emitter and Responder in the same code) + +```go +package main + +import ( + "fmt" + "log" + "simplemq/lib/client" + "simplemq/lib/types" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + + c.Sub("a", func(m *types.Msg) { + ret := fmt.Sprintf("==> Got %s at %s", string(m.Payload), time.Now().String()) + log.Printf(ret) + c.RpcReply(m, []byte(ret))//Important - otherwise RPC call will wait until Timeout (15secs - hardcoded at this stage) + }) + + for { + bs, err := c.Rpc("a", []byte("A Request")) + if err != nil { + log.Printf(err.Error()) + } else { + log.Printf("Recv: %s", string(bs)) + } + time.Sleep(time.Second) + } +} + +``` + +## Simple Pub Sub + +### Sender + +```go +package main + +import ( + "log" + "simplemq/lib/client" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + for { + err = c.Pub("a", []byte("A:"+time.Now().String())) + if err != nil { + log.Printf(err.Error()) + } + time.Sleep(time.Second) + } +} + +``` + +### Receiver + +```go +package main + +import ( + "log" + "simplemq/lib/client" + "simplemq/lib/types" + "time" +) + +func main() { + c, err := client.New("ws://localhost:8080/ws") + if err != nil { + panic(err.Error()) + } + c.Sub("a", func(m *types.Msg) { + log.Printf(string(m.Payload)) + }) + for { + time.Sleep(time.Second) + } +} + +``` + +## TODO + + - 1 - Improve docs + - 2 - Improve parametrization + - 3 - Improve test coverage + - 4 - Plenty of space for performance improvements + - 5 - Add clustering capabilities + - 6 - You name it - send proposals to: paulo@digitalcircle.com.br \ No newline at end of file diff --git a/cmd/server/main.go b/cmd/server/main.go index 62c6861..128f62e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,7 +6,7 @@ import ( ) func main() { - server.Serve() + http.HandleFunc("/ws", server.Serve) err := http.ListenAndServe(":8080", nil) if err != nil { panic(err) diff --git a/lib/server/lib.go b/lib/server/lib.go index 7691052..3201f2d 100644 --- a/lib/server/lib.go +++ b/lib/server/lib.go @@ -210,27 +210,21 @@ func (c *MQConn) Write(m *types.Msg) error { return c.conn.WriteJSON(m) } -func Serve() { - var upgrader = websocket.Upgrader{} - server := NewServer() - http.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) { - id := random.Str(32) - con, err := upgrader.Upgrade(writer, request, nil) - if err != nil { - http.Error(writer, err.Error(), http.StatusInternalServerError) - return - } - mqconn := server.Put(id, con) - con.SetCloseHandler(func(code int, text string) error { - mqconn.Close() - return nil - }) - mqconn.Loop() - }) - http.HandleFunc("/stats", func(writer http.ResponseWriter, request *http.Request) { - //server.Range(func(m *MQConn) bool { - // m. - // return true - //}) +var upgrader = websocket.Upgrader{} +var server *MQServer = NewServer() + +func Serve(writer http.ResponseWriter, request *http.Request) { + + id := random.Str(32) + con, err := upgrader.Upgrade(writer, request, nil) + if err != nil { + http.Error(writer, err.Error(), http.StatusInternalServerError) + return + } + mqconn := server.Put(id, con) + con.SetCloseHandler(func(code int, text string) error { + mqconn.Close() + return nil }) + mqconn.Loop() }