Kafka File Connector 简单使用
File Connector作用简单明了,从文件将数据导入kafka的topic中,以及,从kafka的topic中将数据导出到文件。
Notes: 如果只是为了测试和学习,可以直接参考confluent官网的教程,使用
confluent start
命令。生产环境下不推荐使用CLI启动confluent。
本文依赖于confluent 5.0.1,如果尚未下载,可以参照我之前写的安装和启动。
1 | jps |
需要先启动以上三个服务。
大多数配置都依赖于connector,因此这里不能都提到。但是,有一些公用的配置:
- name - connector的唯一名称。尝试再次使用相同名称注册将失败。
- connector.class - connecor的Java类
- tasks.max - 应为此connector创建的最大任务数。如果connector无法达到此级别的并行性,则可能会创建更少的任务。
- key.converter - (可选)覆盖worker设置的默认转换器。
- value.converter - (可选)覆盖worker设置的默认转换器。
以下示例未开启Schema Registry
,并在etc/kafka/connect-distributed.properties
中设置了如下配置项:
- key.converter=org.apache.kafka.connect.json.JsonConverter
- value.converter=org.apache.kafka.connect.json.JsonConverter
- key.converter.schemas.enable=false
- value.converter.schemas.enable=false
Source Connector
构造测试数据
1 | cd /root/bin/confluent |
file connector 配置文件source.json
1 | { |
Notes: 默认情况下topic是自动创建的,但是如果需要做详细配置最好手动创建
通过REST API上传配置文件
1 | curl -X POST -H "Content-Type: application/json" --data @source.json http://node1:8083/connectors |
此时我们的connector就会自动检测文件的变化并上传到file-source-1.0
的topic里了,现在我们来查看下结果
1 | 启动控制台的消费者查看topic内的消息 |
如果另外打开一个终端,并在test.txt
的末尾添加新的数据,也会被传入到file-source-1.0
中。
Sink Connector
配置文件sink.json
1 | { |
Notes:
source.json
中配置项为topic
,而这里是topics
通过REST API上传配置文件
1 | curl -X POST -H "Content-Type: application/json" --data @sink.json http://node1:8083/connectors |
当前目录下会有一个文件test.sink.txt
,内容和test.txt
一致。
1 | 如果配置错了,可以删除connector |
参考资料
1.Use Kafka Connect to import/export data
2.Tutorial: Moving Data In and Out of Kafka
3.Running Kafka Connect
版权声明:
除另有声明外,本博客文章均采用 知识共享(Creative Commons) 署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议 进行许可。
分享