Jugando con Apache Kafka (continuación).

Después de haber visto una breve introducción a Apache Kafka, vamos a ver con un ejemplo cómo configurar Apache Kafka en nuestra aplicación, crearemos un docker-compose para levantar nuestro servicio Kafka (junto con Zookeeper) y enviar/leer flujos de datos que enviaremos a un topic que definiremos en nuestra aplicación.

En la entrada anterior, hablábamos de brokers, topics, particiones, eventos, etc. Pero, ¿quién gestiona toda esta arquitectura? ¿Cómo se sabe a qué broker enviar los distingos flujos o qué sucede cuando un broker se cae y es necesario utilizar uno de respaldo? Para todo esto está Apache Zookeeper, que es el servicio centralizado que se va a encargar de todas estas tareas de mantenimiento, gestión y distribución.

Veamos una configuración sencilla con un único Zookeeper y un solo Broker Kafka creando un docker-compose.yml para levantar nuestro servicio (basándonos en las imágenes de confluentic).

version: '3'

services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper-1
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: INFO
    ports:
      - "2181:2181"
    #volumes:
    #  - ./zookeeper-1/conf:/etc/kafka/
    #  - ./zookeeper-1/logs:/logs
    #  - ./zookeeper-1/data:/var/lib/zookeeper/data
    #  - ./zookeeper-1/datalog:/var/lib/zookeeper/log/
    networks:
      static-network:
        ipv4_address: 10.2.0.2

  kafka-1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

    ports:
      - "9092:9092"
      - "29092:29092"
    #volumes:
    #  - ./kafka-1/logs:/var/log/kafka
    #  - ./kafka-1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper-1
    networks:
      static-network:
        ipv4_address: 10.2.0.3

networks:
  static-network:
    driver: bridge
    ipam:
      config:
        - subnet: 10.2.0.0/16

