Skip to content

Commit

Permalink
Update write.go
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-webdev authored Jan 24, 2021
1 parent 13b9b5c commit e4ea368
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,24 @@ import (
"github.com/pkg/errors"
)

func (wac *Conn) addListener(ch chan string, messageTag string) {
wac.listener.Lock()
wac.listener.m[messageTag] = ch
wac.listener.Unlock()
}

//writeJson enqueues a json message into the writeChan
func (wac *Conn) writeJson(data []interface{}) (<-chan string, error) {

ch := make(chan string, 1)

wac.writerLock.Lock()
defer wac.writerLock.Unlock()

d, err := json.Marshal(data)
if err != nil {
return nil, err
close(ch)
return ch, err
}

ts := time.Now().Unix()
Expand All @@ -35,48 +44,65 @@ func (wac *Conn) writeJson(data []interface{}) (<-chan string, error) {
wac.timeTag = tss[len(tss)-3:]
}

ch, err := wac.write(websocket.TextMessage, messageTag, bytes)
err = wac.write(websocket.TextMessage, bytes)
if err != nil {
return nil, err
close(ch)
return ch, err
}

wac.addListener(ch, messageTag)

wac.msgCount++
return ch, nil
}

func (wac *Conn) writeBinary(node binary.Node, metric metric, flag flag, messageTag string) (<-chan string, error) {

ch := make(chan string, 1)

if len(messageTag) < 2 {
return nil, ErrMissingMessageTag
close(ch)
return ch, ErrMissingMessageTag
}

wac.writerLock.Lock()
defer wac.writerLock.Unlock()

data, err := wac.encryptBinaryMessage(node)
if err != nil {
return nil, errors.Wrap(err, "encryptBinaryMessage(node) failed")
close(ch)
return ch, errors.Wrap(err, "encryptBinaryMessage(node) failed")
}

bytes := []byte(messageTag + ",")
bytes = append(bytes, byte(metric), byte(flag))
bytes = append(bytes, data...)

ch, err := wac.write(websocket.BinaryMessage, messageTag, bytes)
err = wac.write(websocket.BinaryMessage, bytes)
if err != nil {
return nil, errors.Wrap(err, "failed to write message")
close(ch)
return ch, errors.Wrap(err, "failed to write message")
}

// if not error add chan in listener
wac.addListener(ch, messageTag)

wac.msgCount++

return ch, nil
}

func (wac *Conn) sendKeepAlive() error {

bytes := []byte("?,,")
respChan, err := wac.write(websocket.TextMessage, "!", bytes)
err := wac.write(websocket.TextMessage, bytes)
if err != nil {
return errors.Wrap(err, "error sending keepAlive")
}

respChan := make(chan string, 1)
wac.addListener(respChan, "!")

select {
case resp := <-respChan:
msecs, err := strconv.ParseInt(resp, 10, 64)
Expand Down Expand Up @@ -122,32 +148,21 @@ func (wac *Conn) sendAdminTest() (bool, error) {
}
}

func (wac *Conn) write(messageType int, answerMessageTag string, data []byte) (<-chan string, error) {

ch := make(chan string, 1)
func (wac *Conn) write(messageType int, data []byte) error {

if wac == nil || wac.ws == nil {
close(ch)
return ch, ErrInvalidWebsocket
return ErrInvalidWebsocket
}

wac.ws.Lock()
err := wac.ws.conn.WriteMessage(messageType, data)
wac.ws.Unlock()

if err != nil {
close(ch)
return ch, errors.Wrap(err, "error writing to websocket")
}

// if not error add chan in listener
if answerMessageTag != "" {
wac.listener.Lock()
wac.listener.m[answerMessageTag] = ch
wac.listener.Unlock()
return errors.Wrap(err, "error writing to websocket")
}

return ch, nil
return nil
}

func (wac *Conn) encryptBinaryMessage(node binary.Node) (data []byte, err error) {
Expand Down

0 comments on commit e4ea368

Please sign in to comment.