Skip to content

Commit

Permalink
pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
DarienRaymond committed Dec 11, 2015
1 parent a540d7d commit b6ed26a
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 2 deletions.
7 changes: 6 additions & 1 deletion app/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
type SpaceController struct {
packetDispatcher internal.PacketDispatcherWithContext
dnsCache internal.DnsCacheWithContext
pubsub internal.PubsubWithContext
}

func New() *SpaceController {
Expand All @@ -24,8 +25,12 @@ func (this *SpaceController) Bind(object interface{}) {
if dnsCache, ok := object.(internal.DnsCacheWithContext); ok {
this.dnsCache = dnsCache
}

if pubsub, ok := object.(internal.PubsubWithContext); ok {
this.pubsub = pubsub
}
}

func (this *SpaceController) ForContext(tag string) app.Space {
return internal.NewSpace(tag, this.packetDispatcher, this.dnsCache)
return internal.NewSpace(tag, this.packetDispatcher, this.dnsCache, this.pubsub)
}
23 changes: 23 additions & 0 deletions app/internal/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package internal

import (
"github.com/v2ray/v2ray-core/app"
)

type PubsubWithContext interface {
Publish(context app.Context, topic string, message app.PubsubMessage)
Subscribe(context app.Context, topic string, handler app.TopicHandler)
}

type contextedPubsub struct {
context app.Context
pubsub PubsubWithContext
}

func (this *contextedPubsub) Publish(topic string, message app.PubsubMessage) {
this.pubsub.Publish(this.context, topic, message)
}

func (this *contextedPubsub) Subscribe(topic string, handler app.TopicHandler) {
this.pubsub.Subscribe(this.context, topic, handler)
}
17 changes: 16 additions & 1 deletion app/internal/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
type Space struct {
packetDispatcher PacketDispatcherWithContext
dnsCache DnsCacheWithContext
pubsub PubsubWithContext
tag string
}

func NewSpace(tag string, packetDispatcher PacketDispatcherWithContext, dnsCache DnsCacheWithContext) *Space {
func NewSpace(tag string, packetDispatcher PacketDispatcherWithContext, dnsCache DnsCacheWithContext, pubsub PubsubWithContext) *Space {
return &Space{
tag: tag,
packetDispatcher: packetDispatcher,
dnsCache: dnsCache,
pubsub: pubsub,
}
}

Expand Down Expand Up @@ -43,3 +45,16 @@ func (this *Space) DnsCache() app.DnsCache {
},
}
}

func (this *Space) HasPubsub() bool {
return this.pubsub != nil
}

func (this *Space) Pubsub() app.Pubsub {
return &contextedPubsub{
pubsub: this.pubsub,
context: &contextImpl{
callerTag: this.tag,
},
}
}
9 changes: 9 additions & 0 deletions app/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package app

type PubsubMessage []byte
type TopicHandler func(PubsubMessage)

type Pubsub interface {
Publish(topic string, message PubsubMessage)
Subscribe(topic string, handler TopicHandler)
}
64 changes: 64 additions & 0 deletions app/pubsub/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package pubsub

import (
"sync"

"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/app/internal"
)

type TopicHandlerList struct {
sync.RWMutex
handlers []app.TopicHandler
}

func NewTopicHandlerList(handlers ...app.TopicHandler) *TopicHandlerList {
return &TopicHandlerList{
handlers: handlers,
}
}

func (this *TopicHandlerList) Add(handler app.TopicHandler) {
this.Lock()
this.handlers = append(this.handlers, handler)
this.Unlock()
}

func (this *TopicHandlerList) Dispatch(message app.PubsubMessage) {
this.RLock()
for _, handler := range this.handlers {
go handler(message)
}
this.RUnlock()
}

type Pubsub struct {
topics map[string]*TopicHandlerList
sync.RWMutex
}

func New() internal.PubsubWithContext {
return &Pubsub{
topics: make(map[string]*TopicHandlerList),
}
}

func (this *Pubsub) Publish(context app.Context, topic string, message app.PubsubMessage) {
this.RLock()
list, found := this.topics[topic]
this.RUnlock()

if found {
list.Dispatch(message)
}
}

func (this *Pubsub) Subscribe(context app.Context, topic string, handler app.TopicHandler) {
this.Lock()
defer this.Unlock()
if list, found := this.topics[topic]; found {
list.Add(handler)
} else {
this.topics[topic] = NewTopicHandlerList(handler)
}
}
38 changes: 38 additions & 0 deletions app/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package pubsub_test

import (
"testing"
"time"

"github.com/v2ray/v2ray-core/app"
. "github.com/v2ray/v2ray-core/app/pubsub"
apptesting "github.com/v2ray/v2ray-core/app/testing"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)

func TestPubsub(t *testing.T) {
v2testing.Current(t)

messages := make(map[string]app.PubsubMessage)

pubsub := New()
pubsub.Subscribe(&apptesting.Context{}, "t1", func(message app.PubsubMessage) {
messages["t1"] = message
})

pubsub.Subscribe(&apptesting.Context{}, "t2", func(message app.PubsubMessage) {
messages["t2"] = message
})

message := app.PubsubMessage([]byte("This is a pubsub message."))
pubsub.Publish(&apptesting.Context{}, "t2", message)
<-time.Tick(time.Second)

_, found := messages["t1"]
assert.Bool(found).IsFalse()

actualMessage, found := messages["t2"]
assert.Bool(found).IsTrue()
assert.StringLiteral(string(actualMessage)).Equals(string(message))
}
3 changes: 3 additions & 0 deletions app/space.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ type Space interface {

HasDnsCache() bool
DnsCache() DnsCache

HasPubsub() bool
Pubsub() Pubsub
}
3 changes: 3 additions & 0 deletions proxy/vmess/command/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package command

type ResponseCmd byte

0 comments on commit b6ed26a

Please sign in to comment.