如何使用 TDengine Sink Connector?( 二 )


如果想覆盖Consumer的其它默认配置 , 可以直接在SinkConnector的配置文件中编写 , 但是要加前缀“consumer.override.” , 比如想把每次poll的最大消息数改为3000 , 可以这样配置:consumer.override.max.poll.records=3000
·如何控制写入线程数?
如何使用 TDengine Sink Connector?】对于KafkaConnectSink , task本质上就是消费者线程 , 接收从topic的分区读出来的数据 。 用配置参数tasks.max来控制最大任务数 , 一个任务一个线程 。 实际启动的任务数还与topic的分区数有关 。 如果你有10个分区 , 并且tasks.max设置为5 , 那么每个task会收到2个分区的数据 , 并跟踪2个分区的offsets 。 如果你配置的tasks.max比partition数大 , Connect会启动的task数与topic的partition数相同 。 如果你订阅了5个topic , 每个topic都是1个分区 , 并且设置tasks.max=5,那么实际会启动多少个任务呢?答案是1个 , 任务数与topic数量没有关系 。
TDengineSinkConnector使用示例
这一部分我们在一台Linux服务器上搭建测试环境 , 并运行简单的示例程序 。 示例中将Kafka部署到了个人的home目录 。 操作时请注意把路径中的用户名(bding)替换为自己的用户名 。
·环境准备Java1.8Maven安装并启动了TDengine相关服务进程:taosd和taosAdapter 。
第一步:安装Kafkawgethttps://dlcdn.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgztar-xzfkafka_2.13-3.2.0.tgz
编辑.bash_profile , 加入:exportKAFKA_HOME=/home/bding/kafka_2.13-3.2.0exportPATH=$PATH:$KAFKA_HOME/binsource.bash_profile
第二步:配置Kafka
配置KafkaConnect加载插件的路径 。 cdkafka_2.13-3.2.0/config/viconnect-standalone.properties
追加plugin.path=/home/bding/connectors
修改Connector插件的日志级别 。 这一步非常重要 , 我们将通过插件的日志统计同步数据花费的时间 。 viconnect-log4j.properties
追加log4j.logger.com.taosdata.kafka.connect.sink=DEBUG
第三步:编译并安装插件gitclonegit@github.com:taosdata/kafka-connect-tdengine.gitcdkafka-connect-tdenginemvncleanpackageunzip-d~/connectorstarget/components/packages/taosdata-kafka-connect-tdengine-*.zip
第四步:启动ZooKeeperServer和KafkaServerzookeeper-server-start.sh-daemon$KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh-daemon$KAFKA_HOME/config/server.properties
第五步:创建topickafka-topics.sh--create--topicmeters--partitions1--bootstrap-serverlocalhost:9092
第六步:生成测试数据
将下列脚本保存为gen-data.py:#!/usr/bin/python3importrandomimportsystopic=sys.argv[1]count=int(sys.argv[2])start_ts=1648432611249000000location=["SanFrancisco","LosAngeles","SanDiego"]foriinrange(count):ts=start_ts+irow=f"{topic},location={location[i%3]},groupid=2current={random.random()*10},voltage={random.randint(100,300)},phase={random.random()}{ts}"print(row)
然后执行:python3gen-data.pymeters10000|kafka-console-producer.sh--broker-listlocalhost:9092--topicmeters
生成10000条InfluxDB行协议格式的数据到topicmeters 。 每条数据又包含2个标签字段和3个数据字段 。
第七步:启动KafkaConnect
将下列配置保存为sink-test.properties 。 name=TDengineSinkConnectorconnector.class=com.taosdata.kafka.connect.sink.TDengineSinkConnectortasks.max=1topics=metersconnection.url=jdbc:TAOS://127.0.0.1:6030connection.user=rootconnection.password=taosdataconnection.database=powerdb.schemaless=linekey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.storage.StringConverter
然后执行:connect-standalone.sh-daemon$KAFKA_HOME/config/connect-standalone.propertiessink-test.properties
第八步:检查TDengine中的数据
使用TDengineCLI查询power数据库meters表 , 检查是否正好包含10000条数据 。 [bding@vm95test]$taosWelcometotheTDengineshellfromLinux,ClientVersion:2.6.0.4Copyright(c)2022byTAOSData,Inc.Allrightsreserved.taos>selectcount(*)frompower.meters;count(*)|========================10000|