Skip to content
This repository has been archived by the owner on May 15, 2024. It is now read-only.

Commit

Permalink
FABG-940 Register for block/contract/commit events (hyperledger#63)
Browse files Browse the repository at this point in the history
Add support to gateway package to handle events.
Consistent with Gateway programming model, but using Go channels instead of callback functions.
Includes unit tests & integration tests.

Signed-off-by: andrew-coleman <[email protected]>
  • Loading branch information
andrew-coleman authored Apr 20, 2020
1 parent f7729f1 commit 2d73fad
Show file tree
Hide file tree
Showing 13 changed files with 638 additions and 142 deletions.
22 changes: 21 additions & 1 deletion pkg/gateway/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ SPDX-License-Identifier: Apache-2.0

package gateway

import "github.com/hyperledger/fabric-sdk-go/pkg/client/channel"
import (
"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
)

// A Contract object represents a smart contract instance in a network.
// Applications should get a Contract instance from a Network using the GetContract method
Expand Down Expand Up @@ -61,3 +64,20 @@ func (c *Contract) SubmitTransaction(name string, args ...string) ([]byte, error
func (c *Contract) CreateTransaction(name string, args ...TransactionOption) (*Transaction, error) {
return newTransaction(name, c, args...)
}

// RegisterEvent registers for chaincode events. Unregister must be called when the registration is no longer needed.
// Parameters:
// eventFilter is the chaincode event filter (regular expression) for which events are to be received
//
// Returns:
// the registration and a channel that is used to receive events. The channel is closed when Unregister is called.
func (c *Contract) RegisterEvent(eventFilter string) (fab.Registration, <-chan *fab.CCEvent, error) {
return c.network.event.RegisterChaincodeEvent(c.chaincodeID, eventFilter)
}

// Unregister removes the given registration and closes the event channel.
// Parameters:
// registration is the registration handle that was returned from RegisterContractEvent method
func (c *Contract) Unregister(registration fab.Registration) {
c.network.event.Unregister(registration)
}
38 changes: 32 additions & 6 deletions pkg/gateway/contract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func TestSubmitTransaction(t *testing.T) {

gw := &Gateway{
options: &gatewayOptions{
CommitHandler: DefaultCommitHandlers.OrgAll,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

Expand Down Expand Up @@ -70,9 +69,8 @@ func TestEvaluateTransaction(t *testing.T) {

gw := &Gateway{
options: &gatewayOptions{
CommitHandler: DefaultCommitHandlers.OrgAll,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

Expand All @@ -94,3 +92,31 @@ func TestEvaluateTransaction(t *testing.T) {
t.Fatalf("Incorrect transaction result: %s", result)
}
}

func TestContractEvent(t *testing.T) {
c := mockChannelProvider("mychannel")

gw := &Gateway{
options: &gatewayOptions{
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

nw, err := newNetwork(gw, c)

if err != nil {
t.Fatalf("Failed to create network: %s", err)
}

contr := nw.GetContract("contract1")

eventID := "test([a-zA-Z]+)"

reg, _, err := contr.RegisterEvent(eventID)
if err != nil {
t.Fatalf("Failed to register contract event: %s", err)
}
defer contr.Unregister(reg)

}
53 changes: 0 additions & 53 deletions pkg/gateway/defaultcommithandlers.go

This file was deleted.

25 changes: 6 additions & 19 deletions pkg/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ type Gateway struct {
}

type gatewayOptions struct {
Identity mspProvider.SigningIdentity
User string
CommitHandler CommitHandlerFactory
Discovery bool
Timeout time.Duration
Identity mspProvider.SigningIdentity
User string
Discovery bool
Timeout time.Duration
}

// Option functional arguments can be supplied when connecting to the gateway.
Expand All @@ -54,9 +53,8 @@ func Connect(config ConfigOption, identity IdentityOption, options ...Option) (*

g := &Gateway{
options: &gatewayOptions{
CommitHandler: DefaultCommitHandlers.OrgAll,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

Expand Down Expand Up @@ -173,17 +171,6 @@ func WithUser(user string) IdentityOption {
}
}

// WithCommitHandler is an optional argument to the Connect method which
// allows an alternative commit handler to be specified. The commit handler defines how
// client code should wait to receive commit events from peers following submit of a transaction.
// Currently unimplemented.
func WithCommitHandler(handler CommitHandlerFactory) Option {
return func(gw *Gateway) error {
gw.options.CommitHandler = handler
return nil
}
}

// WithDiscovery is an optional argument to the Connect method which
// enables or disables service discovery for all transaction submissions for this gateway.
func WithDiscovery(discovery bool) Option {
Expand Down
47 changes: 0 additions & 47 deletions pkg/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ func TestConnectNoOptions(t *testing.T) {

options := gw.options

if options.CommitHandler != DefaultCommitHandlers.OrgAll {
t.Fatal("DefaultCommitHandler not correctly initialized")
}

if options.Discovery != true {
t.Fatal("Discovery not correctly initialized")
}
Expand Down Expand Up @@ -96,10 +92,6 @@ func TestConnectWithSDK(t *testing.T) {

options := gw.options

if options.CommitHandler != DefaultCommitHandlers.OrgAll {
t.Fatal("DefaultCommitHandler not correctly initialized")
}

if options.Discovery != true {
t.Fatal("Discovery not correctly initialized")
}
Expand Down Expand Up @@ -133,23 +125,6 @@ func TestConnectWithIdentity(t *testing.T) {
}
}

func TestConnectWithCommitHandler(t *testing.T) {
gw, err := Connect(
WithConfig(config.FromFile("testdata/connection-tls.json")),
WithUser("user1"),
WithCommitHandler(DefaultCommitHandlers.OrgAny),
)
if err != nil {
t.Fatalf("Failed to create gateway: %s", err)
}

options := gw.options

if options.CommitHandler != DefaultCommitHandlers.OrgAny {
t.Fatal("CommitHandler not set correctly")
}
}

func TestConnectWithDiscovery(t *testing.T) {
gw, err := Connect(
WithConfig(config.FromFile("testdata/connection-tls.json")),
Expand Down Expand Up @@ -184,28 +159,6 @@ func TestConnectWithTimout(t *testing.T) {
}
}

func TestConnectWithMultipleOptions(t *testing.T) {
gw, err := Connect(
WithConfig(config.FromFile("testdata/connection-tls.json")),
WithUser("user1"),
WithCommitHandler(DefaultCommitHandlers.OrgAny),
WithDiscovery(false),
)
if err != nil {
t.Fatalf("Failed to create gateway: %s", err)
}

options := gw.options

if options.Discovery != false {
t.Fatal("Discovery not set correctly")
}

if options.CommitHandler != DefaultCommitHandlers.OrgAny {
t.Fatal("CommitHandler not set correctly")
}
}

func TestGetSDK(t *testing.T) {
gw, err := Connect(
WithConfig(config.FromFile("testdata/connection-tls.json")),
Expand Down
28 changes: 28 additions & 0 deletions pkg/gateway/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package gateway

import (
"github.com/hyperledger/fabric-sdk-go/pkg/client/channel"
"github.com/hyperledger/fabric-sdk-go/pkg/client/event"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/context"
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
"github.com/pkg/errors"
Expand All @@ -20,6 +21,7 @@ type Network struct {
gateway *Gateway
client *channel.Client
peers []fab.Peer
event *event.Client
}

func newNetwork(gateway *Gateway, channelProvider context.ChannelProvider) (*Network, error) {
Expand Down Expand Up @@ -54,6 +56,11 @@ func newNetwork(gateway *Gateway, channelProvider context.ChannelProvider) (*Net

n.peers = peers

n.event, err = event.New(channelProvider, event.WithBlockEvents())
if err != nil {
return nil, errors.Wrap(err, "Failed to create new event client")
}

return &n, nil
}

Expand All @@ -66,3 +73,24 @@ func (n *Network) Name() string {
func (n *Network) GetContract(chaincodeID string) *Contract {
return newContract(n, chaincodeID, "")
}

// RegisterBlockEvent registers for block events. Unregister must be called when the registration is no longer needed.
// Returns:
// the registration and a channel that is used to receive events. The channel is closed when Unregister is called.
func (n *Network) RegisterBlockEvent() (fab.Registration, <-chan *fab.BlockEvent, error) {
return n.event.RegisterBlockEvent()
}

// RegisterFilteredBlockEvent registers for filtered block events. Unregister must be called when the registration is no longer needed.
// Returns:
// the registration and a channel that is used to receive events. The channel is closed when Unregister is called.
func (n *Network) RegisterFilteredBlockEvent() (fab.Registration, <-chan *fab.FilteredBlockEvent, error) {
return n.event.RegisterFilteredBlockEvent()
}

// Unregister removes the given registration and closes the event channel.
// Parameters:
// registration is the registration handle that was returned from RegisterBlockEvent method
func (n *Network) Unregister(registration fab.Registration) {
n.event.Unregister(registration)
}
50 changes: 50 additions & 0 deletions pkg/gateway/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,56 @@ func TestGetContract(t *testing.T) {
}
}

func TestBlockEvent(t *testing.T) {

gw := &Gateway{
options: &gatewayOptions{
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

c := mockChannelProvider("mychannel")

nw, err := newNetwork(gw, c)

if err != nil {
t.Fatalf("Failed to create network: %s", err)
}

reg, _, err := nw.RegisterBlockEvent()
if err != nil {
t.Fatalf("Failed to register block event: %s", err)
}

nw.Unregister(reg)
}

func TestFilteredBlocktEvent(t *testing.T) {

gw := &Gateway{
options: &gatewayOptions{
Discovery: defaultDiscovery,
Timeout: defaultTimeout,
},
}

c := mockChannelProvider("mychannel")

nw, err := newNetwork(gw, c)

if err != nil {
t.Fatalf("Failed to create network: %s", err)
}

reg, _, err := nw.RegisterFilteredBlockEvent()
if err != nil {
t.Fatalf("Failed to register filtered block event: %s", err)
}

nw.Unregister(reg)
}

func mockChannelProvider(channelID string) context.ChannelProvider {

channelProvider := func() (context.Channel, error) {
Expand Down
12 changes: 0 additions & 12 deletions pkg/gateway/spi.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,6 @@ package gateway
// for implementing alternative gateway strategies, wallets, etc.
// This is currently experimental and will be implemented in future user stories

// CommitHandlerFactory is currently unimplemented
type CommitHandlerFactory interface {
Create(string, Network) CommitHandler
}

// CommitHandler is currently unimplemented
type CommitHandler interface {
StartListening()
WaitForEvents(int64)
CancelListening()
}

// WalletStore is the interface for implementations that provide backing storage for identities in a wallet.
// To create create a new backing store, implement all the methods defined in this interface and provide
// a factory method that wraps an instance of this in a new Wallet object. E.g:
Expand Down
Loading

0 comments on commit 2d73fad

Please sign in to comment.