replaycli-go/svc/lib.go

155 lines
2.9 KiB
Go
Raw Permalink Normal View History

2021-11-11 14:07:20 +00:00
package svc
import (
"encoding/json"
"errors"
"fmt"
"github.com/nats-io/nats.go"
"log"
"os"
"reflect"
"strings"
"time"
"unicode"
)
var nc *nats.Conn
var compname string
func Init(s string) error {
var err error
compname = s
var apikey string
if nc == nil {
apikey = os.Getenv("REPLAY_APIKEY")
if apikey == "" {
log.Printf("Connecting to NATS Server w/o TK.")
nc, err = nats.Connect(nats.DefaultURL, nats.Name(compname))
} else {
log.Printf("Connecting to NATS Server w TK: " + apikey)
nc, err = nats.Connect(nats.DefaultURL, nats.Name(compname), nats.Token(apikey))
}
if err != nil {
return err
}
}
return err
}
func Close() error {
err := nc.Drain()
chEnd <- struct{}{}
return err
}
var chEnd chan struct{}
func Serve() {
chEnd = make(chan struct{})
<-chEnd
}
func NormalizeName(s string) string {
sb := strings.Builder{}
for i, v := range s {
if unicode.IsUpper(v) && i != 0 {
sb.WriteString(".")
}
sb.WriteRune(unicode.ToLower(v))
}
return sb.String()
}
func Register(h interface{}) {
var err error
tp := reflect.TypeOf(h)
for i := 0; i < tp.NumMethod(); i++ {
mtd := tp.Method(i)
mtdt := mtd.Type
intp := mtdt.In(1)
isPtr := false
if intp.Kind() == reflect.Ptr {
isPtr = true
intp = intp.Elem()
}
mn := NormalizeName(compname) + "." + NormalizeName(mtd.Name)
err = nc.Publish("sub", []byte(mn))
if err != nil {
log.Printf(err.Error())
}
_, err = nc.Subscribe(mn, func(msg *nats.Msg) {
ni := reflect.New(intp).Interface()
var err error
if intp.Kind() == reflect.Ptr {
err = json.Unmarshal(msg.Data, ni)
} else {
err = json.Unmarshal(msg.Data, &ni)
}
if err != nil {
log.Printf(err.Error())
}
var rawret []reflect.Value
if isPtr {
rawret = mtd.Func.Call([]reflect.Value{reflect.ValueOf(h), reflect.ValueOf(ni)})
} else {
rawret = mtd.Func.Call([]reflect.Value{reflect.ValueOf(h), reflect.ValueOf(ni).Elem()})
}
ret := rawret[0].Interface()
err1 := rawret[1].Interface()
//if err1 != nil {
// log.Printf(err1.(error).Error())
//}
var bs []byte
rmsg := &nats.Msg{}
rmsg.Data = bs
bs, err = json.Marshal(ret)
if err1 != nil {
rmsg.Header = nats.Header{}
rmsg.Header.Set("ERR", fmt.Sprintf("%#v", err1))
}
if err != nil {
log.Printf(err.Error())
}
rmsg.Data = bs
err = msg.RespondMsg(rmsg)
if err != nil {
log.Printf(err.Error())
}
})
if err != nil {
log.Printf(err.Error())
}
if err != nil {
nc.Publish("err", []byte(err.Error()))
}
}
}
func Nc() *nats.Conn {
return nc
}
func Call(s string, in interface{}, out interface{}) error {
bs, err := json.Marshal(in)
if err != nil {
return err
}
msg, err := nc.Request(s, bs, time.Minute)
if err != nil {
return err
}
err = json.Unmarshal(msg.Data, out)
if err != nil {
return err
}
errstr := msg.Header.Get("ERR")
if errstr != "" {
return errors.New(errstr)
}
return nil
}