Introduction:
In this article, we’ll explore the intricacies of crafting an event-driven architecture using the Reactive Redis Publisher/Subscriber .
Table of Contents
Reactive Redis Overview:
Reactive Redis, in essence, is a powerful tool for implementing reactive programming paradigms. Its core functionality lies in the Publish/Subscribe model, allowing for efficient and scalable event-driven architectures. This model is particularly valuable when integrated with frameworks like Spring WebFlux, enabling developers to build responsive and real-time systems. For a deeper dive into its integration specifics, refer to the detailed discussion in the associated article.
Redis Publisher/Subscriber Model:
Pub/Sub stands as a messaging paradigm facilitating communication among various components within a distributed system. In this model, publishers dispatch messages to a specific topic, and subscribers, expressing interest in that topic, receive the messages. This structure maintains the anonymity of publishers while ensuring that the messages reach all relevant subscribers.
Redis Pub/Sub introduces a notable feature where messages sent by publishers to channels are actively pushed by Redis to all subscribed clients. This mechanism guarantees that subscribers receive the messages in the chronological order of their publication, enhancing the reliability and orderliness of communication in the system.
Application Configuration Using Spring Boot and Reactive Redis:
Technical Stack for the Project:
- Spring Boot version 3.2.0
- Java version 21
- Redis
- Docker
Starting Up Redis Server and Redis Insight:
Initiating the Redis server along with Redis Insight involves utilizing Docker, with the Redis server configured on port 6379 and Redis Insight on port 8001:
Docker run -d --call redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:today's
To set up the application, use Spring Initializer and include the following dependencies in the pom.xml file:
<dependencies>
<!-- Spring Boot Starter for Reactive Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<!-- Spring Boot Starter for WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Project Lombok for simplified Java code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Reactor Test for testing reactive components -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<!-- SpringDoc OpenAPI Starter for WebFlux UI -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webflux-ui</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
Setting Up Configurations:
To configure the Spring Boot application, modify the application.yml file with the Redis-related settings as follows:
redis:
host: localhost
port: 6379
This configuration specifies that the Redis server is expected to be running on the local machine at the specified host (localhost) and port (6379). Adjust these settings according to your Redis server’s actual host and port if they differ. To configure the Spring Boot application, modify the application.yml file with the Redis-related settings as follows:
Reactive Redis Pub/Sub Configuration:
The RedisMessageListenerContainer serves as a crucial message-listener container, facilitating the reception of messages from a Redis channel and steering the injected MessageListener instances. Below is a rewritten note summarizing the provided configuration:
@Configuration
@Slf4j
public class RedisConfig {
@Value("${redis.host}")
private String host;
@Value("${redis.port}")
private int port;
@Bean
@Primary
public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
@Bean
public ReactiveRedisOperations reactiveRedisOperations(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Product.class);
RedisSerializationContext.RedisSerializationContextBuilder builder =
RedisSerializationContext.newSerializationContext(new StringRedisSerializer());
RedisSerializationContext context = builder.value(serializer).build();
return new ReactiveRedisTemplate(reactiveRedisConnectionFactory, context);
}
@Bean
public ReactiveRedisMessageListenerContainer messageListenerContainer(
final ProductService productService,
final ReactiveRedisConnectionFactory reactiveRedisConnectionFactory
) {
final ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(reactiveRedisConnectionFactory);
final ObjectMapper objectMapper = new ObjectMapper();
container.receive(ChannelTopic.of("products"))
.map(ReactiveSubscription.Message::getMessage)
.map(message -> {
try {
return objectMapper.readValue(message, Product.class);
} catch (IOException e) {
return null;
}
})
.switchIfEmpty(Mono.error(new IllegalArgumentException()))
.flatMap(productService::save)
.subscribe(c -> log.info("Product Saved Successfully."));
return container;
}
}
This configuration demonstrates the setup of a Reactive Redis Pub/Sub system within a Spring Boot application, emphasizing the utilization of RedisMessageListenerContainer to handle messages from the “products” channel.Customizations can be made based on specific project requirements.
Product Publisher Class:
The ProductPublisher class functions as the Redis Publisher, responsible for publishing messages to the specified channel.
@Component
@RequiredArgsConstructor
@Slf4j
public class ProductPublisher {
personal static very last string REDIS_CHANNEL = "PRODUCT_CHANNEL";
private final ReactiveRedisOperations reactiveRedisOperations;
public Mono publishProductMessage(final Product product) {
this.reactiveRedisOperations.convertAndSend(REDIS_CHANNEL, product).subscribe(count -> {
log.info("Product Published Successfully.");
});
return Mono.just("Published Message to Redis Channel");
}
}
This class is annotated with @Component to indicate its role as a Spring component. It utilizes the ReactiveRedisOperations to publish a Product message to the designated Redis channel (“PRODUCT_CHANNEL”). The publishProductMessage method logs a success message upon successful publication and returns a Mono with a message indicating the successful message publication to the Redis channel.
Product Service Class:
The ProductService class, annotated with @Service, encapsulates operations related to the product domain.
@Service
@RequiredArgsConstructor
public class ProductService {
private final ReactiveRedisOperations reactiveRedisOperations;
private static trailing string REDIS_KEY = "PRODUCT_REDIS";
public Flux findAll() {
return this.reactiveRedisOperations.opsForList().range(REDIS_KEY, 0, -1);
}
public Mono save(final Product product) {
final String id = UUID.randomUUID().toString().substring(0, 8);
product.setId(id);
return this.reactiveRedisOperations.opsForList().rightPush(REDIS_KEY, product);
}
}
This service class, marked with @Service, provides methods for interacting with Redis using ReactiveRedisOperations. The findAll method retrieves all products from the Redis list, and the save method generates a unique ID for the product, sets it, and then saves the product to the Redis list. The class is designed to handle product-related operations in a reactive manner.
Product Controller Class:
@RestController
@RequestMapping("/product")
@RequiredArgsConstructor
public class ProductController {
private final ProductService productService;
private final ProductPublisher productPublisher;
@GetMapping
public Flux findAll() {
return this.productService.findAll();
}
@PostMapping("/publish")
public Mono publishProduct(@RequestBody final Product product) {
return this.productPublisher.publishProductMessage(product);
}
}
In this class, the @GetMapping annotated method exposes an endpoint to retrieve all products, utilizing the findAll method from the injected ProductService. The @PostMapping(“/publish”) annotated method is responsible for publishing a product message to Redis, leveraging the ProductPublisher. The @RequestBody annotation ensures that the Product object is correctly mapped from the request body. This controller class provides a RESTful API for managing products, including retrieval and publication functionalities.
Testing the Application:
To assess the application’s functionality, we’ll employ Spring Doc Open API for publishing product messages to the Redis Channel. Follow these steps:
1.Publish Product Message to Redis Channel:
Utilize Spring Doc Open API to publish product messages effectively to the designated Redis channel.
2.Redis Insight Dashboard with Published Products:
Monitor the Redis Insight dashboard to visualize the products successfully published during testing.
3.Published Products in Redis Database:
Verify the presence of the published products within the Redis database.
Summary:
In this article, we’ve delved into the following key areas:
Overview of Reactive Redis
Redis Publisher-Subscriber model
Docker-based initiation of Redis Server and Redis Insight
Integration of Reactive Redis Pub/Sub with Spring Boot
Testing REST endpoints through Spring Doc Open API
Feel free to share your valuable feedback. Thank you for your readership.
Pingback: Migrating From Lombok to Records in Java: A Comprehensive guide 2208 - Javanetc