Skip to content

Commit

Permalink
Merge pull request v2ray#219 from Vigilans/vigilans/stats-channel-config
Browse files Browse the repository at this point in the history
Stats: Add ChannelConfig & Return error on subscription
  • Loading branch information
Loyalsoldier authored Sep 23, 2020
2 parents 2cc8c1a + fa37f82 commit 788dd1e
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 114 deletions.
81 changes: 49 additions & 32 deletions app/stats/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,33 @@ package stats
import (
"sync"
"time"

"v2ray.com/core/common"
)

// Channel is an implementation of stats.Channel.
type Channel struct {
channel chan interface{}
subscribers []chan interface{}

// Synchronization components
access sync.RWMutex
closed chan struct{}

channel chan interface{}
subscribers []chan interface{}
// Channel options
subscriberLimit int // Set to 0 as no subscriber limit
channelBufferSize int // Set to 0 as no buffering
broadcastTimeout time.Duration // Set to 0 as non-blocking immediate timeout
}

// NewChannel creates an instance of Statistics Channel.
func NewChannel(config *ChannelConfig) *Channel {
return &Channel{
channel: make(chan interface{}, config.BufferSize),
subscriberLimit: int(config.SubscriberLimit),
channelBufferSize: int(config.BufferSize),
broadcastTimeout: time.Duration(config.BroadcastTimeout+1) * time.Millisecond,
}
}

// Channel returns the underlying go channel.
Expand All @@ -31,16 +49,19 @@ func (c *Channel) Subscribers() []chan interface{} {
}

