Skip to content
This repository has been archived by the owner on Aug 21, 2021. It is now read-only.

Commit

Permalink
Merge pull request #155 from NoteGio/feature/ws
Browse files Browse the repository at this point in the history
Feature/ws
  • Loading branch information
AusIV authored Apr 9, 2019
2 parents ec9e729 + 2814e3d commit a5a42e5
Show file tree
Hide file tree
Showing 98 changed files with 9,783 additions and 90 deletions.
12 changes: 12 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,16 @@ jobs:
root: bin
paths:
- metadataindexer
golang_build_websockets:
docker:
- image: circleci/golang:1.8
steps:
- checkout
- run: make bin/websockets
- persist_to_workspace:
root: bin
paths:
- websockets
golang_test:
docker:
# CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/
Expand Down Expand Up @@ -387,6 +397,7 @@ workflows:
- golang_build_terms
- golang_build_poolfilter
- golang_build_metadataindexer
- golang_build_websockets
- golang_test
- notify_test_finished:
requires:
Expand Down Expand Up @@ -419,6 +430,7 @@ workflows:
- golang_build_terms
- golang_build_poolfilter
- golang_build_metadataindexer
- golang_build_websockets
- docker_build_deploy:
requires:
- notify_test_finished
Expand Down
9 changes: 9 additions & 0 deletions Dockerfile.websockets
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM corebuild

FROM scratch

COPY --from=corebuild /go/src/github.com/notegio/openrelay/bin/websockets /websockets

COPY --from=corebuild /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt

CMD ["/websockets", "redis:6379", "topic://released", "postgres://postgres@postgres", "/run/secrets/postgress_password"]
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ bin/poolfilter: $(BASE) cmd/poolfilter/main.go
bin/metadataindexer: $(BASE) cmd/metadataindexer/main.go
cd "$(BASE)" && $(GOSTATIC) -o bin/metadataindexer cmd/metadataindexer/main.go

bin: bin/delayrelay bin/fundcheckrelay bin/getbalance bin/ingest bin/initialize bin/simplerelay bin/validateorder bin/fillupdate bin/indexer bin/fillindexer bin/automigrate bin/searchapi bin/exchangesplitter bin/blockmonitor bin/allowancemonitor bin/spendmonitor bin/fillmonitor bin/multisigmonitor bin/spendrecorder bin/queuemonitor bin/canceluptomonitor bin/canceluptofilter bin/canceluptoindexer bin/erc721approvalmonitor bin/affiliatemonitor bin/terms bin/poolfilter bin/metadataindexer
bin/websockets: $(BASE) cmd/websockets/main.go
cd "$(BASE)" && $(GOSTATIC) -o bin/websockets cmd/websockets/main.go

bin: bin/delayrelay bin/fundcheckrelay bin/getbalance bin/ingest bin/initialize bin/simplerelay bin/validateorder bin/fillupdate bin/indexer bin/fillindexer bin/automigrate bin/searchapi bin/exchangesplitter bin/blockmonitor bin/allowancemonitor bin/spendmonitor bin/fillmonitor bin/multisigmonitor bin/spendrecorder bin/queuemonitor bin/canceluptomonitor bin/canceluptofilter bin/canceluptoindexer bin/erc721approvalmonitor bin/affiliatemonitor bin/terms bin/poolfilter bin/metadataindexer bin/websockets

truffleCompile:
cd js ; node_modules/.bin/truffle compile
Expand All @@ -124,7 +127,7 @@ $(BASE)/tmp/postgres.containerid:

dockerstart: $(BASE) $(BASE)/tmp/redis.containerid $(BASE)/tmp/postgres.containerid

gotest: dockerstart test-funds test-channels test-accounts test-affiliates test-types test-ingest test-blocksmonitor test-allowancemonitor test-fillmonitor test-spendmonitor test-splitter test-search test-db test-metadata
gotest: dockerstart test-funds test-channels test-accounts test-affiliates test-types test-ingest test-blocksmonitor test-allowancemonitor test-fillmonitor test-spendmonitor test-splitter test-search test-db test-metadata test-pool test-ws test-subscriptions

test-funds: $(BASE)
cd "$(BASE)/funds" && go test
Expand Down Expand Up @@ -162,6 +165,10 @@ test-metadata: $(BASE)
cd "$(BASE)/metadata" && POSTGRES_HOST=localhost POSTGRES_USER=postgres POSTGRES_PASSWORD=secret go test
test-pool: $(BASE)
cd "$(BASE)/pool" && POSTGRES_HOST=localhost POSTGRES_USER=postgres POSTGRES_PASSWORD=secret go test
test-ws: $(BASE)
cd "$(BASE)/channels/ws" && POSTGRES_HOST=localhost POSTGRES_USER=postgres POSTGRES_PASSWORD=secret go test
test-subscriptions: $(BASE)
cd "$(BASE)/subscriptions" && POSTGRES_HOST=localhost POSTGRES_USER=postgres POSTGRES_PASSWORD=secret go test

