3.2.2.1 bootstrap.servers
同生产者bootstrap.servers参数。
3.2.2.2 group.id
该参数指定的是consumer group的名字,它能够唯一标识一个consumer group。通常设置一个有业务意义的名字就可以了。
3.2.2.3 key.deserializer
consumer代码从broker端获取的任何消息都是字节数组的格式,因此消息的每个组件都要执行相应的解序列化操作才能“还原”成原来的对象格式。这个参数就是为消息的key做解序列化的。该参数必须是实现org.apache.kafka.common.serialization.Deserializer接口的Java类的全限定名称。Kafka默认为绝大部分的初始类型(primitive type)提供现成的解序列化器。StringDeserializer会将接收到的字节数组转换成UTF-8编码的字符串。consumer支持自定义的deserializer。不论consumer消费的消息是否指定了key,consumer都必须要设置该参数,否则程序会抛出ConfigException。
3.2.2.4 value.deserializer
与value.deserializer类似,该参数用来对消息体(即消息value)进行解序列化,从而把消息“还原”会原来的对象类型。
3.2.2.5 session.timeout.ms
session.timeout.ms是consumer group检测组内成员发送崩溃的时间。这个参数还有另外一重含义:consumer消息处理逻辑的 最大时间,倘若consumer两次poll之间的间隔超过了该参数所设置的阀值,那么coordinator(消息组协调者)就会认为这个consumer已经追不上组内其他成员的消费进度了,因此会将该consumer踢出组,该consumer负责的分区也会被分配给其他consumer。在最好的情况下,这会导致不必要的rebalance,因为consumer需要重新加入group。更糟的是,对于那些在被踢出group后处理的消息,consumer都无法提交位移,这就意味着这些消息在rebalance之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么consumer甚至无法执行任何消费,除非用户重新调整参数。
0.10.1.0版本对该参数的含义进行了拆分。在该版本及以后的版本中,session.timeout.ms参数被明确为“coordinator检测失败的时间”。实际使用中可以为该参数设置一个较小的值使coordinator能够更快第检查consumer崩溃的情况,从而更快地开启rebalance,避免造成更大的消息滞后(consumer lag),目前该参数的默认值是10秒。
3.2.2.6 max.poll.interval.ms
这个参数就是用来设置消息处理逻辑的最大时间的。通过将该参数设置成稍大于实际的逻辑处理时间再结合较低的session.timeout.ms参数值,consumer group既实现了快速的consumer崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的rebalance。
3.2.2.7 auto.offset.reset
指定了无位移信息或位移越界(即consumer要消费的消息的位移不在当前消息日志的合理区间范围)时Kafka的应对策略。
目前该参数有如下3个可能的取值:
earliest: 指定从最早的位移开始消费。注意这里最早的位移不一定就是0。
latest: 指定从最新处位移开始消费。
none:指定如果未发现位移信息或位移越界,则抛出异常。该值在真实业务场景中使用甚少。
3.2.2.8 enable.auto.commit
该参数指定consumer是否自动提交位移。若设置为true,则consumer在后台自动提交位移。否则,用户需要手动提交位移。对于有较强“精确处理一次”语义需求的用户来说,最好将该参数设置为false,由用户自行处理位移提交问题。
3.2.2.9 fetch.max.bytes
它指定了consumer端单次获取数据的最大字节数。若实际业务消息很大,则必须要设置该参数为一个较大的值,否则consumer将无法消费这些消息。
3.2.2.10 max.poll.records
该参数控制单次poll调用返回的最大消息数。比较极端的做法是设置该参数为1,那么每次poll只会返回1条消息。如果用户发现consumer端的瓶颈在poll速度太慢,可以适当地增加该参数的值。如果用户的消息处理逻辑很清理,默认的500条消息通常不能满足实际的消息处理速度。
3.2.2.11 heartbeat.interval.ms
当coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式“塞进”consumer心跳请求的response中,这样其他成员拿到response后才能知道它需要重新加入group。显然这个过程越快越好,而heartbeat.interval.ms就是用来做这件事情的。
比较推荐的做法是设置一个比较低的值,让group下的其他consumer成员能够更快地感知新一轮rebalance开启了。注意,该值必须小于session.timeout.ms!毕竟如果consumer在session.timeout.ms这段时间内都不发送心跳,coordinator就会认为它已经dead,因此也就没有必要让它知晓coordinator的决定了。
3.2.2.12 connections.max.idle.ms
Kafka会定期地关闭空闲Socket连接导致下次consumer处理请求时需要重新创建连向broker的Socket连接。当前默认值是9分钟,如果用户实际环境中不在乎这些Socket资源开销,比较推荐设置该参数值为-1,既不要关闭这些空闲连接。
相关推荐
使用场景:生产环境海量数据,用kafka-console-consumer 消费kafka某时间段消息用于分析问题,生产环境海量数据,用kafka-console-consumer.sh只能消费全量,文件巨大,无法grep。 代码来源于博主:BillowX_ ,感谢...
主要介绍了详解Spring Kafka中关于Kafka的配置参数,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 单机连通性能测试 运行producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 运行...
kafka-消费者组工作版本参数如:zookeeper.lacolhost.com:2181 group1 结果1
Pepper-Box包含四个主要组件 PepperBoxKafkaSampler :这是jmeter Java采样器将消息发送到kafka。 Pepper-Box PlainText Config :此jmeter config元素基于设计的输入架构模板生成纯文本消息。 Pepper-Box序列化...
程序参数 -Dspring.application.name =消费者生产者服务 融合卡夫卡命令 融合本地服务开始 融合的本地服务站 融合局部破坏 生产者命令 kafka-topics.sh --zookeeper本地主机:2181-列表 kafka-topics.sh --create --...
14. consumer的重点参数解析 15. 手动位移提交消费者偏移量(调用kafka api) 16. 其他重要参数 17. 补充工具—版本管理—git—gitee 18. api方式进行集群管理 19. kafka和flume的整合—kafkasource—kafkasink 20. ...
文章目录Kafka架构名次解释Producer(生产者)命令使用脚本常用参数举例分区策略发送返回值幂等性Consumer(消费者)命令使用脚本常用参数举例分配策略Topic(主题)命令使用脚本常用参数举例Kafka高读写 ...
#Kafka 样本 ##设置: 使用默认端口(2181)在本地主机上启动 ZooKeeper,并在顶层创建“kafka”节点 ... ./bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka --topic page_visits --from-beginn
Kafka NodeJS客户端测试... KAFKACLUSTER=localhost node consumer.js [topic] [groupId] [partition] 所有参数都有默认值,因此您可以简单地运行: node consumer.js 它将连接到名为kafka-node-group的组中的本地主机
最近在做一个物联网paas项目,实时设备预警结合kafka流式实时计算的特定,采用springBoot结合kafka来完成 kafka环境搭建——kafka基本命令及环境搭建 ...spring.kafka.consumer.key-deserializer=org.apache.kafka
kafka-spring kafa spring插件 ...com.gewara.kafka.consumer.KafkaConsumerListenerContainer:消息监听器容器,用来存放消息监听器,管理监听器,包括监听器注册和初始化等方法 com.gewara.kafka.co
您需要将消费者指向源集群的 ZooKeeper,生产者指向镜像集群的 ZooKeeper(或使用 broker.list 参数)。 容器需要传入以下环境变量: CONSUMER_ZK_CONNECT - 源的 Zookeeper 连接字符串,包括端口和 chroot。 ...
Kafdrop – Kafka Web UI Kafdrop是一个Web UI,用于查看Kafka主题和浏览消费者组。 该工具显示诸如代理,主题,分区,使用者之类的信息,并允许您查看消息。 该项目是Kafdrop 2.x的重新启动,它被踢到JDK 11 +,...
蓝天卡夫卡 Kafka与bluesky集成。 免费软件:3条款BSD许可证特征蓝天消费者消费者发行人远程分派器发行历史v0.3.0(2020-09-03) 添加了BlueskyConsumer 添加了MongoConsumer 为mongo_normalized_consumer.py添加了...
**g)** 将数据发送至kafka并使用kafka console-consumer进行检测 ### **编写消费者** **思路:** **a**)新建子工程:tf_consumer **b)** 配置maven依赖 **c**) 配置redis并测试 **d**) 将刚才kafka....
kafka rocketmq memcached es lamda fork/join ThreadPool 微信支付 Elasticsearch 1:微信支付,暂时只支持公众号网页(调用微信客户端)支付,不支持二维码扫码支付 2:Lamda 函数式编程基础,优化美丽的高性能代码 ...