`
拓子轩
  • 浏览: 204461 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Kafka的Consumer主要参数

    博客分类:
  • java
阅读更多

3.2.2.1 bootstrap.servers

同生产者bootstrap.servers参数。

 

3.2.2.2 group.id

该参数指定的是consumer group的名字,它能够唯一标识一个consumer group。通常设置一个有业务意义的名字就可以了。

 

3.2.2.3 key.deserializer

consumer代码从broker端获取的任何消息都是字节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式。这个参数就是为消息的key做解序列化的。该参数必须是实现org.apache.kafka.common.serialization.Deserializer接口的Java类的全限定名称。Kafka默认为绝大部分的初始类型(primitive type)提供现成的解序列化器。StringDeserializer会将接收到的字节数组转换成UTF-8编码的字符串。consumer支持自定义的deserializer。不论consumer消费的消息是否指定了keyconsumer都必须要设置该参数,否则程序会抛出ConfigException

 

3.2.2.4 value.deserializer

value.deserializer类似,该参数用来对消息体(即消息value)进行解序列化,从而把消息“还原”会原来的对象类型。

 

3.2.2.5 session.timeout.ms

session.timeout.msconsumer group检测组内成员发送崩溃的时间。这个参数还有另外一重含义:consumer消息处理逻辑的 最大时间,倘若consumer两次poll之间的间隔超过了该参数所设置的阀值,那么coordinator(消息组协调者)就会认为这个consumer已经追不上组内其他成员的消费进度了,因此会将该consumer踢出组,该consumer负责的分区也会被分配给其他consumer。在最好的情况下,这会导致不必要的rebalance,因为consumer需要重新加入group。更糟的是,对于那些在被踢出group后处理的消息,consumer都无法提交位移,这就意味着这些消息在rebalance之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer甚至无法执行任何消费,除非用户重新调整参数。

0.10.1.0版本对该参数的含义进行了拆分。在该版本及以后的版本中,session.timeout.ms参数被明确为“coordinator检测失败的时间”。实际使用中可以为该参数设置一个较小的值使coordinator能够更快第检查consumer崩溃的情况,从而更快地开启rebalance,避免造成更大的消息滞后(consumer lag,目前该参数的默认值是10秒。

 

3.2.2.6 max.poll.interval.ms

这个参数就是用来设置消息处理逻辑的最大时间的。通过将该参数设置成稍大于实际的逻辑处理时间再结合较低的session.timeout.ms参数值,consumer group既实现了快速的consumer崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的rebalance

 

3.2.2.7 auto.offset.reset

指定了无位移信息或位移越界(即consumer要消费的消息的位移不在当前消息日志的合理区间范围)时Kafka的应对策略。

目前该参数有如下3个可能的取值:

earliest: 指定从最早的位移开始消费。注意这里最早的位移不一定就是0

latest: 指定从最新处位移开始消费。

none:指定如果未发现位移信息或位移越界,则抛出异常。该值在真实业务场景中使用甚少。

 

3.2.2.8 enable.auto.commit

    该参数指定consumer是否自动提交位移。若设置为true,则consumer在后台自动提交位移。否则,用户需要手动提交位移。对于有较强“精确处理一次”语义需求的用户来说,最好将该参数设置为false,由用户自行处理位移提交问题。

 

3.2.2.9 fetch.max.bytes

它指定了consumer端单次获取数据的最大字节数。若实际业务消息很大,则必须要设置该参数为一个较大的值,否则consumer将无法消费这些消息。

 

3.2.2.10 max.poll.records

该参数控制单次poll调用返回的最大消息数。比较极端的做法是设置该参数为1,那么每次poll只会返回1条消息。如果用户发现consumer端的瓶颈在poll速度太慢,可以适当地增加该参数的值。如果用户的消息处理逻辑很清理,默认的500条消息通常不能满足实际的消息处理速度。

 

3.2.2.11 heartbeat.interval.ms

coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式“塞进”consumer心跳请求的response中,这样其他成员拿到response后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat.interval.ms就是用来做这件事情的。

比较推荐的做法是设置一个比较低的值,让group下的其他consumer成员能够更快地感知新一轮rebalance开启了。注意,该值必须小于session.timeout.ms!毕竟如果consumersession.timeout.ms这段时间内都不发送心跳,coordinator就会认为它已经dead,因此也就没有必要让它知晓coordinator的决定了。

 

3.2.2.12 connections.max.idle.ms

Kafka会定期地关闭空闲Socket连接导致下次consumer处理请求时需要重新创建连向brokerSocket连接。当前默认值是9分钟,如果用户实际环境中不在乎这些Socket资源开销,比较推荐设置该参数值为-1,既不要关闭这些空闲连接。

分享到:
评论

相关推荐

    指定时间段消费Kafka工具

    使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...

    详解Spring Kafka中关于Kafka的配置参数

    主要介绍了详解Spring Kafka中关于Kafka的配置参数,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

    kafka_2.9.2-0.8.2.1.tgz

    produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 单机连通性能测试 运行producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 运行...

    kafka-consumer-group:https的工作版本

    kafka-消费者组工作版本参数如:zookeeper.lacolhost.com:2181 group1 结果1

    pepper-box:Pepper-Box是jmeter的kafka负载生成器插件。 它允许发送纯文本(JSON,XML,CSV或任何其他自定义格式)类型的kafka消息以及Java序列化的对象

    Pepper-Box包含四个主要组件 PepperBoxKafkaSampler :这是jmeter Java采样器将消息发送到kafka。 Pepper-Box PlainText Config :此jmeter config元素基于设计的输入架构模板生成纯文本消息。 Pepper-Box序列化...

    kafka-consumer-producer:这是一个Kafka Spring Boot应用程序,可监听一个主题并将其写入另一个主题

    程序参数 -Dspring.application.name =消费者生产者服务 融合卡夫卡命令 融合本地服务开始 融合的本地服务站 融合局部破坏 生产者命令 kafka-topics.sh --zookeeper本地主机:2181-列表 kafka-topics.sh --create --...

    学习kafa的笔记,可以看看目录选择下载

    14. consumer的重点参数解析 15. 手动位移提交消费者偏移量(调用kafka api) 16. 其他重要参数 17. 补充工具—版本管理—git—gitee 18. api方式进行集群管理 19. kafka和flume的整合—kafkasource—kafkasink 20. ...

    Kafka 笔记

    文章目录Kafka架构名次解释Producer(生产者)命令使用脚本常用参数举例分区策略发送返回值幂等性Consumer(消费者)命令使用脚本常用参数举例分配策略Topic(主题)命令使用脚本常用参数举例Kafka高读写 ...

    kafka-samples

    #Kafka 样本 ##设置: 使用默认端口(2181)在本地主机上启动 ZooKeeper,并在顶层创建“kafka”节点 ... ./bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic page_visits --from-beginn

    Kafka-NodeJS-Client-Test:Kafka NodeJS客户端测试

    Kafka NodeJS客户端测试... KAFKACLUSTER=localhost node consumer.js [topic] [groupId] [partition] 所有参数都有默认值,因此您可以简单地运行: node consumer.js 它将连接到名为kafka-node-group的组中的本地主机

    springBoot2.x集成kafka

    最近在做一个物联网paas项目,实时设备预警结合kafka流式实时计算的特定,采用springBoot结合kafka来完成 kafka环境搭建——kafka基本命令及环境搭建 ...spring.kafka.consumer.key-deserializer=org.apache.kafka

    kafka-spring:kafa spring插件

    kafka-spring kafa spring插件 ...com.gewara.kafka.consumer.KafkaConsumerListenerContainer:消息监听器容器,用来存放消息监听器,管理监听器,包括监听器注册和初始化等方法 com.gewara.kafka.co

    docker-kafka-mirrormaker:运行 Kafka 的 MirrorMaker 的 Docker 容器

    您需要将消费者指向源集群的 ZooKeeper,生产者指向镜像集群的 ZooKeeper(或使用 broker.list 参数)。 容器需要传入以下环境变量: CONSUMER_ZK_CONNECT - 源的 Zookeeper 连接字符串,包括端口和 chroot。 ...

    kafdrop:Kafka Web UI

    Kafdrop – Kafka Web UI Kafdrop是一个Web UI,用于查看Kafka主题和浏览消费者组。 该工具显示诸如代理,主题,分区,使用者之类的信息,并允许您查看消息。 该项目是Kafdrop 2.x的重新启动,它被踢到JDK 11 +,...

    bluesky-kafka

    蓝天卡夫卡 Kafka与bluesky集成。 免费软件:3条款BSD许可证特征蓝天消费者消费者发行人远程分派器发行历史v0.3.0(2020-09-03) 添加了BlueskyConsumer 添加了MongoConsumer 为mongo_normalized_consumer.py添加了...

    大数据非关系型数据库课程设计基于Scala的交通拥堵预测源码+项目说明.zip

    **g)** 将数据发送至kafka并使用kafka console-consumer进行检测 ### **编写消费者** **思路:** **a**)新建子工程:tf_consumer **b)** 配置maven依赖 **c**) 配置redis并测试 **d**) 将刚才kafka....

    java8源码-SpringTree:互联网通用技术

    kafka rocketmq memcached es lamda fork/join ThreadPool 微信支付 Elasticsearch 1:微信支付,暂时只支持公众号网页(调用微信客户端)支付,不支持二维码扫码支付 2:Lamda 函数式编程基础,优化美丽的高性能代码 ...

Global site tag (gtag.js) - Google Analytics