如何使用 TDengine Sink Connector?( 三 )


TDengineSinkConnector性能测试
·测试流程
这一部分 , 我们将上面示例步骤中的第四步到第七步封装成可重复运行的shell脚本 , 并做以下修改:将topic的分区数作为脚本的第1个参数 , 同时配置tasks.max , 使其等于分区数 。 这样我们可以控制每次测试使用的写入线程数 。 将生成测试数据的条数作为脚本的第2个参数 , 用来控制每次测试同步的数据量 。 启动测试前清空所有数据 , 测试结束后停止Connect、Kafka和ZooKeeper 。
每次测试都先写数据到Kafka , 然后再启动Connect同步数据到TDengine , 这样做可以把同步数据的压力全部集中到Sink插件这边 。 我们统计SinkConnector从接收到第一批数据到接收到最后一批数据之间的时间 , 作为同步数据的总耗时 。
完整脚本如下:#!/bin/bashif[$#-lt2];thenecho"Usage:./run-test.sh"exit0fiecho"---------------------------TESTSTARTED---------------------------------------"echocleandataandlogstaos-s"DROPDATABASEIFEXISTSpower"rm-rf/tmp/kafka-logs/tmp/zookeeperrm-f$KAFKA_HOME/logs/connect.lognp=$1#numberofpartitionstotal=$2#numberofrecordsechonumberofpartitionsis$np,numberofrecordesis$total.echostartzookeeperzookeeper-server-start.sh-daemon$KAFKA_HOME/config/zookeeper.propertiesechostartkafkasleep3kafka-server-start.sh-daemon$KAFKA_HOME/config/server.propertiessleep5echocreatetopickafka-topics.sh--create--topicmeters--partitions$np--bootstrap-serverlocalhost:9092kafka-topics.sh--describe--topicmeters--bootstrap-serverlocalhost:9092echogeneratetestdatapython3gen-data.pymeters$total|kafka-console-producer.sh--broker-listlocalhost:9092--topicmetersechoalterconnectorconfigurationsettingtasks.max=$npsed-i"s/tasks.max=.*/tasks.max=${np}/"sink-test.propertiesechostartkafkaconnectconnect-standalone.sh-daemon$KAFKA_HOME/config/connect-standalone.propertiessink-test.propertiesecho-e"e[1;31mopenanotherconsoletomonitorconnect.log.pressenterwhennomoredatareceived.e[0m"readechostopconnectjps|grepConnectStandalone|awk'{print$1}'|xargskillechostopkafkaserverkafka-server-stop.shechostopzookeeperzookeeper-server-stop.sh#extracttimestampsofreceivingthefirstbatchofdataandthelastbatchofdatagrep"records"$KAFKA_HOME/logs/connect.log|grepmeters->tmp.logstart_time=`cattmp.log|grep-Eo"[0-9]{4}-[0-9]{2}-[0-9]{2}[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}"|head-1`stop_time=`cattmp.log|grep-Eo"[0-9]{4}-[0-9]{2}-[0-9]{2}[0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}"|tail-1`echo"--------------------------TESTFINISHED------------------------------------"echo"|records|partitions|starttime|stoptime|"echo"|---------|------------|------------|-----------|"echo"|$total|$np|$start_time|$stop_time|"
如果要测试使用1个分区 , 共100万条数据的性能 , 可以这样执行:./run-test.sh11000000
执行过程的截图如下:
如何使用 TDengine Sink Connector?
文章图片
注意中间有一个交互过程 。 因为脚本无法确定数据是否同步完 , 需要用户监控connect.log来确定是否已经消费完了所有数据 , 例如:[bding@vm95~]$cdkafka_2.13-3.2.0/logs/[bding@vm95logs]$tail-fconnect.log[2022-06-2117:39:00,176]DEBUG[TDengineSinkConnector|task-0]Received500records.Firstrecordkafkacoordinates:(meters-0-314496).Writingthemtothedatabase...(com.taosdata.kafka.connect.sink.TDengineSinkTask:101)[2022-06-2117:39:00,180]DEBUG[TDengineSinkConnector|task-0]Received500records.Firstrecordkafkacoordinates:(meters-0-314996).Writingthemtothedatabase...(com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
当日志不再滚动 , 就说明已经消费完了
·测试结果
写入速度与数据量和线程数的关系表
如何使用 TDengine Sink Connector?
文章图片
上表第1列为总数据量 , 第1行为消费者线程数 , 也是写入线程数 。 中间为平均每秒写入记录数 。
写入速度与数据量和线程数的关系图