RabbitMq شبیه به یک صف FIFO عمل میکند؛ یعنی دادهها به ترتیب وارد queue میشوند و به ترتیب نیز به Consumerها ارسال میشوند. برای شروع، یک سولوشن جدید را به نام RabbitMqExample ایجاد میکنیم و پروژههای زیر را به آن اضافه میکنیم.
- یک پروژه از نوع Asp.Net Core Web Application ایجاد میکنیم به نام RabbiMqExample.Producer که همان ارسال کننده (Producer) میباشد.
- یک پروژه از نوع Asp.Net Core Web Application به نام RabbitMqExample.Consumer برای دریافت کننده (Consumer).
- یک پروژه از نوع Class library .Net Core به نام RabbitMqExample.Common که شامل سرویسها و مدلهای مشترک بین Producer و Consumer میباشد.
ابتدا در لایه Common یک کلاس برای دریافت اطلاعات RabbitMq از appsettings.json ایجاد میکنیم.
public class RabbitMqConfiguration
{
public string HostName { get; set; }
public string Username { get; set; }
public string Password { get; set; }
}
سپس یک سرویس را برای برقراری ارتباط با RabbitMq ایجاد میکنیم
public interface IRabbitMqService
{
IConnection CreateChannel();
}
public class RabbitMqService : IRabbitMqService
{
private readonly RabbitMqConfiguration _configuration;
public RabbitMqService(IOptions<RabbitMqConfiguration> options)
{
_configuration = options.Value;
}
public IConnection CreateChannel()
{
ConnectionFactory connection = new ConnectionFactory()
{
UserName = _configuration.Username,
Password = _configuration.Password,
HostName = _configuration.HostName
};
connection.DispatchConsumersAsync = true;
var channel = connection.CreateConnection();
return channel;
}
}
در متد CreateChannel، اطلاعات موردنیاز برای ارتباط با RabbitMq را مانند هاست، نام کاربری و کلمه عبور، وارد میکنیم که از appsettings.json خوانده شدهاند. مقدار پیشفرض نام کاربری و کلمه عبور، guest میباشد. اگر بخواهید Consumer شما دادههای queueها را به صورت async دریافت کند، باید مقدار پراپرتی DispatchConsumersAsync مربوط به ConnectionFactory را برابر true کنید. مقدار پیشفرض آن false است.
در ادامه یک کلاس را برای رجیستر کردن سرویسها ایجاد میکنیم؛ در لایه Common.
public static class StartupExtension
{
public static void AddCommonService(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<RabbitMqConfiguration>(a => configuration.GetSection(nameof(RabbitMqConfiguration)).Bind(a));
services.AddSingleton<IRabbitMqService, RabbitMqService>();
}
}
پکیجهای مورد نیاز این لایه :
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
</ItemGroup>
تا به اینجا موارد مربوط به لایه Common تمام شده؛ در ادامه باید یک Consumer و یک Producer را ایجاد کنیم.در لایه Consumer برای دریافت دادهها از RabbitMq؛ یک سرویس را به نام ConsumerService ایجاد میکنیم:
ابتدا connection را ایجاد کردهایم؛ توسط متد CreateChannel که آنرا در سرویس قبلی پیاده سازی کردیم.بعد از ایجاد IModel، باید queue مربوطه را معرفی کنیم که با استفاده از متد QueueDeclare این کار را انجام دادهایم. پارامترهای متد QueueDeclare:
- پارامتر اول، اسم queue میباشد
- پارامتر durable مشخص میکند که دادهها به صورت مانا باشند یا نه. اگر برابر true باشد، دیتاهای مربوط به queueها، در دیسک ذخیره میشوند؛ اما اگر برابر false باشد، بر روی حافظه ذخیره میشوند. در محیطهایی که مانایی اطلاعات مهم میباشد، باید مقدار این پارامتر را true کنید.
- پارامتر سوم: اطلاعات بیشتر
- پارامتر autoDelete اگر برابر true باشد، زمانی که تمامی Consumerها ارتباطشان با RabbitMq قطع شود، queue هم پاک میشود. اما اگر برابر true باشد، queue باقی میماند؛ حتی اگر هیچ Consumer ای به آن وصل نباشد.
در ادامه باید Exchange مربوط به queue را مشخص کنیم. متد ExchangeDeclare یک Exchange را ایجاد میکند. پارامترهای متد ExchangeDeclare:
- نام Exchange
- نوع Exchange که میتواند Headers , Topic , Fanout یا Direct باشد. اگر برابر Fanout باشد و اگر دادهای وارد Exchange شود، آنرا به تمامی queue هایی که به آن بایند شدهاست، ارسال میکند. اما اگر نوع آن Direct باشد، داده را به یک queue مشخص ارسال میکند؛ با استفاده از پارامتر routeKey.
- پارامترهای بعدی، durable و autoDelete هستند که همانند پارامترهای QueueDeclare عمل میکنند.
سپس در ادامه با استفاده از متد QueueBind میتوانیم queue ایجاد شده را به exchange ایجاد شده، بایند کنیم. پارامتر اول، اسم queue و پارامتر دوم، اسم exchange میباشد و پارامتر سوم، routeKey میباشد و چون نوع Exchange ایجاد شده از نوع Fanout است، آنرا خالی میگذاریم.
چون هنگام تعریف queue مقدار پارامتر DispatchConsumersAsync مربوط به ConnectionFactory را برابر true کردیم، در اینجا نیز باید بجای EventingBasicConsumer، از AsyncEventingBasicConsumer استفاده کنیم. اگر مقدار DispatchConsumersAsync برابر false باشد، باید از EventingBasicConsumer برای ایجاد Consumer استفاده کنید.
سپس باید EventHandler مربوط به دریافت دادهها از queue را پیاده سازی کنیم. event مربوط به Received، زمانی اجرا میشود که دادهای به queue ارسال شود. زمانیکه دادهای ارسال میشود، وارد event مربوطه میشود و ابتدا آنرا به صورت byte دریافت میکنیم. سپس رشتهی ارسالی آنرا توسط متد GetString، بدست میآوریم و دادهی ارسال شده را در صفحهی کنسول نمایش میدهیم.
در ادامه به RabbitMq اطلاع میدهیم که دادهای که ارسال شده برای queue، توسط Consumer دریافت شده؛ با استفاده از متد BasicAck. این کار یک delivery به RabbitMq ارسال میکند تا دیتای ارسال شده را را پاک کند. اگر این متد را فراخوانی نکنیم، هربار که برنامه اجرا میشود، تمامی دیتاهای قبلی را مجددا دریافت میکنیم و تا زمانیکه delivery را به RabbitMq نفرستیم، دادهها را پاک نمیکند.
نکته آخر در Consumer، متد BasicConsume است که عملا Consumer ایجاد شده را به RabbitMq معرفی میکند. برای دریافت دادهها و ثبت Consumer، نیازمند آن هستیم تا یکبار متد ReadMessage فراخوانی شود. برای همین یک HostedService ایجاد میکنیم تا یکبار این متد را فراخوانی کند:
public class ConsumerHostedService : BackgroundService
{
private readonly IConsumerService _consumerService;
public ConsumerHostedService(IConsumerService consumerService)
{
_consumerService = consumerService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _consumerService.ReadMessgaes();
}
}
در نهایت سرویسهای ایجاد شده را رجیستر میکنیم؛ در Startup لایه Consumer
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; set; }
public void ConfigureServices(IServiceCollection services)
{
services.AddCommonService(Configuration);
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
}
}
تا به اینجا کارهای مربوط به Consumer تمام شده و باید قسمت Producer آنرا پیاده سازی کنیم.در لایه Producer یک کنترلر به نام RabbitController را ایجاد میکنیم که شامل یک متد میباشد که دادهها را به Queue ارسال میکند:
[Route("api/[controller]/[action]")]
[ApiController]
public class RabbitController : ControllerBase
{
private readonly IRabbitMqService _rabbitMqService;
public HomeController(IRabbitMqService rabbitMqService)
{
_rabbitMqService = rabbitMqService;
}
[HttpPost]
public IActionResult SendMessage()
{
using var connection = _rabbitMqService.CreateChannel();
using var model = connection.CreateModel();
var body = Encoding.UTF8.GetBytes("Hi");
model.BasicPublish("UserExchange",
string.Empty,
basicProperties: null,
body: body);
return Ok();
}
}
در متد SendMessage، ابتدا ارتباط خود را با RabbitMq برقرار میکنیم و سپس دیتای “Hi” را به صورت byte، به RabbitMq ارسال میکنیم؛ توسط متد BasicPublish.پارامتر اول، اسم Exchange است و پارامتر دوم، routeKey و body هم دیتای ارسالی میباشد. در نهایت سرویسهای مربوط به لایه Producer را رجیستر میکنیم؛ در Startup لایه Consumer:
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; set; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddCommonService(Configuration);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapDefaultControllerRoute();
});
}
}
اکنون اگر هر دو پروژه را اجرا کنید و متد SendMessage مربوط به کنترلر Rabbit را فراخوانی کنید، بعد از آنکه پیام شما ارسال شد، در صفحه کنسول مربوط به Consumer، رشته ارسال شده را مشاهده میکنید.فایل appsetting.json مربوط به پروژههای Consumer و Producer:
{
"RabbitMqConfiguration": {
"HostName": "localhost",
"Username": "guest",
"Password": "guest"
}
}
فایل docker-compose.yml برای اجرای RabbitMq بر روی داکر:
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
ports:
- 5672:5672
- 15672:15672
کدهای این مقاله را میتوانید از گیتهاب دانلود کنید.
Views: 0
طراح و توسعه دهنده نرم افزار، با سوابق مختلف در زمینه زیر ساخت و SQL Server