master
Paulo Simão 2021-11-15 20:23:10 -03:00
parent 217325edf0
commit a722ce0db4
3 changed files with 160 additions and 23 deletions

143
Readme.md 100644
View File

@ -0,0 +1,143 @@
# Simple MQ
A simple, but embeddable message broker.
Actual implementation allows only websockets, but later, virtually any stream based conn would do it.
## Motivations
- CGO Free solution
- Embedable, multi platform
- Little to no dependencies
- Shareable with existing solutions (can be bound as route)
- Low to no boilerplate
- As little overhead as possible
- No limitations for message size and possibility of streaming
## Installation
`go get go.digitalcircle.com.br/open/simplemq`
(Did you seriously expect something different?)
## Simple Server Impl
```go
package main
import (
"net/http"
"simplemq/lib/server"
)
func main() {
http.HandleFunc("/ws", server.Serve) //Bind our server to /ws
err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)
}
}
```
## Simple RPC Example (Emitter and Responder in the same code)
```go
package main
import (
"fmt"
"log"
"simplemq/lib/client"
"simplemq/lib/types"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
c.Sub("a", func(m *types.Msg) {
ret := fmt.Sprintf("==> Got %s at %s", string(m.Payload), time.Now().String())
log.Printf(ret)
c.RpcReply(m, []byte(ret))//Important - otherwise RPC call will wait until Timeout (15secs - hardcoded at this stage)
})
for {
bs, err := c.Rpc("a", []byte("A Request"))
if err != nil {
log.Printf(err.Error())
} else {
log.Printf("Recv: %s", string(bs))
}
time.Sleep(time.Second)
}
}
```
## Simple Pub Sub
### Sender
```go
package main
import (
"log"
"simplemq/lib/client"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
for {
err = c.Pub("a", []byte("A:"+time.Now().String()))
if err != nil {
log.Printf(err.Error())
}
time.Sleep(time.Second)
}
}
```
### Receiver
```go
package main
import (
"log"
"simplemq/lib/client"
"simplemq/lib/types"
"time"
)
func main() {
c, err := client.New("ws://localhost:8080/ws")
if err != nil {
panic(err.Error())
}
c.Sub("a", func(m *types.Msg) {
log.Printf(string(m.Payload))
})
for {
time.Sleep(time.Second)
}
}
```
## TODO
- 1 - Improve docs
- 2 - Improve parametrization
- 3 - Improve test coverage
- 4 - Plenty of space for performance improvements
- 5 - Add clustering capabilities
- 6 - You name it - send proposals to: paulo@digitalcircle.com.br

View File

@ -6,7 +6,7 @@ import (
)
func main() {
server.Serve()
http.HandleFunc("/ws", server.Serve)
err := http.ListenAndServe(":8080", nil)
if err != nil {
panic(err)

View File

@ -210,27 +210,21 @@ func (c *MQConn) Write(m *types.Msg) error {
return c.conn.WriteJSON(m)
}
func Serve() {
var upgrader = websocket.Upgrader{}
server := NewServer()
http.HandleFunc("/ws", func(writer http.ResponseWriter, request *http.Request) {
id := random.Str(32)
con, err := upgrader.Upgrade(writer, request, nil)
if err != nil {
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
}
mqconn := server.Put(id, con)
con.SetCloseHandler(func(code int, text string) error {
mqconn.Close()
return nil
})
mqconn.Loop()
})
http.HandleFunc("/stats", func(writer http.ResponseWriter, request *http.Request) {
//server.Range(func(m *MQConn) bool {
// m.
// return true
//})
var upgrader = websocket.Upgrader{}
var server *MQServer = NewServer()
func Serve(writer http.ResponseWriter, request *http.Request) {
id := random.Str(32)
con, err := upgrader.Upgrade(writer, request, nil)
if err != nil {
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
}
mqconn := server.Put(id, con)
con.SetCloseHandler(func(code int, text string) error {
mqconn.Close()
return nil
})
mqconn.Loop()
}