Skip to content
This repository has been archived by the owner on Jul 1, 2023. It is now read-only.

Question regards to the source code and new transport layer #405

Open
VoVAllen opened this issue Aug 13, 2021 · 17 comments
Open

Question regards to the source code and new transport layer #405

VoVAllen opened this issue Aug 13, 2021 · 17 comments

Comments

@VoVAllen
Copy link

Hi,

I'm interested in adding a new transport layer for AWS Elastic Fabric Adapter (kind of RDMA, but based on libfabric interface). However I got confused between Channel and Transport. Based on documentation, the data buffer is supposed to transfer through channels, therefore I thought the ibv(RDMA) implementation should be in channel folder. But actually it's in the transport folder. I'm wondering what's the design between the Channel and the Transport.

Also I found there're multiple channels with abbreviations, that are hard to understand. Based on commit history I found CMA means cross memory access, and MPT means Multiplex transport. But I didn't find explanation on XTH. What does XTH mean?

Thanks for the wonderful project.

@lw
Copy link
Contributor

lw commented Aug 16, 2021

Hi, thanks for your interest! We've been wanting to add EFA backends for a while but have been always a bit deterred by the requirement of learning a new library (libfabric). If you're already familiar with it and would like to contribute it that would be awesome!

As for your questions, both transports and channels are pluggable and modular backends, but they have some slightly different goals. To give an overall picture, channels stack on top of transports. Transports are thus the lowest-level components, they are primarily used to transfer control messages (and small payloads), thus they are CPU-only (no CUDA), they should be optimized for low-latency (as a single "logical" transfer could require several roundtrips over a transport), and lastly they need to have some kind of signalling ability (an endpoint must be able to detect and "wake up" when some data comes in and invoke a callback), which typically means that transports need to have an internal thread. Channels, on the other hand, are optimized for larger tensors, which can be both CPU and CUDA, they should target high bandwidths, and they are able to leverage transports for any signalling they need (by sending control messages) thus channels might go without an internal thread. There are also some other slight differences (e.g., transports can handle "unsolicited" reads, by queuing a read before knowing if anything will arrive or how big it will be).

In fact, if you look at InfiniBand, we have both a transport and a channel for it: IBV is the channel, whereas CUDA_GDR is the channel (which for now only handles CUDA data, because that's where we've found InfiniBand to be the most useful, but @beauby is working on extending CUDA_GDR to also handle CPU).

To add a couple more things to the mix, we also have some "fallback" channels, such as CUDA_BASIC and (just) BASIC, which respectively allow to wrap a CPU-only channel to add CUDA support (by adding D2H and H2D transfers), and allow to implement a CPU-only channel by just internally deferring to a transport. Thus, for the sake of example, you could say that BASIC is a CPU channel that uses InfiniBand, and CUDA_BASIC<BASIC> is another CUDA channel that uses InfiniBand, although they would both be very inefficient.

To clarify what the other channels do, here is what the acronyms stand for:

  • XTH and CUDA_XTH = cross-thread (i.e., for use within a single process)
  • SHM = shared-memory (for use across processes within a machine)
  • CMA = cross-memory attach (same)
  • CUDA_IPC = inter-process communication (again same-machine, leveraging NVLink)
  • IBV = InfiniBand verbs
  • CUDA_GDR = GPUDirect (aka InfiniBand again)
  • BASIC and CUDA_BASIC see above

I'd love to discuss more about adding EFA support to TensorPipe. If you want, to have a "home" to chat, we can set up a channel in the PyTorch Slack? LMK!

@VoVAllen
Copy link
Author

VoVAllen commented Aug 16, 2021

Thanks. I've applied to join PyTorch slack channel through the form at PyTorch's README. My email address is zhoujinjing09 (gmail). Could you invite me if possible?

I have experiences implementing communication layer with EFA, but only on CPU Buffer but not GPUDirect. EFA currently support GPUDirect also I believe but may take some time for me to study so.

