V první části jsem se seznámili s teorií ohledně FilePulse Source Connector. Pro potřeby příkladu budeme pracovat s Kafka cluster za pomoci Dockeru. Docker-compose.yml (https://github.com/cambit/kafka-demo/blob/main/docker-compose.yml se skládá z docker image Kafky od Confluent.Inc. Budeme pracovat s jednou broker instancí, s jednou instancí Zookeper a jednou instancí schéma registru. Co se týče Kafka workeru, zde budeme mít předinstalován FilePulse Source Connector.
Stáhnete-li si repozitář, je potřeba spustit Kafka cluster
docker-compose up
Kontrolu správného spuštění můžeme provést kontrolou za pomoci GET metody na endpoint /connector-plugins. Tedy v našem případě:
GET localhost:8083/connector-plugins
V seznamu dostupných konektorů bychom měli vidět FilePulse connector.
Nyní je kafka cluster připraven a můžeme jej nakonfigurovat. To se provádí metodou *PUT na endpoint /connectors/source-csv-filepulse-00/config *, tedy:
PUT http://localhost:8083/connectors/source-csv-filepulse-00/config
A do těla dáme první konfiguraci - /connectors/source-csv-filepulse-00/config-1-parseLine.json
{ "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector", "fs.scan.directory.path":"/tmp/kafka-connect/examples/", "fs.scan.interval.ms":"10000", "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter", "file.filter.regex.pattern":".*\\.csv$", "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader", "offset.strategy":"name", "skip.headers": "1", "topic":"topic-filepulse-csv-00", "internal.kafka.reporter.bootstrap.servers": "broker:29092", "internal.kafka.reporter.topic":"connect-file-pulse-status", "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy", "tasks.max": 1, "filters":"ParseLine", "filters.ParseLine.extractColumnName": "headers", "filters.ParseLine.trimColumn": "true", "filters.ParseLine.separator": ";", "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter" }
Pojďme se blíže podívat co soubor obsahuje. Na prvním řádku říkáme, že chceme použít FilePulseConnector. Řádek 3 - 6 obsahuje konfiguraci která nám říká jaký adresář budeme jak často kontrolovat na přítomnost soubvorů končící na příponu .csv. Budeme číst po řádcích (řádek 7) a přeskočíme první řádek který je hlavičkou (řádek 9). Řádky 10 - 12 obsahují informace do jaké fronty Kafky se budou informace nahrávat a kde zjistit aktuální průběh zpracování (řádek 12). Po dokončení chceme jen zalogovat a proto použijeme LogCleanupPolicy (řádek 13).
Konfiguraci jak přesně zpracovat soubor najdeme na řádcích 16 - 20.
Pokud tuto konfiguraci ve formátu json nahrajeme příkazem PUT (například za pomoci programu Postman) je vše připraveno. Nyní je potřeba nahrát soubor do adresáře, kde se očekává vstupní soubor ve formátu .csv.
Jako vstupní soubor použijeme /connectors/source-csv-filepulse-00/example.csv
Překopírování provedeme těmito příkazy:
docker exec -it connect mkdir -p /tmp/kafka-connect/examples docker cp ./connectors/source-csv-filepulse-00/example.csv connect://tmp/kafka-connect/examples/example.csv
Nyní do 10 sekund se začne soubor zpracovávat. Status zpracovávání lze získat na GET localhost:8083/connectors/source-csv-filepulse-00/status
Po dokončení úlohy máme v topicu zprávy odpovídající jednotlivým řádkům.
Nyní si můžeme hrát s konfigurací konektoru a postupně za pomocí konfigurací filtrů otestovat jak se změní formát zpráv v topiku:
Filtrace výrobců obsahující řetězec Pilulka - /connectors/source-csv-filepulse-00/config-2-keepPilulkaManu.json (řádky 21 - 23).
Přidání názvu zpracovaného souboru do zprávy - /connectors/source-csv-filepulse-00/config-3-insertFileName.json (řádky 24 - 26)