• Home

  • Custom Ecommerce
  • Application Development
  • Database Consulting
  • Cloud Hosting
  • Systems Integration
  • Legacy Business Systems
  • Security & Compliance
  • GIS

  • Expertise

  • About Us
  • Our Team
  • Clients
  • Blog
  • Careers

  • VisionPort

  • Contact
  • Our Blog

    Ongoing observations by End Point Dev people

    Creating a Messaging App Using Spring for Apache Kafka, Part 2

    Kürşat Kutlu Aydemir

    By Kürşat Kutlu Aydemir
    April 29, 2020

    Spring pasture

    This article is part of a series. The GitHub repository with code examples can be found here.

    In this part I’ll walk through Kafka’s servers and processes, the basics of spring-kafka producers and consumers, persistence, and caching configurations.

    Kafka Servers

    Kafka uses Apache ZooKeeper as the distributed coordination server. You can download the Apache Kafka with ZooKeeper bundle here.

    When you download and untar the Kafka bundle Kafka’s console scripts can be found in the bin directory. To enable Kafka connectivity and prepare the Kafka configuration let’s start the Kafka servers and see how to create Kafka topics and test console producers and consumers.

    ZooKeeper

    To start ZooKeeper with the default properties run the following command:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    

    Kafka Server

    A single Kafka server with the default properties can be started with following command:

    bin/kafka-server-start.sh config/server.properties
    

    Kafka Topics

    Creating Kafka Topics

    Let’s create a test Kafka topic:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic myTestTopic
    

    List Topics

    To list all previously created Kafka topics:

    bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    

    Start a Producer

    To start a console producer run the following command and send some messages from console:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTestTopic
    > This is a message
    > This is another message
    

    Start a Consumer

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTestTopic --from-beginning
    

    When you run the consumer on the console with the from-beginning parameter you’ll see all the messages sent previously shown in the console.

    Here we ran Kafka as a single server. You’ll need to optimize and scale the Kafka clusters for production and large-scale distributed systems. So far, we’ve become familiar with some Kafka components but for further Kafka configuration you can refer to the corresponding tutorials.

    spring-kafka Configuration

    Consumer Configuration

    In the Spring Boot project let’s put the lines below in the application.properties to configure the Spring Kafka consumer:

    #Consumer
    spring.kafka.consumer.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=foo
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    

    A simple Kafka consumer is defined as a Spring @KafkaListener annotated method like this:

    @Configuration
    public class MyKafkaConsumer {
    
        @KafkaListener(topics = "myTestTopic")
        public void listenTopic(ConsumerRecord<String, String> kafkaMessage) {
            System.out.print(String.format("Received a message: %s", kafkaMessage.value()));
        }
    
    }
    

    We are going to define different Kafka consumer methods listening to different topics for different purposes in our messaging app.

    Producer Configuration

    For producer configuration let’s add the following lines in the application.properties in our Spring Kafka project.

    #Producer
    spring.kafka.producer.bootstrap-servers=localhost:9092
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    

    A very simple Kafka producer could be configured like below. Spring KafkaTemplate provides a producer model and methods for sending messages to specified Kafka topics.

    @Configuration
    public class MyKafkaProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendMessage(String topic, String message) {
            System.out.println(String.format("Message is being sent to topic %s", topic));
            kafkaTemplate.send(topic, message);
        }
    
    }
    

    So far we have configured Kafka in a Spring Boot project and seen simple consumer and producer examples. Before going further with Kafka configuration, let’s configure the persistence and cache repositories.

    Persistence Configuration

    As I mentioned in the first part of this blog series, I’m going to use PostgreSQL as a persistence environment and Spring data configuration will be like below in the application.properties:

    spring.datasource.url=jdbc:postgresql://localhost:5432/epmessagingdb
    spring.datasource.username=epmessaging
    spring.datasource.password=epmessagingdb_password
    spring.datasource.driver-class-name=org.postgresql.Driver
    spring.datasource.configuration.maximum-pool-size=30
    spring.jpa.database-platform=PostgreSQL
    # The SQL dialect makes Hibernate generate better SQL for the chosen database
    spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.PostgreSQLDialect
    # Hibernate ddl auto (create, create-drop, validate, update)
    spring.jpa.hibernate.ddl-auto=none
    

    In the properties we set the spring.jpa.hibernate.ddl-auto Spring JPA property to none to avoid Hibernate populating the schema automatically. In some cases it can be useful to allow auto-population. We leave the base configuration here for now as it is, in the next part we’ll create our Spring Data models in the project.

    Caching Configuration

    I also mentioned that we’re going to use Redis as the cache environment. Redis is developed using C and a very fast in-memory cache.

    Let’s put the following lines in application.properties to enable Redis configuration in our Spring Kafka project.

    cache.redis.host=localhost
    cache.redis.port=6379
    cache.redis.timeout=5000
    cache.redis.password=
    

    Redis Pooling Factory

    We’re going to use Jedis as the Redis client in our project. So let’s create a Jedis pooling factory class in our project called JedisFactory like below:

    package com.endpoint.SpringKafkaMessaging.cache;
    
    import org.springframework.beans.factory.annotation.Value;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class JedisFactory {
    
        @Value("${cache.redis.host}")
        private static String host;
    
        @Value("${cache.redis.port}")
        private static Integer port;
    
        @Value("${cache.redis.timeout}")
        private static Integer timeout;
    
        @Value("${cache.redis.password}")
        private static String password;
    
        // hide the constructor
        private JedisFactory() {
    
        }
    
        private static JedisPool jedisPool;
    
        static {
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxTotal(128);
    
            jedisPool = new JedisPool(
                poolConfig,
                host,
                port,
                timeout,
                password
            );
        }
    
        public static Jedis getConnection() {
            return jedisPool.getResource();
        }
    }
    

    We’ll create a persistence model, repository, controllers, and a cache repository in the next part of this blog series.

    java spring frameworks kafka spring-kafka-series


    Comments