Spring for Apache Kafka
                
                
                    
                        - Développé chez LinkedIn, et maintenu au sein de la fondation Apache depuis 2012.
 
                        - Poussé par Confluent, plateforme créée autour de Kafka.
 
                    
                
                
                    Mais c’est quoi Apache Kafka ?
                    
                        - Système de stockage de flux de  messages (streams of records)
 
                        - Asynchrone
 
                        - Permet de gérer des gros volumes de données, faible latence
 
                        - Tout en assurant une sécurité des données
 
                        - Aucun type de message, que des streams
 
                    
                
                
                    Messages
                    
                        - Composés d’une valeur, d’une clé, et d’un timestamp
 
                        - Sont organisés en catégories appelées  topics
 
                    
                
                
                    
                        Topics / Consumer / Producer
                        
                    
                    
                        
                            - Les topics ne sont pas modifiables à l’exception de l’ajout de messages à la fin
 
                            - Le  producer ajoute des messages à la fin des topics de son choix
 
                            - Le  consumer lit des topics toujours dans l’ordre, c’est-à-dire du plus ancien message au plus récent
 
                        
                    
                
                
                    Plateforme de streaming distribuée
                    
                
                
                    RIP opt-kakfa
                    
                        - Nouvelle stratégie SMO : ne plus maintenir de lib "maison"
 
                        - Abandon de la lib opt-kafka (utilisation de la lib Apache Kafka) au profit de Spring for Apache Kafka
 
                    
                
                
                    Spring Apache for Kafka
                    MAVEN
   org.springframework.kafka
   spring-kafka
                    GRADLE
                    compile 'org.springframework.kafka:spring-kafka'
                    PROPRIÉTÉS
                    spring.kafka.bootstrap-servers=${KAFKA_BROKERS_HOST}
                
                
                    développement en local
                    version: "3"
services:
    zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    container_name: zookeeper
    expose:
        - "2181"
    kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    container_name: kafka
    depends_on:
        - zookeeper
    ports:
        - "9092:9092"
    environment:
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
        KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
                    
                
                
                    Producer
                    
                        - Produit des données sur les topics
 
                        - Peut attendre un ACK (accusé réception) pour savoir si la donnée a bien été transmise
 
                    
                
                
                    Configuration
                    @Configuration
public class KafkaConfiguration {
    @Autowired
    private KafkaProperties kafkaProperties;
    public static final String ENTREPRISE_KAFKA_TOPIC = "isee.entreprise" ;
    public static final String ETABLISSEMENT_KAFKA_TOPIC = "isee.etablissement" ;
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> kafkaProps = new HashMap<>();
        kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
        kafkaProps.put(ProducerConfig.RETRIES_CONFIG, "1");
        kafkaProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        kafkaProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "22020096");
        return kafkaProps;
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        return new KafkaAdmin(configs);
    }
}
                
                
                    Producer avec ACK
                    @Service
public class ProducerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProducerService.class);
    private KafkaTemplate<String, String> kafkaTemplate;
    public ProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    public SendResult<String, String> push(String topic, String key, Object data, ObjectMapper objectMapper) {
        try {
            String json = objectMapper.writeValueAsString(data);
            ProducerRecord<String, String> record = new ProducerRecord>(topic, key, json);
            LOGGER.debug("Envoi Kafka : topic=[{}], key=[{}], message=[{}]", record.topic(), record.key(), record.value());
            return kafkaTemplate.send(record).get();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(String.format("Erreur de serialization de l'objet %s destiné au topic %s", key, topic), e);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(String.format("Erreur lors de l'envoi de l'objet %s destiné au topic %s", key, topic), e);
        }
    }
}
                
                
                    Producer mode asynchrone
                    @Service
public class ProducerService {
  private KafkaTemplate<String, String> kafkaTemplate;
    public KafkaService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
 // Envoi asynchrone du message
 public void pushAsync(MessageDTO messageDTO) {
    ObjectMapper objectMapper = new ObjectMapper();
    String json = null;
    try {
         json = objectMapper.writeValueAsString(messageDTO);
         String uuid = UUID.randomUUID().toString();
         ListenableFuture<SendResult<String, String>> future =
              kafkaTemplate.send(messageTopic, json);
         String finalJson = json;
         future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
           @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("Sent message=[" + finalJson +
                   "] with offset=[" + result.getRecordMetadata().offset() + "]");
             }
           @Override
           public void onFailure(Throwable ex) {
              System.out.println("Unable to send message=["
                    + finalJson + "] due to : " + ex.getMessage());
           }
         });
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
 }
}
                
                
                    Consumer
                    
                        - Appartient à un consumer group name
 
                        - Chaque message publié dans un topic est consommé par un seul consumer d'un group name
 
                    
                
                
                @Component
