Part-4 Asynchronous Data Communication Between Microservices Using RabbitMQ Message Broker With MassTransit[.NET6 Microservice Series]
In this article, we are going to establish asynchronous data communication between microservice using the RabbitMQ message broker along with MassTransit.
Steps we are going to accomplish this demo are:
MassTransit:
Note: Now add the newly created class library reference into both microservice applications.
Install 'MassTransit.RabbitMQ'
Now we can observe data between '[Manufacture].[dbo].[Products]' and '[SalesBusiness].[dbo].[Products]' these 2 tables are synced because of RabbitMQ asynchronous communication between the microservices.
Now let's take look at the 'Orders' GET endpoint response.
Here we can understand we don't have 'Product' information. So we have to make a way to save the required 'Product' information into the 'SalesBusiness.API' microservice application from the 'Manufacture.API' microservice application.- Configure RabbitMQ message broker with MassTransit for asynchronous data communication between the microservices.
- In the 'Manufacture' database we have a 'Products' table(like Master table), now we will create a new 'Products' table(like child table) in 'SalesDatabase', but here we will add only required columns. So that while fetching the 'Orders' endpoint it is to fetch required products information.
- So with help of RabbitMQ, we always maintain consistent or exact data between both parent and child tables.
Part-3 Create A Microservice For The Orders Endpoint[.NET6 Microservice Series]
RabbitMQ:
RabbitMQ is a message broker that helps to share data asynchronously between the publishers and receivers.
RabbitMQ exchange flow:- RabbitMQ supports different exchange types like 'Direct', 'Topic', 'Headers', 'Fanout'. By default RabitMQ uses 'Fanout' exchange, we also use this fanout exchange for our demo.
- In Fanout exchange when a producer sends a message to RabbitMQ then the message will be sent to the 'FanoutExchange' and then the from the fanout exchange messages will be delivered to all queues then from the queued messages will be received by the respective receivers.
MassTransit:
MassTransit is free open-source software that can wrap around message brokers like 'RabbitMQ', 'Azure Service Bus', 'SQS', 'ActiveMQ Service Bus' etc.
MassTransit provides an easy way to configure the message brokers in our .NET applications.
RabbitMQ Docker-Compose:
step1:
Install Docker Desktop into your local machine.(https://www.docker.com/products/docker-desktop/)
step2:
Install Docker Desktop into your local machine.(https://www.docker.com/products/docker-desktop/)
step2:
Now create a folder of any name then in that folder add the 'docker-compose.yaml' file. Then add the following docker-compose configurations.
docker-compose.yaml:
version: '3.8' services: rabbitmq: image: rabbitmq:3-management ports: - 15672:15672 - 4001:5672 container_name: myrabbitcontainer
- (Line: 1) The 'version' is the docker-compose document version.
- (Line: 3) The 'services' represents software or service we are going to run on docker.
- (Line: 4) The 'rabbitmq' custom name of our service, is recommended giving a meaningful name that represents the service.
- (Line: 5) Docker image of 'RabbitMQ'.
- (Line: 7) The right-hand side port '15672' runs the RabbitMQ admin tool.
- (Line: 8) The right-hand side port '5672' runs the RabbitMQ service.
- (Line: 9) Name of the container to run our 'RabbitMQ' image.
Command to run the docker container.
docker-compose up -d
Create A Shared Library Contains DTOs For RabbitMQ Messages:
Our two microservices in which one will act as publisher and another will act as a consumer using the RabbitMQ message broker. So the 'Type' of the message will be a model(POCO class) that will be created in 'Shared Library' that will be consumed by both our Microservices applications.
Now create a .NET Library.
CLI command
dotnet new classlib -o Your_Project_Name
dotnet new classlib -o Your_Project_Name
Note: Now add the newly created class library reference into both microservice applications.
Now in the shared library let's add a new folder like 'RabbitMqModels', next add a model(POCO class) like 'ProductCreated'(this type of our message shared by RabbitMq).
{Shared_Project}/RabbitMqModels/ProductCreated.cs:
namespace Shared.Models.RabbitMqModel; public class ProductCreated { public int Id { get; set; } public string Name { get; set; } }
Install NuGet Packges:
To configure RabbitMQ and MassTransit we have to install the following NuGet Package into our both microservice applications.
Install 'MassTransit' NuGet Package.
CLI command
dotnet add package MassTransit --version 8.0.0
dotnet add package MassTransit --version 8.0.0
Package Manager
Install-Package MassTransit -Version 8.0.0
Install-Package MassTransit -Version 8.0.0
Install 'MassTransit.AspNetCore' NuGet Package.
CLI command
dotnet add package MassTransit.AspNetCore --version 7.3.1
dotnet add package MassTransit.AspNetCore --version 7.3.1
Package Manager
Install-Package MassTransit.AspNetCore -Version 7.3.1
Install-Package MassTransit.AspNetCore -Version 7.3.1
Install 'MassTransit.RabbitMQ'
CLI command
dotnet add package MassTransit.RabbitMQ --version 8.0.0
dotnet add package MassTransit.RabbitMQ --version 8.0.0
Package Manager
Install-Package MassTransit.RabbitMQ -Version 8.0.0
Install-Package MassTransit.RabbitMQ -Version 8.0.0
Publisher:
Our 'Manufacturer.API' microservice application will be the publisher.
Configure 'MassTransit' and 'RabbitMQ' services in to the 'Program.cs'.
Manufacturer.API/Program.cs:
builder.Services.AddMassTransit(options => { options.UsingRabbitMq((context,cfg) => { cfg.Host(new Uri("rabbitmq://localhost:4001"), h => { h.Username("guest"); h.Password("guest"); }); }); });
- (Line: 1) Registered the 'MassTransit' service.
- (Line: 2) The 'RabbitMQ' service is configured inside of the 'MassTransit' service.
- (Line: 3) Defined our 'RabbitMQ' host. Here port '4001' is my custom port exposed from the docker container.
- (Line: 4&5) The default username and password for 'RabbitMQ' are 'guest'.
Manufactures.API/Controllers/ProductController.cs:
using MassTransit; // existing code hidden for display purpose [ApiController] [Route("[controller]")] public class ProductsController : ControllerBase { private readonly IPublishEndpoint _publishEndpoint; public ProductsController( IPublishEndpoint publishEndpoint) { _publishEndpoint = publishEndpoint; } }Now let's implement publish logic in our 'PostAsync' action method.
Manufactures.API/Controllers/ProductController.cs:
using Shared.Models.RabbitMqModel; // existing code hidden for display purpose [HttpPost] public async Task<IActionResult> PostAsync(Products newProduct) { _manufactureContext.Products.Add(newProduct); await _manufactureContext.SaveChangesAsync(); await _publishEndpoint.Publish<ProductCreated>(new ProductCreated { Id = newProduct.Id, Name = newProduct.Name }); return CreatedAtAction("Get", new { id = newProduct.Id }, newProduct); }
- (Line: 10-14) The 'IPublishEndpoint.Publish<T>()' method pushes the message into the 'Fanout Exchange' of RabbitMQ. The message of type is 'Shared.Models.RabbitMqMode.ProductCreated'.
Create A Child Products Table In SalesBusiness Database:
Let's create a child 'Products' table in the 'SalesBusiness' database.
CREATE TABLE [dbo].[Products]( [Id] [int] NOT NULL, [Name] [varchar](500) NULL, CONSTRAINT [PK_Products] PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
Configure Products Table Entity In SalesBusiness.API Project:
Let's add the 'Product' entity in our 'SalesBusiness.API' project
SalesBusiness.API/Data/Entities/Products.cs:
namespace SalesBusiness.Api.Data.Entities; public class Products { public int Id { get; set; } public string Name { get; set; } }Now register the 'Products' inside of the 'SalesBusinessContext'.
SalesBusiness.API/Data/SalesBusinesContext.cs:
using Microsoft.EntityFrameworkCore; using SalesBusiness.Api.Data.Entities; namespace SalesBusiness.Api.Data; public class SalesBusinessContext: DbContext { public SalesBusinessContext(DbContextOptions<SalesBusinessContext> options):base(options) { } public DbSet<Orders> Orders{get;set;} public DbSet<Products> Products{get;set;} }
Consumer:
Our 'SalesBusiness.API' microservice application will be the consumer.
Using 'MassTransit.IConsumer<T>' we will make a RabbitMQ queue consumer. So let's create a folder like 'Consumer' and then add a class inside of it like 'ProductCreatedConsumer.cs'.
SalesBusiness.API/Consumer/ProductCreatedConsumer.cs:
using MassTransit; using SalesBusiness.Api.Data; using SalesBusiness.Api.Data.Entities; using Shared.Models.RabbitMqModel; namespace SalesBusiness.Api.Consumer; public class ProductCreatedConsumer : IConsumer<ProductCreated> { private readonly SalesBusinessContext _salesBusinessContext; public ProductCreatedConsumer(SalesBusinessContext salesBusinessContext) { _salesBusinessContext = salesBusinessContext; } public async Task Consume(ConsumeContext<ProductCreated> context) { var newProduct = new Products{ Id = context.Message.Id, Name = context.Message.Name }; _salesBusinessContext.Add(newProduct); await _salesBusinessContext.SaveChangesAsync(); } }
- To make 'ProductCreatedConsumer' entity as a RabbitMQ queue consumer it must inherit the 'MassTransit.IConsumer'.
- (Line: 15-23) Here implemented 'Consume' asynchronous method that's gets executed on every new message received by the queue. Inside of this method, our logic to store the message data has been implemented.
SalesBusiness.API/Programc.s:
builder.Services.AddMassTransit(x => { x.AddConsumer<ProductCreatedConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri("rabbitmq://localhost:4001"), h => { h.Username("guest"); h.Password("guest"); }); cfg.ReceiveEndpoint("event-listener", e => { e.ConfigureConsumer<ProductCreatedConsumer>(context); }); }); });
- (Line: 9-12) Here defined our channel or queue name like 'event-listener'. So 'RabbitMQ' Fanout exchange pushes messages to all queues. Here register our channel entity that is 'ProductCreatedConsumer' which listens for every new message from the RabbitMQ queue.
Test Microservices Communication With RabbitMQ Message Broker:
Step1:
Make sure our microservice application running under different port numbers. To change port number
Make sure our microservice application running under different port numbers. To change port number
goto 'Properties/launchsettings.json'.
Step2:
Make sure to run the Docker desktop application on our local machine and then run our docker-compose for 'RabbitMQ' as explained above.
Make sure to run the Docker desktop application on our local machine and then run our docker-compose for 'RabbitMQ' as explained above.
Step3:
Let's try to create a new product from our Product Endpoint(Manufacture.API microservice application). In the action method we are publishing our new product record as a message to the RabbitMQ queue channel, so our 'SalesBusiness.API' project act as a consumer and reads the message and saves the message into our child 'Products' table in the 'SalesBusiness' database. This step represents asynchronous communication between our microservices.
Now we can observe data between '[Manufacture].[dbo].[Products]' and '[SalesBusiness].[dbo].[Products]' these 2 tables are synced because of RabbitMQ asynchronous communication between the microservices.
Update Orders GET Endpoint To Fetch Products Information:
First, let's create DTO for our 'Orders' like 'Orders.Dto' that contains 'Prodcut.Dto' as one of its properties.
SalesBusiness.API/DTOs/Orders.Dto.cs:
namespace SalesBusiness.Api.DTOs; public class OrdersDto { public int Id { get; set; } public string UserId { get; set; } public DateTime? OrderDate { get; set; } public ProductDto ProductInfo { get; set; } } public class ProductDto { public int Id { get; set; } public string Name { get; set; } }Now let's update our Orders GET endpoint to fetch the 'Product' information along with the 'Orders'.
SalesBusiness.API/Controllers/OrdersController.cs:
[HttpGet] public async Task<IActionResult> GetAsync() { var orders = await _salesBusinessContext.Orders .Join( _salesBusinessContext.Products, order => order.ProductId, product => product.Id, (order, product) => new { Order = order, Product = product } ) .Select(_ => new OrdersDto { Id = _.Order.Id, OrderDate = _.Order.OrderDate, UserId = _.Order.UserId, ProductInfo = new ProductDto { Id = _.Product.Id, Name = _.Product.Name } }).ToListAsync(); return Ok(orders); }
- Here 'Orders' table is joined with the 'Product' table while fetching the collection of 'Orders'.
Support Me!
Buy Me A Coffee
PayPal Me
Video Session:
Wrapping Up:
Hopefully, I think this article delivered some useful information Microservice flow in .NET6 applications. using I love to have your feedback, suggestions, and better techniques in the comment section below.
This comment has been removed by the author.
ReplyDeleteVery good Tutorial thanks
ReplyDelete