Kafka Connect

Official Refernces

通过 kafka connect 导入/导出数据

当需要从其他数据源导入kafka或者将kafka中的数据导入其他数据源,除了通过代码实现,还可以使用kafka connect来导入导出数据,它是一个可扩展的工具,通过运行connector,实现与外部系统交互的自定义逻辑。

# 造数据
$ echo -e "foo\nbar" > test.txt

# 在standalone模式下,启动两个connectors,需要配置三个配置文件。
# connect-standalone.properties配置kafka connect process,例如要连接到哪个broker,数据的序列化格式等。
# 其他的每个配置文件,都代表了要创建的每个connector,例如指定每个connector唯一的name,要实例化哪个connector class,和一些connector需要的其他的配置项。

# 修改 connect-standalone.properties
$ vim config/connect-standalone.properties
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=ambari0:9093

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
# org.apache.kafka.connect.converters.ByteArrayConverter
# org.apache.kafka.connect.converters.DoubleConverter
# org.apache.kafka.connect.converters.FloatConverter
# org.apache.kafka.connect.converters.IntegerConverter
# org.apache.kafka.connect.converters.LongConverter
# org.apache.kafka.connect.converters.ShortConverter
# org.apache.kafka.connect.json.JsonConverter
# org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# 导出 kafka 数据到 file
$ vim config/connect-file-sink.properties
name=kafka_to_file
connector.class=FileStreamSink
tasks.max=5
file=invert_index.txt
topics=invert_index
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink.properties

# 导入 file 数据到 kafka
$ vim config/connect-file-source.properties
name=file_to_kafka
connector.class=FileStreamSource
tasks.max=5
file=invert_index.txt
topics=invert_index

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