public class KafkaConsumer {
   private final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
   @KafkaListener(id = "messageListener", topics = "${opt.kafka.topics.message}", groupId = "messageG1")
   public void messageListener(ConsumerRecord<String, String> record) throws JsonProcessingException {
      log.info("Reception enregistrement brut de messageListener : [{}]", record);
      ObjectMapper objectMapper = new ObjectMapper();
      MessageDTO messageDTO = objectMapper.readValue(record.value(), MessageDTO.class);
      log.info("Reception message de messageListener : [{}]", messageDTO);
   }
}
                
                
                    Consumer Stream
                    
                        - Permet de développer des unités de traitement de messages au fil de l’eau (streaming).
 
                        - Permet de faire des opérations entre topic en temps réel: aggregation / jointure ...
 
                    
                
                
                    Cas d'utilisation - Consumer-SIG
                    
                
                
                    Cas d'utilisation - Consumer-SIG
                    
                
				
                
                    Tests
                    MAVEN
                    
   org.springframework.kafka
   spring-kafka-test
   test
                    GRADLE
                    testCompile 'org.springframework.kafka:spring-kafka-test'
                
                
					
                    
					
						Exemple opt-kafka
                        public class BPWriterTest {
private BPWriter writer;
private KafkaService kafkaService = Mockito.mock(KafkaService.class) ;
    @Before
    public void init() {
        writer = new BPWriter(kafkaService);
    }
    @Test
    public void testBPWriter() {
        Mockito.when(kafkaService.push(any(), any())).thenReturn(Mockito.mock(KafkaResult.class));
        List<BoitePostaleKafkaDto> dto = new ArrayList<>();
        dto.add(new  BoitePostaleKafkaDto());
        writer.write(dto);
        Mockito.verify(kafkaService, Mockito.times(1)).push(any(), any());
    }
}
                    
					
						Exemple @EmbeddedKafka
                    	@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${opt.kafka.topics.message}"})
@SpringBootTest(
        properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        classes = {KafkaService.class, KafkaAutoConfiguration.class})
public class KafkaServiceTest {
    @Value(value = "${opt.kafka.topics.message}")
    private String messageTopic;
    @Autowired
    private KafkaService kafkaService;
    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    private static final String AUTHOR = "Great Leader";
    private static final String BODY = "Dear %{Recipient}, foo, Kind Regards";
    private static final Long ID = 12345L;
    private static final String RECIPIENT = "World";
    private static final String SUBJECT = "Hello world";
    @Test
    public void push() {
        final Consumer<String, String> consumer = buildConsumer();
        MessageDTO m = createMessageDTO();
        SendResult<String, String> result = kafkaService.push(m);
        Assert.assertNotNull(result);
        String value = result.getProducerRecord().value();
        String key = result.getProducerRecord().key();
        embeddedKafka.consumeFromEmbeddedTopics(consumer, messageTopic);
        final ConsumerRecord<String, String> record = getSingleRecord(consumer, messageTopic, 10_000);
        assertThat(record, hasValue(value));
        assertThat(record, hasKey(key));
    }
    private <K, V> Consumer<K, V> buildConsumer() {
        final Map<String, Object> props = KafkaTestUtils.consumerProps("g1", "true", embeddedKafka);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        final DefaultKafkaConsumerFactory<K, V> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory.createConsumer();
    }
    private MessageDTO createMessageDTO() {
        MessageDTO m = new MessageDTO();
        m.setAuthor(AUTHOR);
        m.setBody(BODY);
        m.setId(ID);
        m.setRecipient(RECIPIENT);
        m.setSubject(SUBJECT);
        return m;
    }
}
                
                
					
					
						Exemple opt-kafka
						public class EsiriusTaskTest {
    private EsiriusTask task;
    private IPortailService portailService;
    @Before
    public void setUp() {
        portailService = mock(IPortailService.class);
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "APP_ID");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "BOOTSTRAP");
        ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        task = new EsiriusTask(new StreamsConfig(streamsConfiguration), new String[]{BorneKafkaDTO.TOPIC_ESIRIUS}, mapper, portailService);
    }
    @Test
    public void testIsForMe() {
        Assert.assertTrue(task.isForMe("id", "{}"));
    }
    @Test
    public void testProcessNonAP1AP2() {
        task.process("id", "{\"borne\": {\"siteCode\": \"site31\"}}");
        verify(portailService, times(1)).updateBorne(any());
    }
    @Test
    public void testProcessAP1() {
        task.process("id", "{\"borne\": {\"siteCode\": \"AP1\"}}");
        verify(portailService, times(0)).updateBorne(any());
    }
    @Test
    public void testProcessAP2() {
        task.process("id", "{\"borne\": {\"siteCode\": \"AP2\"}}");
        verify(portailService, times(0)).updateBorne(any());
    }
}
					
					
                        Exemple @EmbeddedKafka
                    @RunWith(SpringRunner.class)
