Skip to content

Commit

Permalink
Merge tag 'neo-v0.6.1' into neo-v0.x.x
Browse files Browse the repository at this point in the history
neo-v0.6.1
  • Loading branch information
Gavin Norman committed Jul 18, 2018
2 parents 6331275 + 554e957 commit 35627c1
Show file tree
Hide file tree
Showing 17 changed files with 82 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/dhtproto/client/legacy/DhtConst.d
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ static:
***************************************************************************/

public const RecordSizeLimit = RecordBatch.DefaultMaxBatchSize;
public const RecordSizeLimit = RecordBatcher.DefaultMaxBatchSize;


/***************************************************************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import swarm.client.request.GetChannelSizeRequest;
import swarm.client.request.GetSizeRequest;
import swarm.client.request.RemoveChannelRequest;

import swarm.util.RecordBatcher;

import dhtproto.client.legacy.internal.request.model.IDhtRequestResources;
import dhtproto.client.legacy.internal.request.GetVersionRequest;
import dhtproto.client.legacy.internal.request.GetResponsibleRangeRequest;
Expand Down Expand Up @@ -229,7 +231,7 @@ public class DhtRequestConnection :

override protected mstring new_batch_buffer ( )
{
return new char[RecordBatch.DefaultMaxBatchSize];
return new char[RecordBatcher.DefaultMaxBatchSize];
}


Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/client/request/internal/Exists.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module dhtproto.client.request.internal.Exists;
*******************************************************************************/

import ocean.transition;
import ocean.core.VersionCheck;
import ocean.util.log.Logger;

/*******************************************************************************
Expand Down Expand Up @@ -179,7 +180,9 @@ public struct Exists
payload.add(context.user_params.args.key);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/client/request/internal/Get.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module dhtproto.client.request.internal.Get;
*******************************************************************************/

import ocean.transition;
import ocean.core.VersionCheck;
import ocean.util.log.Logger;

/*******************************************************************************
Expand Down Expand Up @@ -196,7 +197,9 @@ public struct Get
payload.add(context.user_params.args.key);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/client/request/internal/Put.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module dhtproto.client.request.internal.Put;
*******************************************************************************/

import ocean.transition;
import ocean.core.VersionCheck;
import ocean.util.log.Logger;

