Skip to content

Commit

Permalink
Combine flushes in DnsNameResolver to allow usage of sendmmsg to redu…
Browse files Browse the repository at this point in the history
…ce syscall costs (netty#8470)

Motivation:

Some of transports support gathering writes when using datagrams. For example this is the case for EpollDatagramChannel. We should minimize the calls to flush() to allow making efficient usage of sendmmsg in this case.

Modifications:

- minimize flush() operations when we query for multiple address types.
- reduce GC by always directly schedule doResolveAll0(...) on the EventLoop.

Result:

Be able to use sendmmsg internally in the DnsNameResolver.
  • Loading branch information
normanmaurer authored Nov 21, 2018
1 parent 3d2fdc4 commit d728a72
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.netty.resolver.ResolvedAddressTypes;
import io.netty.util.NetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
Expand Down Expand Up @@ -962,10 +963,32 @@ static boolean doResolveAllCached(String hostname,
}
}

private void doResolveAllUncached(String hostname,
private void doResolveAllUncached(final String hostname,
final DnsRecord[] additionals,
final Promise<List<InetAddress>> promise,
final DnsCache resolveCache) {
// Call doResolveUncached0(...) in the EventLoop as we may need to submit multiple queries which would need
// to submit multiple Runnable at the end if we are not already on the EventLoop.
EventExecutor executor = executor();
if (executor.inEventLoop()) {
doResolveAllUncached0(hostname, additionals, promise, resolveCache);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
doResolveAllUncached0(hostname, additionals, promise, resolveCache);
}
});
}
}