@EmbeddedKafka(topics = {"${opt.kafka.topics.message}"})
@SpringBootTest(
        properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        classes = {KafkaConsumer.class, KafkaAutoConfiguration.class})
public class KafkaConsumerTest {
    @Autowired
    private KafkaConsumer kafkaConsumer;
    @Value(value = "${opt.kafka.topics.message}")
    private String messageTopic;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;
    private static final String AUTHOR = "Great Leader";
    private static final String BODY = "Dear %{Recipient}, foo, Kind Regards";
    private static final Long ID = 12345L;
    private static final String RECIPIENT = "World";
    private static final String SUBJECT = "Hello world";
    private static final String UID = UUID.randomUUID().toString();
    @Test
    public void receive() throws JsonProcessingException, InterruptedException {
        ObjectMapper objectMapper = new ObjectMapper();
        kafkaTemplate = new KafkaTemplate<>(buildProducer());
        kafkaTemplate.setDefaultTopic(messageTopic);
        MessageDTO m = createMessageDTO();
        String json = objectMapper.writeValueAsString(m);
        ProducerRecord<String, String> record = new ProducerRecord<>(messageTopic, UID, json);
        kafkaTemplate.send(record);
        kafkaConsumer.getLatch().await(10000, TimeUnit.MILLISECONDS);
        assertThat(kafkaConsumer.getLatch().getCount()).isEqualTo(0);
    }
    private ProducerFactory<String, String> buildProducer() {
        final Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    private MessageDTO createMessageDTO() {
        MessageDTO m = new MessageDTO();
        m.setAuthor(AUTHOR);
        m.setBody(BODY);
        m.setId(ID);
        m.setRecipient(RECIPIENT);
        m.setSubject(SUBJECT);
        return m;
    }
}
					
					
                    
                
                
                    Maintenance de Kafka
                    
                        - Disponibilité
 
                        - Méthode d'upgrade
 
                        - Maintenance consumers
 
                    
                    Prochain Upgrade 2.4.1 > 2.5.0
                
                
                    Problèmatique de la création de nouveaux topics
                    
                        - Tel que configuré, Kafka laisse le producer créer automatiquement le topic s'il n'existe pas
 
                        - Risque de duplication de la donnée
 
                    
                
                
                    Stratégie
                    
                        - Consulter le GLIA avant toute création de topic
 
                        - En cours : mise en place par le GLIA d'un référenciel des topics dans Confluence
 
                        - Rédaction de la documentation d'interface avant toute MEP
 
                    
                
                
                    Design for failure
                    
                        - De plus en plus de clients Kafka vont etre développés
 
                        - Kakfa ne stocke que 48h de données
 
                        - Le consumer doit savoir quel est le dernier offset traité avec succès.
 
                    
                
                
                    Projet utilisant org.springframework.kafka
                    
                        - OPTISEE
 
                        - SYNCRO-TIARHE-SIRH (en cours)
 
                    
                
                
                    Clients Kafka
                        
                            - CLIK
 
                            - SIG (BPWEB)
 
                            - REFCOM
 
                            - REFTEL
 
                            - OPTISEE
 
                            - ...
 
                        
                
                
                
                Démo
                    
                        - Producer
 
                        - Consumer
 
                        - KStream
 
                    
                
                
                    
                        Producer
                        
                        {
    "id": 123465,
    "author": "MailBatch1",
    "subject": "1er message dans Kafka",
    "recipient": "DSI_GLOBAL",
    "body": "Incident !"
}
                    
                    
                
                
                    
                        Consumer
                        
                        log.info("Reception message de messageListener : [{}]", messageDTO);
                    
                    
                
                
                    
                        KStream
                        
                        # sms
{"phoneNumberEmitter":"112233", "message":"Coucou !", "phoneNumberReceiver":"907256"}
                        # user
{"phoneNumber":"112233", "firstName":"Hubert", "lastName":"Bonisseur de la Bath"}
                    
                    
                        KStream
                        
                        # sms enrichi
{
    "firstNameEmitter":"Hubert",
    "lastNameEmitter":"Bonisseur de la Bath",
    "phoneNumberEmitter":"112233",
    "message":"Coucou !",
    "phoneNumberReceiver":"907256"
}