Skip to content

Commit

Permalink
Merge pull request fanliang11#9 from dotnetcore/master
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
brucehu123 authored Oct 24, 2019
2 parents 9e9146e + 7ac5611 commit 7412a58
Show file tree
Hide file tree
Showing 18 changed files with 461 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public class ProtocolPortOptions

public int WSPort { get; set; }

public int GrpcPort { get; set; }

public int UdpPort { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public ConcurrentDictionary<String, Object> GetContextParameters()
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void SetAttachment(string key, object value)
public void SetAttachment(string key,object value)
{
contextParameters.AddOrUpdate(key, value, (k, v) => value);
contextParameters.AddOrUpdate(key, value,(k,v)=>value);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand All @@ -34,21 +34,25 @@ public void SetContextParameters(ConcurrentDictionary<String, Object> contextPar
this.contextParameters = contextParameters;
}

private static ThreadLocal<RpcContext> rpcContextThreadLocal = new ThreadLocal<RpcContext>(() =>
{
RpcContext context = new RpcContext();
context.SetContextParameters(new ConcurrentDictionary<string, object>());
return context;
});
private static AsyncLocal<RpcContext> rpcContextThreadLocal=new AsyncLocal<RpcContext>();

public static RpcContext GetContext()
{
var context = rpcContextThreadLocal.Value;

if (context == null)
{
context = new RpcContext();
context.SetContextParameters(new ConcurrentDictionary<string, object>());
rpcContextThreadLocal.Value = context;
}

return rpcContextThreadLocal.Value;
}

public static void RemoveContext()
{
rpcContextThreadLocal.Dispose();
rpcContextThreadLocal.Value = null;
}

private RpcContext()
Expand Down
82 changes: 82 additions & 0 deletions src/Surging.Core/Surging.Core.Grpc/GrpcModule.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using Autofac;
using Microsoft.Extensions.Logging;
using Surging.Core.CPlatform;
using Surging.Core.CPlatform.Module;
using Surging.Core.CPlatform.Runtime.Server;
using Surging.Core.CPlatform.Runtime.Server.Implementation;
using Surging.Core.Grpc.Runtime;
using Surging.Core.Grpc.Runtime.Implementation;

namespace Surging.Core.Grpc
{
public class GrpcModule : EnginePartModule
{
public override void Initialize(AppModuleContext context)
{
base.Initialize(context);
}

protected override void RegisterBuilder(ContainerBuilderWrapper builder)
{
builder.Register(provider =>
{
return new DefaultGrpcServiceEntryProvider(
provider.Resolve<IServiceEntryProvider>(),
provider.Resolve<ILogger<DefaultGrpcServiceEntryProvider>>(),
provider.Resolve<CPlatformContainer>()
);
}).As(typeof(IGrpcServiceEntryProvider)).SingleInstance();
if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.WS)
{
RegisterDefaultProtocol(builder);
}
else if (AppConfig.ServerOptions.Protocol == CommunicationProtocol.None)
{
RegisterGrpcProtocol(builder);
}
}

private static void RegisterDefaultProtocol(ContainerBuilderWrapper builder)
{

builder.Register(provider =>
{
return new GrpcServerMessageListener(
provider.Resolve<ILogger<GrpcServerMessageListener>>(),
provider.Resolve<IGrpcServiceEntryProvider>()
);
}).SingleInstance();
builder.Register(provider =>
{
var messageListener = provider.Resolve<GrpcServerMessageListener>();
return new DefaultServiceHost(async endPoint =>
{
await messageListener.StartAsync(endPoint);
return messageListener;
}, null);

}).As<IServiceHost>();
}

private static void RegisterGrpcProtocol(ContainerBuilderWrapper builder)
{
builder.Register(provider =>
{
return new GrpcServerMessageListener(provider.Resolve<ILogger<GrpcServerMessageListener>>(),
provider.Resolve<IGrpcServiceEntryProvider>()
);
}).SingleInstance();
builder.Register(provider =>
{
var messageListener = provider.Resolve<GrpcServerMessageListener>();
return new GrpcServiceHost(async endPoint =>
{
await messageListener.StartAsync(endPoint);
return messageListener;
});

}).As<IServiceHost>();
}
}
}

84 changes: 84 additions & 0 deletions src/Surging.Core/Surging.Core.Grpc/GrpcServerMessageListener.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Surging.Core.CPlatform.Messages;
using Surging.Core.CPlatform.Transport;
using Surging.Core.Grpc.Runtime;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace Surging.Core.Grpc
{
public class GrpcServerMessageListener: IMessageListener, IDisposable
{
private Server _server;
private readonly ILogger<GrpcServerMessageListener> _logger;
private readonly IGrpcServiceEntryProvider _grpcServiceEntryProvider;

public GrpcServerMessageListener(ILogger<GrpcServerMessageListener> logger,
IGrpcServiceEntryProvider grpcServiceEntryProvider)
{
_logger = logger;
_grpcServiceEntryProvider = grpcServiceEntryProvider;
}
public Task StartAsync(EndPoint endPoint)
{
var ipEndPoint = endPoint as IPEndPoint;
_server = new Server() { Ports = { new ServerPort(ipEndPoint.Address.ToString(), ipEndPoint.Port, ServerCredentials.Insecure) } };

try
{
var entries = _grpcServiceEntryProvider.GetEntries();

var serverServiceDefinitions = new List<ServerServiceDefinition>();
foreach (var entry in entries)
{

var baseType = entry.Type.BaseType.BaseType;
var definitionType = baseType?.DeclaringType;

var methodInfo = definitionType?.GetMethod("BindService", new Type[] { baseType });
if (methodInfo != null)
{
var serviceDescriptor = methodInfo.Invoke(null, new object[] { entry.Behavior }) as ServerServiceDefinition;
if (serviceDescriptor != null)
{
_server.Services.Add(serviceDescriptor);
continue;
}
}
}
_server.Start();
if (_logger.IsEnabled(LogLevel.Debug))
_logger.LogDebug($"Grpc服务主机启动成功,监听地址:{endPoint}。");
}
catch
{
_logger.LogError($"Grpc服务主机启动失败,监听地址:{endPoint}。 ");
}
return Task.CompletedTask;
}

public Server Server
{
get
{
return _server;
}
}

public event ReceivedDelegate Received;

public Task OnReceived(IMessageSender sender, TransportMessage message)
{
return Task.CompletedTask;
}

public void Dispose()
{
_server.ShutdownAsync();
}
}
}
56 changes: 56 additions & 0 deletions src/Surging.Core/Surging.Core.Grpc/GrpcServiceHost.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Surging.Core.CPlatform;
using Surging.Core.CPlatform.Runtime.Server.Implementation;
using Surging.Core.CPlatform.Transport;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace Surging.Core.Grpc
{
public class GrpcServiceHost : ServiceHostAbstract
{
#region Field

private readonly Func<EndPoint, Task<IMessageListener>> _messageListenerFactory;
private IMessageListener _serverMessageListener;

#endregion Field

public GrpcServiceHost(Func<EndPoint, Task<IMessageListener>> messageListenerFactory) : base(null)
{
_messageListenerFactory = messageListenerFactory;
}

#region Overrides of ServiceHostAbstract

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public override void Dispose()
{
(_serverMessageListener as IDisposable)?.Dispose();
}

/// <summary>
/// 启动主机。
/// </summary>
/// <param name="endPoint">主机终结点。</param>
/// <returns>一个任务。</returns>
public override async Task StartAsync(EndPoint endPoint)
{
if (_serverMessageListener != null)
return;
_serverMessageListener = await _messageListenerFactory(endPoint);

}

public override async Task StartAsync(string ip, int port)
{
if (_serverMessageListener != null)
return;
_serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.GrpcPort));
}

#endregion Overrides of ServiceHostAbstract
}
}
15 changes: 15 additions & 0 deletions src/Surging.Core/Surging.Core.Grpc/Runtime/GrpcServiceEntry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Surging.Core.CPlatform.Ioc;
using System;
using System.Collections.Generic;
using System.Text;