Several other questions:

  1. There's three more abstractions, connection, pipe and listener. As what I understand, connection is the same as transport. And pipe is the overall abstraction above the channel. Listener is something like a passive socket, that help to construct the pipe and won't be used later once pipe constructed. Overall Pipe is the highest abstraction for user to use, and it will contains many channels if possible. And each channel will use one or more transports underlying. Do I understand it correctly?
  2. I'm not very familiar with ibv, but the overall structure of EFA should be the same. It needs to send the device address through other connections(probably socket first) to build connections. Then send/write by pushing events a event queue, and get the result by busy polling the completion queue. Could you point me to the code somewhere minimal that I can start read and understand? (for example is it a good idea to start from the listener part of ibv?)
  3. Are non-XTH methods thread-safe? What does cross thread exact mean?
  4. Does MPT channel use multiple sockets between pairs to increase the throughput? How did the pipe decide whether to use this channel(or other channels)?

Sorry for the long questions list and appreciate all the optimizations here so far!

@VoVAllen
Copy link
Author

Some differences between ibv found so far:

  • EFA doesn't require the memory registration process. The bounce buffer/cached memory registration is managed by libfabric, which can simplify the whole process
  • The completion event order is not consistent with the receive event order. Is this the same for the ibv?

@lw
Copy link
Contributor

lw commented Aug 17, 2021

I requested for you to be added to the Slack, hopefully it will be accepted soon.

As for connection/pipe/listener/..., those are the names of the individual classes within each channel/transport/core. ("core" is what we call the orchestration layer that combines all the backends in a single object). These three layers (channels, transports, and core) all look similar in their internal structure, by each having 3 classes:

  • A context, which is basically an object that holds all the structures/threads/... that are shared across multiple objects. In practice we could just use global variables, but we prefer to put this data inside an object so that it is more object-oriented and modular (e.g., it allows you to create two independent contexts within the same process without them interfering).
  • A listener, which you can think of as a bound socket in listening mode (although each backend can implement this as it prefers). This is only needed to bootstrap the connection/pipes/... and becomes useless afterwards. While transports and the core have listeners, channels do not, because channels can just rely on transports for bootstrap.
  • A "point-to-point" link, which is called pipe (in the core), channel (in channels) and connection (in transports). Sorry if we use different names for basically "equivalent" things, it just happened organically. These are the objects that are used the most in TensorPipe, as they are the ones that have the read/write/send/recv methods on them. These objects can either be obtained by actively initiating an outgoing connection to another node, or by passively waiting for an incoming connection to be received by a listener. Once they are created, however, they are indistinguishable and symmetric.

So in a sense the internal structure of TensorPipe consists of three layers (core, channels, and transports) each with three sublayers (context, listener, pipe/channel/connection), except for the channel-listener gap. Also note that the "corresponding" objects at different layers are interconnected: a core context contains a channel context and a transport context; a core listener contains a transport listener; a pipe contains channels and connections.

I tried to draw a diagram to recap all I said above. The diagram is perhaps a bit "overwhelming" but hopefully after the above explanation it should be clear.
PXL_20210817_113529321

@lw
Copy link
Contributor

lw commented Aug 17, 2021

Are non-XTH methods thread-safe? What does cross thread exact mean?

Perhaps cross-thread isn't the best name. It's simply a channel that will only work if both endpoints are within the same process (i.e., only if a process is communicating "to itself"). This could happen in a case of loopback or, as I mentioned above, if a single process creates two channel contexts and tries to communicate between the two. In this latter case, typically, each of these contexts would be owned by a separate thread, which is why we called it cross-thread. But I admit that perhaps "intra-process" would have been a better name.

