Skip to content

Commit

Permalink
improvement(header/p2p): unify sendMessage method to request headers (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Nov 29, 2022
1 parent c05065e commit 3444fdf
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 116 deletions.
66 changes: 8 additions & 58 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
)
Expand Down Expand Up @@ -50,10 +48,6 @@ type Exchange struct {
peerTracker *peerTracker
}

func protocolID(protocolSuffix string) protocol.ID {
return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix))
}

func NewExchange(host host.Host, peers peer.IDSlice, protocolSuffix string) *Exchange {
return &Exchange{
host: host,
Expand Down Expand Up @@ -194,53 +188,23 @@ func (ex *Exchange) request(
to peer.ID,
req *p2p_pb.ExtendedHeaderRequest,
) ([]*header.ExtendedHeader, error) {
stream, err := ex.host.NewStream(ctx, to, ex.protocolID)
responses, _, _, err := sendMessage(ctx, ex.host, to, ex.protocolID, req)
if err != nil {
return nil, err
}
if err = stream.SetWriteDeadline(time.Now().Add(writeDeadline)); err != nil {
log.Debugf("error setting deadline: %s", err)
}
// send request
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, err
}
err = stream.CloseWrite()
if err != nil {
log.Error(err)
if len(responses) == 0 {
return nil, header.ErrNotFound
}
// read responses
headers := make([]*header.ExtendedHeader, req.Amount)
for i := 0; i < int(req.Amount); i++ {
resp := new(p2p_pb.ExtendedHeaderResponse)
if err = stream.SetReadDeadline(time.Now().Add(readDeadline)); err != nil {
log.Debugf("error setting deadline: %s", err)
}
_, err := serde.Read(stream, resp)
if err != nil {
stream.Reset() //nolint:errcheck
headers := make([]*header.ExtendedHeader, 0, len(responses))
for _, response := range responses {
if err = convertStatusCodeToError(response.StatusCode); err != nil {
return nil, err
}

if err = convertStatusCodeToError(resp.StatusCode); err != nil {
stream.Reset() //nolint:errcheck
return nil, err
}
header, err := header.UnmarshalExtendedHeader(resp.Body)
header, err := header.UnmarshalExtendedHeader(response.Body)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, err
}

headers[i] = header
}
if err = stream.Close(); err != nil {
log.Errorw("closing stream", "err", err)
}
if len(headers) == 0 {
return nil, header.ErrNotFound
headers = append(headers, header)
}
return headers, nil
}
Expand Down Expand Up @@ -274,17 +238,3 @@ func bestHead(result []*header.ExtendedHeader) (*header.ExtendedHeader, error) {
// otherwise return header with the max height
return result[0], nil
}

// convertStatusCodeToError converts passed status code into an error.
func convertStatusCodeToError(code p2p_pb.StatusCode) error {
switch code {
case p2p_pb.StatusCode_OK:
return nil
case p2p_pb.StatusCode_NOT_FOUND:
return header.ErrNotFound
case p2p_pb.StatusCode_LIMIT_EXCEEDED:
return header.ErrHeadersLimitExceeded
default:
return fmt.Errorf("unknown status code %d", code)
}
}
97 changes: 97 additions & 0 deletions header/p2p/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package p2p

import (
"context"
"fmt"
"io"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
)

func protocolID(protocolSuffix string) protocol.ID {
return protocol.ID(fmt.Sprintf("/header-ex/v0.0.3/%s", protocolSuffix))
}

// sendMessage opens the stream to the given peers and sends ExtendedHeaderRequest to fetch ExtendedHeaders.
// As a result sendMessage returns ExtendedHeaderResponse, the size of fetched data,
// the duration of the request and an error.
func sendMessage(
ctx context.Context,
host host.Host,
to peer.ID,
protocol protocol.ID,
req *p2p_pb.ExtendedHeaderRequest,
) ([]*p2p_pb.ExtendedHeaderResponse, uint64, uint64, error) {
startTime := time.Now()
stream, err := host.NewStream(ctx, to, protocol)
if err != nil {
return nil, 0, 0, err
}

// set stream deadline from the context deadline.
// if it is empty, then we assume that it will
// hang until the server will close the stream by the timeout.
if dl, ok := ctx.Deadline(); ok {
if err = stream.SetDeadline(dl); err != nil {
log.Debugw("error setting deadline: %s", err)
}
}

// send request
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, 0, 0, err
}

err = stream.CloseWrite()
if err != nil {
return nil, 0, 0, nil
}

headers := make([]*p2p_pb.ExtendedHeaderResponse, 0)
totalRequestSize := uint64(0)
for i := 0; i < int(req.Amount); i++ {
resp := new(p2p_pb.ExtendedHeaderResponse)
msgSize, err := serde.Read(stream, resp)
if err != nil {
if err == io.EOF {
break
}
stream.Reset() //nolint:errcheck
return nil, 0, 0, err
}

totalRequestSize += uint64(msgSize)
headers = append(headers, resp)
}

duration := time.Since(startTime).Milliseconds()
if err = stream.Close(); err != nil {
log.Errorw("closing stream", "err", err)
}

return headers, totalRequestSize, uint64(duration), nil
}

// convertStatusCodeToError converts passed status code into an error.
func convertStatusCodeToError(code p2p_pb.StatusCode) error {
switch code {
case p2p_pb.StatusCode_OK:
return nil
case p2p_pb.StatusCode_NOT_FOUND:
return header.ErrNotFound
case p2p_pb.StatusCode_LIMIT_EXCEEDED:
return header.ErrHeadersLimitExceeded
default:
return fmt.Errorf("unknown status code %d", code)
}
}
59 changes: 1 addition & 58 deletions header/p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ package p2p
import (
"context"
"errors"
"io"
"sort"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

"github.com/celestiaorg/go-libp2p-messenger/serde"

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
)
Expand Down Expand Up @@ -116,7 +111,7 @@ func (s *session) doRequest(
req *p2p_pb.ExtendedHeaderRequest,
headers chan []*header.ExtendedHeader,
) {
r, size, duration, err := s.requestHeaders(ctx, stat.peerID, req)
r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return
Expand Down Expand Up @@ -144,58 +139,6 @@ func (s *session) doRequest(
s.queue.push(stat)
}

// requestHeaders sends the ExtendedHeaderRequest to a remote peer.
func (s *session) requestHeaders(
ctx context.Context,
to peer.ID,
req *p2p_pb.ExtendedHeaderRequest,
) ([]*p2p_pb.ExtendedHeaderResponse, uint64, uint64, error) {
startTime := time.Now()
stream, err := s.host.NewStream(ctx, to, s.protocolID)
if err != nil {
return nil, 0, 0, err
}

// set stream deadline from the context deadline.
// if it is empty, then we assume that it will
// hang until the server will close the stream by the timeout.
if dl, ok := ctx.Deadline(); ok {
if err = stream.SetDeadline(dl); err != nil {
log.Debugw("error setting deadline: %s", err)
}
}
// send request
_, err = serde.Write(stream, req)
if err != nil {
stream.Reset() //nolint:errcheck
return nil, 0, 0, err
}
err = stream.CloseWrite()
if err != nil {
log.Error(err)
}
headers := make([]*p2p_pb.ExtendedHeaderResponse, 0)
totalRequestSize := uint64(0)
for i := 0; i < int(req.Amount); i++ {
resp := new(p2p_pb.ExtendedHeaderResponse)
msgSize, err := serde.Read(stream, resp)
if err != nil {
if err == io.EOF {
break
}
stream.Reset() //nolint:errcheck
return nil, 0, 0, err
}
totalRequestSize += uint64(msgSize)
headers = append(headers, resp)
}
duration := time.Since(startTime).Milliseconds()
if err = stream.Close(); err != nil {
log.Errorw("closing stream", "err", err)
}
return headers, totalRequestSize, uint64(duration), nil
}

// processResponse converts ExtendedHeaderResponse to ExtendedHeader.
func (s *session) processResponse(responses []*p2p_pb.ExtendedHeaderResponse) ([]*header.ExtendedHeader, error) {
headers := make([]*header.ExtendedHeader, 0)
Expand Down

0 comments on commit 3444fdf

Please sign in to comment.