Skip to content

Commit

Permalink
initial baseline for p2p node (dominant-strategies#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
alejoacosta74 authored Oct 17, 2023
1 parent 0af6bea commit 3b0df2c
Show file tree
Hide file tree
Showing 18 changed files with 2,127 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
# or operating system, you probably want to add a global ignore instead:
# git config --global core.excludesfile ~/.gitignore_global

private.key
30 changes: 18 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ https://camo.githubusercontent.com/915b7be44ada53c290eb157634330494ebe3e30a/6874
[![Go Report Card](https://goreportcard.com/badge/github.com/dominant-strategies/go-quai)](https://goreportcard.com/report/github.com/dominant-strategies/go-quai)
[![Discord](https://img.shields.io/badge/discord-join%20chat-blue.svg)](https://discord.gg/s8y8asPwNC)

* [Prerequisites](#prerequisites)
* [Building The Source](#building-the-source)
* [Executables](#executables)
* [go-quai](#go-quai)
* [test](#test)
* [Running go-quai](#running-go-quai)
* [Configuration](#configuration)
* [Starting and stopping a node](#starting-and-stopping-a-node)
* [Viewing logs](#viewing-logs)
* [Garden test network](#garden-test-network)
* [Contribution](#contribution)
* [License](#license)
- [Go Quai](#go-quai)
- [Prerequisites](#prerequisites)
- [Building the source](#building-the-source)
- [Executables](#executables)
- [go-quai](#go-quai-1)
- [test](#test)
- [Running `go-quai`](#running-go-quai)
- [Configuration](#configuration)
- [Starting and stopping a node](#starting-and-stopping-a-node)
- [Viewing logs](#viewing-logs)
- [Garden test network](#garden-test-network)
- [Developer notes](#developer-notes)
- [Contribution](#contribution)
- [License](#license)

## Prerequisites

Expand Down Expand Up @@ -124,6 +126,10 @@ The Garden test network is based on the Blake3 proof-of-work consensus algorithm

To run on the Garden test network, modify the `network.env` configuration file to reflect `NETWORK=garden`. You should also set `ENABLE_ARCHIVE=true` to make sure to save the trie-nodes after you stop your node. Then [build](#building-the-source) and [run](#starting-and-stopping-a-node) with the same commands as mainnet.

## Developer notes

See [dev-notes.md](dev-notes.md) for more information on the development of go-quai.

## Contribution

Thank you for considering to help out with the source code! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes.
Expand Down
65 changes: 65 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package client

import (
"context"
"fmt"
"net/http"

"github.com/dominant-strategies/go-quai/log"

dht "github.com/libp2p/go-libp2p-kad-dht"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
)

type P2PClient struct {
node host.Host
dht *dht.IpfsDHT
httpServer *http.Server
ctx context.Context
}

func NewClient(ctx context.Context, node host.Host) *P2PClient {
client := &P2PClient{
node: node,
ctx: ctx,
}

client.node.SetStreamHandler(myProtocol, client.handleStream)
return client
}

// subscribes to the event bus to listen for specific events
func (c *P2PClient) ListenForEvents() {
subAddrUpdated, err := c.node.EventBus().Subscribe(new(event.EvtLocalAddressesUpdated))
if err != nil {
log.Fatalf("Failed to subscribe to address change events: %s", err)
}
defer subAddrUpdated.Close()

subPeerConnected, err := c.node.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
log.Fatalf("Failed to subscribe to peer connectedness events: %s", err)
}
defer subPeerConnected.Close()

for {
select {
case evt := <-subAddrUpdated.Out():
if e, ok := evt.(event.EvtLocalAddressesUpdated); ok {
for _, addr := range e.Current {
fullAddr := fmt.Sprintf("%+v/p2p/%s", addr, c.node.ID().Pretty())
log.Debugf("Advertised Address changed: %s", fullAddr)
}
}
case evt := <-subPeerConnected.Out():
if e, ok := evt.(event.EvtPeerConnectednessChanged); ok {
log.Tracef("Peer %s is now %s", e.Peer.String(), e.Connectedness)
}
case <-c.ctx.Done():
log.Warnf("Context cancel received. Stopping event listener")
return
}
}
}
50 changes: 50 additions & 0 deletions client/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package client

import (
"github.com/dominant-strategies/go-quai/log"

kadht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/peer"
)

// initDHT initializes the DHT using client's libp2p node
func (c *P2PClient) InitDHT() error {
dht, err := kadht.New(c.ctx, c.node)
if err != nil {
log.Fatalf("error creating DHT: %s", err)
}
c.dht = dht
return nil
}

// BootstrapDHT bootstraps the DHT with the given list of bootstrap peers
func (c *P2PClient) BootstrapDHT(bootstrapPeers ...string) error {
var bootStrapPeersAddrInfo []peer.AddrInfo
if len(bootstrapPeers) == 0 {
// if no bootstrap peers are given, bootstrap with the default bootstrap peers
log.Warnf("no bootstrap peers given, using default public bootstrap peers")
bootStrapPeersAddrInfo = kadht.GetDefaultBootstrapPeerAddrInfos()
} else {
for _, peerAddr := range bootstrapPeers {
peerInfo, err := peer.AddrInfoFromString(peerAddr)
if err != nil {
log.Errorf("error creating peer info from address: %s", err)
continue
}
bootStrapPeersAddrInfo = append(bootStrapPeersAddrInfo, *peerInfo)
}
}

for _, peerInfo := range bootStrapPeersAddrInfo {
log.Debugf("adding bootstraping node: %s", peerInfo.ID.Pretty())
err := c.node.Connect(c.ctx, peerInfo)
if err != nil {
log.Errorf("error connecting to bootstrap node: %s", err)
continue
}
log.Debugf("connected to bootstrap node: %s", peerInfo.ID.Pretty())
}

// Bootstrap the DHT
return c.dht.Bootstrap(c.ctx)
}
163 changes: 163 additions & 0 deletions client/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package client

import (
"context"
"io"
"net/http"
"strings"

"github.com/dominant-strategies/go-quai/log"

// log "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
multiaddr "github.com/multiformats/go-multiaddr"
)

func (c *P2PClient) StartServer(port string) error {
// start http server
mux := c.newServer()
c.httpServer = &http.Server{
Addr: ":" + port,
Handler: mux,
}
log.Infof("http server listening on port %s", port)
return c.httpServer.ListenAndServe()
}

func (c *P2PClient) StopServer() {
err := c.httpServer.Shutdown(context.Background())
if err != nil {
log.Fatalf("error stopping http server: %s", err)
}
}

// handler for the /dhtpeers endpoint
func (c *P2PClient) dhtPeersHandler(w http.ResponseWriter, r *http.Request) {
// get the list of peers from the dht
peers := c.dht.RoutingTable().ListPeers()
log.Debugf("peers on dht: %d", len(peers))
// write the list of peers to the response
for _, peer := range peers {
w.Write([]byte(peer.Pretty() + "\n"))
}
}

// handler for the /nodepeers endpoint
func (c *P2PClient) nodePeersHandler(w http.ResponseWriter, r *http.Request) {
// get the list of peers from the host
peers := c.node.Peerstore().Peers()
// write the list of peers to the response
log.Debugf("peers on peerStore: %d", len(peers))
for _, peer := range peers {
w.Write([]byte(peer.Pretty() + "\n"))
}
}

// handler for the /connect/{multiaddr} endpoint
func (c *P2PClient) connectHandler(w http.ResponseWriter, r *http.Request) {
addr := strings.TrimPrefix(r.URL.Path, "/connect")
maddr, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Errorf("Failed to parse multiaddress: %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
addrInfo, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
log.Errorf("Failed to parse multiaddress: %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// Discover the Mac node's addresses using DHT
discoveredAddrs, err := c.dht.FindPeer(c.ctx, addrInfo.ID)
if err != nil {
log.Errorf("Failed to discover peer: %v, error: %s", addrInfo.ID, err)
http.Error(w, "Failed to discover peer: "+err.Error(), http.StatusInternalServerError)
return
}
log.Info("Discovered addresses:", discoveredAddrs)

ctx := context.Background()
if err := c.node.Connect(ctx, *addrInfo); err != nil {
log.Errorf("Failed to connect to peer: %v, %s", addrInfo.ID.Pretty(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write([]byte("Connected successfully to peer " + addrInfo.ID.Pretty()))
}

// handler for the /discover/{peerID} endpoint
func (c *P2PClient) discoverHandler(w http.ResponseWriter, r *http.Request) {
peerIDStr := strings.TrimPrefix(r.URL.Path, "/discover/")
peerID, err := peer.Decode(peerIDStr)
if err != nil {
log.Errorf("Failed to decode peer ID: %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
log.Debugf("Received request to discover Peer ID: %s", peerID.String())

// Discover the peer's addresses using DHT
discoveredAddrsInfo, err := c.dht.FindPeer(c.ctx, peerID)
if err != nil {
log.Errorf("Failed to discover peer: %v, error: %s", peerID, err)
http.Error(w, "Failed to discover peer: "+err.Error(), http.StatusInternalServerError)
return
}
log.Info("Discovered addresses:", discoveredAddrsInfo)

if err := c.node.Connect(c.ctx, discoveredAddrsInfo); err != nil {
log.Errorf("Failed to connect to peer: %v, %s", discoveredAddrsInfo.ID.Pretty(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Write([]byte("Discovered and connected successfully to peer " + discoveredAddrsInfo.ID.Pretty()))
}

// handler for the /send/{peerID} endpoint
func (c *P2PClient) sendMessageHandler(w http.ResponseWriter, r *http.Request) {
parts := strings.Split(r.URL.Path, "/")
if len(parts) != 3 {
log.Errorf("Invalid request. Parts: %v", parts)
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}

peerIDStr := parts[2]
peerID, err := peer.Decode(peerIDStr)
if err != nil {
log.Errorf("Failed to decode peer ID: %s", err)
http.Error(w, "Invalid peer ID", http.StatusBadRequest)
return
}

// Read the message from the request body
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusInternalServerError)
return
}
message := string(bodyBytes)

err = c.sendMessage(peerID, message)
if err != nil {
http.Error(w, "Failed to send message", http.StatusInternalServerError)
return
}
log.Infof("Sent message: '%s' to peer %s", message, peerID.String())
w.Write([]byte("Message sent successfully"))

}

// NewServer creates a new http server with the available handlers
func (c *P2PClient) newServer() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/dhtpeers", c.dhtPeersHandler)
mux.HandleFunc("/nodepeers", c.nodePeersHandler)
mux.HandleFunc("/connect/", c.connectHandler)
mux.HandleFunc("/send/", c.sendMessageHandler)
mux.HandleFunc("/discover/", c.discoverHandler)
return mux
}
66 changes: 66 additions & 0 deletions client/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package client

import (
"github.com/dominant-strategies/go-quai/log"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

const myProtocol = "/quai/0.1.0"

// This function is used by the node to handle incoming streams.
// When a message is received, it prints the message and sends an acknowlegdment back to the sender.
func (c *P2PClient) handleStream(s network.Stream) {
defer s.Close()
// Create a buffer to read the message
buf := make([]byte, 1024)
n, err := s.Read(buf)
if err != nil {
log.Errorf("Error reading from stream: %s", err)
return
}

// Print the received message
log.Infof("Received message: '%s'", string(buf[:n]))

// send acknowlegdment back to the sender
_, err = s.Write([]byte("Message received!"))
if err != nil {
log.Errorf("Error sending acknowlegdment: %s", err)
}

// Read the acknowledgment from the sender
n, err = s.Read(buf)
if err != nil {
log.Errorf("Error reading acknowledgment from stream: %s", err)
return
}

// Print the received acknowledgment
log.Infof("Received acknowledgment: '%s'", string(buf[:n]))
}

// This function is used by the node to send a message to a peer using a stream
func (c *P2PClient) sendMessage(peerID peer.ID, message string) error {
// Open a stream to the peer
s, err := c.node.NewStream(c.ctx, peerID, myProtocol)
if err != nil {
log.Errorf("Error opening stream: %s", err)
return err
}
defer s.Close()
if err != nil {
log.Errorf("Error opening stream: %s", err)
return err
}

// Write the message to the stream
_, err = s.Write([]byte(message))
if err != nil {
log.Errorf("Error writing to stream: %s", err)
return err
}
log.Infof("Sent message: '%s' to peer %s", message, peerID.String())
return nil
}
Loading

0 comments on commit 3b0df2c

Please sign in to comment.