As for thread-safety, all object in TensorPipe are fully thread-safe and reentrant. (If you ever find anything that breaks due to concurrency please flag it as we'd want to fix it). It has actually been tricky to get all this right and in the end we resorted to some perhaps unusual abstractions and tools to do so. To explain how we got here, and why other solutions didn't work, you can take a look at this doc: https://github.com/pytorch/tensorpipe/blob/master/docs/thread_model.md

Getting this threading-model right when writing a new transport/channel isn't very straightforward, and requires quite a bit of boilerplate. Hence to deduplicate the code and simplify our life we've extracted this boilerplate into a base class that can simply be subclassed and extended by each concrete transport/channel.

@lw
Copy link
Contributor

lw commented Aug 17, 2021

If you want to look at how we handle InfiniBand, I suggest you start from the IBV transport, so that you can focus on CPU-only transfers (since GPUDirect adds some more complexity). The listener of IBV is actually the only part that doesn't use InfiniBand at all, since it uses a plain TCP socket for bootstrapping, and then the connection is the one that initiates the InfiniBand link.

To give you a brief overview of how IBV works:

  • The context contains a libibverbs context (it picks the "first" device if more than one are available)
  • The listener sets up a bound TCP socket
  • Some other node connects to the TCP socket
  • Each endpoint sends their setup information over the TCP socket (their "address" on the InfiniBand network, e.g., subnet number, port number, queue pair number, ...)
  • Once both endpoints have each other's info, they set up the queue pair. (They do keep the TCP socket around as it's used to "detect" when the other side disconnects).
  • Each side also sets up two regions of memory which we call outbox and inbox, and registers them with InfiniBand. Each side also sends the "key" for its inbox to the other endpoint.
  • These two sides are used as ringbuffers. When a side wants to send something to the other one, it copies the data from the source user buffer to its outbox, then it does a RDMA copy over InfiniBand to the remote's inbox, and this operation "notifies" the remote (which is continuously polling for events), which then copies it from its inbox to the target buffer (and notifies the sender that the inbox is now empty again and can be overwritten).

@lw
Copy link
Contributor

lw commented Aug 17, 2021

Does MPT channel use multiple sockets between pairs to increase the throughput? How did the pipe decide whether to use this channel(or other channels)?

Indeed MPT uses multiple pairs of sockets between two nodes, and also multiple threads to send and recv on these sockets. It is very straightforward to do so, thanks to the modularity I mentioned earlier: MPT just creates N independent instances of a transport's context, which thus each contain their own thread and sockets, and when MPT is asked to perform a transfer it simply breaks it up into chunks and sends each chunk to one of its underlying contexts. This logic can probably be refined further but it has worked fine for now.

@lw
Copy link
Contributor

lw commented Aug 17, 2021

EFA doesn't require the memory registration process. The bounce buffer/cached memory registration is managed by libfabric, which can simplify the whole process

I'm wondering how this works: does it mean that libfabric will register the memory on-demand on-the-fly? Doesn't this cause a performance hit, since registering the memory is expensive (it involves a syscall)? Also does this mean that the memory is cached and kept around forever, potentially resulting in too much memory becoming pinned? Or is it regularly cleaned, and if so how?

The completion event order is not consistent with the receive event order. Is this the same for the ibv?

I think InfiniBand preserves the order of the completions, but I don't think we currently really rely on this property?

@VoVAllen
Copy link
Author

VoVAllen commented Aug 17, 2021

@lw Thanks a lot. Actually I'm not so familiar with the underlying implementation of EFA.
Here's something I found at
https://ofiwg.github.io/libfabric/v1.13.0/man/fi_efa.7.html about the configuration, which partially shows how the underlying memory is managed. It seems the registered memories are cached.

FI_EFA_MR_CACHE_ENABLE
    Enables using the mr cache and in-line registration instead of a bounce buffer for iov’s larger than max_memcpy_size. Defaults to true. When disabled, only uses a bounce buffer
FI_EFA_MR_MAX_CACHED_COUNT
    Sets the maximum number of memory registrations that can be cached at any time.
FI_EFA_MR_MAX_CACHED_SIZE
    Sets the maximum amount of memory that cached memory registrations can hold onto at any time.
FI_EFA_MAX_MEMCPY_SIZE
    Threshold size switch between using memory copy into a pre-registered bounce buffer and memory registration on the user buffer.

@VoVAllen
Copy link
Author

One more question: Is the resemble bytes logic handled by the framework already? For example resembling the multiplex sockets chunks back to the complete chunk/message as send directly? Is there anything like a protocol I can find? Some communication systems may depend on the message order. However seems the order is not relevant here?

