From b6ed26aedfd1fbf74b000b5f590be8ceda6fdb48 Mon Sep 17 00:00:00 2001 From: Darien Raymond Date: Fri, 11 Dec 2015 14:56:10 +0000 Subject: [PATCH] pubsub --- app/controller/controller.go | 7 +++- app/internal/pubsub.go | 23 ++++++++++++ app/internal/space.go | 17 ++++++++- app/pubsub.go | 9 +++++ app/pubsub/pubsub.go | 64 +++++++++++++++++++++++++++++++++ app/pubsub/pubsub_test.go | 38 ++++++++++++++++++++ app/space.go | 3 ++ proxy/vmess/command/response.go | 3 ++ 8 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 app/internal/pubsub.go create mode 100644 app/pubsub.go create mode 100644 app/pubsub/pubsub.go create mode 100644 app/pubsub/pubsub_test.go create mode 100644 proxy/vmess/command/response.go diff --git a/app/controller/controller.go b/app/controller/controller.go index c1a589818c..8dbdd9df9a 100644 --- a/app/controller/controller.go +++ b/app/controller/controller.go @@ -10,6 +10,7 @@ import ( type SpaceController struct { packetDispatcher internal.PacketDispatcherWithContext dnsCache internal.DnsCacheWithContext + pubsub internal.PubsubWithContext } func New() *SpaceController { @@ -24,8 +25,12 @@ func (this *SpaceController) Bind(object interface{}) { if dnsCache, ok := object.(internal.DnsCacheWithContext); ok { this.dnsCache = dnsCache } + + if pubsub, ok := object.(internal.PubsubWithContext); ok { + this.pubsub = pubsub + } } func (this *SpaceController) ForContext(tag string) app.Space { - return internal.NewSpace(tag, this.packetDispatcher, this.dnsCache) + return internal.NewSpace(tag, this.packetDispatcher, this.dnsCache, this.pubsub) } diff --git a/app/internal/pubsub.go b/app/internal/pubsub.go new file mode 100644 index 0000000000..a5fde1dd01 --- /dev/null +++ b/app/internal/pubsub.go @@ -0,0 +1,23 @@ +package internal + +import ( + "github.com/v2ray/v2ray-core/app" +) + +type PubsubWithContext interface { + Publish(context app.Context, topic string, message app.PubsubMessage) + Subscribe(context app.Context, topic string, handler app.TopicHandler) +} + +type contextedPubsub struct { + context app.Context + pubsub PubsubWithContext +} + +func (this *contextedPubsub) Publish(topic string, message app.PubsubMessage) { + this.pubsub.Publish(this.context, topic, message) +} + +func (this *contextedPubsub) Subscribe(topic string, handler app.TopicHandler) { + this.pubsub.Subscribe(this.context, topic, handler) +} diff --git a/app/internal/space.go b/app/internal/space.go index 1e96ab3e11..184a5c08a3 100644 --- a/app/internal/space.go +++ b/app/internal/space.go @@ -7,14 +7,16 @@ import ( type Space struct { packetDispatcher PacketDispatcherWithContext dnsCache DnsCacheWithContext + pubsub PubsubWithContext tag string } -func NewSpace(tag string, packetDispatcher PacketDispatcherWithContext, dnsCache DnsCacheWithContext) *Space { +func NewSpace(tag string, packetDispatcher PacketDispatcherWithContext, dnsCache DnsCacheWithContext, pubsub PubsubWithContext) *Space { return &Space{ tag: tag, packetDispatcher: packetDispatcher, dnsCache: dnsCache, + pubsub: pubsub, } } @@ -43,3 +45,16 @@ func (this *Space) DnsCache() app.DnsCache { }, } } + +func (this *Space) HasPubsub() bool { + return this.pubsub != nil +} + +func (this *Space) Pubsub() app.Pubsub { + return &contextedPubsub{ + pubsub: this.pubsub, + context: &contextImpl{ + callerTag: this.tag, + }, + } +} diff --git a/app/pubsub.go b/app/pubsub.go new file mode 100644 index 0000000000..697180f609 --- /dev/null +++ b/app/pubsub.go @@ -0,0 +1,9 @@ +package app + +type PubsubMessage []byte +type TopicHandler func(PubsubMessage) + +type Pubsub interface { + Publish(topic string, message PubsubMessage) + Subscribe(topic string, handler TopicHandler) +} diff --git a/app/pubsub/pubsub.go b/app/pubsub/pubsub.go new file mode 100644 index 0000000000..37cefba02b --- /dev/null +++ b/app/pubsub/pubsub.go @@ -0,0 +1,64 @@ +package pubsub + +import ( + "sync" + + "github.com/v2ray/v2ray-core/app" + "github.com/v2ray/v2ray-core/app/internal" +) + +type TopicHandlerList struct { + sync.RWMutex + handlers []app.TopicHandler +} + +func NewTopicHandlerList(handlers ...app.TopicHandler) *TopicHandlerList { + return &TopicHandlerList{ + handlers: handlers, + } +} + +func (this *TopicHandlerList) Add(handler app.TopicHandler) { + this.Lock() + this.handlers = append(this.handlers, handler) + this.Unlock() +} + +func (this *TopicHandlerList) Dispatch(message app.PubsubMessage) { + this.RLock() + for _, handler := range this.handlers { + go handler(message) + } + this.RUnlock() +} + +type Pubsub struct { + topics map[string]*TopicHandlerList + sync.RWMutex +} + +func New() internal.PubsubWithContext { + return &Pubsub{ + topics: make(map[string]*TopicHandlerList), + } +} + +func (this *Pubsub) Publish(context app.Context, topic string, message app.PubsubMessage) { + this.RLock() + list, found := this.topics[topic] + this.RUnlock() + + if found { + list.Dispatch(message) + } +} + +func (this *Pubsub) Subscribe(context app.Context, topic string, handler app.TopicHandler) { + this.Lock() + defer this.Unlock() + if list, found := this.topics[topic]; found { + list.Add(handler) + } else { + this.topics[topic] = NewTopicHandlerList(handler) + } +} diff --git a/app/pubsub/pubsub_test.go b/app/pubsub/pubsub_test.go new file mode 100644 index 0000000000..20633b3186 --- /dev/null +++ b/app/pubsub/pubsub_test.go @@ -0,0 +1,38 @@ +package pubsub_test + +import ( + "testing" + "time" + + "github.com/v2ray/v2ray-core/app" + . "github.com/v2ray/v2ray-core/app/pubsub" + apptesting "github.com/v2ray/v2ray-core/app/testing" + v2testing "github.com/v2ray/v2ray-core/testing" + "github.com/v2ray/v2ray-core/testing/assert" +) + +func TestPubsub(t *testing.T) { + v2testing.Current(t) + + messages := make(map[string]app.PubsubMessage) + + pubsub := New() + pubsub.Subscribe(&apptesting.Context{}, "t1", func(message app.PubsubMessage) { + messages["t1"] = message + }) + + pubsub.Subscribe(&apptesting.Context{}, "t2", func(message app.PubsubMessage) { + messages["t2"] = message + }) + + message := app.PubsubMessage([]byte("This is a pubsub message.")) + pubsub.Publish(&apptesting.Context{}, "t2", message) + <-time.Tick(time.Second) + + _, found := messages["t1"] + assert.Bool(found).IsFalse() + + actualMessage, found := messages["t2"] + assert.Bool(found).IsTrue() + assert.StringLiteral(string(actualMessage)).Equals(string(message)) +} diff --git a/app/space.go b/app/space.go index 9d9e5f0263..f01354a854 100644 --- a/app/space.go +++ b/app/space.go @@ -13,4 +13,7 @@ type Space interface { HasDnsCache() bool DnsCache() DnsCache + + HasPubsub() bool + Pubsub() Pubsub } diff --git a/proxy/vmess/command/response.go b/proxy/vmess/command/response.go new file mode 100644 index 0000000000..594cffd377 --- /dev/null +++ b/proxy/vmess/command/response.go @@ -0,0 +1,3 @@ +package command + +type ResponseCmd byte