// Subscribe implements stats.Channel.
func (c *Channel) Subscribe() chan interface{} {
func (c *Channel) Subscribe() (chan interface{}, error) {
c.access.Lock()
defer c.access.Unlock()
subscriber := make(chan interface{})
if c.subscriberLimit > 0 && len(c.subscribers) >= c.subscriberLimit {
return nil, newError("Number of subscribers has reached limit")
}
subscriber := make(chan interface{}, c.channelBufferSize)
c.subscribers = append(c.subscribers, subscriber)
return subscriber
return subscriber, nil
}

// Unsubscribe implements stats.Channel.
func (c *Channel) Unsubscribe(subscriber chan interface{}) {
func (c *Channel) Unsubscribe(subscriber chan interface{}) error {
c.access.Lock()
defer c.access.Unlock()
for i, s := range c.subscribers {
Expand All @@ -50,9 +71,9 @@ func (c *Channel) Unsubscribe(subscriber chan interface{}) {
copy(subscribers[:i], c.subscribers[:i])
copy(subscribers[i:], c.subscribers[i+1:])
c.subscribers = subscribers
return
}
}
return nil
}

// Publish implements stats.Channel.
Expand Down Expand Up @@ -85,34 +106,30 @@ func (c *Channel) Running() bool {
func (c *Channel) Start() error {
c.access.Lock()
defer c.access.Unlock()
if c.Running() {
return nil
}
if c.channel == nil { // Initialize publisher channel
c.channel = make(chan interface{}, 16)
}
c.closed = make(chan struct{}) // Reset close signal
go func() {
for {
select {
case message := <-c.channel: // Broadcast message
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
select {
case sub <- message: // Successfully sent message
case <-time.After(100 * time.Millisecond):
c.Unsubscribe(sub) // Remove timeout subscriber
close(sub) // Actively close subscriber as notification
if !c.Running() {
c.closed = make(chan struct{}) // Reset close signal
go func() {
for {
select {
case message := <-c.channel: // Broadcast message
for _, sub := range c.Subscribers() { // Concurrency-safe subscribers retreivement
select {
case sub <- message: // Successfully sent message
case <-time.After(c.broadcastTimeout): // Remove timeout subscriber
common.Must(c.Unsubscribe(sub))
close(sub) // Actively close subscriber as notification
}
}
case <-c.closed: // Channel closed
for _, sub := range c.Subscribers() { // Remove all subscribers
common.Must(c.Unsubscribe(sub))
close(sub)
}
return
}
case <-c.closed: // Channel closed
for _, sub := range c.Subscribers() { // Remove all subscribers
c.Unsubscribe(sub)
close(sub)
}
return
}
}
}()
}()
}
return nil
}

Expand Down
114 changes: 65 additions & 49 deletions app/stats/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package stats_test

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -12,25 +11,30 @@ import (
)

func TestStatsChannel(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
// At most 2 subscribers could be registered
c := NewChannel(&ChannelConfig{SubscriberLimit: 2})
source := c.Channel()

a, err := stats.SubscribeRunnableChannel(c)
common.Must(err)
if !c.Running() {
t.Fatal("unexpected failure in running channel after first subscription")
}

m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
b, err := c.Subscribe()
common.Must(err)
common.Must(m.Start())
defer m.Close()

source := c.(*Channel).Channel()
a := c.Subscribe()
b := c.Subscribe()
defer c.Unsubscribe(a)
defer c.Unsubscribe(b)
// Test that third subscriber is forbidden
_, err = c.Subscribe()
if err == nil {
t.Fatal("unexpected successful subscription")
}
t.Log("expected error: ", err)

stopCh := make(chan struct{})
errCh := make(chan string)

go func() {
go func() { // Blocking publish
source <- 1
source <- 2
source <- "3"
Expand Down Expand Up @@ -84,22 +88,31 @@ func TestStatsChannel(t *testing.T) {
t.Fatal(e)
case <-stopCh:
}

// Test the unsubscription of channel
common.Must(c.Unsubscribe(b))

// Test the last subscriber will close channel with `UnsubscribeClosableChannel`
common.Must(stats.UnsubscribeClosableChannel(c, a))
if c.Running() {
t.Fatal("unexpected running channel after unsubscribing the last subscriber")
}
}

func TestStatsChannelUnsubcribe(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
c := NewChannel(&ChannelConfig{})
common.Must(c.Start())
defer c.Close()

m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
source := c.Channel()

a := c.Subscribe()
b := c.Subscribe()
a, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(a)

b, err := c.Subscribe()
common.Must(err)

pauseCh := make(chan struct{})
stopCh := make(chan struct{})
errCh := make(chan string)
Expand All @@ -119,10 +132,10 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
}
}

go func() {
c.Publish(1)
go func() { // Blocking publish
source <- 1
<-pauseCh // Wait for `b` goroutine to resume sending message
c.Publish(2)
source <- 2
}()

go func() {
Expand Down Expand Up @@ -179,26 +192,27 @@ func TestStatsChannelUnsubcribe(t *testing.T) {
}

func TestStatsChannelTimeout(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
// Do not use buffer so as to create blocking scenario
c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 50})
common.Must(c.Start())
defer c.Close()

m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
source := c.Channel()

a := c.Subscribe()
b := c.Subscribe()
a, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(a)

b, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(b)

stopCh := make(chan struct{})
errCh := make(chan string)

go func() {
c.Publish(1)
c.Publish(2)
go func() { // Blocking publish
source <- 1
source <- 2
}()

go func() {
Expand Down Expand Up @@ -229,7 +243,7 @@ func TestStatsChannelTimeout(t *testing.T) {
errCh <- fmt.Sprint("unexpected receiving: ", v, ", wanted ", 1)
}
// Block `b` channel for a time longer than `source`'s timeout
<-time.After(150 * time.Millisecond)
<-time.After(200 * time.Millisecond)
{ // Test `b` has been unsubscribed by source
var aSet, bSet bool
for _, s := range c.Subscribers() {
Expand Down Expand Up @@ -264,25 +278,27 @@ func TestStatsChannelTimeout(t *testing.T) {
}

func TestStatsChannelConcurrency(t *testing.T) {
raw, err := common.CreateObject(context.Background(), &Config{})
common.Must(err)
// Do not use buffer so as to create blocking scenario
c := NewChannel(&ChannelConfig{BufferSize: 0, BroadcastTimeout: 100})
common.Must(c.Start())
defer c.Close()

m := raw.(stats.Manager)
c, err := m.RegisterChannel("test.channel")
common.Must(err)
common.Must(m.Start())
defer m.Close()
source := c.Channel()

a := c.Subscribe()
b := c.Subscribe()
a, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(a)

b, err := c.Subscribe()
common.Must(err)
defer c.Unsubscribe(b)

stopCh := make(chan struct{})
errCh := make(chan string)

go func() {
c.Publish(1)
c.Publish(2)
go func() { // Blocking publish
source <- 1
source <- 2
}()

go func() {
Expand Down
15 changes: 0 additions & 15 deletions app/stats/config.go

This file was deleted.

Loading

0 comments on commit 788dd1e

Please sign in to comment.