@lw
Copy link
Contributor

lw commented Aug 17, 2021

Is the resemble bytes logic handled by the framework already?

Do you mean "reassembling"? If I understand the question correctly, then this is in fact a non-problem, because TensorPipe doesn't manage nor allocate any memory on its own. So, for example, when receiving some data, it's up to the user to allocate a buffer (or to reuse an existing one: they can do it however they want) and then pass a pointer to that buffer back to TensorPipe, which will thus just read into that given buffer. Therefore, in the case of MPT, when we do the "chunking" we're in fact just calculating some offsets within the user-given buffer pointer, and passing those pointer+offsets (and the adjusted lengths) to the underlying TCP connections.

@VoVAllen
Copy link
Author

VoVAllen commented Aug 18, 2021

@lw Thanks. I just realized what I actually want to ask is how did tensorpipe reconstruct the Message. Basically a message will break into multiple parts to send. How did tensorpipe reconstruct from those parts? How did the framework know which buffer from network is associated with which part of the message?

Apart from that, I've read the connection part of ibv. Here's something about EFA I want to consult.

The general model of EFA is similar to ibv, which has event queues and completion queues for send/recv.
One thing is that user have to poll the completion queue to trigger the send/recv event happen. Otherwise the event will just stay in the event queue. Is this the same for ibv?

The code below shows the basic process of sending messages, mainly from aws-ofi-nccl, which used libfabric to implement nccl's interface.

Send process:

  1. Push sent event to the completion queue
while (true){
    rc = fi_send(...) # return code
    if (rc == 0)
        break; # send succeed
    else if (rc == -FI_EAGAIN) {
        # This is a retryable error
        # Can attempt to progress the completion queue to make send event happen and make nic available
        # Or you can progress the completion queue in another thread, here just ignore this error and retried later
        # aws-ofi-nccl attempt to progress the cq here
        /*
        * Process completions so that you have enough
        * resources for sending connect message
        */
        ret = nccl_ofi_progress(nccl_ofi_component[dev]);
        if (OFI_UNLIKELY(ret != 0))
            goto error;
    }
    else {
        # Some fatal error
        NCCL_OFI_WARN("Unable to send connect message for dev %d. RC: %zd, ERROR: %s",
                    dev, rc, fi_strerror(-rc));
        ret = ncclSystemError;
        goto error;
    }
} while (true);

This part is bit different from ibv, that pushing send event to the event queue may fail, which might need retry.

  1. Progress the completion queue
do {
    ret = nccl_ofi_progress(nccl_ofi_component[dev]);
    if (OFI_UNLIKELY(ret != 0))
        goto error;
} while (true);

The receive process is the same as the send process.

Some design question I'd like to ask for suggestions:

  1. Since the memory doesn't need to be pinned/registered in EFA, do I still need the RingBuffer related class for EFA?
  2. How should I arrange the event loop? Previously I used a busy polling thread for the progress of completion queue, and retrying push the send event to the queue directly in the main thread. Is this a good practice in tensorpipe?

@lw
Copy link
Contributor

lw commented Aug 18, 2021

How did tensorpipe reconstruct from those parts? How did the framework know which buffer from network is associated with which part of the message?

In fact it all relies on the order of payloads/tensors/messages being preserved. (I realized you had asked about order earlier and I forgot to answer, sorry). TensorPipe guarantees that if two send operations are scheduled one after the other on one side, and two recv operations are scheduled one after the other on the other side, then we will match the first send with the first recv, and so on. For tensors within a message the same thing happens: the pipe will delegate the tensors to its underlying channels in a consistent order on both the sender and receiver, thus the tensors will "match up". BTW the sending end of a pipe also sends a "descriptor" of the message to the receiving end, and this descriptor contains information about the number of tensors, their sizes, the devices they reside on, which channel they will travel along, ... This ensures that sender and receiver never get "out of sync" (e.g., the sender sending N tensors but the receiver expecting N-1). If you want to find out more about this check the pipe_impl.cc file, and its advanceReadOperation and advanceWriteOperation methods which contain the state machine for the "protocol" adopted to transfer messages.

