diff --git a/smqcli/lib.go b/smqcli/lib.go index 76db6e6..6ed0a23 100644 --- a/smqcli/lib.go +++ b/smqcli/lib.go @@ -20,11 +20,9 @@ func dbg(s string, p ...interface{}) { } type MQConn struct { - opts *MQConnOpts - retry bool - chRetry chan *struct{} - //chReady chan *struct{} - subs sync.Map + opts *MQConnOpts + retry bool + subs sync.Map mtx sync.Mutex conn *websocket.Conn @@ -189,64 +187,68 @@ func (c *MQConn) Loop() { } func (c *MQConn) Init() error { - c.retry = true - - doConn := func() error { - d := websocket.Dialer{ - NetDial: nil, - NetDialContext: nil, - Proxy: nil, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - HandshakeTimeout: 0, - ReadBufferSize: 0, - WriteBufferSize: 0, - WriteBufferPool: nil, - Subprotocols: nil, - EnableCompression: false, - Jar: nil, - } - h := http.Header{} - if c.opts.Headers != nil { - for k, v := range c.opts.Headers { - h.Set(k, v) + if c.retry { + doConn := func() error { + d := websocket.Dialer{ + NetDial: nil, + NetDialContext: nil, + Proxy: nil, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + HandshakeTimeout: 0, + ReadBufferSize: 0, + WriteBufferSize: 0, + WriteBufferPool: nil, + Subprotocols: nil, + EnableCompression: false, + Jar: nil, } + h := http.Header{} + if c.opts.Headers != nil { + for k, v := range c.opts.Headers { + h.Set(k, v) + } + } + con, _, err := d.Dial(c.opts.Url, h) + if err != nil { + return err + } + c.conn = con + con.SetCloseHandler(func(code int, text string) error { + c.conn = nil + return nil + }) + return nil } - con, _, err := d.Dial(c.opts.Url, h) + + err := doConn() + if err != nil { return err } - c.conn = con - con.SetCloseHandler(func(code int, text string) error { - c.conn = nil - return nil + + c.subs.Range(func(key, value interface{}) bool { + m := &types.Msg{ + Cmd: types.CMD_SUB, + Topic: key.(string), + } + + err := c.Write(m) + if err != nil { + log.Printf("Error re-listening: %s", err.Error()) + return false + } + return true }) - return nil } - err := doConn() - - if err != nil { - return err - } - - c.subs.Range(func(key, value interface{}) bool { - m := &types.Msg{ - Cmd: types.CMD_SUB, - Topic: key.(string), - } - - err := c.Write(m) - if err != nil { - log.Printf("Error re-listening: %s", err.Error()) - return false - } - return true - }) - return nil } +func (c *MQConn) Close() error { + c.retry = false + return c.conn.Close() +} type MQConnOpts struct { Url string @@ -256,9 +258,13 @@ type MQConnOpts struct { func New(o *MQConnOpts) (*MQConn, error) { ret := &MQConn{ - opts: o, + opts: o, + retry: true, } err := ret.Init() + if err != nil { + return nil, err + } go func() { for ret.retry { ret.Loop()