private void doResolveAllUncached0(String hostname,
DnsRecord[] additionals,
Promise<List<InetAddress>> promise,
DnsCache resolveCache) {

assert executor().inEventLoop();

final DnsServerAddressStream nameServerAddrs =
dnsServerAddressStreamProvider.nameServerAddressStream(hostname);
new DnsAddressResolveContext(this, hostname, additionals, nameServerAddrs,
Expand Down Expand Up @@ -1014,8 +1037,8 @@ private InetSocketAddress nextNameServerAddress() {
public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
InetSocketAddress nameServerAddr, DnsQuestion question) {

return query0(nameServerAddr, question, EMPTY_ADDITIONALS,
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(),
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
}

/**
Expand All @@ -1024,8 +1047,8 @@ public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
InetSocketAddress nameServerAddr, DnsQuestion question, Iterable<DnsRecord> additionals) {

return query0(nameServerAddr, question, toArray(additionals, false),
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(),
ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
}

/**
Expand All @@ -1035,7 +1058,7 @@ public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
InetSocketAddress nameServerAddr, DnsQuestion question,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {

return query0(nameServerAddr, question, EMPTY_ADDITIONALS, promise);
return query0(nameServerAddr, question, EMPTY_ADDITIONALS, true, ch.newPromise(), promise);
}

/**
Expand All @@ -1046,7 +1069,7 @@ public Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query(
Iterable<DnsRecord> additionals,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {

return query0(nameServerAddr, question, toArray(additionals, false), promise);
return query0(nameServerAddr, question, toArray(additionals, false), true, ch.newPromise(), promise);
}

/**
Expand All @@ -1067,24 +1090,23 @@ public static boolean isTimeoutError(Throwable cause) {
return cause != null && cause.getCause() instanceof DnsNameResolverTimeoutException;
}

final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query0(
InetSocketAddress nameServerAddr, DnsQuestion question,
DnsRecord[] additionals,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
return query0(nameServerAddr, question, additionals, ch.newPromise(), promise);
final void flushQueries() {
ch.flush();
}

final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> query0(
InetSocketAddress nameServerAddr, DnsQuestion question,
DnsRecord[] additionals,
boolean flush,
ChannelPromise writePromise,
Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> promise) {
assert !writePromise.isVoid();

final Promise<AddressedEnvelope<DnsResponse, InetSocketAddress>> castPromise = cast(
checkNotNull(promise, "promise"));
try {
new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise).query(writePromise);
new DnsQueryContext(this, nameServerAddr, question, additionals, castPromise)
.query(flush, writePromise);
return castPromise;
} catch (Exception e) {
return castPromise.setFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ DnsQuestion question() {
return question;
}

void query(ChannelPromise writePromise) {
void query(boolean flush, ChannelPromise writePromise) {
final DnsQuestion question = question();
final InetSocketAddress nameServerAddr = nameServerAddr();
final DatagramDnsQuery query = new DatagramDnsQuery(null, nameServerAddr, id);
Expand All @@ -110,18 +110,21 @@ void query(ChannelPromise writePromise) {
logger.debug("{} WRITE: [{}: {}], {}", parent.ch, id, nameServerAddr, question);
}

sendQuery(query, writePromise);
sendQuery(query, flush, writePromise);
}

private void sendQuery(final DnsQuery query, final ChannelPromise writePromise) {
private void sendQuery(final DnsQuery query, final boolean flush, final ChannelPromise writePromise) {
if (parent.channelFuture.isDone()) {
writeQuery(query, writePromise);
writeQuery(query, flush, writePromise);
} else {
parent.channelFuture.addListener(new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(Future<? super Channel> future) {
if (future.isSuccess()) {
writeQuery(query, writePromise);
// If the query is done in a late fashion (as the channel was not ready yet) we always flush
// to ensure we did not race with a previous flush() that was done when the Channel was not
// ready yet.
writeQuery(query, true, writePromise);
} else {
Throwable cause = future.cause();
promise.tryFailure(cause);
Expand All @@ -132,8 +135,9 @@ public void operationComplete(Future<? super Channel> future) {
}
}

private void writeQuery(final DnsQuery query, final ChannelPromise writePromise) {
final ChannelFuture writeFuture = parent.ch.writeAndFlush(query, writePromise);
private void writeQuery(final DnsQuery query, final boolean flush, final ChannelPromise writePromise) {
final ChannelFuture writeFuture = flush ? parent.ch.writeAndFlush(query, writePromise) :
parent.ch.write(query, writePromise);
if (writeFuture.isDone()) {
onQueryWriteCompletion(writeFuture);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,20 @@ private void internalResolve(String name, Promise<List<T>> promise) {
name = mapping;
}

DnsServerAddressStream nameServerAddressStream = getNameServers(name);
try {
DnsServerAddressStream nameServerAddressStream = getNameServers(name);

final int end = expectedTypes.length - 1;
for (int i = 0; i < end; ++i) {
if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), promise)) {
return;
final int end = expectedTypes.length - 1;
for (int i = 0; i < end; ++i) {
if (!query(name, expectedTypes[i], nameServerAddressStream.duplicate(), false, promise)) {
return;
}
}
query(name, expectedTypes[end], nameServerAddressStream, false, promise);
} finally {
// Now flush everything we submitted before.
parent.flushQueries();
}
query(name, expectedTypes[end], nameServerAddressStream, promise);
}

/**
Expand Down Expand Up @@ -316,17 +321,11 @@ private DnsServerAddressStream getNameServersFromCache(String hostname) {
}
}

private void query(final DnsServerAddressStream nameServerAddrStream, final int nameServerAddrStreamIndex,
final DnsQuestion question,
final Promise<List<T>> promise, Throwable cause) {
query(nameServerAddrStream, nameServerAddrStreamIndex, question,
parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise, cause);
}

private void query(final DnsServerAddressStream nameServerAddrStream,
final int nameServerAddrStreamIndex,
final DnsQuestion question,
final DnsQueryLifecycleObserver queryLifecycleObserver,
final boolean flush,
final Promise<List<T>> promise,
final Throwable cause) {
if (nameServerAddrStreamIndex >= nameServerAddrStream.size() || allowedQueries == 0 || promise.isCancelled()) {
Expand All @@ -344,9 +343,12 @@ private void query(final DnsServerAddressStream nameServerAddrStream,
return;
}
final ChannelPromise writePromise = parent.ch.newPromise();
final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> f = parent.query0(
nameServerAddr, question, additionals, writePromise,
parent.ch.eventLoop().<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>>newPromise());
final Promise<AddressedEnvelope<? extends DnsResponse, InetSocketAddress>> queryPromise =
parent.ch.eventLoop().newPromise();

final Future<AddressedEnvelope<DnsResponse, InetSocketAddress>> f =
parent.query0(nameServerAddr, question, additionals, flush, writePromise, queryPromise);

queriesInProgress.add(f);

queryLifecycleObserver.queryWritten(nameServerAddr, writePromise);
Expand Down Expand Up @@ -376,7 +378,8 @@ public void operationComplete(Future<AddressedEnvelope<DnsResponse, InetSocketAd
} else {
// Server did not respond or I/O error occurred; try again.
queryLifecycleObserver.queryFailed(queryCause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, queryCause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
newDnsQueryLifecycleObserver(question), true, promise, queryCause);
}
} finally {
tryToFinishResolve(nameServerAddrStream, nameServerAddrStreamIndex, question,
Expand Down Expand Up @@ -417,11 +420,11 @@ public void operationComplete(final Future<List<InetAddress>> future) {
DnsServerAddressStream addressStream = new CombinedDnsServerAddressStream(
nameServerAddr, resolvedAddresses, nameServerAddrStream);
query(addressStream, nameServerAddrStreamIndex, question,
queryLifecycleObserver, promise, cause);
queryLifecycleObserver, true, promise, cause);
} else {
// Ignore the server and try the next one...
query(nameServerAddrStream, nameServerAddrStreamIndex + 1,
question, queryLifecycleObserver, promise, cause);
question, queryLifecycleObserver, true, promise, cause);
}
}
});
Expand Down Expand Up @@ -490,7 +493,7 @@ private void onResponse(final DnsServerAddressStream nameServerAddrStream, final
// Retry with the next server if the server did not tell us that the domain does not exist.
if (code != DnsResponseCode.NXDOMAIN) {
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
queryLifecycleObserver.queryNoAnswer(code), promise, null);
queryLifecycleObserver.queryNoAnswer(code), true, promise, null);
} else {
queryLifecycleObserver.queryFailed(NXDOMAIN_QUERY_FAILED_EXCEPTION);
}
Expand Down Expand Up @@ -539,7 +542,7 @@ private boolean handleRedirect(
if (serverStream != null) {
query(serverStream, 0, question,
queryLifecycleObserver.queryRedirected(new DnsAddressStreamList(serverStream)),
promise, null);
true, promise, null);
return true;
}
}
Expand Down Expand Up @@ -687,8 +690,7 @@ private void onExpectedResponse(
} else {
queryLifecycleObserver.querySucceed();
// We also got a CNAME so we need to ensure we also query it.
onResponseCNAME(question, cnames,
parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question), promise);
onResponseCNAME(question, cnames, newDnsQueryLifecycleObserver(question), promise);
}
}

Expand Down Expand Up @@ -776,10 +778,11 @@ private void tryToFinishResolve(final DnsServerAddressStream nameServerAddrStrea
if (queryLifecycleObserver == NoopDnsQueryLifecycleObserver.INSTANCE) {
// If the queryLifecycleObserver has already been terminated we should create a new one for this
// fresh query.
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, promise, cause);
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question,
newDnsQueryLifecycleObserver(question), true, promise, cause);
} else {
query(nameServerAddrStream, nameServerAddrStreamIndex + 1, question, queryLifecycleObserver,
promise, cause);
true, promise, cause);
}
return;
}
Expand All @@ -795,7 +798,7 @@ private void tryToFinishResolve(final DnsServerAddressStream nameServerAddrStrea
// As the last resort, try to query CNAME, just in case the name server has it.
triedCNAME = true;

query(hostname, DnsRecordType.CNAME, getNameServers(hostname), promise);
query(hostname, DnsRecordType.CNAME, getNameServers(hostname), true, promise);
return;
}
} else {
Expand Down Expand Up @@ -891,11 +894,12 @@ private void followCname(DnsQuestion question, String cname, DnsQueryLifecycleOb
PlatformDependent.throwException(cause);
return;
}
query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion), promise, null);
query(stream, 0, cnameQuestion, queryLifecycleObserver.queryCNAMEd(cnameQuestion),
true, promise, null);
}

private boolean query(String hostname, DnsRecordType type, DnsServerAddressStream dnsServerAddressStream,
Promise<List<T>> promise) {
boolean flush, Promise<List<T>> promise) {
final DnsQuestion question;
try {
question = new DefaultDnsQuestion(hostname, type, dnsClass);
Expand All @@ -906,10 +910,14 @@ private boolean query(String hostname, DnsRecordType type, DnsServerAddressStrea
type + ']', cause));
return false;
}
query(dnsServerAddressStream, 0, question, promise, null);
query(dnsServerAddressStream, 0, question, newDnsQueryLifecycleObserver(question), flush, promise, null);
return true;
}

private DnsQueryLifecycleObserver newDnsQueryLifecycleObserver(DnsQuestion question) {
return parent.dnsQueryLifecycleObserverFactory().newDnsQueryLifecycleObserver(question);
}

private final class CombinedDnsServerAddressStream implements DnsServerAddressStream {
private final InetSocketAddress replaced;
private final DnsServerAddressStream originalStream;
Expand Down

0 comments on commit d728a72

Please sign in to comment.