From 581b4def8a9fa57d06552fb016805651fa95de04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paulo=20Sima=CC=83o?= Date: Thu, 11 Nov 2021 11:07:20 -0300 Subject: [PATCH] nats migration --- api/cli.go | 50 ---------------- api/nats.go | 110 +++++++++++++++++++++++++++++++++++ api/ocr/cli.go | 4 +- go.mod | 1 + go.sum | 2 + svc/lib.go | 154 +++++++++++++++++++++++++++++++++++++++++++++++++ ver/lib.go | 26 +++++++++ 7 files changed, 295 insertions(+), 52 deletions(-) create mode 100644 api/nats.go create mode 100644 svc/lib.go create mode 100644 ver/lib.go diff --git a/api/cli.go b/api/cli.go index adcfed3..a39c90b 100644 --- a/api/cli.go +++ b/api/cli.go @@ -1,15 +1,10 @@ package api import ( - "encoding/json" - "errors" - "github.com/nats-io/nats.go" "go.digitalcircle.com.br/open/httpcli" "go.digitalcircle.com.br/open/replaycli-go/ipcmux" "go.digitalcircle.com.br/open/replaycli-go/util" - "log" "os" - "time" ) type ApiCli struct { @@ -35,48 +30,3 @@ func NewApiIPCCli() *ApiCli { ret.cli.AddHeader("X-API-KEY", apikey) return ret } - -var nc *nats.Conn - -func Call(s string, in interface{}, out interface{}) error { - if nc == nil { - var err error - apikey = os.Getenv("REPLAY_APIKEY") - if apikey == "" { - log.Printf("Connecting to NATS Server w/o TK.") - nc, err = nats.Connect(nats.DefaultURL) - } else { - log.Printf("Connecting to NATS Server w TK: " + apikey) - nc, err = nats.Connect(nats.DefaultURL, nats.Token(apikey)) - } - if err != nil { - return err - } - } - - bs, err := json.Marshal(in) - if err != nil { - return err - } - - msg, err := nc.Request(s, bs, time.Minute) - if err != nil { - return err - } - - err = json.Unmarshal(msg.Data, out) - if err != nil { - out = bs - return err - } - - errstr := msg.Header.Get("ERR") - if errstr != "" { - return errors.New(errstr) - } - - return nil -} -func Close() error { - return nc.Drain() -} diff --git a/api/nats.go b/api/nats.go new file mode 100644 index 0000000..2a44301 --- /dev/null +++ b/api/nats.go @@ -0,0 +1,110 @@ +package api + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/nats-io/nats.go" + "log" + "os" + "time" +) + +var nc *nats.Conn + +type CallOpts struct { + To int +} + +func Call(s string, in interface{}, out interface{}, opts ...*CallOpts) error { + var err error + opt := &CallOpts{} + if opts != nil && len(opts) > 1 { + opt = opts[0] + } + if opt.To == 0 { + opt.To = 60 + } + defer func() { + if nc != nil && err != nil { + err1 := nc.Publish("err", []byte(fmt.Sprintf("Error calling nats: %s: \nIN: %v\nOUT: %v", s, err.Error(), in, out))) + if err1 != nil { + log.Printf("Error calling nats: %s: \nIN: %v\nOUT: %v", s, err.Error(), in, out) + log.Printf("Inner error: %s", err1.Error()) + } + } else if nc == nil && err != nil { + log.Printf("Error calling nats: %s: \nIN: %v\nOUT: %v\n(nc is nil)", s, err.Error(), in, out) + } + + }() + if nc == nil { + + apikey = os.Getenv("REPLAY_APIKEY") + if apikey == "" { + log.Printf("Connecting to NATS Server w/o TK.") + nc, err = nats.Connect(nats.DefaultURL) + } else { + log.Printf("Connecting to NATS Server w TK: " + apikey) + nc, err = nats.Connect(nats.DefaultURL, nats.Token(apikey)) + } + if err != nil { + return err + } + } + var bs []byte + if in != nil { + bs, err = json.Marshal(in) + if err != nil { + return err + } + } else { + bs = []byte("{}") + } + + msg, err := nc.Request(s, bs, time.Second+time.Duration(opt.To)) + if err != nil { + return err + } + if out != nil { + err = json.Unmarshal(msg.Data, out) + if err != nil { + out = bs + return err + } + } + + errstr := msg.Header.Get("ERR") + if errstr != "" { + return errors.New(errstr) + } + + return nil +} +func Close() error { + return nc.Drain() +} +func Pub(s string, in interface{}) error { + var err error + defer func() { + if nc != nil && err != nil { + err1 := nc.Publish("err", []byte(fmt.Sprintf("Error calling nats: %s: \nIN: %v", s, err.Error(), in))) + if err1 != nil { + log.Printf("Error calling nats: %s: \nIN: %v", s, err.Error(), in) + log.Printf("Inner error: %s", err1.Error()) + } + } else if nc == nil && err != nil { + log.Printf("Error calling nats: %s: \nIN: %v\n(nc is nil)", s, err.Error(), in) + } + + }() + + bs, err := json.Marshal(in) + if err != nil { + return err + } + + err = nc.Publish(s, bs) + + return err + +} diff --git a/api/ocr/cli.go b/api/ocr/cli.go index c02652c..f87d4a6 100644 --- a/api/ocr/cli.go +++ b/api/ocr/cli.go @@ -172,10 +172,10 @@ type Cli struct { *api.ApiCli } -func (c *Cli) OCR(opts *Opts) (*Alto, error) { +func (c *Cli) OCR(opts *Opts, copts ...*api.CallOpts) (*Alto, error) { res := &Alto{} //err := c.HttpCli().JsonPost("/ipc/ocr/", opts, res) - err := api.Call("ocr.ocr", opts, &res) + err := api.Call("ocr.ocr", opts, &res, copts...) return res, err } diff --git a/go.mod b/go.mod index 8d494f9..27fc58d 100644 --- a/go.mod +++ b/go.mod @@ -20,5 +20,6 @@ require ( github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect + golang.org/x/mod v0.5.1 // indirect google.golang.org/protobuf v1.27.1 // indirect ) diff --git a/go.sum b/go.sum index 594b5df..7a32689 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,8 @@ go.digitalcircle.com.br/open/httpcli v0.0.0-20211031093505-ecf33aed8afb/go.mod h golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.5.1 h1:OJxoQ/rynoF0dcCdI7cLPktw/hR2cueqYfjm43oqK38= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/svc/lib.go b/svc/lib.go new file mode 100644 index 0000000..7649463 --- /dev/null +++ b/svc/lib.go @@ -0,0 +1,154 @@ +package svc + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/nats-io/nats.go" + "log" + "os" + "reflect" + "strings" + "time" + "unicode" +) + +var nc *nats.Conn + +var compname string + +func Init(s string) error { + var err error + compname = s + var apikey string + if nc == nil { + apikey = os.Getenv("REPLAY_APIKEY") + if apikey == "" { + log.Printf("Connecting to NATS Server w/o TK.") + nc, err = nats.Connect(nats.DefaultURL, nats.Name(compname)) + } else { + log.Printf("Connecting to NATS Server w TK: " + apikey) + nc, err = nats.Connect(nats.DefaultURL, nats.Name(compname), nats.Token(apikey)) + } + if err != nil { + return err + } + } + return err +} + +func Close() error { + err := nc.Drain() + chEnd <- struct{}{} + return err +} + +var chEnd chan struct{} + +func Serve() { + chEnd = make(chan struct{}) + <-chEnd +} + +func NormalizeName(s string) string { + sb := strings.Builder{} + for i, v := range s { + if unicode.IsUpper(v) && i != 0 { + sb.WriteString(".") + } + sb.WriteRune(unicode.ToLower(v)) + + } + return sb.String() +} + +func Register(h interface{}) { + var err error + tp := reflect.TypeOf(h) + for i := 0; i < tp.NumMethod(); i++ { + mtd := tp.Method(i) + mtdt := mtd.Type + intp := mtdt.In(1) + isPtr := false + if intp.Kind() == reflect.Ptr { + isPtr = true + intp = intp.Elem() + } + mn := NormalizeName(compname) + "." + NormalizeName(mtd.Name) + err = nc.Publish("sub", []byte(mn)) + if err != nil { + log.Printf(err.Error()) + } + _, err = nc.Subscribe(mn, func(msg *nats.Msg) { + ni := reflect.New(intp).Interface() + var err error + if intp.Kind() == reflect.Ptr { + err = json.Unmarshal(msg.Data, ni) + } else { + err = json.Unmarshal(msg.Data, &ni) + } + + if err != nil { + log.Printf(err.Error()) + } + var rawret []reflect.Value + if isPtr { + rawret = mtd.Func.Call([]reflect.Value{reflect.ValueOf(h), reflect.ValueOf(ni)}) + } else { + rawret = mtd.Func.Call([]reflect.Value{reflect.ValueOf(h), reflect.ValueOf(ni).Elem()}) + } + ret := rawret[0].Interface() + err1 := rawret[1].Interface() + //if err1 != nil { + // log.Printf(err1.(error).Error()) + //} + var bs []byte + rmsg := &nats.Msg{} + rmsg.Data = bs + bs, err = json.Marshal(ret) + if err1 != nil { + rmsg.Header = nats.Header{} + rmsg.Header.Set("ERR", fmt.Sprintf("%#v", err1)) + } + if err != nil { + log.Printf(err.Error()) + } + rmsg.Data = bs + err = msg.RespondMsg(rmsg) + if err != nil { + log.Printf(err.Error()) + } + }) + if err != nil { + log.Printf(err.Error()) + } + if err != nil { + nc.Publish("err", []byte(err.Error())) + } + } +} + +func Nc() *nats.Conn { + return nc +} + +func Call(s string, in interface{}, out interface{}) error { + bs, err := json.Marshal(in) + if err != nil { + return err + } + msg, err := nc.Request(s, bs, time.Minute) + if err != nil { + return err + } + + err = json.Unmarshal(msg.Data, out) + if err != nil { + return err + } + errstr := msg.Header.Get("ERR") + if errstr != "" { + return errors.New(errstr) + } + return nil +} diff --git a/ver/lib.go b/ver/lib.go new file mode 100644 index 0000000..d68059b --- /dev/null +++ b/ver/lib.go @@ -0,0 +1,26 @@ +package ver + +import ( + "golang.org/x/mod/semver" + "log" + "os" +) + +func EnsureVer(v string) { + build := os.Getenv("BUILD_VER") + if build == "" { + log.Printf("No BUILD_VER found - skipping verification") + return + } + if semver.Compare(build, v) < 0 { + log.Printf("Replay version is %s, lower than required. Should be minimum %s - aborting", build, v) + os.Exit(0) + } + if semver.Major(build) != semver.Major(v) { + log.Printf("Replay major version is %s, different from feature requirement: %s - aborting", build, v) + } +} +func Get() string { + build := os.Getenv("BUILD_VER") + return build +}