این پروژه، یک پروژه زیر ساختی برای تمام سرویس های توسعه داده شده بر اساس پروژه Template
است و شامل ابزارهای کاربردی برای توسعه بهتر و پیشرفته تر سرویس های موجود در معماری میکروسرویس می باشد .
🔥 توجه : تمامی این ابزارها به طور اختصاصی برای این پروژه طراحی و توسعه داده شده اند و دارای نمونه پیاده سازی مشابه نمی باشند .
پروژه | لینک |
---|---|
Domic-TemplateService |
https://github.com/HasanProgrammer/Domic-TemplateService |
برخی از قابلیت ها و امکانات پیاده سازی شده مطابق با استانداردهای روز مهندسی نرم افزار که در این سورس موجود است به شرح زیر می باشد :
قابلیت | توضیحات |
---|---|
CQRS | برخورداری از ابزار مناسب برای مدیریت اصل CQS و به طور دقیق تر الگوی CQRS با استفاده از ابزار Mediator که در اصل این ابزار پیاده کننده الگوی Mediator می باشد |
AsyncCommand | برخورداری از قابلیت Command های Async مبتنی بر زیرساخت Broker و با استفاده از ابزار RabbitMQ برای مدیریت درخواست های Fire&Forget |
DistributedCache | برخورداری از ابزار مناسب برای مدیریت Distributed Cache مبتنی بر ابزار زیرساختی Redis برای اضافه کردن لایه Cache در پروژه |
EDA | برخورداری از ابزار مناسب برای مدیریت بهتر و بهینه تر Event و یا Message در ساختار پروژه ها مبتنی بر ابزارهای RabbitMQ و Apache Kafka |
Logger | برخورداری از ابزار لاگ مرکزی یا همان سرویس ( StateTracker ) برای مدیریت لاگ خطاهای ایجاد شده در سرویس ها و لاگ رخدادها ( Event Snapshot ) و یا لاگ های ایجاد شده در سطح کدهای نوشته شده در سرویس ها با ابزار مربوطه ( Logger و StreamLogger ) |
RPC | برخورداری از زیرساخت مناسب برای مدیریت درخواست های مبتنی بر پروتکل RPC و براساس ابزار gRPC |
ServiceDiscovery | برخورداری از ابزارهای مناسب برای دریافت و واکشی اطلاعات سرویس ها مبتنی بر سرویس DiscoveryService و بررسی سلامت سرویس ها ( HealthCheck ) براساس چرخه زمان بندی مشخص |
AuthFilters | برخورداری از فیلتر های شخصی سازی شده برای مدیریت سطوح دسترسی ( Permission ) و مدیریت توکن های Revoke شده در ساختار JWT |
ExceptionHandler | برخورداری از لایه مدیریت کننده خطاهای مدیریت شده و مدیریت نشده در لایه های مختلف پروژه و بازگشت پاسخ مناسب به Client |
در این پروژه برای آنکه بتوانید منطق های بخش Command
و بخش Query
خود را مجزا کنید می توانید این ابزار که با نام Mediator
در این پروژه پیاده سازی شده است را مورد استفاده قرار دهید .
در ابتدا اجازه دهید نحوه پیاده سازی کلاس های مربوط به بخش Command
و بخش Query
را مورد ارزیابی قرار دهیم و در انتها به نحوه دسترسی به این منطق ها با استفاده از واسط Mediator
می پردازیم .
1 . نحوه تعریف کلاس های مربوط به بخش Command
مطابق زیر می باشد
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
public CreateCommandHandler(){}
public string Handle(CreateCommand command)
{
//logic
return default;
}
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
2 . برای تعریف کلاس های مربوط به لاجیک بخش Query
هم می توانید مطابق دستورات زیر عمل نمایید
public class ReadAllQuery : IQuery<Dto> //any result type
{
}
public class ReadAllQueryHandler : IQueryHandler<ReadAllQuery, Dto>
{
public ReadAllQueryHandler(){}
public Dto Handle(ReadAllQuery query)
{
//query
return default;
}
public Task<Dto> HandleAsync(ReadAllQuery query, CancellationToken cancellationToken)
{
//query
return Task.FromResult<Dto>(default);
}
}
3 . فعال سازی ابزار Mediator
در سرویس مربوطه
در نهایت برای فعال سازی این ابزار در سرویس خود ، می بایست در لایه Presentation
و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterCommandQueryUseCases();
🔥 توجه : دقت داشته باشید که Command
و Query
های مربوطه می بایست در لایه UseCase
سرویس های مربوطه تعریف شده و مورد استفاده قرار بگیرند
1 . استفاده از WithTransactionAttribute
از این Attribute
برای مواقعی که نیاز دارید تا عملیات Command
خود را در داخل یک Transaction
مدیریت کنید، استفاده می شود که دارای یک Property
تحت عنوان IsolationLevel
می باشد که سطح قفل گزاری منطق شما را در داخل دیتابیس مدیریت می کند ( Pessimistic Lock
) .
در ابتدا برای استفاده از این ابزار می بایست در سطح لایه Domain
سرویس مربوطه ، یک واسط پیاده سازی کرده که از واسط ICoreCommandUnitOfWork
ارث بری کرده است، مطابق کد زیر :
public interface ICommandUnitOfWork : ICoreCommandUnitOfWork;
سپس باید در لایه Infrastructure
سرویس مربوطه این واسط پیاده سازی شود ، مطابق کد زیر :
public class CommandUnitOfWork : ICommandUnitOfWork
{
private readonly SQLContext _context;
private IDbContextTransaction _transaction;
public CommandUnitOfWork(SQLContext context) => _context = context; //Resource
public void Transaction(IsolationLevel isolationLevel)
=> _transaction = _context.Database.BeginTransaction(isolationLevel); //Resource
public async Task TransactionAsync(IsolationLevel isolationLevel = IsolationLevel.ReadCommitted,
CancellationToken cancellationToken = new CancellationToken())
{
_transaction = await _context.Database.BeginTransactionAsync(isolationLevel, cancellationToken); //Resource
}
public void Commit()
{
_context.SaveChanges();
_transaction.Commit();
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
await _context.SaveChangesAsync(cancellationToken);
await _transaction.CommitAsync(cancellationToken);
}
public void Rollback() => _transaction?.Rollback();
public Task RollbackAsync(CancellationToken cancellationToken)
{
if (_transaction is not null)
return _transaction.RollbackAsync(cancellationToken);
return Task.CompletedTask;
}
public void Dispose() => _transaction?.Dispose();
public ValueTask DisposeAsync()
{
if (_transaction is not null)
return _transaction.DisposeAsync();
return ValueTask.CompletedTask;
}
}
🔥 توجه : در نظر داشته باشید که این موارد به طور پیشفرض در سرویس Template
پیاده سازی شده اند
🔥 توجه : دقت نمایید که برای پیاده سازی منطق تراکنش مربوطه ، از ابزار EF Core
استفاده شده است که شما می توانید از هر ابزار و یا دیتابیس دیگری استفاده نمایید
در ادامه برای استفاده از Attribute
مربوطه می توانید مطابق کد زیر عمل نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
public CreateCommandHandler(){}
[WithTransaction]
public string Handle(CreateCommand command)
{
//logic
return default;
}
[WithTransaction]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
🔥 توجه : در صورتی که مقداری برای ویژگی IsolationLevel
در این Attribute
در نظر گرفته نشود، مقدار پیشفرض ReadCommitted
لحاظ می گردد
در ادامه برای استفاده از Attribute
مربوطه با مقدار IsolationLevel
مشخص می توانید مطابق کد زیر عمل نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
public CreateCommandHandler(){}
[WithTransaction(IsolationLevel = IsolationLevel.RepeatableRead)]
public string Handle(CreateCommand command)
{
//logic
return default;
}
[WithTransaction(IsolationLevel = IsolationLevel.RepeatableRead)]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
🔥 توجه : برای کارکرد صحیح WithTransaction
در سرویس خود ، می بایست این Attribute
را فعال سازی نمایید
برای فعال سازی WithTransaction
در سرویس خود می بایست در لایه Presentation
پروژه مربوطه و در فایل Program.cs
دستورات زیر را اعمال نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
//if using the [ EF Core ]
builder.RegisterEntityFrameworkCoreCommand< TCommandContext , TIdentity >(); //TCommandContext -> SqlContext or ... | TIdentity -> string or ...
builder.RegisterCommandRepositories();
برای تعیین رشته اتصال پایگاه داده SQL Server
می بایست به لایه Presentation
پروژه مربوطه رفته و سپس در پوشه Properties
به فایل launchSettings.json
مراجعه کرد و تنظیمات زیر را اعمال نمایید .
{
"environmentVariables": {
"C-SqlServerConnectionString": "", //for [ Command ] section -> CommandTransaction - CommandRepositories & ...
"Q-SqlServerConnectionString": "" //for [ Query ] section -> QueryTransaction - QueryRepositories & ...
}
}
2 . استفاده از WithValidationAttribute
از این Attribute
برای مواقعی که نیاز به اعتبارسنجی Command
یا Query
خود دارید استفاده می شود . برای شروع می بایست کلاس مربوط به Validator
را ایجاد نمایید و سپس اقدام به گذاشتن WithValidation
نمایید .
public class CreateCommandValidator : IValidator<CreateCommand>
{
public CreateCommandValidator(){}
public object Validate(CreateCommand input)
{
//validations
return default;
}
public Task<object> ValidateAsync(CreateCommand input, CancellationToken cancellationToken)
{
//validations
return Task.FromResult(default(object));
}
}
کدهای بالا برای موارد سمت Query
نیز صدق می کند و برای این بخش هم می توان از دستورات بالا استفاده کرد ، حال می توان از این Attribute
استفاده کرد .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
public CreateCommandHandler(){}
[WithValidation]
public string Handle(CreateCommand command)
{
//logic
return default;
}
[WithValidation]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
🔥 توجه : در کد بالا و در بخش مربوط به کلاس Validator
مربوطه ، شما می توانید نتیجه متد Validate
و یا ValidateAsync
را که یک object
می باشد در داخل CommandHandler
مربوطه مورد استفاده قرار دهید
برای این مهم ، کافی است که در قسمت CommandHandler
خود یک متغیر از نوع object
و با نام validationResult_
و به شکل readonly
ایجاد نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
private readonly object _validationResult;
public CreateCommandHandler(){}
[WithValidation]
public string Handle(CreateCommand command)
{
//logic
return default;
}
[WithValidation]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
3 . استفاده از WithCleanCacheAttribute
در قسمت Command
در مواقعی نیاز دارید که پس از اجرا شدن لاجیک بخش مربوطه ، Cache
مربوط به موجودیت مورد نظر را حذف نمایید تا مجدد در درخواست دیگری که برای قسمت Query
مربوطه ارسال می شود ، Cache
مربوطه ایجاد شود .
برای سهولت در این کار می توانید از این Attribute
مطابق کد های زیر استفاده نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
public CreateCommandHandler(){}
[WithCleanCache(Keies = "Key1|Key2|...")]
public string Handle(CreateCommand command)
{
//logic
return default;
}
[WithCleanCache(Keies = "Key1|Key2|...")]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
4 . استفاده از WithPessimisticConcurrencyAttribute
در مواقعی که نیاز دارید تا منطق بخش مربوط به Command
خود را که یک Critical Section
می باشد در داخل بلوک lock
قرار دهید که تنها یک یا تعداد مشخصی Thread
بتوانند به آن بخش Critical
دسترسی داشته باشند ، می توانید از این Attribute
استفاده نمایید .
🔥 توجه : برای متد Handle
باید در داخل CommandHandler
خود یک متغیر از نوع object
ایجاد نمایید و برای HandleAsync
باید یک متغیر از نوع SemaphoreSlim
ایجاد نمایید
برای استفاده از این Attribute
و در کنار متد Handle
بخش مربوط به Command
باید مطابق دستورات زیر عمل نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
private static object _lock = new();
public CreateCommandHandler(){}
[WithPessimisticConcurrency]
public string Handle(CreateCommand command)
{
//logic
return default;
}
}
🔥 توجه : در کد بالا ، حتما می بایست نام متغیر مربوط به کلید قفل گذاری ، lock_
باشد
برای استفاده از این Attribute
و در کنار متد HandleAsync
بخش مربوط به Command
باید مطابق دستورات زیر عمل نمایید .
public class CreateCommand : ICommand<string> //any result type
{
//some properties
}
public class CreateCommandHandler : ICommandHandler<CreateCommand, string>
{
private static SemaphoreSlim _asyncLock = new(1, 1); //custom count of thread
public CreateCommandHandler(){}
[WithPessimisticConcurrency]
public Task<string> HandleAsync(CreateCommand command, CancellationToken cancellationToken)
{
//logic
return Task.FromResult<string>(default);
}
}
🔥 توجه : در کد بالا ، حتما می بایست نام متغیر مربوط به کلید قفل گذاری ، asyncLock_
باشد
برای مدیریت پیشرفته تر و خوانا تر Cache
های نوشته شده در سطح پروژه که بر اساس دیتابیس Redis
پیاده سازی شده است می توانید ، مطابق دستور العمل های زیر اقدام نمایید .
🔥 توجه : از واسط IInternalDistributedCache
برای Redis
متعلق به سرویس جاری استفاده می شود و سرویس های دیگر به این Cache
دسترسی ندارند
🔥 توجه : از واسط IExternalDistributedCache
برای Redis
متعلق به همه سرویس ها استفاده می شود ، درواقع پیاده کننده این واسط از Redis
مشترک برای همه سرویس ها استفاده می کند
1 . تعریف کلاس مربوط به منطق دیتای مورد نیاز برای Cache
در ابتدا ، شما می بایست کلاس مربوط به منطق Cache
خود را مطابق دستورات زیر ایجاد نمایید .
//for current service distributed cahce
public class MemoryCache : IInternalDistributedCacheHandler<List<Dto>>
{
public MemoryCache(){}
[Config(Key = 'Key', Ttl = 60 /*time to live based on minute*/)]
public List<Dto> Set()
{
//query
return new();
}
[Config(Key = 'Key', Ttl = 60 /*time to live based on minute*/)]
public Task<List<Dto>> SetAsync(CancellationToken cancellationToken)
{
//query
return Task.FromResult(new());
}
}
//for all services distributed cahce ( global | share cahce )
public class MemoryCache : IExternalDistributedCacheHandler<List<Dto>>
{
public MemoryCache(){}
[Config(Key = 'Key', Ttl = 60 /*time to live based on minute*/)]
public List<Dto> Set()
{
//query
return new();
}
[Config(Key = 'Key', Ttl = 60 /*time to live based on minute*/)]
public Task<List<Dto>> SetAsync(CancellationToken cancellationToken)
{
//query
return Task.FromResult(new());
}
}
🔥 توجه : اگر در ConfigAttribute
کدهای فوق ، مقداری برای Ttl
تنظیم نکنید و یا این Property
را 0 مقداردهی نمایید ، Cache
مربوطه به شکل دائمی و بدون انقضا در Redis
باقی خواهد ماند
2 . فراخوانی Cache
مربوطه در قسمت مورد نیاز
حال برای استفاده از مقدار Cache
شده ( مطابق دستورات فوق ) می بایست ، از واسط متناسب با InternalCache
و یا ExternalCache
استفاده نمود . برای این مهم دو واسط IInternalDistributedCacheMediator
و IExternalDistributedCacheMediator
پیاده سازی شده اند که می توان از آنها مطابق دستورات زیر استفاده کرد .
public class Query : IQuery<List<Dto>>
{
}
public class QueryHandler : IQueryHandler<Query, List<Dto>>
{
private readonly IInternalDistributedCacheMediator _cacheMediator;
public QueryHandler(IInternalDistributedCacheMediator cacheMediator) => _cacheMediator = cacheMediator;
public List<Dto> Handle(Query query)
{
var result = _cacheMediator.Get<List<Dto>>(cancellationToken);
return result;
}
public async Task<List<Dto>> HandleAsync(Query query, CancellationToken cancellationToken)
{
var result = await _cacheMediator.GetAsync<List<Dto>>(cancellationToken);
return result;
}
}
🔥 توجه : برای فراخوانی Cache
مورد نیاز ، همانطور که در کدهای فوق مشخص می باشد ، نیاز به ارسال کلید مربوطه به متد <>Get
و <>GetAsync
نمی باشد ، بلکه این متد از نوع ارسالی در قسمت Generic
و تطابق آن با نوع در نظر گرفته شده در قسمت Setter
، داده ها را واکشی می کند
3 . فعال سازی ابزار DistributedCache
در سرویس مربوطه
در نهایت برای فعال سازی این قابلیت در سرویس خود ، می بایست در لایه Presentation
و در فایل Program.cs
از دستور زیر استفاده نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterDistributedCaching();
برای تنظیمات مربوط به رشته اتصال و اطلاعات مربوط به Cache
می بایست در سرویس مربوطه و در بخش Properties
و در فایل مربوط به launchSettings.json
و در قسمت environmentVariables
کلید های زیر را اضافه نمایید .
{
"environmentVariables": {
"E-RedisConnectionString": "", //external connection
"I-RedisConnectionString": "" //internal connection
}
}
🔥 توجه : دقت داشته باشید که منطق Cache
های نوشته شده می بایست در لایه UseCase
سرویس های مربوطه تعریف شده و مورد استفاده قرار بگیرند
در ابتدا ، می بایست از ابزار پر کاربرد MessageBroker
شروع نماییم که استفاده زیادی در سطح پروژه های مبتنی بر معماری میکروسرویس دارد .
برای استفاده از این ابزار و نیز برای استفاده از زیرساخت های پیاده سازی شده ( سرویس های تکمیلی آماده ) ، می بایست ابتدا به سرویس Domic-TriggerService
مراجعه کرده و سپس به پروژه Domic.Init.MessageBroker
رفته و این پروژه را اجرا گرفته تا تمام ساختارهای Queue
و Exchange
مربوط به سرویس های زیرساختی پروژه Domic
ایجاد شوند .
حال بیایید به بررسی دقیق ابزار MessageBroker
در پروژه Domic
بپردازیم .
1 . نحوه ایجاد Event
در سطح سرویس ها و مدیریت آنها برای ارسال به Broker
برای این مهم ابتدا باید به این نکته اشاره کرد که تمامی Event
ها در لایه Domain
سرویس ها ایجاد می شوند و از بیرون از این لایه تنها به استفاده و مدیریت این Event
های ایجاد شده پرداخته می شود .
//ExchangeType : Exchange.FanOut | Exchange.Direct | Exchange.Headers | Exchange.Topic
//FanOut-Exchange
//create event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange")]
public class Created : CreateDomainEvent<string> //any type of identity key
{
//payload
}
//update event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange")]
public class Updated : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//delete event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange")]
public class Deleted : DeleteDomainEvent<string> //any type of identity key
{
//payload
}
//ExchangeType : Exchange.FanOut | Exchange.Direct | Exchange.Headers | Exchange.Topic
//Direct-Exchange
//create event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route")]
public class Created : CreateDomainEvent<string> //any type of identity key
{
//payload
}
//update event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route")]
public class Updated : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//delete event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route")]
public class Deleted : DeleteDomainEvent<string> //any type of identity key
{
//payload
}
🔥 توجه : اگر چنانچه در کدهای بالا، سرویس مذکور علاوه بر تولید این رخداد ها ( Producer
) ، مصرف کننده این رخداد نیز باشد ( Consumer
) می بایست مطابق دستورات زیر عمل کرد
//FanOut-Exchange
//create event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange", Queue = "queue")]
public class Created : CreateDomainEvent<string> //any type of identity key
{
//payload
}
//update event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange", Queue = "queue")]
public class Updated : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//delete event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange", Queue = "queue")]
public class Deleted : DeleteDomainEvent<string> //any type of identity key
{
//payload
}
//Direct-Exchange
//create event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route", Queue = "queue")]
public class Created : CreateDomainEvent<string> //any type of identity key
{
//payload
}
//update event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route", Queue = "queue")]
public class Updated : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//delete event
[EventConfig(ExchangeType = Exchange.Direct, Exchange = "exchange", Route = "route", Queue = "queue")]
public class Deleted : DeleteDomainEvent<string> //any type of identity key
{
//payload
}
2 . استفاده از Event
های تعریف شده در لایه Domain
بعد از آنکه Event
های مورد نیاز در لایه Domain
ایجاد شدند ، می بایست از این رخداد ها در سطح کلاس های Entity
استفاده شود . موجودیت های تعریف شده در لایه Domain
بر پایه الگوی Rich Domain Model
توسعه پیدا کرده اند و می بایست به ازای هر Behavior
ای که صدا زده می شود ، در صورت نیاز یک Event
مناسب ایجاد گردد که برای این مهم می بایست مطابق دستورات زیر عمل نمود .
//update event
[EventConfig(ExchangeType = Exchange.FanOut, Exchange = "exchange")]
public class UpdatedEvent : UpdateDomainEvent<string> //any type of identity key
{
public string Email { get; init; }
public string Username { get; init; }
}
public class DomainEntity : Entity<string> //any type of identity key
{
public string Id { get; private set; }
public string Email { get; private set; }
public string Username { get; private set; }
//Behaviors
public void Change(string username, string email)
{
Email = email;
Username = username;
AddEvent(
new UpdatedEvent {
Id = Id ,
Username = username ,
Email = email ,
}
);
}
}
🔥 توجه : تمامی Entity
های بخش Command
می بایست از کلاس <>Entity
ارث بری کنند
🔥 توجه : برای قدم اول پردازش Event
های تولیدی در سطح کلاس های Entity
می بایست در داخل Behavior
مربوطه در کلاس Entity
از متد پایه ای AddEvent
استفاده نمود
3 . ارسال Event
های تولید شده در سطح لایه Domain
به MessageBroker
بعد از ایجاد و استفاده از Event
در سطح لایه Domain
، حال می بایست نحوه ارسال این رخداد ها به MessageBroker
و EventStreamBroker
مورد بررسی قرار گیرد . پردازش Event
ها در پروژه Domic
به شکل OutBox
بوده ، به این صورت که تمامی رخدادها به شکل Transactional
در پایگاه داده ذخیره می شوند . البته این نکته را باید در نظر گرفت که برای این موضوع حتما می بایست WithTransactionAttribute
در قسمت Command
منطق مربوطه ، مورد استفاده قرار بگیرد .
حال برای فعال کردن پردازش OutBox
تمامی رخدادهای تولید شده در سرویس مربوطه ، می بایست مطابق دستورات زیر عمل نمود .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterEventsPublisher(); //for [ MessageBroker ( RabbitMQ ) ]
builder.RegisterDistributedCaching(); //for [ DistributedLock ] handling
🔥 توجه : در نظر داشته باشید که پردازش OutBox
رخدادهای تولید شده در سرویس مورد نظر ، به جهت مدیریت Concurrency
در Instance
های مختلفی که از سرویس مورد نظر ایجاد می شود ، به ابزار InternalDistributedCache
نیاز دارد
🔥 توجه : بازه ی زمانی اجرای مجدد Job
مورد نیاز برای پردازش OutBox
رخدادهای ایجاد شده ، 5
ثانیه می باشد
4 . پردازش و مصرف کردن Event
های تولید شده
این بخش مهمترین قسمت پیاده سازی شده در پروژه Domic
می باشد ، زیرا پردازش رخدادهای تولیدی به واسطه سرویس های مختلف ، بسیار موضوع مهم و اصطلاحا Critical
می باشد که عدم رعایت نکات ریز فنی و دقت به جزئیات ، باعث بروز Inconsistancy
های مختلف مابین سرویس ها می گردد .
خوشبختانه در پروژه Domic
به تمامی این موارد و نکات توجه شده است و کاربر نهایی ، صرفا می بایست مطابق دستورات مطرح شده عمل کرده و به راحتی هر چه تمام تر به پردازش این Event
ها در بستر MessageBroker
بپردازد .
در ابتدا ، برای پردازش Event
های تولیدی توسط سرویس های Producer
، می بایست کلاس های مربوطه ( Consumer
) در لایه UseCase
ایجاد شوند . برای این مهم مطابق دستورات زیر عمل نمایید .
//define in [ Domain ] layer of consumer service
[EventConfig(Queue = "queue")]
public class UpdatedEvent : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//define in [ UseCase ] layer of consumer service
public class UpdatedConsumerEventBusHandler : IConsumerEventBusHandler<UpdatedEvent>
{
public UpdatedConsumerEventBusHandler(){}
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : پروژه Domic
بر پایه الگوی طراحی CQRS
که یک الگوی System Design
ایی می باشد ، توسعه پیدا کرده است . لذا در بخش Consume
کردن Event
های مربوطه ، حتما باید نوع تراکنش مورد نظر از نظر Command
و یا Query
بودن مشخص شود
🔥 توجه : برای مدیریت تراکنش بخش مربوط به Query
در مدیریت Event
و نیز Message
، پیش تر در قسمت Command
های مربوط به الگوی Mediator
گفته شد که این بخش نیز مشابه آن می باشد منتها با یک تفاوت و آن این است که باید به جای پیاده سازی ICoreCommandUnitOfWork
، واسط ICoreQueryUnitOfWork
پیاده سازی شود
🔥 توجه : در نظر داشته باشید که در بخش مربوط به مدیریت Event
ها و یا Message
ها ، تمام فرآیند به صورت پیشفرض و ثابت ، در یک Transaction Boundary
صورت می گیرد و صرفا شما به عنوان مدیریت کننده رخداد مربوطه ، باید نوع تراکنش را ( Command
و یا Query
) مشخص نمایید، به این معنی که این رخداد و یا Message
بر کدام بخش پروژه ( بهتر است بگوییم دیتابیس ) قرار است اثر بگذارد ، دیتابیس Command
و یا Query
برای مدیریت تراکنش بخش مربوط به Query
می بایست مطابق دستورات زیر عمل نمایید .
public class QueryUnitOfWork : IQueryUnitOfWork
{
private readonly SQLContext _context;
private IDbContextTransaction _transaction;
public QueryUnitOfWork(SQLContext context) => _context = context; //Resource
public void Transaction(IsolationLevel isolationLevel)
=> _transaction = _context.Database.BeginTransaction(isolationLevel); //Resource
public async Task TransactionAsync(IsolationLevel isolationLevel = IsolationLevel.ReadCommitted,
CancellationToken cancellationToken = new CancellationToken())
{
_transaction = await _context.Database.BeginTransactionAsync(isolationLevel, cancellationToken); //Resource
}
public void Commit()
{
_context.SaveChanges();
_transaction.Commit();
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
await _context.SaveChangesAsync(cancellationToken);
await _transaction.CommitAsync(cancellationToken);
}
public void Rollback() => _transaction?.Rollback();
public Task RollbackAsync(CancellationToken cancellationToken)
{
if (_transaction is not null)
return _transaction.RollbackAsync(cancellationToken);
return Task.CompletedTask;
}
public void Dispose() => _transaction?.Dispose();
public ValueTask DisposeAsync()
{
if (_transaction is not null)
return _transaction.DisposeAsync();
return ValueTask.CompletedTask;
}
}
🔥 توجه : برای فعال سازی منطق مربوط به بخش Query
، حتما می بایست این بخش فعال گردد
برای فعال کردن منطق های بخش مربوط به Query
باید به لایه Presentation
پروژه مربوطه رفته و در فایل Program.cs
مراجعه کرده و مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
//if using the [ EF Core ]
builder.RegisterEntityFrameworkCoreQuery< TQueryContext >(); //TQueryContext -> SqlContext or ...
builder.RegisterQueryRepositories();
برای تعیین رشته اتصال پایگاه داده SQL Server
می بایست به لایه Presentation
پروژه مربوطه رفته و سپس در پوشه Properties
به فایل launchSettings.json
مراجعه کرده و تنظیمات زیر را اعمال نمایید .
{
"environmentVariables": {
"C-SqlServerConnectionString": "", //for [ Command ] section -> CommandTransaction - CommandRepositories & ...
"Q-SqlServerConnectionString": "" //for [ Query ] section -> QueryTransaction - QueryRepositories & ...
}
}
🔥 توجه : دقت نمایید که برای پیاده سازی منطق تراکنش مربوطه ، از ابزار EF Core
استفاده شده است که شما می توانید از هر ابزار و یا دیتابیس دیگری استفاده نمایید
🔥 توجه : در نظر داشته باشید که توابع Handle
و HandleAsync
در مدیریت Event
و یا Message
، هر یک دارای معنای مشخصی است و برای استفاده از هر کدام از این توابع باید در بخش تنظیمات سرویس مربوطه ، Config
خاصی را اعمال نمایید
برای استفاده از توابع Handle
و HandleAsync
بسته به نیاز در مدیریت Event
و یا Message
های بخش MessageBroker
، می بایست ابتدا به لایه Presentation
پروژه مربوطه رفته و سپس در پوشه مربوط به Configs
، فایل مربوط به Config.json
را باز کرده و مطابق دستورات زیر عمل نمایید .
{
"IsExternalBrokerConsumingAsync": false, //false => using Handle() | true => using HandleAsync()
"IsInternalBrokerConsumingAsync": false //false => using Handle() | true => using HandleAsync()
}
🔥 توجه : استفاده از تابع Handle
برای پردازش تک به تک پیام های داخل MessageBroker
مورد استفاده قرار می گیرد
🔥 توجه : استفاده از تابع HandleAsync
به شما این امکان را می دهد که پیام های داخل MessageBroker
را به شکل Concurrent
پردازش نمایید . در واقع در این حالت به میزانی که در MessageBroker
پیام داشته باشید در پروژه شما Task
ایجاد می شود و این Task
ها به شکل Concurrent
به پردازش پیام های شما می پردازند
🔥 توجه : توجه کنید که اگر از تابع HandleAsync
برای مدیریت پیام های MessageBroker
استفاده می نمایید ، برای مدیریت بار وارد شده بر Consumer
و به طور دقیق تر ، برای جلوگیری از Crash
نکردن سرویس مربوطه در High Loading
، حتما از قابلیت Throttle
پروژه Domic
که جلوتر اشاره خواهد شد ، استفاده نمایید
در نهایت برای استفاده از EventConsumer
ها در سرویس مربوطه باید در لایه Presentation
و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterEventsSubscriber();
تا به اینجای کار ، ما به بررسی نحوه مدیریت Event
در بستر MessageBroker
پرداختیم ، حال برای پردازش Message
نیز تمام مراحل فوق صادق می باشند و تنها در رابط مدیریت کننده Message
تفاوت وجود دارد .
برای مدیریت Message
های دریافتی از MessageBroker
پروژه Domic
، شما می بایست از واسط IConsumerMessageBusHandler
استفاده و مطابق دستورات زیر عمل نمایید .
[Consumer(Queue = "Queue")]
public class ConsumerMessageBusHandler : IConsumerMessageBusHandler<MessageDto>
{
public ConsumerMessageBusHandler(){}
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : دقت نمایید که در بخش مربوط به Message
از هیچ الگوی OutBox
ای برای ارسال پیام ها استفاده نمی شود ( در قسمت Producer
) و این ساختار در مواقعی که نیاز است به طور مستقیم یک پیامی به MessageBroker
ارسال شود ، مورد استفاده قرار می گیرد
در نهایت برای استفاده از MessageConsumer
ها در سرویس مربوطه باید در لایه Presentation
و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterMessagesSubscriber();
برای فعال سازی MessageBroker
در سرویس مربوطه ، شما می بایست در لایه Presentation
سرویس مربوطه و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterMessageBroker();
در نهایت برای تنظیمات مربوط به رشته اتصال MessageBroker
باید در لایه Presentation
سرویس مربوطه و در پوشه Properties
و در فایل launchSettings.json
مانند دستورات زیر عمل نمایید .
{
"environmentVariables": {
//for external message broker
"E-RabbitMQ-Host": "",
"E-RabbitMQ-Port": "",
"E-RabbitMQ-Username": "",
"E-RabbitMQ-Password": "",
//for internal message broker ( using in [ AsyncCommands ] )
"I-RabbitMQ-Host": "",
"I-RabbitMQ-Port": "",
"I-RabbitMQ-Username": "",
"I-RabbitMQ-Password": ""
}
}
1 . استفاده از WithMaxRetryAttribute
این Attribute
به شما این امکان را می دهد که میزان تلاش Consumer
مربوطه برای پردازش Message
و یا Event
مربوطه را مدیریت نمایید .
برای استفاده از این Attribute
می توانید مطابق دستورات زیر عمل نمایید .
//for [ Message ] consuming
[Consumer(Queue = "Queue")]
public class ConsumerMessageBusHandler : IConsumerMessageBusHandler<MessageDto>
{
public ConsumerMessageBusHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
//for [ Event ] consuming
public class UpdatedConsumerEventBusHandler : IConsumerEventBusHandler<UpdatedEvent>
{
public UpdatedConsumerEventBusHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : در استفاده از WithMaxRetryAttribute
، یک ویژگی تحت عنوان HasAfterMaxRetryHandle
به چشم می خورد که بیان کننده آن است که آیا نیاز به جداگانه مدیریت کردن پیام مربوطه در صورتی که بیش از حد مجاز Retry
شده است وجود دارد یا خیر . اگر این ویژگی برابر با false
باشد که مقدار پیشفرض این متغییر می باشد ، پیام مربوطه بعد از حداکثر تلاش برای پردازش ، از Queue
مربوطه پاک خواهد شد
اگر پیام مورد نظر در Queue
مربوطه ، به حد مجاز پردازش مجدد برسد ( در صورت بروز خطاهای احتمالی ) ، برای مدیریت جداگانه پردازش پیام مربوطه ، می بایست مطابق دستورات زیر عمل نمایید .
//for [ Message ] consuming
[Consumer(Queue = "Queue")]
public class ConsumerMessageBusHandler : IConsumerMessageBusHandler<MessageDto>
{
public ConsumerMessageBusHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
//for handle max retry
public void AfterMaxRetryHandle(MessageDto message)
{
//logic
}
public Task AfterMaxRetryHandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
//for [ Event ] consuming
public class UpdatedConsumerEventBusHandler : IConsumerEventBusHandler<UpdatedEvent>
{
public UpdatedConsumerEventBusHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
//for handle max retry
public void AfterMaxRetryHandle(UpdatedEvent @event)
{
//logic
}
public Task AfterMaxRetryHandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
2 . استفاده از WithCleanCacheAttribute
مطابق موردی که در Mediator
توضیح داده شد ، در اینجا هم ما مطابق با ابزار Mediator
می توانیم از این Attribute
و طبق دستور العمل پیشتر گفته شده ، استفاده نماییم .
3 . استفاده از Config
های مربوط به Throttle
در بخش تنظیمات
برای مدیریت نرخ Event
و یا Message
های وارده به سرویس Consumer
مربوطه و مدیریت بهتر بار وارده به پروژه ، شما می بایست از قابلیت Throttle
استفاده نمایید .
برای این منظور می بایست به لایه Presentation
سرویس مربوطه رفته و در پوشه Configs
و در فایل Config.json
مطابق دستورات زیر عمل نمایید .
{
//for external [ MessageBroker ]
"ExternalQueueConfig": {
"Throttle": [
{
"Active": false, //active or inactive
"Queue": "Queue", //name of target [ Queue ]
"Size": 0, //size of payload
"Limitation": 1000, //count of async [ Event ] or [ Message ] processing per second . other world -> count of concurrent [ Task ] per second
"IsGlobally": false //should be always false
}
]
},
//for internal [ MessageBroker ]
"InternalQueueConfig": {
"Throttle": []
}
}
🔥 توجه : در تنظیمات بالا به این نکته توجه داشته باشید که در صورتی که ویژگی IsExternalBrokerConsumingAsync
و یا IsInternalBrokerConsumingAsync
فعال باشد ، قابلیت Throttle
با تنظیمات بالا اعمال می گردد
در گام اول برای استفاده از زیرساخت EventStreamBroker
می بایست به سرویس Domic-TriggerService
رفته و پروژه Domic.Init.EventStreamBroker
را اجرا گرفته تا تمام Topic
های پروژه Domic
ایجاد گردند .
حال بیایید به بررسی دقیق ابزار EventStreamBroker
در پروژه Domic
بپردازیم .
در نظر داشته باشید ، چنانچه در پروژه مد نظر خود نیاز به سرعت بالا و Scalability
سریع و مداوم نیاز دارید ، حتما باید از قابلیت EventStreamBroker
به جای استفاده از MessageBroker
استفاده نمایید .
1 . نحوه ایجاد Event
در سطح سرویس ها و مدیریت آنها برای ارسال به Broker
برای این مهم ابتدا باید به این نکته اشاره کرد که تمامی Event
ها در لایه Domain
سرویس ها ایجاد می شوند و از بیرون از این لایه تنها به استفاده و مدیریت این Event
های ایجاد شده پرداخته می شود .
//create event
[EventConfig(Topic = "Topic")]
public class Created : CreateDomainEvent<string> //any type of identity key
{
//payload
}
//update event
[EventConfig(Topic = "Topic")]
public class Updated : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//delete event
[EventConfig(Topic = "Topic")]
public class Deleted : DeleteDomainEvent<string> //any type of identity key
{
//payload
}
2 . استفاده از Event
های تعریف شده در لایه Domain
بعد از آنکه Event
های مورد نیاز در لایه Domain
ایجاد شدند ، می بایست از این رخداد ها در سطح کلاس های Entity
استفاده شود . موجودیت های تعریف شده در لایه Domain
بر پایه الگوی Rich Domain Model
توسعه پیدا کرده اند و می بایست به ازای هر Behavior
ای که صدا زده می شود ، در صورت نیاز یک Event
مناسب ایجاد گردد که برای این مهم می بایست مطابق دستورات زیر عمل نمود .
//update event
[EventConfig(Topic = "Topic")]
public class UpdatedEvent : UpdateDomainEvent<string> //any type of identity key
{
public string Email { get; init; }
public string Username { get; init; }
}
public class DomainEntity : Entity<string> //any type of identity key
{
public string Id { get; private set; }
public string Email { get; private set; }
public string Username { get; private set; }
//Behaviors
public void Change(string username, string email)
{
Email = email;
Username = username;
AddEvent(
new UpdatedEvent {
Id = Id ,
Username = username ,
Email = email ,
}
);
}
}
🔥 توجه : تمامی Entity
های بخش Command
می بایست از کلاس <>Entity
ارث بری کنند
🔥 توجه : برای قدم اول پردازش Event
های تولیدی در سطح کلاس های Entity
می بایست در داخل Behavior
مربوطه در کلاس Entity
از متد پایه ای AddEvent
استفاده نمود
3 . ارسال Event
های تولید شده در سطح لایه Domain
به EventStreamBroker
بعد از ایجاد و استفاده از Event
در سطح لایه Domain
، حال می بایست نحوه ارسال این رخداد ها به EventStreamBroker
مورد بررسی قرار گیرد . پردازش Event
ها در پروژه Domic
به شکل OutBox
بوده ، به این صورت که تمامی رخدادها به شکل Transactional
در پایگاه داده ذخیره می شوند . البته این نکته را باید در نظر گرفت که برای این موضوع حتما می بایست WithTransactionAttribute
در قسمت Command
منطق مربوطه ، مورد استفاده قرار بگیرد .
حال برای فعال کردن پردازش OutBox
تمامی رخدادهای تولید شده در سرویس مربوطه ، می بایست مطابق دستورات زیر عمل نمود .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterEventsStreamPublisher(); //for [ EventStreamBroker ( Apache Kafka ) ]
builder.RegisterDistributedCaching(); //for [ DistributedLock ] handling
🔥 توجه : در نظر داشته باشید که پردازش OutBox
رخدادهای تولید شده در سرویس مورد نظر ، به جهت مدیریت Concurrency
در Instance
های مختلفی که از سرویس مورد نظر ایجاد می شود ، به ابزار InternalDistributedCache
نیاز دارد
🔥 توجه : بازه ی زمانی اجرای مجدد Job
مورد نیاز برای پردازش OutBox
رخدادهای ایجاد شده ، 5
ثانیه می باشد
4 . پردازش و مصرف کردن Event
های تولید شده
این بخش مهمترین قسمت پیاده سازی شده در پروژه Domic
می باشد ، زیرا پردازش رخدادهای تولیدی به واسطه سرویس های مختلف ، بسیار موضوع مهم و اصطلاحا Critical
می باشد که عدم رعایت نکات ریز فنی و دقت به جزئیات ، باعث بروز Inconsistancy
های مختلف مابین سرویس ها می گردد .
خوشبختانه در پروژه Domic
به تمامی این موارد و نکات توجه شده است و کاربر نهایی ، صرفا می بایست مطابق دستورات مطرح شده عمل کرده و به راحتی هر چه تمام تر به پردازش این Event
ها در بستر EventStreamBroker
بپردازد .
در ابتدا ، برای پردازش Event
های تولیدی توسط سرویس های Producer
، می بایست کلاس های مربوطه ( Consumer
) در لایه UseCase
ایجاد شوند . برای این مهم مطابق دستورات زیر عمل نمایید .
//define in [ Domain ] layer of consumer service
[EventConfig(Topic = "Topic")]
public class UpdatedEvent : UpdateDomainEvent<string> //any type of identity key
{
//payload
}
//define in [ UseCase ] layer of consumer service
public class UpdatedConsumerEventStreamHandler : IConsumerEventStreamHandler<UpdatedEvent>
{
public UpdatedConsumerEventStreamHandler(){}
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : پروژه Domic
بر پایه الگوی طراحی CQRS
که یک الگوی System Design
ایی می باشد ، توسعه پیدا کرده است . لذا در بخش Consume
کردن Event
های مربوطه ، حتما باید نوع تراکنش مورد نظر از نظر Command
و یا Query
بودن مشخص شود
🔥 توجه : برای مدیریت تراکنش بخش مربوط به Query
در مدیریت Event
و نیز Message
، پیش تر در قسمت Command
های مربوط به الگوی Mediator
گفته شد که این بخش نیز مشابه آن می باشد منتها با یک تفاوت و آن این است که باید به جای پیاده سازی ICoreCommandUnitOfWork
، واسط ICoreQueryUnitOfWork
پیاده سازی شود
🔥 توجه : در نظر داشته باشید که در بخش مربوط به مدیریت Event
ها و یا Message
ها ، تمام فرآیند به صورت پیشفرض و ثابت ، در یک Transaction Boundary
صورت می گیرد و صرفا شما به عنوان مدیریت کننده رخداد مربوطه ، باید نوع تراکنش را ( Command
و یا Query
) مشخص نمایید، به این معنی که این رخداد و یا Message
بر کدام بخش پروژه ( بهتر است بگوییم دیتابیس ) قرار است اثر بگذارد ، دیتابیس Command
و یا Query
برای مدیریت تراکنش بخش مربوط به Query
می بایست مطابق دستورات زیر عمل نمایید .
public class QueryUnitOfWork : IQueryUnitOfWork
{
private readonly SQLContext _context;
private IDbContextTransaction _transaction;
public QueryUnitOfWork(SQLContext context) => _context = context; //Resource
public void Transaction(IsolationLevel isolationLevel)
=> _transaction = _context.Database.BeginTransaction(isolationLevel); //Resource
public async Task TransactionAsync(IsolationLevel isolationLevel = IsolationLevel.ReadCommitted,
CancellationToken cancellationToken = new CancellationToken())
{
_transaction = await _context.Database.BeginTransactionAsync(isolationLevel, cancellationToken); //Resource
}
public void Commit()
{
_context.SaveChanges();
_transaction.Commit();
}
public async Task CommitAsync(CancellationToken cancellationToken)
{
await _context.SaveChangesAsync(cancellationToken);
await _transaction.CommitAsync(cancellationToken);
}
public void Rollback() => _transaction?.Rollback();
public Task RollbackAsync(CancellationToken cancellationToken)
{
if (_transaction is not null)
return _transaction.RollbackAsync(cancellationToken);
return Task.CompletedTask;
}
public void Dispose() => _transaction?.Dispose();
public ValueTask DisposeAsync()
{
if (_transaction is not null)
return _transaction.DisposeAsync();
return ValueTask.CompletedTask;
}
}
🔥 توجه : برای فعال سازی منطق مربوط به بخش Query
، حتما می بایست این بخش فعال گردد
برای فعال کردن منطق های بخش مربوط به Query
باید به لایه Presentation
پروژه مربوطه رفته و در فایل Program.cs
مراجعه کرده و مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
//if using the [ EF Core ]
builder.RegisterEntityFrameworkCoreQuery< TQueryContext >(); //TQueryContext -> SqlContext or ...
builder.RegisterQueryRepositories();
برای تعیین رشته اتصال پایگاه داده SQL Server
می بایست به لایه Presentation
پروژه مربوطه رفته و سپس در پوشه Properties
به فایل launchSettings.json
مراجعه کرده و تنظیمات زیر را اعمال نمایید .
{
"environmentVariables": {
"C-SqlServerConnectionString": "", //for [ Command ] section -> CommandTransaction - CommandRepositories & ...
"Q-SqlServerConnectionString": "" //for [ Query ] section -> QueryTransaction - QueryRepositories & ...
}
}
🔥 توجه : دقت نمایید که برای پیاده سازی منطق تراکنش مربوطه ، از ابزار EF Core
استفاده شده است که شما می توانید از هر ابزار و یا دیتابیس دیگری استفاده نمایید
🔥 توجه : در نظر داشته باشید که توابع Handle
و HandleAsync
در مدیریت Event
و یا Message
، هر یک دارای معنای مشخصی است و برای استفاده از هر کدام از این توابع باید در بخش تنظیمات سرویس مربوطه ، Config
خاصی را اعمال نمایید
برای استفاده از توابع Handle
و HandleAsync
بسته به نیاز در مدیریت Event
و یا Message
های بخش MessageBroker
، می بایست ابتدا به لایه Presentation
پروژه مربوطه رفته و سپس در پوشه مربوط به Configs
، فایل مربوط به Config.json
را باز کرده و مطابق دستورات زیر عمل نمایید .
{
"IsExternalBrokerConsumingAsync": false, //false => using Handle() | true => using HandleAsync()
"IsInternalBrokerConsumingAsync": false //false => using Handle() | true => using HandleAsync()
}
🔥 توجه : استفاده از تابع Handle
برای پردازش تک به تک پیام های داخل EventStreamBroker
مورد استفاده قرار می گیرد
🔥 توجه : استفاده از تابع HandleAsync
به شما این امکان را می دهد که پیام های داخل EventStreamBroker
را به شکل Concurrent
پردازش نمایید . در واقع در این حالت به میزانی که در EventStreamBroker
پیام داشته باشید در پروژه شما Task
ایجاد می شود و این Task
ها به شکل Concurrent
به پردازش پیام های شما می پردازند
🔥 توجه : توجه کنید که اگر از تابع HandleAsync
برای مدیریت پیام های EventStreamBroker
استفاده می نمایید ، برای مدیریت بار وارد شده بر Consumer
و به طور دقیق تر ، برای جلوگیری از Crash
نکردن سرویس مربوطه در High Loading
، حتما از قابلیت Throttle
پروژه Domic
که جلوتر اشاره خواهد شد ، استفاده نمایید
در نهایت برای استفاده از EventConsumer
ها در سرویس مربوطه باید در لایه Presentation
و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterEventsStreamSubscriber();
تا به اینجای کار ، ما به بررسی نحوه مدیریت Event
در بستر EventStreamBroker
پرداختیم ، حال برای پردازش Message
نیز تمام مراحل فوق صادق می باشند و تنها در رابط مدیریت کننده Message
تفاوت وجود دارد .
برای مدیریت Message
های دریافتی از EventStreamBroker
پروژه Domic
، شما می بایست از واسط IConsumerMessageStreamHandler
استفاده و مطابق دستورات زیر عمل نمایید .
[StreamConsumer(Topic = "Topic")]
public class ConsumerMessageStreamHandler : IConsumerMessageStreamHandler<MessageDto>
{
public ConsumerMessageStreamHandler(){}
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : دقت نمایید که در بخش مربوط به Message
از هیچ الگوی OutBox
ای برای ارسال پیام ها استفاده نمی شود ( در قسمت Producer
) و این ساختار در مواقعی که نیاز است به طور مستقیم یک پیامی به EventStreamBroker
ارسال شود ، مورد استفاده قرار می گیرد
در نهایت برای استفاده از MessageConsumer
ها در سرویس مربوطه باید در لایه Presentation
و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterMessagesStreamSubscriber();
برای فعال سازی EventStreamBroker
در سرویس مربوطه ، شما می بایست در لایه Presentation
سرویس مربوطه و در فایل Program.cs
مطابق دستورات زیر عمل نمایید .
WebApplicationBuilder builder = WebApplication.CreateBuilder();
builder.RegisterEventStreamBroker();
در نهایت برای تنظیمات مربوط به رشته اتصال EventStreamBroker
باید در لایه Presentation
سرویس مربوطه و در پوشه Properties
و در فایل launchSettings.json
مانند دستورات زیر عمل نمایید .
{
"environmentVariables": {
//for external event stream broker
"E-Kafka-Host": "",
"E-Kafka-Username": "",
"E-Kafka-Password": ""
}
}
1 . استفاده از WithMaxRetryAttribute
این Attribute
به شما این امکان را می دهد که میزان تلاش Consumer
مربوطه برای پردازش Message
و یا Event
مربوطه را مدیریت نمایید .
برای استفاده از این Attribute
می توانید مطابق دستورات زیر عمل نمایید .
//for [ Message ] consuming
[StreamConsumer(Topic = "Topic")]
public class ConsumerMessageStreamHandler : IConsumerMessageStreamHandler<MessageDto>
{
public ConsumerMessageStreamHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
//for [ Event ] consuming
public class UpdatedConsumerEventStreamHandler : IConsumerEventStreamHandler<UpdatedEvent>
{
public UpdatedConsumerEventStreamHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
🔥 توجه : در استفاده از WithMaxRetryAttribute
، یک ویژگی تحت عنوان HasAfterMaxRetryHandle
به چشم می خورد که بیان کننده آن است که آیا نیاز به جداگانه مدیریت کردن پیام مربوطه در صورتی که بیش از حد مجاز Retry
شده است وجود دارد یا خیر . اگر این ویژگی برابر با false
باشد که مقدار پیشفرض این متغییر می باشد ، پیام مربوطه بعد از حداکثر تلاش برای پردازش ، از Topic
مربوطه پاک خواهد شد
اگر پیام مورد نظر در Topic
مربوطه ، به حد مجاز پردازش مجدد برسد ( در صورت بروز خطاهای احتمالی ) ، برای مدیریت جداگانه پردازش پیام مربوطه ، می بایست مطابق دستورات زیر عمل نمایید .
//for [ Message ] consuming
[StreamConsumer(Topic = "Topic")]
public class ConsumerMessageStreamHandler : IConsumerMessageStreamHandler<MessageDto>
{
public ConsumerMessageStreamHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public void Handle(MessageDto message)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Query)] //or -> Type = TransactionType.Command
public Task HandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
//for handle max retry
public void AfterMaxRetryHandle(MessageDto message)
{
//logic
}
public Task AfterMaxRetryHandleAsync(MessageDto message, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
//for [ Event ] consuming
public class UpdatedConsumerEventStreamHandler : IConsumerEventStreamHandler<UpdatedEvent>
{
public UpdatedConsumerEventStreamHandler(){}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public void Handle(UpdatedEvent @event)
{
//logic
}
[WithMaxRetry(Count = 100, HasAfterMaxRetryHandle = true)] //Count = 100 -> this message will be reprocessed a maximum of 100 times in case of an error
[TransactionConfig(Type = TransactionType.Command)] //or => Type = TransactionType.Query
public Task HandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
//for handle max retry
public void AfterMaxRetryHandle(UpdatedEvent @event)
{
//logic
}
public Task AfterMaxRetryHandleAsync(UpdatedEvent @event, CancellationToken cancellationToken)
{
//logic
return Task.CompleteTask;
}
}
2 . استفاده از WithCleanCacheAttribute
مطابق موردی که در Mediator
توضیح داده شد ، در اینجا هم ما مطابق با ابزار Mediator
می توانیم از این Attribute
و طبق دستور العمل پیشتر گفته شده ، استفاده نماییم .
3 . استفاده از Config
های مربوط به Throttle
در بخش تنظیمات
برای مدیریت نرخ Event
و یا Message
های وارده به سرویس Consumer
مربوطه و مدیریت بهتر بار وارده به پروژه ، شما می بایست از قابلیت Throttle
استفاده نمایید .
برای این منظور می بایست به لایه Presentation
سرویس مربوطه رفته و در پوشه Configs
و در فایل Config.json
مطابق دستورات زیر عمل نمایید .
{
//for external [ EventStreamBroker ]
"ExternalTopicConfig": {
"Throttle": [
{
"Active": false, //active or inactive
"Topic": "Topic", //name of target [ Topic ]
"Limitation": 1000, //count of async [ Event ] or [ Message ] processing per second . other world -> count of concurrent [ Task ] per second
},
//for retriable topic | main topic name -> topic . retry topic name -> serviceName-Retry-topicName
//by default this throttle for retry topic is active
{
"Topic": "Topic", //name of target [ Topic ]
"Limitation": 1000 //count of async [ Event ] or [ Message ] processing per second . other world -> count of concurrent [ Task ] per second
}
]
},
//for internal [ EventStreamBroker ]
"InternalTopicConfig": {
"Throttle": []
}
}
🔥 توجه : در تنظیمات بالا به این نکته توجه داشته باشید که در صورتی که ویژگی IsExternalBrokerConsumingAsync
و یا IsInternalBrokerConsumingAsync
فعال باشد ، قابلیت Throttle
با تنظیمات بالا اعمال می گردد