Spring for Apache Kafka

  • Développé chez LinkedIn, et maintenu au sein de la fondation Apache depuis 2012.
  • Poussé par Confluent, plateforme créée autour de Kafka.

Mais c’est quoi Apache Kafka ?

  • Système de stockage de flux de messages (streams of records)
  • Asynchrone
  • Permet de gérer des gros volumes de données, faible latence
  • Tout en assurant une sécurité des données
  • Aucun type de message, que des streams

Messages

  • Composés d’une valeur, d’une clé, et d’un timestamp
  • Sont organisés en catégories appelées topics

Topics / Consumer / Producer

  • Les topics ne sont pas modifiables à l’exception de l’ajout de messages à la fin
  • Le producer ajoute des messages à la fin des topics de son choix
  • Le consumer lit des topics toujours dans l’ordre, c’est-à-dire du plus ancien message au plus récent

Plateforme de streaming distribuée

RIP opt-kakfa

  • Nouvelle stratégie SMO : ne plus maintenir de lib "maison"
  • Abandon de la lib opt-kafka (utilisation de la lib Apache Kafka) au profit de Spring for Apache Kafka

Spring Apache for Kafka

MAVEN

   org.springframework.kafka
   spring-kafka
GRADLE
compile 'org.springframework.kafka:spring-kafka'
PROPRIÉTÉS
spring.kafka.bootstrap-servers=${KAFKA_BROKERS_HOST}

développement en local

version: "3"
services:

    zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    expose:
        - "2181"

    kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    container_name: kafka
    depends_on:
        - zookeeper
    ports:
        - "9092:9092"
    environment:
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
        KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                    

Producer

  • Produit des données sur les topics
  • Peut attendre un ACK (accusé réception) pour savoir si la donnée a bien été transmise

Configuration

@Configuration
public class KafkaConfiguration {

    @Autowired
    private KafkaProperties kafkaProperties;

    public static final String ENTREPRISE_KAFKA_TOPIC = "isee.entreprise" ;
    public static final String ETABLISSEMENT_KAFKA_TOPIC = "isee.etablissement" ;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> kafkaProps = new HashMap<>();
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
        kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "1");
        kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "22020096");
        return kafkaProps;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        return new KafkaAdmin(configs);
    }

}

Producer avec ACK

@Service
public class ProducerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);

    private KafkaTemplate<String, String> kafkaTemplate;

    public ProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public SendResult<String, String> push(String topic, String key, Object data, ObjectMapper objectMapper) {
        try {
            String json = objectMapper.writeValueAsString(data);
            ProducerRecord<String, String> record = new ProducerRecord>(topic, key, json);
            LOGGER.debug("Envoi Kafka : topic=[{}], key=[{}], message=[{}]", record.topic(), record.key(), record.value());
            return kafkaTemplate.send(record).get();

        } catch (JsonProcessingException e) {
            throw new RuntimeException(String.format("Erreur de serialization de l'objet %s destiné au topic %s", key, topic), e);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(String.format("Erreur lors de l'envoi de l'objet %s destiné au topic %s", key, topic), e);
        }
    }
}

Producer mode asynchrone

@Service
public class ProducerService {
  private KafkaTemplate<String, String> kafkaTemplate;

    public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

 // Envoi asynchrone du message
 public void pushAsync(MessageDTO messageDTO) {
    ObjectMapper objectMapper = new ObjectMapper();
    String json = null;
    try {
         json = objectMapper.writeValueAsString(messageDTO);
         String uuid = UUID.randomUUID().toString();
         ListenableFuture<SendResult<String, String>> future =
              kafkaTemplate.send(messageTopic, json);

         String finalJson = json;
         future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
           @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Sent message=[" + finalJson +
                   "] with offset=[" + result.getRecordMetadata().offset() + "]");
             }
           @Override
           public void onFailure(Throwable ex) {
              System.out.println("Unable to send message=["
                    + finalJson + "] due to : " + ex.getMessage());
           }
         });

    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
 }
}

Consumer

  • Appartient à un consumer group name
  • Chaque message publié dans un topic est consommé par un seul consumer d'un group name
