Al giorno d’oggi, l’architettura Event Driven viene utilizzata nello sviluppo di applicazioni software in diverse aree, come microservizi con pattern come CQRS, Saga Pattern, ecc e utilizzando sistemi di messaggistica di publish e subscribe come Apache Kafka, per la comunicazione asincrona tra microservizi. Tuttavia, potrebbe essere necessario stabilire una comunicazione sincrona di tipo request/response se i vostri requisiti lo richiedono. Visto che non e’ proprio immediato implementare una comunicazione request-response su Kafka ho deciso di scrivere questo articolo.
PS: mi raccomando leggete fino in fondo l articolo perché’ Kafka e’ un broker atipico.. occorre capire come funziona altrimenti son dolori 😛
- Kafka e Zookeper running in locale:
- il setup e’ molto semplice vedi gli step descritti in questo LINK (nel mio esempio faccio riferimento a due istanze di Kafka running)
- puoi altrimenti tirarti su Kafka utilizzando Docker (https://github.com/lensesio/fast-data-dev)
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 landoop/fast-data-dev:latest
- Conoscenza delle Basi di Kafka e della sua architettura
Per farvi capire come implementare una comunicazione sincrona request/response su Kafka ho deciso di proporre il seguente esercizio. Supponiamo di avere due microservizi
- micro servizio client kafka-request: espone un API REST /fibonacci la quale si aspetta nel payload di richiesta un oggetto di tipo FibonacciRequest e risponde con un payload di tipo FibonacciResponse con all’interno il risultato della nota funzione di Fibonacci. La logica di calcolo di Fibonacci e’ implementato da un secondo microservizio. I due microservizi comunicheranno tramite Kafka; pertanto una volta ricevuta la richiesta di calcolo via web service il micro servizio client scriverà’ la richiesta di calcolo sul topic kafka fib-request e attendera’ la risposta sul topic fib-response
- micro servizio fibonacci calculator: legge da un topic Kafka fib-request la richiesta di calcolo, calcola il valore di fibonacci per tale richiesta e risponde su un topic di risposta.
L architettura della soluzione e’ illustrata nella seguente immagine
Per implementare quanto descritto occorre fare due considerazioni:
- Tutte le risposte di fibonacci Calculator arriveranno sul medesimo topic fib-response. Pertanto per legare la risposta alla specifica richiesta occorre introdurre il concetto di correlation id. Richiesta e Risposta dovranno quindi avere all’interno un campo correlation id che dovra’ avere il medesimo valore
- Nel messaggio di richiesta, oltre a valore e correlation id, sara’ presente anche il nome del topic dove il client e’ in attesa della risposta
Fortunatamente dalla release di Spring Kafka 2.1.3 e’ stato aggiunto il support al pattern Request Response con la nuova classe ReplyingKafkaTemplate la quale ci permette di implementare facilmente i seguenti punti. Vi mostrerò’ adesso, step by step, le parti saliente del codice soluzione di questo esercizio. Il codice della soluzione completa lo trovate nel mio REPO GITHUB.
Come best practices, e’ sempre consigliato creare da cli i topic e non auto generarli da codice essendo tale configurazione un punto chiave per il tuning di kafka. Nel mio esempio ho due nodi running in locale alla porta 9092 e 9093. Da shell quindi provvedo a creare i due topic fib-request e fib-response configurati con 4 partizioni e replication factor 2 ( configurazione da ambiente di sviluppo, consiglio sempre per garantire HA almeno 3 nodi con replication factor 3)
kafka-topics.bat --create --topic fib-request -zookeeper localhost:2181 --replication-factor 2 --partitions 4 kafka-topics.bat --create --topic fib-response -zookeeper localhost:2181 --replication-factor 2 --partitions 4
Adesso siamo pronti ad analizzare il codice dei due microservizi.
I punti chiavi che dovranno essere implementati sono:
- controller per gestire le richieste http dell endpoint fibonacci
- producer kafka che scrive la richiesta nel topic fib-request e consumer kafka che legge nel topic fib-response la risposta
Di seguito le configuazione dell application.properties
server.port=8090 spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093 kafka.request.topic=fib-request kafka.reply.topic=fib-response kafka.group.id=fibonacci-result-group
1. controller per gestire le richieste http dell endpoint fibonacci
Abbiamo poco da commentare, trattasi di un semplice Rest Controller.
@RestController @Slf4j public class FibonacciController { @Autowired FibonacciService fibonacciService; @PostMapping("/fibonacci") public ResponseEntity<?> postFibonacci(@RequestBody FibonacciRequest request) throws ExecutionException, InterruptedException, JsonProcessingException { FibonacciResult result = fibonacciService.calculate(request); return ResponseEntity.ok(result); } }
2.producer kafka che scrive la richiesta nel topic fib-request e consumer kafka che legge nel topic fib-response la risposta
Il lavoro e’ totalmente delegato al servizio fibonacciService. Andiamo a vedere il codice di tale service.
@Slf4j @Service public class FibonacciServiceImpl implements FibonacciService { @Autowired ReplyingKafkaTemplate<String, String, String> kafkaTemplate; @Autowired ObjectMapper objectMapper; @Value("${kafka.request.topic}") String requestTopic; @Value("${kafka.reply.topic}") String requestReplyTopic; @Override public FibonacciResult calculate(FibonacciRequest request) throws InterruptedException, ExecutionException, JsonProcessingException { log.info("send request fib [{]]", request.getNumber()); String requestJSON = objectMapper.writeValueAsString(request); ProducerRecord<String, String> record = new ProducerRecord<String, String>(requestTopic, requestJSON); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); RequestReplyFuture<String, String, String> sendAndReceive = kafkaTemplate.sendAndReceive(record); // confirm if producer produced successfully SendResult<String, String> sendResult = sendAndReceive.getSendFuture().get(); log.info("ProducerRecord request[{}]", sendResult.getProducerRecord()); sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString())); // get consumer record ConsumerRecord<String, String> consumerRecord = sendAndReceive.get(); String jsonResponse = consumerRecord.value(); // return consumer value return objectMapper.readValue(jsonResponse, FibonacciResult.class); } }
La parte hot del codice sopra mostrato e’ sicuramente l utlizzo di
ReplyingKafkaTemplate<String, String, String> kafkaTemplate;
Tale utility e’ una novità’ di Kafka che ci permette di realizzare un meccanismo request-response su Kafka. Guardando le righe di codice del ProducerRecord creato
String requestJSON = objectMapper.writeValueAsString(request); ProducerRecord<String, String> record = new ProducerRecord<String, String>(requestTopic, requestJSON); record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));
si puo’ notare che abbiamo valorizzato nel messaggio un header KafkaHeaders.REPLY_TOPIC in modo da comunicare al microservizio fibonacci calculator dove attendiamo la risposta. A questo punto possiamo passare il producer record all utility sendAndReceive di ReplyingKafkaTemplate in modo da inviare il messaggio su Kafka. ReplyingKafkaTemplate in modo del tutto trasparente andrà’ a valorizzare un altro header kafka_correlationId con identificativo univoco.
RequestReplyFuture<String, String, String> sendAndReceive = kafkaTemplate.sendAndReceive(record); SendResult<String, String> sendResult = sendAndReceive.getSendFuture().get(); log.info("ProducerRecord request[{}]", sendResult.getProducerRecord());
Ho inserito a scopo didattico un log che mostra il ProducedRecord con gli header sopra spiegati.
A questo punto possiamo metterci in attesa della risposta sul topic di risposta incluso nell header del messaggio appena inviato: fib-response topic.
ConsumerRecord<String, String> consumerRecord = sendAndReceive.get(); String jsonResponse = consumerRecord.value(); return objectMapper.readValue(jsonResponse, FibonacciResult.class);
Riassumendo velocemente con ReplyingKafkaTemplate riusciamo ad inviare un messaggio su kafka il quale contiene due header: correlation id ( auto valorizzato) e topic di risposta.
Il resto del codice lo trovate all URL
Il comportamente del microservizio Fibonacci Calculator e’ il seguente:
- Leggere messaggi dal topic fib-request
- Per ogni messaggio calcolare il numero di fibonacci
- Scrivere nella Risposta il risultato; il framework ci garantirà’ di includere nella risposta il correlation id della richiesta nell header kafka_correlationId in modo che l altro microservizio possa fare match tra request e response.
Di seguito le configuazione dell application.properties
server.port=8091 spring.kafka.producer.bootstrap-servers=localhost:9092, localhost:9093 kafka.request.topic=fib-request kafka.reply.topic=fib-response spring.kafka.consumer.group-id=fibonacci-calculator-group
I 3 punti sopra citati sono implementati nelle seguenti linee di codice
@Component public class KafkaConsumer { @Autowired private ObjectMapper objectMapper; private static long fibonacci(int i) { /* F(i) non e` definito per interi i negativi! */ if (i == 0) return 0; else if (i == 1) return 1; else return fibonacci(i-1) + fibonacci(i-2); } @KafkaListener(topics = {"${kafka.request.topic}"}) @SendTo public String onMessage(String req) throws JsonProcessingException { FibonacciRequest fibonacciRequest = objectMapper.readValue(req, FibonacciRequest.class); long result = fibonacci(fibonacciRequest.getNumber()); FibonacciResult calc = new FibonacciResult(); calc.setResult(result); return objectMapper.writeValueAsString(calc); } }
Analizziamo il codice appena mostrato
- Con l annotation KafkaListener andiamo a realizzare il punto 1) leggere dal topic fib-request.
- L annotation SendTo su un listener del topic A, permette di inviare ad un topic B il valore di ritorno di tale listener. In breve mette in relazione due topic permettendo facilmente la creazione di pipeline. Il topic di destinazione della risposta deve essere passato come argomento o altrimenti viene dedotto dal framework se nel messaggio e’ contenuto l header kafka_replyTopic come nel nostro caso. Pertanto in tale listener leggiamo messaggi provenienti da fib-request, calcoliamo il risultato di fibonacci per tali richieste e ripondiamo sul topic fib-response includendo in tale risposta il correlation id.
Trovate il codice completo del secondo microservizio all URL.
Nel nostro esempio abbiamo volutamente configurato in maniera concorrente i consumer ( 3 thread a consumer) e abbiamo volutamente configurato il topic con più partizioni. Questo perché’ il container in listening e’ in grado di sistemare il matching tra request e response grazie al correlation id.
In un architettura a microservice si scala orizzontalmente eseguendo più’ repliche di un microservizio se il carico lo necessita. Kafka e’ fantastico in questo. Se conoscete Kafka sapete che se eseguo più’ istanze di un consumer con il medesimo consumer-group, il carico viene distribuito tra le diverse istanze del consumer.
Supponiamo di avere il topic A con 4 partizioni. Posso eseguire 4 istanze di un consumer con medesimo consumer group. In questo modo
- ad ogni consumer viene assegnata una partizione del topic
- i messaggi vengono creati da kafka in modo da distribuire equamente i messaggi tra le varie partizioni
- ogni singola istanza del consumer riceverà’ i messaggi della propria partizione di competenza.
In questo modo si aumenta notevolmente throughput dei consumer.
Tutto molto bello… pero se scaliamo in questo modo la nostra soluzione di request-response non funziona più’. Questo perché l istanza che invia la richiesta di fibonacci potrebbe non ricevere la risposta se la risposta viene salvata su una partizione assegnata ad un altra instanza del consumer. Abbiamo due soluzione a questo problema.
- La documentazione di Spring Kafka consiglia di usare un topic ad hoc per istanza di consumer. Nel nostro esempio se decidiamo di eseguire 4 istanze del microservizio client requestor occorrerebbe creare 4 topic di risposta: fib-response-1 … fib-response-4. Questa soluzione funziona ma non e’ il massimo della flessibilità’.
- Un altra soluzione e’ quella di inviare nel messaggio, oltre al topic di risposta e correlation id, un altro header KafkaHeaders.REPLY_PARTITION contenente la rappresentazione BIG-ENDIAN dell intero della partizione assegnata a tale istanza di consumer. Il framework gestirà’ tale header come quanto fatto con il topic di risposta: la risposta verrà’ salvata nel topic di risposta e nella partizione specificata. In questo modo siamo sicuri che il consumer riceverà’ la risposta.
Abbiamo spiegato come realizzare il pattern request-response su Kafka in maniera sincrona. Come specificato ad inizio articolo ponderate se e’ veramente necessario applicare il pattern request-response sincrono nella vostra architettura. Se potete affidatevi ai pattern event driven asincronici e semplicemente gestire request e response in maniera totalmente asincrona
https://italiancoders.it/request-response-usando-apache-kafka/