Jako každá technologická firma i Pilulka používá pro výměnu/notifikace systémy postavené na principu posílání zpráv. Nejedná se tedy o přímý dotaz na API, ale za pomoci eventů které jsou uložené ve frontách se posílají předem definované zprávy které jsou postupně zpracovány koncovými systémy. V současné době používáme RabbitMQ, který patrně nemusím představovat. Ale jak to u nás v Pilulce už bývá, rádi si sáhneme na něco nového a tak si pojďme představit základní vhled do Apache Kafka.
Pokud navštívíte odkaz https://www.confluent.io/what-is-apache-kafka/ dozvíte se, že Kafka je platforma, která dokáže zpracovat biliony událostí každý den. Je to tedy robustní systém který dokáže zprostředkovat výměnu událostí a dat mezi systémy jak ukazuje i obrázek Obr. 1.
Projít všechny možnosti není předmětem tohoto článku a ani v možnosti znalostí autora. Pojďme se podívat na triviální problém a zkusme jej vyřešit za pomoci Apache Kafka.
Pro potřeby tohoto článku jsem zvolil jednoduchý problém, kdy chceme za pomoci Kafky načíst soubor ve formátu CSV a ten dále zpracovat v PHP. Jasně že můžeme použít fopen, fgetcsv a tak podobně, ale proč si nevyzkoušet něco nového.
Rozebrání nové technologie se neobejde bez trochy teorie. Pojďme tedy na to. Pro potřeby našeho úkolu budeme pracovat s Kafka Connect framework (https://kafka.apache.org/documentation.html#connect).
Tento framework slouží k tomu, abychom mohli data přesouvat do a z Kafky za pomoci programů, které tento framework označuje jako konektory. Můžete si je představit jako prostředníky mezi systémem odkud data beru (nebo kam je dávám) a Kafkou. Například chceme-li vyměnit data mezi Kafkou a MongoDB můžeme použít jeden z https://www.confluent.io/hub/#MongoDB konektorů. Na této stránce najdete všechny možné konektory, takže se tam neváhejte porozhlédnout. Vyhledáte-li na této stránce “File” najdete FilePulse konektor (https://www.confluent.io/hub/streamthoughts/kafka-connect-file-pulse).
FilePulse Source Connector nám umožňuje pravidelně sledovat zadaný adresář a pokud v něm objeví nový soubor začne jej zpracovávat. FilePulse je schopen pracovat s následujícími formáty souborů za pomoci čtecí konfigurace: * RowFileInputReader - čte zadaný soubor po řádcích a pro každý řádek vytvoří záznam * BytesArrayInputReader * AvroFileInputReader - umožňuje číst soubory ve formátu Avro * XMLFileInputReader - čte jednoduché XML soubory
V našem příkladu jsme si řekli, že budeme zpracovávat CSV soubory. Dále tedy budeme pracovat s RowFileInputReader.
Nyní si tedy představme, že máme možnost načíst zadaný soubor po řádcích. Data která načteme můžeme dále zpracovávat za pomocí filtrů. Samozřejmě i Kafka má možnost pracovat s daty a to za pomoci Single Message Transforms (SMT) funkcionality.
FilePulse filtry
Uvedmě si příklad konfigurace filtru:
"filters": "RenameArtist", "filters.RenameArtist.type": "io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter", "filters.RenameArtist.field": "artist", "filters.RenameArtist.target": "artists",
Jak vidíme, konfigurace obsahuje vždy alias a za dvojtečkou následuje konfigurace. Alias filters nám říká jak všechny filtry a v jakém pořadí je chceme aplikovat. Poté již následuje konfigurace k danému filtru, který jsme uvedli v aliasu filters. V naší ukázce definujeme filtr RenameArtist, který je typu io.streamthoughts.kafka.connect.filepulse.filter.RenameFilter a tento typ filtru vyžaduje specifikaci field a target. Je to tedy filtr který nám přejmenuje políčko artist na artists.
Po zpracování celho vstupního souboru nám umožňuje FilePulse tento soubor buď smazat, přesunout a nebo jen zalogovat zpracování a soubor ponechat. FilePulse toto označuje jako CleanUp policy:
Pojďme si tedy shrnout celý proces FilePulse načtení a zpracování souboru v následujícím obrázku:
Nyní máme tedy teorii zpracování CSV souboru za sebou a v dalším článku se podíváme na příklad.