/*******************************************************************************
Expand Down Expand Up @@ -170,7 +171,9 @@ public struct Put
payload.addArray(context.user_params.args.value);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/client/request/internal/Remove.d
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
module dhtproto.client.request.internal.Remove;

import ocean.transition;
import ocean.core.VersionCheck;
import ocean.util.log.Logger;

/*******************************************************************************
Expand Down Expand Up @@ -174,7 +175,9 @@ public struct Remove
payload.add(context.user_params.args.key);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down
21 changes: 16 additions & 5 deletions src/dhtproto/client/request/internal/Update.d
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
module dhtproto.client.request.internal.Update;

import ocean.transition;
import ocean.core.VersionCheck;
import ocean.util.log.Logger;
import ocean.core.Verify;

Expand Down Expand Up @@ -408,7 +409,9 @@ private struct FirstROCHandler
payload.add(this.context.user_params.args.key);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node.
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down Expand Up @@ -514,7 +517,9 @@ private struct FirstROCHandler
payload.addArray(*this.context.shared_working.updated_value);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Handle response message.
auto result = conn.receiveValue!(MessageType)();
Expand Down Expand Up @@ -597,7 +602,9 @@ private struct FirstROCHandler
payload.addCopy(MessageType.RemoveRecord);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Handle response message.
auto result = conn.receiveValue!(MessageType)();
Expand Down Expand Up @@ -647,7 +654,9 @@ private struct FirstROCHandler
payload.addCopy(MessageType.LeaveRecord);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

auto result = conn.receiveValue!(MessageType)();
with ( MessageType ) switch ( result )
Expand Down Expand Up @@ -736,7 +745,9 @@ private struct SecondROCHandler
payload.addArray(*this.context.shared_working.updated_value);
}
);
conn.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
conn.flush();

// Receive supported code from node.
auto supported = conn.receiveValue!(SupportedStatus)();
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/node/neo/request/Exists.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.Exists;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;

/*******************************************************************************
Expand Down Expand Up @@ -92,7 +93,9 @@ public abstract scope class ExistsProtocol_v0 : IRequestHandler
payload.add(this.response);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down
4 changes: 3 additions & 1 deletion src/dhtproto/node/neo/request/Get.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.Get;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;

/*******************************************************************************
Expand Down Expand Up @@ -128,7 +129,8 @@ public abstract class GetProtocol_v0 : IRequestHandler
else
sendResponse(MessageType.WrongNode);

ed.flush();
static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down
9 changes: 7 additions & 2 deletions src/dhtproto/node/neo/request/GetAll.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.GetAll;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;
import ocean.util.log.Logger;

Expand Down Expand Up @@ -180,7 +181,9 @@ public abstract class GetAllProtocol_v0 : IRequestHandler
? MessageType.Started : MessageType.Error);
}
);
this.connection.event_dispatcher.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
this.connection.event_dispatcher.flush();

if ( !this.initialised_ok )
return;
Expand Down Expand Up @@ -431,9 +434,11 @@ public abstract class GetAllProtocol_v0 : IRequestHandler
payload.addArray(*this.outer.compressed_batch);
}
);

// flush() does not suspend the fiber, so is safe to call in a
// RequestEventDispatcher-managed request.
this.outer.connection.event_dispatcher.flush();
static if (!hasFeaturesFrom!("swarm", 4, 7))
this.outer.connection.event_dispatcher.flush();
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/node/neo/request/GetHashRange.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.GetHashRange;

import ocean.core.VersionCheck;
import swarm.neo.AddrPort;
import swarm.neo.node.IRequestHandler;

Expand Down Expand Up @@ -128,7 +129,9 @@ public abstract scope class GetHashRangeProtocol_v0 : IRequestHandler
payload.add(max);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();

while ( true )
{
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/node/neo/request/Put.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.Put;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;

/*******************************************************************************
Expand Down Expand Up @@ -94,7 +95,9 @@ public abstract class PutProtocol_v0 : IRequestHandler
payload.add(response);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/node/neo/request/Remove.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.Remove;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;

/*******************************************************************************
Expand Down Expand Up @@ -93,7 +94,9 @@ public abstract class RemoveProtocol_v0 : IRequestHandler
payload.add(this.response);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down
5 changes: 4 additions & 1 deletion src/dhtproto/node/neo/request/RemoveChannel.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.RemoveChannel;

import ocean.core.VersionCheck;
import ocean.util.log.Logger;
import swarm.neo.node.IRequestHandler;

Expand Down Expand Up @@ -82,7 +83,9 @@ public abstract scope class RemoveChannelProtocol_v0 : IRequestHandler
payload.add(this.response);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down
9 changes: 7 additions & 2 deletions src/dhtproto/node/neo/request/Update.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

module dhtproto.node.neo.request.Update;

import ocean.core.VersionCheck;
import swarm.neo.node.IRequestHandler;

/*******************************************************************************
Expand Down Expand Up @@ -110,7 +111,9 @@ public abstract class UpdateProtocol_v0 : IRequestHandler
payload.add(response);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}

/***************************************************************************
Expand Down Expand Up @@ -144,7 +147,9 @@ public abstract class UpdateProtocol_v0 : IRequestHandler
payload.addArray(value);
}
);
ed.flush();

static if (!hasFeaturesFrom!("swarm", 4, 7))
ed.flush();
}
);

Expand Down
5 changes: 4 additions & 1 deletion src/dhttest/DhtTestCase.d
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module dhttest.DhtTestCase;
*******************************************************************************/

import ocean.transition;
import ocean.core.VersionCheck;

import turtle.TestCase;

Expand Down Expand Up @@ -194,7 +195,9 @@ abstract class NeoDhtTestCase : TestCase
const max_connections = 2;
this.dht = new DhtClient(theScheduler.epoll, auth_name,
auth_key.content, &this.neoConnectionNotifier, max_connections);
this.dht.neo.enableSocketNoDelay();

static if (!hasFeaturesFrom!("swarm", 4, 7))
this.dht.neo.enableSocketNoDelay();

this.connect(10000);
}
Expand Down
6 changes: 5 additions & 1 deletion src/fakedht/DhtNode.d
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class DhtNode
{
import core.stdc.stdlib : abort;

import ocean.core.VersionCheck;
import ocean.io.select.client.model.ISelectClient : IAdvancedSelectClient;
import ocean.net.server.connection.IConnectionHandlerInfo;
import ocean.io.select.protocol.generic.ErrnoIOException;
Expand Down Expand Up @@ -95,7 +96,10 @@ public class DhtNode
Options neo_options;
neo_options.requests = requests;
neo_options.epoll = epoll;
neo_options.no_delay = true; // favour network turn-around over packet efficiency

static if (!hasFeaturesFrom!("swarm", 4, 7))
neo_options.no_delay = true; // favour network turn-around over packet efficiency

neo_options.credentials_map["admin"] = Key.init;

ushort neo_port = node_item.Port;
Expand Down

0 comments on commit 35627c1

Please sign in to comment.