@Component
public class KafkaConsumer {

   private final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

   @KafkaListener(id = "messageListener", topics = "${opt.kafka.topics.message}", groupId = "messageG1")
   public void messageListener(ConsumerRecord<String, String> record) throws JsonProcessingException {
      log.info("Reception enregistrement brut de messageListener : [{}]", record);
      ObjectMapper objectMapper = new ObjectMapper();
      MessageDTO messageDTO = objectMapper.readValue(record.value(), MessageDTO.class);
      log.info("Reception message de messageListener : [{}]", messageDTO);
   }
}

Consumer Stream

  • Permet de développer des unités de traitement de messages au fil de l’eau (streaming).
  • Permet de faire des opérations entre topic en temps réel: aggregation / jointure ...

Cas d'utilisation - Consumer-SIG

Cas d'utilisation - Consumer-SIG

Tests

MAVEN

   org.springframework.kafka
   spring-kafka-test
   test
GRADLE
testCompile 'org.springframework.kafka:spring-kafka-test'

Producer

Exemple opt-kafka
public class BPWriterTest {

private BPWriter writer;

private KafkaService kafkaService = Mockito.mock(KafkaService.class) ;

    @Before
    public void init() {
        writer = new BPWriter(kafkaService);
    }

    @Test
    public void testBPWriter() {

        Mockito.when(kafkaService.push(any(), any())).thenReturn(Mockito.mock(KafkaResult.class));

        List<BoitePostaleKafkaDto> dto = new ArrayList<>();
        dto.add(new  BoitePostaleKafkaDto());
        writer.write(dto);

        Mockito.verify(kafkaService, Mockito.times(1)).push(any(), any());
    }

}
Exemple @EmbeddedKafka
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${opt.kafka.topics.message}"})
@SpringBootTest(
        properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        classes = {KafkaService.class, KafkaAutoConfiguration.class})
public class KafkaServiceTest {

    @Value(value = "${opt.kafka.topics.message}")
    private String messageTopic;

    @Autowired
    private KafkaService kafkaService;

    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    private static final String AUTHOR = "Great Leader";
    private static final String BODY = "Dear %{Recipient}, foo, Kind Regards";
    private static final Long ID = 12345L;
    private static final String RECIPIENT = "World";
    private static final String SUBJECT = "Hello world";

    @Test
    public void push() {
        final Consumer<String, String> consumer = buildConsumer();

        MessageDTO m = createMessageDTO();
        SendResult<String, String> result = kafkaService.push(m);

        Assert.assertNotNull(result);

        String value = result.getProducerRecord().value();
        String key = result.getProducerRecord().key();

        embeddedKafka.consumeFromEmbeddedTopics(consumer, messageTopic);
        final ConsumerRecord<String, String> record = getSingleRecord(consumer, messageTopic, 10_000);

        assertThat(record, hasValue(value));
        assertThat(record, hasKey(key));

    }

    private <K, V> Consumer<K, V> buildConsumer() {
        final Map<String, Object> props = KafkaTestUtils.consumerProps("g1", "true", embeddedKafka);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory.createConsumer();
    }

    private MessageDTO createMessageDTO() {
        MessageDTO m = new MessageDTO();
        m.setAuthor(AUTHOR);
        m.setBody(BODY);
        m.setId(ID);
        m.setRecipient(RECIPIENT);
        m.setSubject(SUBJECT);
        return m;
    }

}

Consumer

Exemple opt-kafka
public class EsiriusTaskTest {

    private EsiriusTask task;

    private IPortailService portailService;

