Introduction
In this narrative, I will delve into the intricacies of processing real-time data streaming through Redis streams within a Spring Boot environment. The focus will be on elucidating the producer-consumer model.
In a previous attempt, I experimented with the Redis pub/sub model. However, a notable limitation arose: when a producer publishes a topic, all connected consumers processed the data indiscriminately. This stands in contrast to Kafka’s consumer group paradigm, where messages are intelligently distributed among group members. Pub/sub, on the other hand, disseminates messages to all connected and subscribed clients without discrimination.
Table of Contents
Stream Processing
Traditionally, real-time data processing involved batch processing, where activities such as payments were logged in a database and later processed in nightly or batch updates. However, the contemporary demand for immediate insights, particularly in applications like recommendation engines (e.g., Netflix, Prime) or real-time financial transactions (e.g., credit card transactions, UPI transactions), necessitates continuous and instantaneous data processing.
Enter stream processing—a paradigm for real-time, continuous data processing. This narrative will explore how we can achieve straightforward real-time stream processing using Redis Stream in conjunction with Spring Boot.
Consider a scenario involving a movie database. Users, whether registered or anonymous, interact with movies by expressing sentiments through actions like liking, disliking, and rating. Our objective is to capture and tally the number of views, likes, dislikes, and ratings in real-time. I will develop a producer/publisher service that generates randomized likes, dislikes, and ratings. These generated events will be published to Redis streams. Subsequently, consumers subscribed to these streams will update relevant information either in a database or another subsystem. For simplicity, in this case, I have opted to use Redis sorted sets for data storage.
Technology Stack
Java Version: Java 8
Spring Boot Version: 2.3.7.RELEASE
Build Tool: Maven
Containerization: Docker (Both Redis and applications are containerized)
Now, let’s delve into the codebase:
// Producer Service
@RestController
@RequestMapping("/movies")
public class MovieProducerController {
@Autowired
private RedisTemplate redisTemplate;
@PostMapping("/events")
public ResponseEntity generateMovieEvent() {
// Logic to generate random likes, dislikes, and ratings
// ...
// Publish events to Redis Stream
String movieEvent = // Construct your movie event as a string;
redisTemplate.opsForStream().add("movie-stream", Collections.singletonMap("event", movieEvent));
return ResponseEntity.ok("Movie event published successfully");
}
}
// Consumer Service
@Service
public class MovieConsumerService {
@StreamListener(target = "movie-stream")
public void processMovieEvent(@Payload String movieEvent) {
// Logic to process the movie event
// ...
// Update relevant information in the database or subsystem
// ...
// For simplicity, using Redis sorted sets
// redisTemplate.opsForZSet().add("likes", movieEvent, likeCount);
// redisTemplate.opsForZSet().add("dislikes", movieEvent, dislikeCount);
// redisTemplate.opsForZSet().add("ratings", movieEvent, rating);
}
}
Docker Compose File:
version: '3.8'
services:
redis:
image: "redis:latest"
ports:
- "6379:6379"
movie-producer:
image: "your-movie-producer-image:latest"
ports:
- "8080:8080"
depends_on:
- redis
movie-consumer:
image: "your-movie-consumer-image:latest"
depends_on:
- redis
This setup employs Java 8, Spring Boot 2.3.7.RELEASE, and Maven for building. Both the producer and consumer services are containerized using Docker, with Redis serving as the message broker in a Docker container as well. The code showcases a basic movie event generation and processing scenario using Redis streams.
Publisher Configuration
Let’s start by defining the DTO classes used for publishing and consuming movie events:
// MovieEventDTO for publishing to Redis stream
public class MovieEventDTO {
private String movieName;
private ActionType actionType; // Enum: LIKE, DISLIKE, RATING
private int userId; // ID of the user performing the action
// Constructors, getters, and setters
}
// MovieStatsDTO for storing aggregated movie statistics
public class MovieStatsDTO {
private String movieName;
private long likesCount;
private long dislikesCount;
private double averageRating;
// Constructors, getters, and setters
}
// ActionType Enum
public enum ActionType {
LIKE, DISLIKE, RATING
}
Now, let’s create the MoviePublishEvent class responsible for generating random movie events and publishing them to the Redis stream every 2 seconds:
@Service
public class MoviePublishEvent {
private final RedisTemplate redisTemplate;
private final Random randomGenerator = new Random();
@Autowired
public MoviePublishEvent(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Scheduled(fixedRate = 2000) // Run every 2 seconds
public void publishRandomMovieEvent() {
// Generate random movie event
MovieEventDTO movieEvent = generateRandomMovieEvent();
// Publish to Redis stream
redisTemplate.opsForStream().add("movie-stream", Collections.singletonMap("event", movieEvent));
System.out.println("Published movie event: " + movieEvent);
}
private MovieEventDTO generateRandomMovieEvent() {
MovieEventDTO movieEvent = new MovieEventDTO();
movieEvent.setMovieName("RandomMovie" + random.nextInt(100));
movieEvent.setActionType(ActionType.values()[random.nextInt(ActionType.values().length)]);
movieEvent.setUserId(random.nextInt(1000));
return movieEvent;
}
}
This class utilizes a Spring @Scheduled annotation to run the publishRandomMovieEvent method every 2 seconds, generating a random movie event and publishing it to the “movie-stream” Redis stream using the RedisTemplate. The generateRandomMovieEvent method creates a random movie event with a movie name, action type (like, dislike, or rating), and a user ID.
Redis Consumer
RedisConfig
class:
package com.spring.redis.streams.config;
import com.spring.redis.streams.dto.MovieDetails;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
@Configuration
public class RedisConfig {
@Value("${stream.key}")
private String streamKey;
@Autowired
private StreamListener streamListener;
@Bean
public Subscription createSubscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(MovieDetails.class)
.build();
StreamMessageListenerContainer listenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.receive(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}
Movie Event Consumer:
package com.spring.redis.streams.config;
import com.spring.redis.streams.dto.MovieDetails;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
@Service
@Slf4j
public class MovieEventConsumer implements StreamListener {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired
private ReactiveRedisTemplate redisTemplate;
@Override
@SneakyThrows
public void onMessage(ObjectRecord record) {
log.info(InetAddress.getLocalHost().getHostName() + " - consumed: " + record.getValue());
if (record.getValue().getLikes()) {
this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Likes", 1).subscribe();
}
if (record.getValue().getDisLike()) {
this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Dislikes", 1).subscribe();
}
this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Views", 1).subscribe();
this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Rating", record.getValue().getRating()).subscribe();
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar() {
log.info("Total Consumer :: " + atomicInteger.get());
}
}
Dockerizing the Application
To simplify the setup of Redis and the Spring Boot applications, Docker and Docker Compose have been employed. The consumer application has been scaled to three replicas to distribute the load effectively.
Docker-compose.yaml
version: '3.8'
services:
redis:
image: "redis:latest"
redis-commander:
image: "rediscommander/redis-commander:latest"
ports:
- "8081:8081"
environment:
- REDIS_HOSTS=local:redis:6379
redis-publisher:
build:
context: ./path-to-redis-publisher
environment:
- PUBLISH_RATE=2000
redis-consumer:
build:
context: ./path-to-redis-consumer
command: ["--scale", "3"]
Dockerfile for Redis-publisher
FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
Dockerfile for Redis-consumer
FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
Building and Starting the Application
- Navigate to both the consumer and publisher directories and run the following Maven command to create a JAR file:
mvn clean package -DskipTests
2. Bring up Redis and Redis Commander:
docker-compose up redis redis-commander
3.Access Redis Commander GUI at http://127.0.0.1:8081.
4. Create a Redis stream named “movie-events” using the following command:
XADD movie-events * any-key any-value
5. Create a consumer group to share the load between consumers:
XGROUP CREATE movie-events movie-events $
6. Bring up the Redis-producer:
docker-compose up producer
7. Bring up the consumer with 3 replicas:
docker-compose up --scale consumer=3
8. Observe the console to see that 3 consumers have come up and started consuming stream events.
9. Check Docker instances and Redis Commander to verify the data published in a sorted set with a movie name.
Conclusion
With these steps, the application is successfully Dockerized, allowing for efficient management and scalability. For the complete source code, refer to this link.
Happy Learning!