Jag sitter i ett projekt där vi jobbar med molntjänsterna för självkörande/smarta bilar, något som ska ut på marknaden inom en snar framtid. Då vi hanterar stora mängder data och bilen skickar meddelanden så har vi valt att arbeta med eventdriven utveckling.
Hjärtat i vår microservice-arkitektur är Kafka. Kafka är ett högst skalbart messaging-system. När biltillverkaren började sin resa emot autonoma bilar, så var Kafka ungt (vilket det fortfarande är) och Kafka Streams hade ännu inte sett dagens ljus. Därför togs beslutet att arbeta med Apache Flink som ramverk för att strömma de events som kom till molnet.
Det är här min historia börjar.
Vi är just i fas med att migrera alla våra Flink-appar till Kafka Streams. Jag tänker inte gå in på detaljer om varför mer än att Kafka som broker och Kafka Streams som strömmande ramverk går hand i hand. Mitt arbete verkade enkelt till en början, flödet såg ut på följande vis:
- Konsumera meddelanden från fyra olika topics (för er som inte känner till Kafka, tänk topics á la JMS/RabbitMQ) och nyckla om dessa till ett gemensamt id i ett fönster på 30 sekunder.
- Aggregera ihop alla meddelanden och skicka ett meddelande till ett utgående topic när fönstret är stängt.
Låter simpelt, eller hur?
Flink-appen hade givetvis denna funktionen och jag trodde inte för en sekund att detta skulle bli så svårt att utföra, så jag började koda. Jag ville använda mig av Kafka Streams DSL (Domain Specific Language) för att hålla koden så ren och snygg som möjligt. Det finns möjlighet att använda lågnivå-API:et också, där man då har mer kontroll, men det ville jag undvika om det fanns möjlighet.
Min initiala kod såg ut ungefär så här:
StreamsBuilder builder = ...;
builder.stream(
getTopicList(),
Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
.flatMap(new ExceptionSafeKeyValueMapper<String, FooTriggerMessage>(
"", Serdes.String(), FooTriggerSerde))
.filter((key, value) -> value.getTriggerEventId() != null)
.groupBy((key, value) ->
value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), FooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30)))
.aggregate(
FooTriggerWrapper::new, /* initializer */
FooAggregator::getApproval, /* adder */
Materialized
.<String, FooTriggerWrapper, WindowStore<Bytes, byte[]>>as(
"Windowed-Store") /* state store name */
.withValueSerde(FooWrapperSerde))
.toStream()
.transform(() -> FooTransformer::new)
.filter((key, value) -> value != null)
.to(
appProperties.getFooEngineOutgoing(),
Produced.with(Serdes.String(), FooSerde));
För de som jobbat med strömmande ramverk innan, så känns nog en hel del mönster igen.
- Jag får in en ström från fyra olika topics.
- Flat-mappar ut strömmen till ett Java-objekt.
- Kollar så att värdet jag vill gruppera på inte är
null
med ett filter. - Grupperar/nycklar strömmen på det unika värdet och deklarerar ett fönster på 30 sekunder.
Precis som jag ville ha det!
Detta går ju bara för lätt tänkte jag. Nästa steg var att transformera Java-objektet till ett nytt Java-objekt. Kafka Streams DSL har fixat en metod för detta redan så jag använde mig av den. Till sist, använde jag mig av to
-metoden för att producera ett nytt meddelande på ett utgående topic. Underbara Kafka Streams, så lätt, så snyggt.
Då är det ju bara till att testa detta då. Jag skrev ett snabbt test för att se så att jag fick ett meddelande på min utgående topic. Men VA!? Fyra meddelanden? Vad i hela...?!
Jag förstod ingenting. Jag använde mig ju av Kafka Streams inbyggda metoder och det verkade ju stämma. Då jag är en googlare av rang satte jag till verket och började gräva lite mer i hur windowing-funktionen i Kafka-Streams faktiskt fungerade.
Den översta träffen verkade perfekt. Jag klickade mig in på StackOverflow och började läsa. Till min förvåning, så var det inget fel på min implementation. Det är bara som så att Kafka har tagit en annorlunda approach emot windowing än Flink. Det finns ingen slutgiltig aggregering när fönstret är stängt, eftersom Kafka möjliggör sena ankomster av events.
Men inte nog med det. Kafka skickar alla aggregeringar nedströms, vilket alltså bör resultera i fyra meddelanden på mitt utgående topic – om jag får in fyra meddelanden.
Detta satte mig, och tydligen många andra som haft liknande användarscenarion, lite i klistret. Det fanns ett par hackiga lösningar om man ville använda lågnivå-API:erna som Kafka Streams erbjuder, men inga hack kändes helt rätt för vårt specifika fall. Ju mer jag läste, desto mer fast kände jag mig.
Men till slut så hittade jag en blogg som pratade om en punkteringsmetod som erbjöds i ProcessorContexten
.
Det var början på lösningen gott folk. Det visade sig att man kunde få ut tiden då fönstret slutade på ett enkelt vis via transformatorn. Allt jag behövde göra nu, var att skapa min egna tranformatorklass som implemeneterade interfacet Transform
. I aggregate
-steget lägger man in någonting som kallas för Materialize
. Det betyder egentligen hur jag vill spara mitt fönster. Det vanligaste är att använda en WindowStore
. Nyckeln i meddelandet blir då en Windowed<typen din nyckel är>
-typ. Windowed
håller all information vi behöver om fönstret. Implementationen ser ut ungefär så här:
public class WindowdedTransformer implements
Transformer<Windowed<String>,
FooTriggerWrapper,
KeyValue<String, BarApprovalMessage>> {
}
Den första delen är ingående meddelande, och den andra delen är utgående, när transform
-metoden anropas.
Transform
-klassen behövde schemalägga ett jobb för varje unikt id som kom in. Istället för att transformera varje meddelande, så sparade jag undan det i en inmememory-databas som kallas KeyValueStore
(en glorifierad HashMap
). När 30 sekunder sedan hade gått, så skulle punkteringsmetoden anropas, iterera igenom min KeyValueStore
och bara då skicka mitt nya meddelande nedströms om fönstret hade stängts. Jag fick skita ner mina fingrar något i lågnivå-API:et, men inte för mycket. För att förenkla något, så har jag skalat bort diverse villkor som tillkommit, men det ni ser på bilden under är själva hjärtat för att få punkteringen att fungera.
public KeyValue<String, FooMessage> transform(
Windowed<String> key,
FooTriggerWrapper value) {
if (!schedulers.containsKey(key.key())) {
Cancellable newCs = context.schedule(
30000,
PunctuationType.WALL_CLOCK_TIME,
(time) -> checkStateStoreAndTransform(
key.window().end(),
key.key()));
schedulers.put(key.key(), newCs);
}
kvStore.put(key.key(), value);
return null;
}
private void checkStateStoreAndTransform(
long windowEndTime, String key) {
BarMessage barMessage = new BarMessage();
if (shouldCreateBarMessage(key, windowEndTime)) {
schedulers.get(key).cancel();
BarTriggerWrapper message = kvStore.get(keyStoreKey);
//Psuedokod
barMessage.setSomething(message.getSomething)
barMessage.setSomething(message.getSomething)
barMessage.setSomething(message.getSomething)
//Skicka meddelandet nerströms
context.forward(key, barMessage);
context.commit();
}
}
Så vad händer här då? Alla meddelanden som kommer in sparas i inmemory-databasen. För varje meddelande, med unikt grupp-id som kommer in skapas en cancelerbar schedulerare. Scheduleraren anropar sedan den metod man anger på det intervall man anger. I den metoden, så kollar man först om fönstret vi skapade innan transformator steget är stängt. Om det är det, och bara då, så skapar vi upp vårt meddelande utefter de vi fått in i vår inmemory-databas. Givetvis, så vill vi ju stänga av scheduleraren när meddelandet är skickat. Detta löste jag igenom att lägga varje schedulerare i en lokal Map, som jag hämtar upp och stänger när meddelandet är skapat.
Frågan var ju nu, skulle detta fungera? Svaret är ja! Fönstret och scheduleraren kan diffa på några millisekunder, men det är ingenting som stör. Med mycket möda och en aning besvär, så lyckades jag med någonting som många andra har haft problem med. Den slutgiltiga koden ser så här fantastiskt vacker ut.
StreamsBuilder builder = ...;
StoreBuilder<?> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore(storeName),
Serdes.String(),
FooWrapperSerde);
builder.addStateStore(store);
builder.stream(getTopicList(), Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))//
.flatMap(new ExceptionSafeKeyValueMapper<String, FooTriggerMessage>(
"",
Serdes.String(),
FooTriggerSerde))
.filter((key, value) ->
value.getTriggerEventId() != null)
.filter((key, value) ->
!sentIdsList.contains(value.getTriggerEventId().toString()))
.groupBy((key, value) ->
value.getTriggerEventId().toString(),
Serialized.with(Serdes.String(), FooTriggerSerde))
.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30)))
.aggregate(
FooTriggerWrapper::new, /* initializer */
FooAggregator::getApproval, /* adder */
Materialized
.<String, FooTriggerWrapper, WindowStore<Bytes, byte[]>>as(
"Windowed-Store") /* state store name */
.withValueSerde(FooWrapperSerde))
.toStream()
.transform(() -> {
return new WindowdedTransformer(
sentIdsList,
storeName,
vehicleStateURI,
approvalTtl,
restTemplate);
},
storeName).filter((key, value) -> value != null)
.to(
appProperties.getFooEngineOutgoing(),
Produced.with(Serdes.String(), FooSerde));
Enligt mig så är eventdriven utveckling fantastiskt roligt. Kafka och Kafka Streams går hand i hand och jag hoppas att ni som läser detta och aldrig jobbat med strömmande ramverk eller eventdriven utveckling innan, tar er an ett litet hemmaprojekt och provar.
Jag vill avsluta med ett litet Kafka-citat.
"Att rätt förstå en sak och samtidigt missuppfatta den utesluter inte helt vartannat"Franz Kafka
Vill ni veta mer om Kafka och Kafka Streams så rekomenderar jag er att gå in här.