docker-cfg/ca-certificates.crt:
cp /etc/ssl/certs/ca-certificates.crt docker-cfg/ca-certificates.crt
Expand Down
126 changes: 126 additions & 0 deletions channels/ws/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package ws

import (
"context"
"fmt"
"github.com/notegio/openrelay/channels"
"github.com/notegio/openrelay/types"
"github.com/notegio/openrelay/pool"
"github.com/gorilla/websocket"
"github.com/jinzhu/gorm"
"net/http"
"log"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

type websocketDelivery struct {
payload string
}

func (delivery *websocketDelivery) Payload() string {
return delivery.payload
}

func (delivery *websocketDelivery) Ack() bool {
// websocketDeliveris have no ack, reject, or return, so these are no-ops
return true
}
func (delivery *websocketDelivery) Reject() bool {
return true
}
func (delivery *websocketDelivery) Return() bool {
return true
}

type WebsocketChannel struct {
open bool
conn *websocket.Conn
payloads chan []byte
consumers []channels.Consumer
Filter string
quit chan struct{}
cleanup func(channels.Publisher)
}

func (pub *WebsocketChannel) Publish(payload string) bool {
select {
case pub.payloads <- []byte(payload):
return true
default:
return false
}
}

func (consumerChannel *WebsocketChannel) AddConsumer(consumer channels.Consumer) bool {
consumerChannel.consumers = append(consumerChannel.consumers, consumer)
return true
}
func (consumerChannel *WebsocketChannel) StartConsuming() bool {
go func () {
defer consumerChannel.cleanup(consumerChannel)
for {
select {
case _ = <-consumerChannel.quit:
return
default:
}
_, p, err := consumerChannel.conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
for _, consumer := range consumerChannel.consumers {
consumer.Consume(&websocketDelivery{string(p)})
}
}
}()
return true
}

func (consumerChannel *WebsocketChannel) StopConsuming() bool {
consumerChannel.quit <- struct{}{}
return true
}
func (consumerChannel *WebsocketChannel) ReturnAllUnacked() int {
return 0
}
func (consumerChannel *WebsocketChannel) PurgeRejected() int {
return 0
}
func (consumerChannel *WebsocketChannel) Publisher() channels.Publisher {
return consumerChannel
}

func GetChannels(port uint, db *gorm.DB, cleanup func(channels.Publisher)) (<-chan *WebsocketChannel, func() (error)) {
outChan := make(chan *WebsocketChannel)
handler := pool.PoolDecorator(db, func (w http.ResponseWriter, r *http.Request, p types.Pool) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
wsChannel := &WebsocketChannel{true, conn, make(chan []byte), []channels.Consumer{}, p.QueryString(), make(chan struct{}), cleanup}
outChan <- wsChannel
for payload := range wsChannel.payloads {
if err := conn.WriteMessage(websocket.BinaryMessage, payload); err != nil {
log.Println(err)
return
}
}
})

mux := http.NewServeMux()
mux.HandleFunc("/", handler)
srv := &http.Server{
Addr: fmt.Sprintf(":%v", port),
Handler: mux,
}
go func() {
log.Printf("%v", srv.ListenAndServe())
}()
return outChan, func() (error) { return srv.Shutdown(context.Background()) }
}
64 changes: 64 additions & 0 deletions channels/ws/ws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ws_test

import (
"context"
"github.com/notegio/openrelay/channels"
"github.com/notegio/openrelay/channels/ws"
"github.com/gorilla/websocket"
"testing"
"time"
// "log"
)

type TestConsumer struct {
channel *ws.WebsocketChannel
}

func (consumer *TestConsumer) Consume(delivery channels.Delivery) {
if delivery.Payload() == "quit" {
consumer.channel.StopConsuming()
delivery.Ack()
}
consumer.channel.Publish(delivery.Payload())
delivery.Ack()
}

