Apache Kafka® - část II.

(19. 8. 2021) V druhé části naší dvoudílné sérii o Apache Kafka® se podíváme na příklad zpracování csv souboru a modifikaci dat.

V první části jsem se seznámili s teorií ohledně FilePulse Source Connector. Pro potřeby příkladu budeme pracovat s Kafka cluster za pomoci Dockeru. Docker-compose.yml (https://github.com/cambit/kafka-demo/blob/main/docker-compose.yml se skládá z docker image Kafky od Confluent.Inc. Budeme pracovat s jednou broker instancí, s jednou instancí Zookeper a jednou instancí schéma registru. Co se týče Kafka workeru, zde budeme mít předinstalován FilePulse Source Connector.

Stáhnete-li si repozitář, je potřeba spustit Kafka cluster

docker-compose up

Kontrolu správného spuštění můžeme provést kontrolou za pomoci GET metody na endpoint /connector-plugins. Tedy v našem případě:

GET localhost:8083/connector-plugins

V seznamu dostupných konektorů bychom měli vidět FilePulse connector.

Nyní je kafka cluster připraven a můžeme jej nakonfigurovat. To se provádí metodou *PUT na endpoint /connectors/source-csv-filepulse-00/config *, tedy:

PUT http://localhost:8083/connectors/source-csv-filepulse-00/config

A do těla dáme první konfiguraci - /connectors/source-csv-filepulse-00/config-1-parseLine.json

{
  "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
  "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
  "fs.scan.interval.ms":"10000",
  "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
  "file.filter.regex.pattern":".*\\.csv$",
  "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
  "offset.strategy":"name",
  "skip.headers": "1",
  "topic":"topic-filepulse-csv-00",
  "internal.kafka.reporter.bootstrap.servers": "broker:29092",
  "internal.kafka.reporter.topic":"connect-file-pulse-status",
  "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
  "tasks.max": 1,

  "filters":"ParseLine",
  "filters.ParseLine.extractColumnName": "headers",
  "filters.ParseLine.trimColumn": "true",
  "filters.ParseLine.separator": ";",
  "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
}

Pojďme se blíže podívat co soubor obsahuje. Na prvním řádku říkáme, že chceme použít FilePulseConnector. Řádek 3 - 6 obsahuje konfiguraci která nám říká jaký adresář budeme jak často kontrolovat na přítomnost soubvorů končící na příponu .csv. Budeme číst po řádcích (řádek 7) a přeskočíme první řádek který je hlavičkou (řádek 9). Řádky 10 - 12 obsahují informace do jaké fronty Kafky se budou informace nahrávat a kde zjistit aktuální průběh zpracování (řádek 12). Po dokončení chceme jen zalogovat a proto použijeme LogCleanupPolicy (řádek 13).

Konfiguraci jak přesně zpracovat soubor najdeme na řádcích 16 - 20.

Pokud tuto konfiguraci ve formátu json nahrajeme příkazem PUT (například za pomoci programu Postman) je vše připraveno. Nyní je potřeba nahrát soubor do adresáře, kde se očekává vstupní soubor ve formátu .csv.

Jako vstupní soubor použijeme /connectors/source-csv-filepulse-00/example.csv

Překopírování provedeme těmito příkazy:

docker exec -it connect mkdir -p /tmp/kafka-connect/examples

docker cp ./connectors/source-csv-filepulse-00/example.csv connect://tmp/kafka-connect/examples/example.csv

Nyní do 10 sekund se začne soubor zpracovávat. Status zpracovávání lze získat na GET localhost:8083/connectors/source-csv-filepulse-00/status

Po dokončení úlohy máme v topicu zprávy odpovídající jednotlivým řádkům.

Nyní si můžeme hrát s konfigurací konektoru a postupně za pomocí konfigurací filtrů otestovat jak se změní formát zpráv v topiku:

Filtrace výrobců obsahující řetězec Pilulka - /connectors/source-csv-filepulse-00/config-2-keepPilulkaManu.json (řádky 21 - 23).

Přidání názvu zpracovaného souboru do zprávy - /connectors/source-csv-filepulse-00/config-3-insertFileName.json (řádky 24 - 26)

Volné pozice

O autorovi

Michal Čambor PHP - Team leader VMS

V Pilulce se se svým týmem věnuji vývoji interního systému pro komunikaci s dodavateli - VMS (Vendor Management System).