namespace Surging.Core.Grpc.Runtime
{
public class GrpcServiceEntry
{

public Type Type { get; set; }

public IServiceBehavior Behavior { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Surging.Core.Grpc.Runtime
{
public interface IGrpcServiceEntryProvider
{
List<GrpcServiceEntry> GetEntries();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Microsoft.Extensions.Logging;
using Surging.Core.CPlatform;
using Surging.Core.CPlatform.Ioc;
using Surging.Core.CPlatform.Runtime.Server;
using Surging.Core.CPlatform.Runtime.Server.Implementation.ServiceDiscovery.Attributes;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;

namespace Surging.Core.Grpc.Runtime.Implementation
{
public class DefaultGrpcServiceEntryProvider: IGrpcServiceEntryProvider
{
#region Field

private readonly IEnumerable<Type> _types;
private readonly ILogger<DefaultGrpcServiceEntryProvider> _logger;
private readonly CPlatformContainer _serviceProvider;
private List<GrpcServiceEntry> _grpcServiceEntries;

#endregion Field

#region Constructor

public DefaultGrpcServiceEntryProvider(IServiceEntryProvider serviceEntryProvider,
ILogger<DefaultGrpcServiceEntryProvider> logger,
CPlatformContainer serviceProvider)
{
_types = serviceEntryProvider.GetTypes();
_logger = logger;
_serviceProvider = serviceProvider;
}

#endregion Constructor

#region Implementation of IUdpServiceEntryProvider

/// <summary>
/// 获取服务条目集合。
/// </summary>
/// <returns>服务条目集合。</returns>
public List<GrpcServiceEntry> GetEntries()
{
var services = _types.ToArray();
if (_grpcServiceEntries == null)
{
_grpcServiceEntries = new List<GrpcServiceEntry>();
foreach (var service in services)
{
var entry = CreateServiceEntry(service);
if (entry != null)
{
_grpcServiceEntries.Add(entry);
}
}
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"发现了以下grpc服务:{string.Join(",", _grpcServiceEntries.Select(i => i.Type.FullName))}。"); ;
}
}
return _grpcServiceEntries;
}

public GrpcServiceEntry CreateServiceEntry(Type service)
{
GrpcServiceEntry result = null;
var objInstance = _serviceProvider.GetInstances(service);
var behavior = objInstance as IServiceBehavior;
if (behavior != null)
result = new GrpcServiceEntry
{
Behavior = behavior,
Type = behavior.GetType()
};
return result;
}
#endregion
}
}
Loading

0 comments on commit 7412a58

Please sign in to comment.