kafka消息发送几种方式

news/2024/9/19 13:59:11 标签: kafka, 分布式

同步发送 or 异步发送

       消息发送根据是否需要处理发送的结果分为同步发送、异步发送。

同步发送:等待发送结果返回,这种方式是可靠的,因为异常能及时处理,但同步发送需要阻塞等待一条消息发送完才处理下一条,吞吐量差。


异步发送:发送是异步的,不关心发送的结果,吞吐量最高,但可能存在发送失败的情况。

    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送结果。异步发送则忽略send 返回值。

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
  try {
      SendResult sendResult = future.get();
  } catch (InterruptedException e) {
      e.printStackTrace();
  } catch (ExecutionException e) {
      e.printStackTrace();
  }

发送完成回调

有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且保证发送失败能被监控到。

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if(e != null){
    System.out.println("send error ");
}else {
    System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
}

}
});

发送异常

       有些发送异常可以通过重试几次后解决,比如网络异常,对于有些异常比如消息太大超出kafka配置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以配置重试次数

spring.kafka.producer.retries=10

SpringBoot 集成简单介绍

     参考上篇文章SpringBoot 集成配置(pom依赖、application配置),简单讲解SpringBoot 几个重要自动装配类。

KafkaAutoConfiguration

KafkaAutoConfiguration给我们自动配置了几个类

KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是使用的KafkaProducer发送消息的。

ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer

使用方式也很简单,由于直接KafkaAutoConfiguration已经定义了相关Bean, 使用时注入Bean即可

图片

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private ProducerFactory producerFactory;

具体代码

同步发送、异步发送的方式直接使用 kafkaTemplate即可完成,同步发送结果处理:这里简单的打印出消息的topic partition offset 等信息如下图

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
SendResult sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
 

                                   

图片

发送回调kafkaTemplate没有对应api , 需要通过Producer发送,我们通过producerFactory获取。

ProducerRecord record = new ProducerRecord(topic,content);
    Producer producer = producerFactory.createProducer();
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

            if(e != null){
                System.out.println("send error ");
            }else {
                System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
            }
        }
    });


http://www.niftyadmin.cn/n/5665650.html

相关文章

人类行为识别系统源码分享

人类行为识别检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer Vis…

mysql学习教程,从入门到精通,SQL 删除数据(DELETE 语句)(18)

1、SQL 删除数据&#xff08;DELETE 语句&#xff09; 在编写SQL中的DELETE语句时&#xff0c;需要非常小心&#xff0c;因为一旦执行&#xff0c;被删除的数据就无法恢复了&#xff08;除非你有备份&#xff09;。DELETE语句用于从数据库表中移除一条或多条记录。这里&#x…

十七,Spring Boot 整合 MyBatis 的详细步骤(两种方式)

十七&#xff0c;Spring Boot 整合 MyBatis 的详细步骤(两种方式) 文章目录 十七&#xff0c;Spring Boot 整合 MyBatis 的详细步骤(两种方式)1. Spring Boot 配置 MyBatis 的详细步骤2. 最后&#xff1a; MyBatis 的官方文档&#xff1a;https://mybatis.p2hp.com/ 关于 MyBa…

LeetCode41. 缺失的第一个正数(2024秋季每日一题 20)

给你一个未排序的整数数组 nums &#xff0c;请你找出其中没有出现的最小的正整数。 请你实现时间复杂度为 O(n) 并且只使用常数级别额外空间的解决方案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,0] 输出&#xff1a;3 解释&#xff1a;范围 [1,2] 中的数字都在数组…

通往AGI的皇冠:逻辑推理能力

文章来自新浪微博机器学习团队 AI Lab 负责人张俊林&#xff0c;OpenAI发布新模型o1之后的一些观点&#xff0c;很有启发&#xff1a; GPT 4o本质上是要探索不同模态相互融合的大一统模型应该怎么做的问题&#xff0c;对于提升大模型的智力水平估计帮助不大&#xff1b;而o1本…

Vue3.0组合式API:使用defineEmits()实现子组件向父组件传递数据

1、使用 defineEmits() 函数 父组件通过使用 Prop 为子组件传递数据&#xff0c;但如果子组件要把数据传递回去&#xff0c;就需要使用自定义事件来实现。父组件可以通过 v-on 指令&#xff08;简写形式“”&#xff09;监听子组件实例的自定义事件&#xff0c;而子组件可以通…

线程池的状态

线程池的状态分为&#xff1a;Running(运行状态)、Shutdown(关闭状态)、Stop(停止状态)、Tidying(整理状态)、Terminated(终止状态)。 Running(运行状态)&#xff1a;线程池被创建时&#xff0c;就是Running状态&#xff0c;线程池中的任务数位0。 该状态会接受新任务&#xf…

Docker指令学习1

docker指令 查看镜像列表&#xff1a; docker images | docker image ls 删除单个镜像: docker rmi <image_name>: 强制删除镜像&#xff1a; docker rmi -f <image_id> 如果镜像正在被某些容器使用&#xff0c;普通删除命令会失败。使用 -f 选项强制删除镜像 注意…