kafka发送事件的几种方式

10,741次阅读
没有评论

共计 3110 个字符,预计需要花费 8 分钟才能阅读完成。

java
复制代码

package com.wanfeng.producer;

import com.wanfeng.model.GirlFriend;
import jakarta.annotation.Resource;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;


/**
 * 作者:晚枫
 * 时间:2024/9/1 8:57
 */
@Component
public class EventProducer {

    @Resource
    private KafkaTemplate kafkaTemplate;

    public void sendEvent() {
        // 参数一:kafka 主题名字
        // 参数二:需要发送的事件
        kafkaTemplate.send("hello", "喜欢欣宝");
    }

    public void sendEvent2() {Message message = MessageBuilder.withPayload("超级喜欢欣宝")
            // 在 header 中放 topic 的名字
            .setHeader(KafkaHeaders.TOPIC, "hello")
            .build();
        kafkaTemplate.send(message);
    }

    public void sendEvent3() {
        // 可以在头部带一些自定义信息
        Headers headers = new RecordHeaders();
        headers.add("生日", "20010424".getBytes(StandardCharsets.UTF_8));
        // String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers ProducerRecord message = new ProducerRecord("hello", 0, System.currentTimeMillis(), "姓名", "爱欣宝", headers); kafkaTemplate.send(message); } public void sendEvent4() { // String topic, Integer partition, Long timestamp, K key, V data kafkaTemplate.send("hello", 0, System.currentTimeMillis(), "name", "爱欣宝"); } public void sendEvent5() { // Integer partition, Long timestamp, K key, V data kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "address", "广东"); } public void sendEvent6() {CompletableFuture> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝"); try { // 阻塞等待的方式拿结果 SendResult stringStringSendResult = sendResultCompletableFuture.get(); if (stringStringSendResult.getRecordMetadata() != null) {System.out.println("消息发送成功:" + stringStringSendResult.getRecordMetadata().toString()); } System.out.println("producerRecord:" + stringStringSendResult.getProducerRecord()); } catch (Exception e) {throw new RuntimeException(e); } } public void sendEvent7() {CompletableFuture> sendResultCompletableFuture = kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "name", "欣宝宝"); // 非阻塞方式拿结果 sendResultCompletableFuture.thenAccept(sendResult -> {if (sendResult.getRecordMetadata() != null) {System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString()); } System.out.println("producerRecord:" + sendResult.getProducerRecord()); }); } public void sendEvent8() {GirlFriend myGirlFriend = GirlFriend.builder().name("欣宝宝").birthday("2001-04-24").build(); kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "girlFriend", myGirlFriend); } }

在发送对象类型数据的时候,需要更换序列化方式,因为生产者的值默认使用字符串序列化方式,当我们发送对象类型数据的时候就会报错,所以我们需要更换序列化方式,在 application.yml 配置文件中配置即可

yaml
复制代码

spring:
  application:
    # 应用名称
    name: kafka-01-base
  kafka:
    # kafka 连接地址
    bootstrap-servers: ip:port
      # consumer:
      # 让消费者从最早的事件开始读取
      # auto-offset-reset: earliest
    template:
      # 使用模版配置默认 topic
      default-topic: hello
    producer:
      # 生产者 value 的序列化方式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

文章来源: kafka 发送事件的几种方式

    正文完
     0
    Yojack
    版权声明:本篇文章由 Yojack 于2024-09-19发表,共计3110字。
    转载说明:
    1 本网站名称:优杰开发笔记
    2 本站永久网址:https://yojack.cn
    3 本网站的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,请联系站长进行删除处理。
    4 本站一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
    5 本站所有内容均可转载及分享, 但请注明出处
    6 我们始终尊重原创作者的版权,所有文章在发布时,均尽可能注明出处与作者。
    7 站长邮箱:laylwenl@gmail.com
    评论(没有评论)