Optimizing Event-Driven Redis Architectures: A Comprehensive Guide to Leveraging Redis Streams with Spring Boot 2208

You are currently viewing Optimizing Event-Driven Redis Architectures: A Comprehensive Guide to Leveraging Redis Streams with Spring Boot 2208
Event-Driven Architecture With Redis Streams Using Spring Boot

Introduction

In this narrative, I will delve into the intricacies of processing real-time data streaming through Redis streams within a Spring Boot environment. The focus will be on elucidating the producer-consumer model.

In a previous attempt, I experimented with the Redis pub/sub model. However, a notable limitation arose: when a producer publishes a topic, all connected consumers processed the data indiscriminately. This stands in contrast to Kafka’s consumer group paradigm, where messages are intelligently distributed among group members. Pub/sub, on the other hand, disseminates messages to all connected and subscribed clients without discrimination.

Stream Processing

Traditionally, real-time data processing involved batch processing, where activities such as payments were logged in a database and later processed in nightly or batch updates. However, the contemporary demand for immediate insights, particularly in applications like recommendation engines (e.g., Netflix, Prime) or real-time financial transactions (e.g., credit card transactions, UPI transactions), necessitates continuous and instantaneous data processing.

Enter stream processing—a paradigm for real-time, continuous data processing. This narrative will explore how we can achieve straightforward real-time stream processing using Redis Stream in conjunction with Spring Boot.

Consider a scenario involving a movie database. Users, whether registered or anonymous, interact with movies by expressing sentiments through actions like liking, disliking, and rating. Our objective is to capture and tally the number of views, likes, dislikes, and ratings in real-time. I will develop a producer/publisher service that generates randomized likes, dislikes, and ratings. These generated events will be published to Redis streams. Subsequently, consumers subscribed to these streams will update relevant information either in a database or another subsystem. For simplicity, in this case, I have opted to use Redis sorted sets for data storage.

Technology Stack

Java Version: Java 8

Spring Boot Version: 2.3.7.RELEASE

Build Tool: Maven

Containerization: Docker (Both Redis and applications are containerized)

Now, let’s delve into the codebase:

// Producer Service
@RestController
@RequestMapping("/movies")
public class MovieProducerController {

    @Autowired
    private RedisTemplate redisTemplate;

    @PostMapping("/events")
    public ResponseEntity generateMovieEvent() {
        // Logic to generate random likes, dislikes, and ratings
        // ...

        // Publish events to Redis Stream
        String movieEvent = // Construct your movie event as a string;
        redisTemplate.opsForStream().add("movie-stream", Collections.singletonMap("event", movieEvent));

        return ResponseEntity.ok("Movie event published successfully");
    }
}

// Consumer Service
@Service
public class MovieConsumerService {

    @StreamListener(target = "movie-stream")
    public void processMovieEvent(@Payload String movieEvent) {
        // Logic to process the movie event
        // ...

        // Update relevant information in the database or subsystem
        // ...

        // For simplicity, using Redis sorted sets
        // redisTemplate.opsForZSet().add("likes", movieEvent, likeCount);
        // redisTemplate.opsForZSet().add("dislikes", movieEvent, dislikeCount);
        // redisTemplate.opsForZSet().add("ratings", movieEvent, rating);
    }
}

Docker Compose File:

version: '3.8'

services:
  redis:
    image: "redis:latest"
    ports:
      - "6379:6379"

  movie-producer:
    image: "your-movie-producer-image:latest"
    ports:
      - "8080:8080"
    depends_on:
      - redis

  movie-consumer:
    image: "your-movie-consumer-image:latest"
    depends_on:
      - redis

This setup employs Java 8, Spring Boot 2.3.7.RELEASE, and Maven for building. Both the producer and consumer services are containerized using Docker, with Redis serving as the message broker in a Docker container as well. The code showcases a basic movie event generation and processing scenario using Redis streams.

Publisher Configuration

Let’s start by defining the DTO classes used for publishing and consuming movie events:

// MovieEventDTO for publishing to Redis stream
public class MovieEventDTO {
    private String movieName;
    private ActionType actionType; // Enum: LIKE, DISLIKE, RATING
    private int userId; // ID of the user performing the action

    // Constructors, getters, and setters
}

// MovieStatsDTO for storing aggregated movie statistics
public class MovieStatsDTO {
    private String movieName;
    private long likesCount;
    private long dislikesCount;
    private double averageRating;

    // Constructors, getters, and setters
}

// ActionType Enum
public enum ActionType {
    LIKE, DISLIKE, RATING
}

Now, let’s create the MoviePublishEvent class responsible for generating random movie events and publishing them to the Redis stream every 2 seconds:

@Service
public class MoviePublishEvent {