@lw
Copy link
Contributor

lw commented Aug 18, 2021

As for the automatic registration and caching of EFA, it seems like it's trying to automate something that TensorPipe is instead doing manually. That is, it seems EFA also has its own internal staging buffer (which the doc calls "bounce buffer"), which is what we call outbox/inbox in TensorPipe (in the IBV transport). It also has the option for in-place on-the-fly pinning, which TensorPipe does in the CUDA_GDR. I think there could be reasons to manage this logic manually when implementing EFA in TensorPipe, as this allows us more fine-grained control, but perhaps EFA is doing a good enough job in handling this automatically? I'll let you decide.

@lw
Copy link
Contributor

lw commented Aug 18, 2021

Is this the same for ibv?

If I understand the question properly then, no, I think InfiniBand works differently: once some work request is pushed to the send queue it will be actively carried out by the device, even if we're not actively polling from the completion queue. Although in practice I don't believe this matters, since we are always actively polling from the completion queue.

This part is bit different from ibv, that pushing send event to the event queue may fail, which might need retry.

This also happens in InfiniBand, but InfiniBand allows us to know in advance how big the send queue is, so that we can determine on our own whether a new work request would fit or not, and if it doesn't fit we first stash it aside, and only add it to the send queue when some earlier operations complete.

Some design question I'd like to ask for suggestions:

  1. Since the memory doesn't need to be pinned/registered in EFA, do I still need the RingBuffer related class for EFA?
  2. How should I arrange the event loop? Previously I used a busy polling thread for the progress of completion queue, and retrying push the send event to the queue directly in the main thread. Is this a good practice in tensorpipe?

TBH I don't think I can answer that. I gave some context above but I'm not familiar with the workings of EFA and, as I said, I don't think I have time to learn them. Hence I'm kinda counting on you to know or figure out the most performant way to implement the transport interface of TensorPipe using EFA. :)

@VoVAllen
Copy link
Author

Thanks a lot for your detailed explanations. I'll follow the OpenMPI libfaric implementation, since that's what AWS used for benchmark. And let EFA people to optimize their underlying logics if it's not optimal. :)

@eedalong
Copy link

eedalong commented Feb 8, 2022

I requested for you to be added to the Slack, hopefully it will be accepted soon.

As for connection/pipe/listener/..., those are the names of the individual classes within each channel/transport/core. ("core" is what we call the orchestration layer that combines all the backends in a single object). These three layers (channels, transports, and core) all look similar in their internal structure, by each having 3 classes:

  • A context, which is basically an object that holds all the structures/threads/... that are shared across multiple objects. In practice we could just use global variables, but we prefer to put this data inside an object so that it is more object-oriented and modular (e.g., it allows you to create two independent contexts within the same process without them interfering).
  • A listener, which you can think of as a bound socket in listening mode (although each backend can implement this as it prefers). This is only needed to bootstrap the connection/pipes/... and becomes useless afterwards. While transports and the core have listeners, channels do not, because channels can just rely on transports for bootstrap.
  • A "point-to-point" link, which is called pipe (in the core), channel (in channels) and connection (in transports). Sorry if we use different names for basically "equivalent" things, it just happened organically. These are the objects that are used the most in TensorPipe, as they are the ones that have the read/write/send/recv methods on them. These objects can either be obtained by actively initiating an outgoing connection to another node, or by passively waiting for an incoming connection to be received by a listener. Once they are created, however, they are indistinguishable and symmetric.

So in a sense the internal structure of TensorPipe consists of three layers (core, channels, and transports) each with three sublayers (context, listener, pipe/channel/connection), except for the channel-listener gap. Also note that the "corresponding" objects at different layers are interconnected: a core context contains a channel context and a transport context; a core listener contains a transport listener; a pipe contains channels and connections.

I tried to draw a diagram to recap all I said above. The diagram is perhaps a bit "overwhelming" but hopefully after the above explanation it should be clear. PXL_20210817_113529321

After going through source code, these descriptions are really clear to me.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants