Skip to content

Commit

Permalink
login fix and little refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgeniy Ivakha committed Feb 26, 2017
1 parent 1f897da commit d408ec6
Showing 1 changed file with 76 additions and 42 deletions.
118 changes: 76 additions & 42 deletions ami.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -52,33 +53,50 @@ func newAMIAdapter(ip, port, username, password string, eventEmitter func(string

a.readErrChan = make(chan error)
a.writeErrChan = make(chan error)
chanStopActionWriter := make(chan struct{})
chanStop := make(chan struct{})

for {
time.Sleep(time.Second * 1)

err = a.openConnection()
if err != nil {
go a.emitEvent("error", "AMI Reconnect failed")
} else {
go a.streamReader()
go a.actionWriter(chanStopActionWriter)
if err == nil {
go a.streamReader(chanStop)
go a.actionWriter(chanStop)

greetings := make([]byte, 100)
n, err := a.conn.Read(greetings)
if err != nil {
go a.emitEvent("error", fmt.Sprintf("Asterisk connection error: %s", err.Error()))
continue
}

err = a.login()
if err != nil {
go a.emitEvent("error", fmt.Sprintf("Asterisk login error: %s", err.Error()))
continue
}

if n > 2 {
greetings = greetings[:n-2]
}
go a.emitEvent("connect", string(greetings))

break
}

a.emitEvent("error", "AMI Reconnect failed")
}

a.mutex.Lock()
a.connected = true
a.mutex.Unlock()

select {
case err = <-a.readErrChan:
case err = <-a.writeErrChan:
}

chanStopActionWriter <- struct{}{}
close(chanStop)
a.mutex.Lock()
a.connected = false
a.mutex.Unlock()
Expand All @@ -93,6 +111,12 @@ func newAMIAdapter(ip, port, username, password string, eventEmitter func(string

func (a *amiAdapter) actionWriter(stop chan struct{}) {
for {
select {
case <-stop:
return
default:
}

select {
case <-stop:
return
Expand All @@ -108,10 +132,7 @@ func (a *amiAdapter) actionWriter(stop chan struct{}) {
}

func (a *amiAdapter) distribute(event map[string]string) {
if _, ok := event["Event"]; ok {
a.EventsChan <- event
return
}
a.EventsChan <- event

if actionID := event["ActionID"]; actionID != "" {
a.mutex.RLock()
Expand Down Expand Up @@ -209,21 +230,6 @@ func (a *amiAdapter) openConnection() error {
return nil
}

func serialize(data map[string]string) []byte {
var outBuf bytes.Buffer

for key := range data {
value := data[key]

outBuf.WriteString(key)
outBuf.WriteString(": ")
outBuf.WriteString(value)
outBuf.WriteString("\n")
}
outBuf.WriteString("\n")
return outBuf.Bytes()
}

func readMessage(r *bufio.Reader) (m map[string]string, err error) {
m = make(map[string]string)
var responseFollows bool
Expand Down Expand Up @@ -280,26 +286,54 @@ func readMessage(r *bufio.Reader) (m map[string]string, err error) {
}
}

func (a *amiAdapter) streamReader() {
greetings := make([]byte, 100)
n, err := a.conn.Read(greetings)
if err != nil {
a.readErrChan <- err
return
}
if n > 2 {
greetings = greetings[:n-2]
func serialize(data map[string]string) []byte {
var outBuf bytes.Buffer

for key := range data {
outBuf.WriteString(key)
outBuf.WriteString(": ")
outBuf.WriteString(data[key])
outBuf.WriteString("\n")
}
go a.emitEvent("connect", fmt.Sprintf("Connected to %s %s:%s", greetings, a.ip, a.port))
outBuf.WriteString("\n")

return outBuf.Bytes()
}

func (a *amiAdapter) streamReader(stop chan struct{}) {
chanErr := make(chan error)
chanEvents := make(chan map[string]string)

go func() {
bufReader := bufio.NewReader(a.conn)
for i := 0; ; i++ {
var event map[string]string
var err error
event, err = readMessage(bufReader)
if err != nil {
chanErr <- err
return
}

event["#"] = strconv.Itoa(i)
chanEvents <- event
}
}()

bufReader := bufio.NewReader(a.conn)
var event map[string]string
for {
event, err = readMessage(bufReader)
if err != nil {
a.readErrChan <- err
select {
case <-stop:
return
default:
}

select {
case <-stop:
return
case err := <-chanErr:
a.readErrChan <- err
case event := <-chanEvents:
a.distribute(event)
}
a.distribute(event)
}
}

0 comments on commit d408ec6

Please sign in to comment.