3.2.1.1 消息消费的demo代码
消息消费的demo代码如下:
package com.tuozixuan.kafka.demo;
import java.util.Arrays; import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
publicclass ConsumerTest {
publicstaticvoid main(String[] args) {
String topicName = "test"; String groupId = "test-group";
Properties props = new Properties(); // 必须指定的属性 props.put("bootstrap.servers", "10.4.23.159:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("group.id", groupId);
// 可选属性 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); // 从最早的消息开始读取
// 创建consumer实例,订阅topic KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topicName));
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset:%d key:%s value:%s%n", record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }
|
构造consumer需要下面6个步骤:
- 构造一个java.util.Properties对象,至少指定bootstrap.servers、key.deserializer、value.deserializer和group.id的值。
- 使用上一步创建的Properties实例构造KafkaConsumer对象。
- 调用 KafkaConsumer.subscribe方法订阅consumer group感兴趣的topic列表。
- 循环调用KafkaConsumer.poll方法获取封装在ConsumerRecord的topic消息。
- 处理获取到的ConsumerRecord对象。
- 关闭KafkaConsumer。
3.2.1.2 构造Properties对象
在创建的Properties对象中,必须指定的参数有4个:bootstrap.servers、key.deserializer、value.deserializer和group.id的值。参数的具体含义见3.2.2 consumer主要参数
3.2.1.3 构造KafkaConsumer对象
创建KafkaConsumer实例代码如下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
创建KafkaConsumer也可同时指定key和value的deseralizer,若采用这种方式,则不需要在Properties中指定key.deserializer和value.deserializer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props,new StringDeserializer(),new StringDeserializer()); |
3.2.1.4 订阅topic列表
订阅topic的代码如下:
consumer.subscribe(Arrays.asList("topic1","topic2","topic3")); |
该方法还支持正则表达式。假设consumer group要消费所有以kafka开头的topic,则可以如此订阅:
consumer.subscribe(Pattern.compile("kafka.*"),new NoOpConsumerRebalanceListener()); |
注意:subscribe方法不是增量式的,后续的subscribe调用会完全覆盖之前的订阅语句。
3.2.1.5 获取消息
consumer使用KafkaConsumer.poll方法从订阅topic中并行地获取多个分区的消息。为了实现这一点,新版本的consumer的poll方法使用了类似linux的select I/O机制--所有相关的事件(包括rebalance、获取消息等)都发生在一个事件循环(event loop)中。这样consumer端只使用一个线程就能够完成所有类型的I/O操作。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); // 执行具体的消费逻辑 } } finally { consumer.close(); } |
上面代码中的1000代表超时设置(timeout),通常情况下如果consumer拿到了足够多的可用数据,那么它可以立即从该方法返回;但若当前没有足够多的数据可供返回,consumer会处于阻塞状态。这个超时参数即控制阻塞的最大时间。这里的1000表示即使没有那么多数据,consumer最多也不要等待超过1秒的时间。
若用户有定时方面的需求,那么根据需求设定timeout是一个不错的选择。否则,设定一个比较大的值甚至Integer.MAX_VALUE,是不错的建议。
3.2.1.6 处理ConsumerRecord对象
poll调用返回ConsumerRecord封装的Kafka消息,拿到这些消息后consumer可以处理自己的业务逻辑。
从Kafka consumer的角度而言,poll方法返回即认为consumer成功消费了消息。如果发现poll返回消息的速度过慢,那么可以调节相应的参数来提升poll方法的效率;若消息的业务级处理逻辑过慢,则应该考虑简化处理逻辑或者把处理逻辑放入单独的线程执行。
3.2.1.7 关闭consumer
consumer程序结束后一定要显式关闭consumer以释放KafkaConsumer运行过程中占用的各种系统资源(比如线程资源、内存、Socket连接等)。
KafkaConsumer.close():关闭consumer并最多等待30秒
KafkaConsumer.close(timeout):关闭consumer并最多等待给定的timeout秒。
相关推荐
Apache Kafka示例 ... 首先,Apache Kafka是一个分布式排队系统,这意味着所产生的消息将被发送出去,并进入称为主题的kafka的基本构建块中。 Apache Kafka的构建块: 主题:只是指定一个接收同类
该项目使用Makefile来模拟一个非常简单的构建管道,分为两个阶段-测试和部署。 请参阅此处的规范消费者示例: : 另请参见完整的 ,可以将其替换为“消费者”。 在下图中,我们将测试“产品API”,这是一个简单的...
本项目试图构建一个标准的kafka消费者,可以作为进一步扩展的基础。 该项目中使用 Dropwizard 框架使其作为微服务运行。 核心组件 该项目基于 SRP 设计原则构建了两个关键组件。 KafkaMessageConsumer - 处理 kafka...
vertxkafka模板 使用 Vert.x 编写的 Kafka 消费者和可选的 Kafka 生产者 使用 vertx 开发项目真的很有趣!...要启动项目,您可以在构建后使用 .bat 文件。 Workerverticle 的部署和卸载是通过 http-requests 实现的。
通过服务器发送事件实时发布Kafka主题此示例将向您展示如何轻松扩展 dropwizard-kafka-consumer 以构建一个微服务,该微服务可以通过 SSE 实时显示 kafka 主题的传入消息。扩展 dropwizard-kafka-consumer 为了扩展 ...
构建您自己的 Helm Chart 发布 先决条件 发布步骤 更改日志 介绍 Kafka Lag Exporter 可以轻松查看Apache Kafka消费者组的延迟(驻留时间)。 它可以在任何地方运行,但它提供了使用Prometheus和Grafana监控堆栈在...
Kafka是一个分布式流处理平台,被广泛用于构建实时数据管道,允许你流式地处理数据。 Kafka的主要特性包括: 1.分布式:Kafka是分布式的,可以跨多台机器同时存储和处理数据。 2.提供消息系统:Kafka可以作为消息中间件...
kafka-消费者 简单的 Scala Kafka 消费者 如何构建
使用AWS X-Ray跟踪基于Apache Kafka构建的消息传递应用程序的端到端性能。 许可证摘要 该示例代码在经过修改的MIT许可下可用。 请参阅许可文件。 使AWS X-Ray与Apache Kafka一起使用 在高层次上,您必须做三件事: ...
Consumer:消费者,即从Kafka topic订阅并消费消息的客户端。 Topic:主题,是特定类型的消息流。消息是字节流,由Topic来承载。Topic可以被分为若干个Partition,每个Partition都是一个有序的队列。 Partition:...
卡夫卡消费者Kafka-consumer是kafka的大数据消息阅读器。特征使用Kafka的高级/低级API连续接收Kafka数据。 在故障过程中,我们只需在每个批次间隔的开始确定要消耗的偏移量范围是多少。 从Kakfa读取与偏移范围相对应...
基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...
confluent-kafka-python提供了与所有兼容的高级Producer,Consumer和AdminClient 经纪人> = v0.8, 和。 客户是: 可靠-它是 (通过二进制车轮自动提供)的包装,已在各种生产场景中广泛部署。 它使用Java客户端,...
事件驱动的微服务,用于接收Twitter数据,这些服务使用Spring Cloud,Docker,Kafka,Prometheus,Grafana和Elasticsearch(ELK)构建! 去做: 检索特定主题的推文流。 将数据从Java流到Kafka主题 序列化Kafka主题...
li-apache-kafka-clients介绍li-apache-kafka-clients是在香草Apache Kafka客户端之上构建的包装Kafka客户端库。 Apache Kafka现在已成为非常流行的消息传递系统,并以其低延迟,高吞吐量和持久的消息传递而闻名。 ...
Trubka是内置的Kafka CLI工具,可为您提供所需的一切管理,查询和解决您的Kafka群集。 消耗来自Kafka的和纯文本消息。 将协议缓冲区和纯文本消息发布到Kafka。文献资料致谢特别感谢Joshua Humphries构建了引人入胜的...
kafka - Go 中 Kafka 的发布者和消费者 Kafka 是一个分布式发布订阅消息系统:( ) Go语言:( ) 分叉自: : 。 变化 4/13 - 从 apache 存储库合并... $GOPATH/bin/consumer -topic test -consumeforever Consu
使用Flask构建咖啡厅订单服务器和容器分发 使用烧瓶的咖啡订单,使用kafka的数据同步和更新,通过docker分发 ...docker run -d --network my-coffee-network --name kafka_consumer 1yangsh/kafka_consumer
Kafka_Spark_Integration 该存储库是一个有关如何使用Apache Kafka和Apache Spark流设置生产者和消费者的... Consumer.Java仅使用apache kafka构建。 sparkStreamingConsumer.Java使用Apache Spark的流功能构建。
它将事件发布到Kafka主题,然后由Kafka Consumer处理该事件。 概述 创建该代码库的目的是演示一个简单的后端应用程序,该应用程序使用Kafka进行异步任务,并使用Spring Boot构建。 入门 设置 先决条件 Java 8 运行...