func TestGetChannels(t *testing.T) {
clean := false
channels, quit := ws.GetChannels(4321, nil, func(channels.Publisher) { clean = true })
go func() {
for channel := range channels {
channel.AddConsumer(&TestConsumer{channel})
channel.StartConsuming()
}
}()
ctx, cancel := context.WithCancel(context.Background())
c, resp, err := websocket.DefaultDialer.DialContext(ctx, "ws://localhost:4321/v2/", nil)
defer cancel()
if err != nil {
content := []byte{}
resp.Body.Read(content[:])
t.Fatalf("%v - (%v) %v", err.Error(), resp.StatusCode, content)
}
if err := c.WriteMessage(websocket.BinaryMessage, []byte("ping")); err != nil {
t.Errorf(err.Error())
}
mtype, p, err := c.ReadMessage()
if err != nil {
t.Errorf(err.Error())
}
if mtype != websocket.BinaryMessage {
t.Errorf("Unexpected message type %v", mtype)
}
if string(p) != "ping" {
t.Errorf("Unexpected message: %v", string(p))
}
c.Close()
if err := quit(); err != nil {
t.Errorf(err.Error())
}
time.Sleep(50 * time.Millisecond)
if clean != true {
t.Errorf("Should have cleaned up")
}
}
7 changes: 6 additions & 1 deletion cmd/canceluptoindexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func main() {
redisURL := os.Args[1]
srcChannel := os.Args[2]
db, err := dbModule.GetDB(os.Args[3], os.Args[4])
destChannel := os.Args[5]
if err != nil {
log.Fatalf("Could not open database connection: %v", err.Error())
}
Expand All @@ -25,11 +26,15 @@ func main() {
if err != nil {
log.Fatalf("Error establishing consumer channel: %v", err.Error())
}
publisherChannel, err := channels.PublisherFromURI(destChannel, redisClient)
if err != nil {
log.Fatalf("Error establishing publisher channel: %v", err.Error())
}
concurrency, err := strconv.Atoi(os.Getenv("CONCURRENCY"))
if err != nil {
concurrency = 5
}
consumerChannel.AddConsumer(dbModule.NewRecordCancellationConsumer(db, concurrency))
consumerChannel.AddConsumer(dbModule.NewRecordCancellationConsumer(db, concurrency, publisherChannel))
consumerChannel.StartConsuming()
log.Printf("Starting db fill indexer consumer on '%v'", srcChannel)
c := make(chan os.Signal, 1)
Expand Down
7 changes: 6 additions & 1 deletion cmd/fillindexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func main() {
if err != nil {
log.Fatalf("Could not open database connection: %v", err.Error())
}
destChannel := os.Args[5]
redisClient := redis.NewClient(&redis.Options{
Addr: redisURL,
})
Expand All @@ -29,7 +30,11 @@ func main() {
if err != nil {
concurrency = 5
}
consumerChannel.AddConsumer(dbModule.NewRecordFillConsumer(db, concurrency))
publisher, err := channels.PublisherFromURI(destChannel, redisClient)
if err != nil {
log.Fatalf("Error establishing publisher channel: %v", err.Error())
}
consumerChannel.AddConsumer(dbModule.NewRecordFillConsumer(db, concurrency, publisher))
consumerChannel.StartConsuming()
log.Printf("Starting db fill indexer consumer on '%v'", srcChannel)
c := make(chan os.Signal, 1)
Expand Down
9 changes: 7 additions & 2 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func main() {
log.Fatalf("Could not open database connection: %v", err.Error())
}
status := dbModule.StatusOpen
for _, arg := range os.Args[5:] {
destChannel := os.Args[5]
for _, arg := range os.Args[6:] {
if arg == "--unfunded" {
status = dbModule.StatusUnfunded
}
Expand All @@ -31,11 +32,15 @@ func main() {
if err != nil {
log.Fatalf("Error establishing consumer channel: %v", err.Error())
}
publisher, err := channels.PublisherFromURI(destChannel, redisClient)
if err != nil {
log.Fatalf("Error establishing publisher channel: %v", err.Error())
}
concurrency, err := strconv.Atoi(os.Getenv("CONCURRENCY"))
if err != nil {
concurrency = 5
}
consumerChannel.AddConsumer(dbModule.NewIndexConsumer(db, status, concurrency))
consumerChannel.AddConsumer(dbModule.NewIndexConsumer(db, status, concurrency, publisher))
consumerChannel.StartConsuming()
log.Printf("Starting db indexer consumer on '%v'", srcChannel)
c := make(chan os.Signal, 1)
Expand Down
7 changes: 6 additions & 1 deletion cmd/spendrecorder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func main() {
redisClient := redis.NewClient(&redis.Options{
Addr: redisURL,
})
destChannel := os.Args[5]
consumerChannel, err := channels.ConsumerFromURI(srcChannel, redisClient)
if err != nil {
log.Fatalf("Error establishing consumer channel: %v", err.Error())
Expand All @@ -29,7 +30,11 @@ func main() {
if err != nil {
concurrency = 5
}
consumerChannel.AddConsumer(dbModule.NewRecordSpendConsumer(db, concurrency))
publisher, err := channels.PublisherFromURI(destChannel, redisClient)
if err != nil {
log.Fatalf("Error establishing publisher channel: %v", err.Error())
}
consumerChannel.AddConsumer(dbModule.NewRecordSpendConsumer(db, concurrency, publisher))
consumerChannel.StartConsuming()
log.Printf("Starting spend recorder consumer on '%v'", srcChannel)
c := make(chan os.Signal, 1)
Expand Down
Loading

0 comments on commit a5a42e5

Please sign in to comment.