Simple and Flexible Message Queue
Go to file
Paulo Simão 6bd5a3a5ac improvements at NewConn - now w opts, more flexible 2021-11-17 06:14:14 -03:00
cmd pkg reorg 2021-11-16 21:26:51 -03:00
lib pkg reorg 2021-11-16 21:26:51 -03:00
smqcli improvements at NewConn - now w opts, more flexible 2021-11-17 06:14:14 -03:00
smqsvr pkg reorg 2021-11-16 21:26:51 -03:00
.gitignore 1st ver 2021-11-15 20:06:05 -03:00
Readme.md Added stats, set test to more cores to check stressing. 2021-11-16 20:28:08 -03:00
go.mod fix go.mod 2021-11-16 21:19:51 -03:00
go.sum 1st ver 2021-11-15 20:06:05 -03:00

Readme.md

Simple MQ

A simple, but embeddable message broker.

Actual implementation allows only websockets, but later, virtually any stream based conn would do it.

Motivations

  • CGO Free solution
  • Embeddable, multi platform
  • Little to no dependencies
  • Shareable with existing solutions (can be bound as route)
  • Low to no boilerplate
  • As little overhead as possible
  • No limitations for message size and possibility of streaming

Installation

go get go.digitalcircle.com.br/open/simplemq

(Did you seriously expect something different?)

Simple Server Impl

package main

import (
	"net/http"
	"simplemq/lib/server"
)

func main() {
	http.HandleFunc("/ws", server.Serve) //Bind our server to /ws
	err := http.ListenAndServe(":8080", nil)
	if err != nil {
		panic(err)
	}
}

Simple RPC Example (Emitter and Responder in the same code)

package main

import (
	"fmt"
	"log"
	"simplemq/lib/client"
	"simplemq/lib/types"
	"time"
)

func main() {
	c, err := client.New("ws://localhost:8080/ws")
	if err != nil {
		panic(err.Error())
	}

	c.Sub("a", func(m *types.Msg) {
		ret := fmt.Sprintf("==> Got %s at %s", string(m.Payload), time.Now().String())
		log.Printf(ret)
		c.RpcReply(m, []byte(ret))//Important - otherwise RPC call will wait until Timeout (15secs - hardcoded at this stage)
	})
	
	for {
		bs, err := c.Rpc("a", []byte("A Request"))
		if err != nil {
			log.Printf(err.Error())
		} else {
			log.Printf("Recv: %s", string(bs))
		}
		time.Sleep(time.Second)
	}
}

Simple Pub Sub

Sender

package main

import (
	"log"
	"simplemq/lib/client"
	"time"
)

func main() {
	c, err := client.New("ws://localhost:8080/ws")
	if err != nil {
		panic(err.Error())
	}
	for {
		err = c.Pub("a", []byte("A:"+time.Now().String()))
		if err != nil {
			log.Printf(err.Error())
		}
		time.Sleep(time.Second)
	}
}

Receiver

package main

import (
	"log"
	"simplemq/lib/client"
	"simplemq/lib/types"
	"time"
)

func main() {
	c, err := client.New("ws://localhost:8080/ws")
	if err != nil {
		panic(err.Error())
	}
	c.Sub("a", func(m *types.Msg) {
		log.Printf(string(m.Payload))
	})
	for {
		time.Sleep(time.Second)
	}
}

TODO

  • 1 - Improve docs
  • 2 - Improve parametrization
  • 3 - Improve test coverage
  • 4 - Plenty of space for performance improvements
  • 5 - Add clustering capabilities
  • 6 - You name it - send proposals to: paulo@digitalcircle.com.br