Skip to content

Commit

Permalink
improved the data sending API
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Mar 17, 2019
1 parent 27c28ed commit 0e4ac82
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 12 deletions.
8 changes: 6 additions & 2 deletions src/SuperSocket.Channel/ChannelBase.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
using System;
using System.Threading.Tasks;
using SuperSocket.ProtoBase;

namespace SuperSocket.Channel
{
public abstract class ChannelBase<TPackageInfo> : IChannel<TPackageInfo>, IChannel
where TPackageInfo : class
{
public abstract Task ProcessRequest();
public abstract ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer);
public abstract Task StartAsync();

public abstract ValueTask SendAsync(ReadOnlyMemory<byte> buffer);

public abstract ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);

private Action<IChannel, TPackageInfo> _packageReceived;

Expand Down
83 changes: 80 additions & 3 deletions src/SuperSocket.Channel/PipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
using System.Buffers;
using System.Threading.Tasks;
using System.IO.Pipelines;
using SuperSocket.ProtoBase;
using System.Runtime.InteropServices;
using Microsoft.Extensions.Logging;
using SuperSocket.ProtoBase;


namespace SuperSocket.Channel
{
Expand All @@ -12,9 +14,79 @@ public abstract class PipeChannel<TPackageInfo> : ChannelBase<TPackageInfo>, ICh
{
private IPipelineFilter<TPackageInfo> _pipelineFilter;

protected PipeChannel(IPipelineFilter<TPackageInfo> pipelineFilter)
private Pipe Output { get; }

protected ILogger Logger { get; }

protected PipeChannel(IPipelineFilter<TPackageInfo> pipelineFilter, ILogger logger)
{
_pipelineFilter = pipelineFilter;
Logger = logger;
Output = new Pipe();
}

public override async Task StartAsync()
{
try
{
var readsTask = ProcessReads();
var sendsTask = ProcessSends();

await Task.WhenAll(readsTask, sendsTask);
}
catch (Exception e)
{
Logger.LogError(e, "Unhandled exception in the method PipeChannel.StartAsync.");
}
}

protected abstract Task ProcessReads();

protected async Task ProcessSends()
{
var output = Output.Reader;

while (true)
{
var result = await output.ReadAsync();

if (result.IsCanceled)
break;

var completed = result.IsCompleted;

var buffer = result.Buffer;
var end = buffer.End;

if (!buffer.IsEmpty)
{
await SendAsync(buffer);
}

output.AdvanceTo(end);

if (completed)
{
break;
}
}
}

protected abstract ValueTask<int> SendAsync(ReadOnlySequence<byte> buffer);


public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer)
{
var writer = Output.Writer;
await writer.WriteAsync(buffer);
await writer.FlushAsync();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package)
{
var writer = Output.Writer;
packageEncoder.Encode(writer, package);
await writer.FlushAsync();
}

protected internal ArraySegment<T> GetArrayByMemory<T>(ReadOnlyMemory<T> memory)
Expand All @@ -40,9 +112,11 @@ protected async Task ReadPipeAsync(PipeReader reader)

try
{
if (result.IsCompleted)
if (result.IsCanceled)
break;

var completed = result.IsCompleted;

while (true)
{
ReaderBuffer(buffer, out consumed, out examined);
Expand All @@ -52,6 +126,9 @@ protected async Task ReadPipeAsync(PipeReader reader)

buffer = buffer.Slice(examined);
}

if (completed)
break;
}
finally
{
Expand Down
1 change: 1 addition & 0 deletions src/SuperSocket.Channel/SuperSocket.Channel.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageReference Include="System.Threading.Tasks" Version="$(CoreFxVersion)" />
<PackageReference Include="System.Net.Sockets" Version="$(CoreFxVersion)" />
<PackageReference Include="System.IO.Pipelines" Version="$(PipelinesVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="$(MSExtensionsVersion)" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../SuperSocket.ProtoBase/SuperSocket.ProtoBase.csproj" />
Expand Down
37 changes: 32 additions & 5 deletions src/SuperSocket.Channel/TcpPipeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
using System.IO.Pipelines;
using System.Net.Sockets;
using SuperSocket.ProtoBase;
using System.Buffers;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;

namespace SuperSocket.Channel
{
Expand All @@ -11,9 +14,11 @@ public class TcpPipeChannel<TPackageInfo> : PipeChannel<TPackageInfo>
{

private Socket _socket;

private List<ArraySegment<byte>> _segmentsForSend;

public TcpPipeChannel(Socket socket, IPipelineFilter<TPackageInfo> pipelineFilter)
: base(pipelineFilter)
public TcpPipeChannel(Socket socket, IPipelineFilter<TPackageInfo> pipelineFilter, ILogger logger)
: base(pipelineFilter, logger)
{
_socket = socket;
}
Expand Down Expand Up @@ -61,7 +66,7 @@ private async Task<int> ReceiveAsync(Socket socket, Memory<byte> memory, SocketF
return await socket.ReceiveAsync(GetArrayByMemory((ReadOnlyMemory<byte>)memory), socketFlags);
}

public override async Task ProcessRequest()
protected override async Task ProcessReads()
{
var pipe = new Pipe();

Expand All @@ -73,9 +78,31 @@ public override async Task ProcessRequest()
_socket = null;
OnClosed();
}
public override async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer)

protected override async ValueTask<int> SendAsync(ReadOnlySequence<byte> buffer)
{
return await _socket.SendAsync(GetArrayByMemory(buffer), SocketFlags.None);
if (buffer.IsSingleSegment)
{
return await _socket.SendAsync(GetArrayByMemory(buffer.First), SocketFlags.None);
}

if (_segmentsForSend != null)
{
_segmentsForSend = new List<ArraySegment<byte>>();
}
else
{
_segmentsForSend.Clear();
}

var segments = _segmentsForSend;

foreach (var piece in buffer)
{
_segmentsForSend.Add(GetArrayByMemory(piece));
}

return await _socket.SendAsync(_segmentsForSend, SocketFlags.None);
}
}
}
7 changes: 5 additions & 2 deletions src/SuperSocket.Primitives/IChannel.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
using System;
using System.Threading.Tasks;
using SuperSocket.ProtoBase;

namespace SuperSocket
{
public interface IChannel
{
Task ProcessRequest();
Task StartAsync();

ValueTask<int> SendAsync(ReadOnlyMemory<byte> data);
ValueTask SendAsync(ReadOnlyMemory<byte> data);

ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package);

event EventHandler Closed;
}
Expand Down

0 comments on commit 0e4ac82

Please sign in to comment.