    @Before
    public void setUp() {
        portailService = mock(IPortailService.class);

        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "APP_ID");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP");
        ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        task = new EsiriusTask(new StreamsConfig(streamsConfiguration), new String[]{BorneKafkaDTO.TOPIC_ESIRIUS}, mapper, portailService);
    }

    @Test
    public void testIsForMe() {
        Assert.assertTrue(task.isForMe("id", "{}"));
    }

    @Test
    public void testProcessNonAP1AP2() {

        task.process("id", "{\"borne\": {\"siteCode\": \"site31\"}}");

        verify(portailService, times(1)).updateBorne(any());
    }

    @Test
    public void testProcessAP1() {

        task.process("id", "{\"borne\": {\"siteCode\": \"AP1\"}}");

        verify(portailService, times(0)).updateBorne(any());
    }

    @Test
    public void testProcessAP2() {

        task.process("id", "{\"borne\": {\"siteCode\": \"AP2\"}}");

        verify(portailService, times(0)).updateBorne(any());
    }
}
Exemple @EmbeddedKafka
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${opt.kafka.topics.message}"})
@SpringBootTest(
        properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        classes = {KafkaConsumer.class, KafkaAutoConfiguration.class})
public class KafkaConsumerTest {

    @Autowired
    private KafkaConsumer kafkaConsumer;

    @Value(value = "${opt.kafka.topics.message}")
    private String messageTopic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    private static final String AUTHOR = "Great Leader";
    private static final String BODY = "Dear %{Recipient}, foo, Kind Regards";
    private static final Long ID = 12345L;
    private static final String RECIPIENT = "World";
    private static final String SUBJECT = "Hello world";
    private static final String UID = UUID.randomUUID().toString();

    @Test
    public void receive() throws JsonProcessingException, InterruptedException {
        ObjectMapper objectMapper = new ObjectMapper();
        kafkaTemplate = new KafkaTemplate<>(buildProducer());
        kafkaTemplate.setDefaultTopic(messageTopic);
        MessageDTO m = createMessageDTO();
        String json = objectMapper.writeValueAsString(m);

        ProducerRecord<String, String> record = new ProducerRecord<>(messageTopic, UID, json);

        kafkaTemplate.send(record);

        kafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        assertThat(kafkaConsumer.getLatch().getCount()).isEqualTo(0);

    }



    private ProducerFactory<String, String> buildProducer() {
        final Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    private MessageDTO createMessageDTO() {
        MessageDTO m = new MessageDTO();
        m.setAuthor(AUTHOR);
        m.setBody(BODY);
        m.setId(ID);
        m.setRecipient(RECIPIENT);
        m.setSubject(SUBJECT);
        return m;
    }

}

					

Maintenance de Kafka

  • Disponibilité
  • Méthode d'upgrade
  • Maintenance consumers
Prochain Upgrade 2.4.1 > 2.5.0

Problèmatique de la création de nouveaux topics

  • Tel que configuré, Kafka laisse le producer créer automatiquement le topic s'il n'existe pas
  • Risque de duplication de la donnée

Stratégie

  • Consulter le GLIA avant toute création de topic
  • En cours : mise en place par le GLIA d'un référenciel des topics dans Confluence
  • Rédaction de la documentation d'interface avant toute MEP

Design for failure

  • De plus en plus de clients Kafka vont etre développés
  • Kakfa ne stocke que 48h de données
  • Le consumer doit savoir quel est le dernier offset traité avec succès.

Projet utilisant org.springframework.kafka

  • OPTISEE
  • SYNCRO-TIARHE-SIRH (en cours)

Clients Kafka

  • CLIK
  • SIG (BPWEB)
  • REFCOM
  • REFTEL
  • OPTISEE
  • ...

Démo

  • Producer
  • Consumer
  • KStream

Producer

{
    "id": 123465,
    "author": "MailBatch1",
    "subject": "1er message dans Kafka",
    "recipient": "DSI_GLOBAL",
    "body": "Incident !"
}

Consumer

log.info("Reception message de messageListener : [{}]", messageDTO);

KStream

# sms
{"phoneNumberEmitter":"112233", "message":"Coucou !", "phoneNumberReceiver":"907256"}
# user
{"phoneNumber":"112233", "firstName":"Hubert", "lastName":"Bonisseur de la Bath"}

KStream

# sms enrichi
{
    "firstNameEmitter":"Hubert",
    "lastNameEmitter":"Bonisseur de la Bath",
    "phoneNumberEmitter":"112233",
    "message":"Coucou !",
    "phoneNumberReceiver":"907256"
}

Questions