Hemos creado una subred, donde hemos añadido nuestros dos servicios. Por un lado tenemos el servicio de zookeeper, donde hemos añadido cierta configuración básica indicando el server_id, el puerto de escucha (2181), el tickTime y el log level (config ref: https://docs.confluent.io/platform/current/installation/docker/config-reference.html#required-confluent-ak-settings). Por otro lado tenemos el único broker de apache kafka que hemos configurado, indicando el broker id, la dirección/puerto donde escucha el servicio de zookeeper, las advertised_listener (permite que kafka escuche fuera del contenedor docker), y el KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR que cuando solo tenemos un único broker, debemos ponerlo por defecto a 1 (config ref. https://docs.confluent.io/platform/current/installation/docker/config-reference.html#confluent-ak-configuration). (Nota: En caso de querer tener persistencia de datos con docker, podemos descomentar la parte de volumes)

Ahora, haciendo un docker-compose up podemos levantar nuestro servicios y empezar a enviar flujos de datos.

Vamos ahora a ver una sencilla aplicación para ver cómo comunicarnos con nuestro broker. Se trata de un sencillo ejercicio donde debemos, utilizando la API de twitter, leer los twees que se vayan publicando en base a una palabra clave. Para hacerlo más interesante, le añadiremos las restricciones de que debemos leer tweets durante un periodo máximo de tiempo o hasta alcanzar un máximo de tweets leidos.

Como vemos, esta aplicación podría ser un claro ejemplo de Big Data. En primer lugar, se publican millones de tweets, con lo que si no queremos perder ninguno, debemos leer los tweets lo más rápido posible, sin preocuparnos por qué hacer con ellos. Por tanto, necesitamos desacoplar el procesamiento de estos tweets con el proceso de leerlos. Además, nuestra aplicación podría crecer, tener varios procesos que recuperen tweets con distintas palabras clave, o tener distintas fuentes de datos, no solo Twitter. Y para estas tareas, kafka nos viene que ni pintado ya que, no solo desacoplamos estos procesos con un productor/consumidor, sino que externalizamos el servicio, pudiendo tener distintas aplicaciones en distintos servidores y que cada una se encague independientemente de su trabajo.

Veamos cómo sería el productor:

@Component
@Slf4j
public class TwitterProducer implements Runnable{

Lo más importante aquí es el KafkaTemplate, el cual nos va a permitir enviar a un topic específico el tweet que hemos leido. Vemos que es tan sencillo como inyectar con SpringBoot una instancia de KafkaTemplate con un par clave/valor (que serán el topic y el stream a enviar) y enviar (send) al topic que hayamos especificado nuestro tweet.

El resto del productor utiliza un bufferReader (que lo hemos declarado como AtomicReference para que el productor sea Thread-safe, ya que podríamos tener más de un hilo leyendo tweets) el cual se va a encargar de ir leyendo los tweets publicados, un ObjectMapper para mapear el json publicado a nuestra clase Tweet (esta parte se podría hacer directamente en el consumidor y en lugar de enviar a kafka nuestra instancia tweet, enviar el json entero) y un countDownLatch. Este último objeto es el que se va a encargar de ejecutar el hilo del productor hasta que se cumplan las condiciones que habíamos definido. Para esto, podemos declarar el CountDownLatch especificando el número máximo de iteraciones que queremos ejecutar (en cada coundDown se irá decrementando el valor del coundDownLatch)

CountDownLatch producerCountDownLatch = new CountDownLatch(maxTweets);

Y después lanzar nuestro productor y esperar hasta que se el countDownLatch llege a 0 o bien pase un tiempo máximo:

producerCountDownLatch.await(maxTimeInMilliseconds, TimeUnit.MILLISECONDS);

Así, cuando hayamos cumplido estas dos condiciones, podremos terminar nuestro productor.

Veamos nuestro consumidor:

@Component
@Slf4j
public class TwitterConsumer{

    @Value(value = "${app.max_tweets}")
    private int maxTweets;

    @Value(value = "${app.max_time_in_milliseconds}")
    private int maxTimeInMilliseconds;

    @Autowired
    private TwitterQueue twitterQueue;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    private CountDownLatch countDownLatch = new CountDownLatch(maxTweets);


    public void init(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
        start();
    }

    @KafkaListener(id="listener_id", autoStartup = "false", topics = "${app.kafka.topic.name}")
    public void listener(Tweet tweet, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) throws InterruptedException {
        log.info("Tweet = {}", tweet);
        synchronized (this) {
            if(twitterQueue.getTweetList().get().size()<maxTweets) {
                twitterQueue.getTweetList().get().add(tweet);
            }
        }
        this.countDownLatch.countDown();
    }

    public void start() {
        kafkaListenerEndpointRegistry.getListenerContainer("listener_id").start();
    }

    public void stop() {
        kafkaListenerEndpointRegistry.getListenerContainer("listener_id").stop();
    }
}

Aquí la parte más importante es la declaración del listener. Podría valer con declarar nuestro @KafkaListener e indicando tan solo los topics de los cuales leer los streams cada vez que kafka emita un evento, e indicando qué tipo de dato vamos a leer. En nuestro caso hemos añadido también el id y el autoStartup, ya que no queríamos que se iniciara con la aplicación, sino cuando hubieramos arrancado el productor y hayamos definido nuestro CountDownLatch para cumplir con los requisitos establecidos, pero como hemos dicho, esto es algo complementario.

Nuestro listener, además de imprimir por el log el tweet leído, guarda en una lista (declarada para que sea Thread-safe puesto que podemos tener varios hilos) la cual al finalizar el proceso servirá para ordenar todos los tweets (como otro requisito añadido) y decrementará el countDownLatch.

A la hora de ver el uso de kafka, lo más importante es fijarnos sobre todo en la forma de enviar utilizando el KafkaTemplate en el productor y estar a la escucha con el KafkaListener en el consumidor.

Un servicio se podría encargar de ejecutar cada uno de estos procesos, aunque no vamos a entrar en ese detalle. Lo que sí vamos a ver es otra parte importante para conectarnos con kafka.

Hasta ahora, hemos visto que nuestro productor/consummidor publican y leen de un topic, pero ese topic debe haber sido creado previamente en kafka. Podríamos crearlo haciendo llamadas al servicio de kafka, pero vamos a ver cómo hacerlo desde nuestra aplicación.

Desde la clase main de nuestra aplicación SpringBoot, vamos a crear el topic que vayamos a utilizar para publicar/leer tweets.

   @Value(value = "${app.kafka.topic.name}")
    private String topic;

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name(topic)
                .partitions(1)
                .replicas(1)
                .build();
    }

En este caso, he hemos dicho que cree un nuevo topic (si no existe) y que se cree en una única partición y con una única réplica. Si quisieramos que kafka dividiera el topic en varias particiones repartidas entre los diferentes brokers y que hubiera réplicas de cada partición entre los distintos brokers, podríamos indicarlo aquí al crear nuestro topic. Pero en nuestro ejemplo, como tan solo hemos definido un único broker, no tiene sentido crear más particiones/réplicas.

Por último, vamos a ver la configuración en nuestro application.yml para poder realmente comunicarnos con el servicio que hemos levantado de kafka.

spring:
  kafka:
    bootstrap-servers: localhost:29092
    consumer:
      group-id: myGroup
      value-deserializer: com.secdevoops.serializer.TweetDataDeserializer
    producer:
      value-serializer: com.secdevoops.serializer.TweetDataSerializer

Lo más importante aquí es inciar dónde se encuentran levantados los servidores kafka con la propiedad bootstrap-servers e indicar el group-id al que pertenecerá el consumidor. Además, puesto que hemos utilizado una clase propia para enviar datos a kafka, debemos indicar el serializar/deserializer del productor/consumidor.

El código completo de la aplicación se puede encontrar en https://github.com/secdevoops/java_exercise_kafka

Categorías