-
Notifications
You must be signed in to change notification settings - Fork 75
Question regards to the source code and new transport layer #405
Comments
Hi, thanks for your interest! We've been wanting to add EFA backends for a while but have been always a bit deterred by the requirement of learning a new library (libfabric). If you're already familiar with it and would like to contribute it that would be awesome! As for your questions, both transports and channels are pluggable and modular backends, but they have some slightly different goals. To give an overall picture, channels stack on top of transports. Transports are thus the lowest-level components, they are primarily used to transfer control messages (and small payloads), thus they are CPU-only (no CUDA), they should be optimized for low-latency (as a single "logical" transfer could require several roundtrips over a transport), and lastly they need to have some kind of signalling ability (an endpoint must be able to detect and "wake up" when some data comes in and invoke a callback), which typically means that transports need to have an internal thread. Channels, on the other hand, are optimized for larger tensors, which can be both CPU and CUDA, they should target high bandwidths, and they are able to leverage transports for any signalling they need (by sending control messages) thus channels might go without an internal thread. There are also some other slight differences (e.g., transports can handle "unsolicited" reads, by queuing a read before knowing if anything will arrive or how big it will be). In fact, if you look at InfiniBand, we have both a transport and a channel for it: IBV is the channel, whereas CUDA_GDR is the channel (which for now only handles CUDA data, because that's where we've found InfiniBand to be the most useful, but @beauby is working on extending CUDA_GDR to also handle CPU). To add a couple more things to the mix, we also have some "fallback" channels, such as CUDA_BASIC and (just) BASIC, which respectively allow to wrap a CPU-only channel to add CUDA support (by adding D2H and H2D transfers), and allow to implement a CPU-only channel by just internally deferring to a transport. Thus, for the sake of example, you could say that BASIC is a CPU channel that uses InfiniBand, and CUDA_BASIC<BASIC> is another CUDA channel that uses InfiniBand, although they would both be very inefficient. To clarify what the other channels do, here is what the acronyms stand for:
I'd love to discuss more about adding EFA support to TensorPipe. If you want, to have a "home" to chat, we can set up a channel in the PyTorch Slack? LMK! |
Thanks. I've applied to join PyTorch slack channel through the form at PyTorch's README. My email address is zhoujinjing09 (gmail). Could you invite me if possible? I have experiences implementing communication layer with EFA, but only on CPU Buffer but not GPUDirect. EFA currently support GPUDirect also I believe but may take some time for me to study so. Several other questions:
Sorry for the long questions list and appreciate all the optimizations here so far! |
Some differences between ibv found so far:
|
Perhaps cross-thread isn't the best name. It's simply a channel that will only work if both endpoints are within the same process (i.e., only if a process is communicating "to itself"). This could happen in a case of loopback or, as I mentioned above, if a single process creates two channel contexts and tries to communicate between the two. In this latter case, typically, each of these contexts would be owned by a separate thread, which is why we called it cross-thread. But I admit that perhaps "intra-process" would have been a better name. As for thread-safety, all object in TensorPipe are fully thread-safe and reentrant. (If you ever find anything that breaks due to concurrency please flag it as we'd want to fix it). It has actually been tricky to get all this right and in the end we resorted to some perhaps unusual abstractions and tools to do so. To explain how we got here, and why other solutions didn't work, you can take a look at this doc: https://github.com/pytorch/tensorpipe/blob/master/docs/thread_model.md Getting this threading-model right when writing a new transport/channel isn't very straightforward, and requires quite a bit of boilerplate. Hence to deduplicate the code and simplify our life we've extracted this boilerplate into a base class that can simply be subclassed and extended by each concrete transport/channel. |
If you want to look at how we handle InfiniBand, I suggest you start from the IBV transport, so that you can focus on CPU-only transfers (since GPUDirect adds some more complexity). The listener of IBV is actually the only part that doesn't use InfiniBand at all, since it uses a plain TCP socket for bootstrapping, and then the connection is the one that initiates the InfiniBand link. To give you a brief overview of how IBV works:
|
Indeed MPT uses multiple pairs of sockets between two nodes, and also multiple threads to send and recv on these sockets. It is very straightforward to do so, thanks to the modularity I mentioned earlier: MPT just creates N independent instances of a transport's context, which thus each contain their own thread and sockets, and when MPT is asked to perform a transfer it simply breaks it up into chunks and sends each chunk to one of its underlying contexts. This logic can probably be refined further but it has worked fine for now. |
I'm wondering how this works: does it mean that libfabric will register the memory on-demand on-the-fly? Doesn't this cause a performance hit, since registering the memory is expensive (it involves a syscall)? Also does this mean that the memory is cached and kept around forever, potentially resulting in too much memory becoming pinned? Or is it regularly cleaned, and if so how?
I think InfiniBand preserves the order of the completions, but I don't think we currently really rely on this property? |
@lw Thanks a lot. Actually I'm not so familiar with the underlying implementation of EFA.
|
One more question: Is the resemble bytes logic handled by the framework already? For example resembling the multiplex sockets chunks back to the complete chunk/message as send directly? Is there anything like a protocol I can find? Some communication systems may depend on the message order. However seems the order is not relevant here? |
Do you mean "reassembling"? If I understand the question correctly, then this is in fact a non-problem, because TensorPipe doesn't manage nor allocate any memory on its own. So, for example, when receiving some data, it's up to the user to allocate a buffer (or to reuse an existing one: they can do it however they want) and then pass a pointer to that buffer back to TensorPipe, which will thus just read into that given buffer. Therefore, in the case of MPT, when we do the "chunking" we're in fact just calculating some offsets within the user-given buffer pointer, and passing those pointer+offsets (and the adjusted lengths) to the underlying TCP connections. |
@lw Thanks. I just realized what I actually want to ask is how did tensorpipe reconstruct the Message. Basically a message will break into multiple parts to send. How did tensorpipe reconstruct from those parts? How did the framework know which buffer from network is associated with which part of the message? Apart from that, I've read the connection part of ibv. Here's something about EFA I want to consult. The general model of EFA is similar to ibv, which has event queues and completion queues for send/recv. The code below shows the basic process of sending messages, mainly from aws-ofi-nccl, which used libfabric to implement nccl's interface. Send process:
while (true){
rc = fi_send(...) # return code
if (rc == 0)
break; # send succeed
else if (rc == -FI_EAGAIN) {
# This is a retryable error
# Can attempt to progress the completion queue to make send event happen and make nic available
# Or you can progress the completion queue in another thread, here just ignore this error and retried later
# aws-ofi-nccl attempt to progress the cq here
/*
* Process completions so that you have enough
* resources for sending connect message
*/
ret = nccl_ofi_progress(nccl_ofi_component[dev]);
if (OFI_UNLIKELY(ret != 0))
goto error;
}
else {
# Some fatal error
NCCL_OFI_WARN("Unable to send connect message for dev %d. RC: %zd, ERROR: %s",
dev, rc, fi_strerror(-rc));
ret = ncclSystemError;
goto error;
}
} while (true); This part is bit different from ibv, that pushing send event to the event queue may fail, which might need retry.
do {
ret = nccl_ofi_progress(nccl_ofi_component[dev]);
if (OFI_UNLIKELY(ret != 0))
goto error;
} while (true); The receive process is the same as the send process. Some design question I'd like to ask for suggestions:
|
In fact it all relies on the order of payloads/tensors/messages being preserved. (I realized you had asked about order earlier and I forgot to answer, sorry). TensorPipe guarantees that if two send operations are scheduled one after the other on one side, and two recv operations are scheduled one after the other on the other side, then we will match the first send with the first recv, and so on. For tensors within a message the same thing happens: the pipe will delegate the tensors to its underlying channels in a consistent order on both the sender and receiver, thus the tensors will "match up". BTW the sending end of a pipe also sends a "descriptor" of the message to the receiving end, and this descriptor contains information about the number of tensors, their sizes, the devices they reside on, which channel they will travel along, ... This ensures that sender and receiver never get "out of sync" (e.g., the sender sending N tensors but the receiver expecting N-1). If you want to find out more about this check the |
As for the automatic registration and caching of EFA, it seems like it's trying to automate something that TensorPipe is instead doing manually. That is, it seems EFA also has its own internal staging buffer (which the doc calls "bounce buffer"), which is what we call outbox/inbox in TensorPipe (in the IBV transport). It also has the option for in-place on-the-fly pinning, which TensorPipe does in the CUDA_GDR. I think there could be reasons to manage this logic manually when implementing EFA in TensorPipe, as this allows us more fine-grained control, but perhaps EFA is doing a good enough job in handling this automatically? I'll let you decide. |
If I understand the question properly then, no, I think InfiniBand works differently: once some work request is pushed to the send queue it will be actively carried out by the device, even if we're not actively polling from the completion queue. Although in practice I don't believe this matters, since we are always actively polling from the completion queue.
This also happens in InfiniBand, but InfiniBand allows us to know in advance how big the send queue is, so that we can determine on our own whether a new work request would fit or not, and if it doesn't fit we first stash it aside, and only add it to the send queue when some earlier operations complete.
TBH I don't think I can answer that. I gave some context above but I'm not familiar with the workings of EFA and, as I said, I don't think I have time to learn them. Hence I'm kinda counting on you to know or figure out the most performant way to implement the transport interface of TensorPipe using EFA. :) |
Thanks a lot for your detailed explanations. I'll follow the OpenMPI libfaric implementation, since that's what AWS used for benchmark. And let EFA people to optimize their underlying logics if it's not optimal. :) |
Hi,
I'm interested in adding a new transport layer for AWS Elastic Fabric Adapter (kind of RDMA, but based on libfabric interface). However I got confused between Channel and Transport. Based on documentation, the data buffer is supposed to transfer through channels, therefore I thought the ibv(RDMA) implementation should be in channel folder. But actually it's in the transport folder. I'm wondering what's the design between the Channel and the Transport.
Also I found there're multiple channels with abbreviations, that are hard to understand. Based on commit history I found CMA means cross memory access, and MPT means Multiplex transport. But I didn't find explanation on XTH. What does XTH mean?
Thanks for the wonderful project.
The text was updated successfully, but these errors were encountered: