Skip to content

Commit

Permalink
Was working, but it broke while I was writing documentation. Damn you…
Browse files Browse the repository at this point in the history
…, bit rot.
  • Loading branch information
jeffallen committed Feb 4, 2016
1 parent 8ffc1e4 commit ac7b668
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ smqttsrv/server.key
smqttsrv/smqttsrv
mqttsrv/mqttsrv
vbridge/vbridge
ticktock/ticktock
67 changes: 67 additions & 0 deletions ticktock/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"flag"
"fmt"
"net"
"os"
"time"

proto "github.com/huin/mqtt"
"github.com/jeffallen/mqtt"
)

var host = flag.String("host", "localhost:1883", "hostname of broker")
var id = flag.String("id", "", "client id")
var user = flag.String("user", "", "username")
var pass = flag.String("pass", "", "password")
var dump = flag.Bool("dump", false, "dump messages?")
var delay = flag.Duration("delay", time.Second, "delay between messages")
var who = flag.String("who", "bonnie", "who is this? (to make two instance of ticktock distinct)")

func main() {
flag.Parse()

conn, err := net.Dial("tcp", *host)
if err != nil {
fmt.Fprint(os.Stderr, "dial: ", err)
return
}
cc := mqtt.NewClientConn(conn)
cc.Dump = *dump
cc.ClientId = *id

tq := []proto.TopicQos{
{Topic: "tick", Qos: proto.QosAtMostOnce},
}

if err := cc.Connect(*user, *pass); err != nil {
fmt.Fprintf(os.Stderr, "connect: %v\n", err)
os.Exit(1)
}

cc.Subscribe(tq)

// Sender
go func() {
for {
now := time.Now()
what := fmt.Sprintf("%v at %v", *who, now)

cc.Publish(&proto.Publish{
Header: proto.Header{Retain: false},
TopicName: "tick",
Payload: proto.BytesPayload([]byte(what)),
})

time.Sleep(*delay)
}
}()

// Receiver
for m := range cc.Incoming {
fmt.Print(m.TopicName, "\t")
m.Payload.WritePayload(os.Stdout)
fmt.Println("\tr: ", m.Header.Retain)
}
}
99 changes: 99 additions & 0 deletions vbridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
Vbridge
=======

Vbridge is a bridge for message brokers which uses [Vanadium] (https://v.io)
for RPC that can securely and reliably cross network borders.

Vbridge uses Vanadium's input and output streams to move MQTT messages
in both directions between the linked brokers. It is not currently
possible to limit replication to one direction or the other.

Vbridge operates in one of three modes: server, client, or pipe (dual client).

Server
------

As a server, vbridge implements the [Bridge service]
(https://github.com/jeffallen/mqtt/tree/master/vbridge/ifc) and
bridges messages to/from the caller into a the MQTT server it is
configured to attach to.

To run it in server mode, use "./vbridge -service-name XXX". XXX will
be registered the Vanadium namespace, so that callers can find the
service.

Client
------

vbridge can connect to an MQTT broker as a client and then bridge
messages to another instance of Vbridge.

To run it in client mode, use the "-to" argument to tell it which
Vanadium service name it should be connecting to.

Pipe
----

vbridge can act as a pipe, connecting two servers which both
provide the Bridge service.

To run it in pipe mode, use the "-to" and "-from" arguments to tell it
which two Vanadium service names it should be linking. There's no
practical difference between to and from, they are just two flags to
make it clearer what's happening.

(pipe mode is not yet implemented)

Echoing
-------

Vbridge uses one MQTT connection for both sending and receiving
messages. The MQTT specification, though it does not clearly indicate
it, implies that a message that is published on a given connection
should be echoed back to the same connection, if that connection is
subscribed to a matching topic.

If that were the case, it would cause vbridge to create an endless
message loop.

Therefore, the MQTT server in this Git repository has been patched to
not send these echo messages, because that makes vbridge work, and
because it just makes more sense that way.

If you need vbridge to work with a message broker that has this echo
"behavior" (or "problem", depending on how you look at it) then you'll
need to patch the broker to not do that.

Example
-------

The following set of commands shows the system working:

mqttsrv/mqttsrv -addr :1883 &
mqttsrv/mqttsrv -addr :1884 &
ticktock/ticktock -host :1883 -who bonnie &
ticktock/ticktock -host :1884 -who clyde &
vbridge/vbridge -service-name tmp/$USER/vbridge -host :1883 &
vbridge/vbridge -to tmp/$USER/vbridge -host :1884

(You might want to run the ticktocks in two windows, to see
separately what each of them is saying.)

Ticktock is a program that subscribes to topic "tick"
while publishing messages on that topic, once a second.
(see ticktock -help for options).

The first vbridge is running as a server. It registers
itself as service tmp/$USER/vbridge in your default
mount table (usually in dev.v.io).

The second vbridge is running as a client. It looks up
the first one in the mount table, and calls it.

Once the sixth command is run (the client vbridge) the ticktocks
will start showing messages from the other ticktock. But
because each ticktock is connected only to it's "own"
MQTT server, this demonstrates that the bridge is functioning.

If you stop and start the client vbridge, the messages stop
and start in the ticktocks.
16 changes: 11 additions & 5 deletions vbridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"log"
"strings"

"github.com/jeffallen/mqtt/vbridge/ifc"
Expand Down Expand Up @@ -44,12 +45,11 @@ func main() {
return
}

// Wait forever.
<-signals.ShutdownOnSignals(ctx)

endpoint := server.Status().Endpoints[0]
fmt.Printf("Listening at: %v\n", endpoint)

// Wait forever.
<-signals.ShutdownOnSignals(ctx)
} else {
cc, mu, err := mqttConnect()
if err != nil {
Expand All @@ -64,20 +64,26 @@ func main() {
}

bc := ifc.BridgeClient(*to)
// timeout := options.ChannelTimeout(2 * time.Second)
bcc, err := bc.Link(ctx, ifct)
if err != nil {
ctx.Error(err)
return
}

done := make(chan error)
done := make(chan error, 2)
go func() {
done <- transmitter(ifct, bcc.SendStream(), cc, mu)
println("send done")
}()
go func() {
done <- receiver(bcc.RecvStream(), cc, mu)
println("recv done")
}()
<-done
err = <-done
log.Print("Stopped with error ", err)

// Stop sender by closing cc.Incoming
cc.Disconnect()
}
}
3 changes: 1 addition & 2 deletions vbridge/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func mqttConnect() (*mqtt.ClientConn, *sync.Mutex, error) {
func (f *impl) Link(ctx *context.T, sc ifc.BridgeLinkServerCall, topics []ifc.Topic) error {
ctx.Info("Link for topics ", topics)

done := make(chan error)
done := make(chan error, 2)

cc, mu, err := mqttConnect()
if err != nil {
Expand All @@ -58,7 +58,6 @@ func (f *impl) Link(ctx *context.T, sc ifc.BridgeLinkServerCall, topics []ifc.To
// Stop sender by closing cc.Incoming
cc.Disconnect()

ctx.Info("done with err = ", err)
return err
}

Expand Down

0 comments on commit ac7b668

Please sign in to comment.