    private final RedisTemplate redisTemplate;
    private final Random randomGenerator = new Random();

    @Autowired
    public MoviePublishEvent(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Scheduled(fixedRate = 2000) // Run every 2 seconds
    public void publishRandomMovieEvent() {
        // Generate random movie event
        MovieEventDTO movieEvent = generateRandomMovieEvent();

        // Publish to Redis stream
        redisTemplate.opsForStream().add("movie-stream", Collections.singletonMap("event", movieEvent));

        System.out.println("Published movie event: " + movieEvent);
    }

    private MovieEventDTO generateRandomMovieEvent() {
        MovieEventDTO movieEvent = new MovieEventDTO();
        movieEvent.setMovieName("RandomMovie" + random.nextInt(100));
        movieEvent.setActionType(ActionType.values()[random.nextInt(ActionType.values().length)]);
        movieEvent.setUserId(random.nextInt(1000));

        return movieEvent;
    }
}

This class utilizes a Spring @Scheduled annotation to run the publishRandomMovieEvent method every 2 seconds, generating a random movie event and publishing it to the “movie-stream” Redis stream using the RedisTemplate. The generateRandomMovieEvent method creates a random movie event with a movie name, action type (like, dislike, or rating), and a user ID.

Redis Consumer

RedisConfig class:

package com.spring.redis.streams.config;

import com.spring.redis.streams.dto.MovieDetails;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;

@Configuration
public class RedisConfig {

    @Value("${stream.key}")
    private String streamKey;

    @Autowired
    private StreamListener streamListener;

    @Bean
        public Subscription createSubscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        .pollTimeout(Duration.ofSeconds(1))
                        .targetType(MovieDetails.class)
                        .build();

        StreamMessageListenerContainer listenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);

        Subscription subscription = listenerContainer.receive(
                Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
                streamListener);

        listenerContainer.start();
        return subscription;
    }
}

Movie Event Consumer:

package com.spring.redis.streams.config;

import com.spring.redis.streams.dto.MovieDetails;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;

@Service
@Slf4j
public class MovieEventConsumer implements StreamListener {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Autowired
    private ReactiveRedisTemplate redisTemplate;

    @Override
    @SneakyThrows
    public void onMessage(ObjectRecord record) {
        log.info(InetAddress.getLocalHost().getHostName() + " - consumed: " + record.getValue());

        if (record.getValue().getLikes()) {
            this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Likes", 1).subscribe();
        }

        if (record.getValue().getDisLike()) {
            this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Dislikes", 1).subscribe();
        }

        this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Views", 1).subscribe();
        this.redisTemplate.opsForZSet().incrementScore(record.getValue().getMovie().getName(), "Rating", record.getValue().getRating()).subscribe();

        atomicInteger.incrementAndGet();
    }

    @Scheduled(fixedRate = 10000)
    public void showPublishedEventsSoFar() {
        log.info("Total Consumer :: " + atomicInteger.get());
    }
}

Dockerizing the Application

To simplify the setup of Redis and the Spring Boot applications, Docker and Docker Compose have been employed. The consumer application has been scaled to three replicas to distribute the load effectively.

Docker-compose.yaml

version: '3.8'

services:
  redis:
    image: "redis:latest"

  redis-commander:
    image: "rediscommander/redis-commander:latest"
    ports:
      - "8081:8081"
    environment:
      - REDIS_HOSTS=local:redis:6379

  redis-publisher:
    build:
      context: ./path-to-redis-publisher
    environment:
      - PUBLISH_RATE=2000

  redis-consumer:
    build:
      context: ./path-to-redis-consumer
    command: ["--scale", "3"]

Dockerfile for Redis-publisher

FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar

Dockerfile for Redis-consumer

FROM openjdk:8-jdk-alpine
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar

Building and Starting the Application

  1. Navigate to both the consumer and publisher directories and run the following Maven command to create a JAR file:
mvn clean package -DskipTests

2. Bring up Redis and Redis Commander:

docker-compose up redis redis-commander

3.Access Redis Commander GUI at http://127.0.0.1:8081.

4. Create a Redis stream named “movie-events” using the following command:

XADD movie-events * any-key any-value

5. Create a consumer group to share the load between consumers:

XGROUP CREATE movie-events movie-events $

6. Bring up the Redis-producer:

docker-compose up producer

7. Bring up the consumer with 3 replicas:

docker-compose up --scale consumer=3

8. Observe the console to see that 3 consumers have come up and started consuming stream events.

9. Check Docker instances and Redis Commander to verify the data published in a sorted set with a movie name.

Conclusion

With these steps, the application is successfully Dockerized, allowing for efficient management and scalability. For the complete source code, refer to this link.

Happy Learning!

Leave a Reply