我是靠谱客的博主 机灵黄蜂,最近开发中收集的这篇文章主要介绍kafka生产消息几种方式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

kafka生产消息几种方式

    • 一、发送并忘记
    • 二、同步发送
    • 三、异步发送
    • 四、多线程方式 (由此想到java线程池的几种方式)
    • 五、Kafka Producer 常用配置(kafka-1.1.0)

一、发送并忘记

把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

package com.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TestKafka {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.21.12.152:9092,172.21.12.228:9092,172.21.12.229:9092");
        props.put("acks", "0");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("max.block.ms", "1000");

        Producer producer = new KafkaProducer<>(props);

        int i = 0;
        while(i<100000) {
            // 创建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可选的
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", i+"");

            // 只管发送消息,不管是否发送成功
            producer.send(record);

            //同步
            /*try {
                RecordMetadata recordMetadata = (RecordMetadata) producer.send(record).get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }*/
            i++;
        }
        long endTime = System.currentTimeMillis();

        System.out.println(endTime-startTime);
        producer.close();
    }
}

二、同步发送

使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待(会返回元数据或者抛出异常),
就可以知道消息是否发送成功。

package com.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TestKafka {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.21.12.152:9092,172.21.12.228:9092,172.21.12.229:9092");
        props.put("acks", "0");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("max.block.ms", "1000");

        Producer producer = new KafkaProducer<>(props);

        int i = 0;
        while(i<100000) {
            // 创建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可选的
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", i+"");

            // 只管发送消息,不管是否发送成功
            //producer.send(record);

            //同步
            try {
                RecordMetadata recordMetadata = (RecordMetadata) producer.send(record).get();
                StringBuilder sb = new StringBuilder();
                sb.append("record [").append("] has been sent successfully!").append("n")
                        .append("send to partition ").append(recordMetadata.partition())
                        .append(", offset = ").append(recordMetadata.offset());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            i++;
        }
        long endTime = System.currentTimeMillis();

        System.out.println(endTime-startTime);
        producer.close();
    }
}

三、异步发送

大多数时候,我们并不需要等待响应——尽管 Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。

不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志等,这样的情况下可以使用异步发送消息的方式,调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

package com.test;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class TestKafka2 {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        Properties props = new Properties();
        props.put("bootstrap.servers", "172.21.12.152:9092");
        props.put("acks", "0");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("max.block.ms", "1000");


        Producer<String, String> producer = new KafkaProducer<>(props);

        int i = 0;
        while(i<10) {
            final ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", i+"");
            producer.send(record, new Callback() {

                public void onCompletion(RecordMetadata metadata, Exception e) {
                    // 如果发送消息成功,返回了 RecordMetadata
                    /*try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }*/
                    if(metadata != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("message has been sent successfully! ")
                                .append("send to partition ").append(metadata.partition())
                                .append(", offset = ").append(metadata.offset());
                        System.out.println(sb.toString());
                    }
                    // 如果消息发送失败,抛出异常
                    if(e != null) {
                        System.out.println("exception occurs when sending message: " + e +" "+record.value());
                    }

                }
            });
            //System.out.println(i);
            i++;
        }
        long endTime = System.currentTimeMillis();

        System.out.println(endTime-startTime);
        producer.close();
    }
}

四、多线程方式 (由此想到java线程池的几种方式)

在数据量比较大同时对发送消息的顺序没有严格要求时,可以使用多线程的方式发送数据,实现多线程生产者有两种方式:1. 实例化一个 KafkaProducer 对象运行多个线程共享该对象发送消息;2. 实例化多个 KafkaProducer 对象。
由于 Kafka Producer 是线程安全的,所以多个线程共享一个 Kafka Producer 对象在性能上要好很多。

package com.test;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiProducer {
    private static final int THREADS_NUMS = 2;

    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(5);

        Properties props = new Properties();
        props.put("bootstrap.servers", "172.21.12.152:9092,172.21.12.228:9092,172.21.12.229:9092");
        props.put("acks", "0");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("max.block.ms", "1000");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record;
       //Thread.currentThread().setDaemon(true);
        try {
            for(int i = 0; i < 10000; i++) {
                record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello " + i);
                executor.execute(new KafkaProducerThread(producer, record));
            }
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getId()+Thread.currentThread().getName());
        }catch (Exception e) {
            System.out.println("exception occurs when sending message: " + e);
        }finally {
            producer.close();
            executor.shutdown();
        }
    }
}

package com.test;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import sun.awt.windows.ThemeReader;

public class KafkaProducerThread implements Runnable {
    private KafkaProducer<String, String> producer;
    private ProducerRecord<String, String> record;

    public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception exception) {
               /* System.out.println(record.value());
                try {
                    Thread.sleep((long) (1000*Math.random()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }*/
                if(exception != null) {
                    System.out.println("exception occurs when sending message: " + exception);
                }
                if(metadata != null) {
                    StringBuilder result = new StringBuilder();
                    result.append("message[" + record.value() + "] has been sent successfully! ")
                            .append("send to partition ").append(metadata.partition())
                            .append(", offset = ").append(metadata.offset());
                    System.out.println(Thread.currentThread().getId()+" "+ Thread.currentThread().getName() +" "+ result.toString());
                }
            }
        });
    }
}

五、Kafka Producer 常用配置(kafka-1.1.0)

(1) acks
类型:string
默认值:1
可设置值:[all, -1, 0, 1]
重要性:高
说明:
0:生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。
-1:作用与"all"是一样的。
(2) buffer.memory
类型:long
默认值:33554432(32M)
可设置值:[0,…]
重要性:高
说明:该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。
这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置 max.block.ms (类型:long,默认值:60000(1分钟),可设置值:[0,…],重要性:中等)参数。表示在抛出异常之前可以阻塞的时间。
(3) compression.type
类型:string
默认值:none
可设置值:[none, gzip, snappy, lz4]
重要性:高
说明:该参数可以指定消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
(4) retries
类型:int
默认值:0
可设置值:[0,…,2147483647]
重要性:高
说明:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms(类型:long,默认值:100, 可设置值:[0,…],重要性:低) 参数来改变这个时间间隔。
建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如"消息太大"错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。
(5) batch.size
类型:int
默认值:16384(16K)
可设置值:[0,…]
重要性:中等
说明:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。
(6) linger.ms
类型:long
默认值:0
可设置值:[0,…]
重要性:中等
说明:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。
(7) max.in.flight.requests.per.connection
类型:int
默认值:5
可设置值:[1,…]
重要性:低
说明:该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。
把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
(8) max.request.size
类型:int
默认值:1048576
可设置值:[0,…]
重要性:中等
说明:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes(类型:int,默认值:1000012,大约0.95M,可设置值:[0,…],重要性:高)),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。
(9) receive.buffer.bytes 和 send.buffer.bytes
receive.buffer.bytes

类型:int
默认值:32768(32K)
可设置值:[-1,…]
重要性:中等
send.buffer.bytes

类型:int
默认值:131072(128K)
可设置值:[-1,…]
重要性:中等
说明:这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

(10) client.id
类型:string
默认值:""
可设置值:任意字符串
重要性:中等
说明:该参数可以是任意的字符串,服务器会用它来识别消息的来源。
(11) request.timeout.ms
类型:int
默认值:30000
可设置值:[0,…]
重要性:中等
说明:该参数指定了生产者在发送数据时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。[metadata.fetch.timeout.ms] and [timeout.ms] have been removed. They were initially deprecated in Kafka 0.9.0.0.
(12) max.block.ms
类型:long
默认值:60000
可设置值:[0,…]
重要性:中等
说明:该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。
(13) connections.max.idle.ms
类型:long
默认值:540000
可设置值:[0,…]
重要性:中等
说明:关闭空闲连接的等待时间,检测到空闲的连接后,默认等待9分钟才会关闭这个连接。
(14) metadata.max.age.ms
类型:long
默认值:300000
可设置值:[0,…]
重要性:低
说明:更新元数据的时间间隔,在等待该参数配置的时间后,即使 producer 没有发现任何 partition 或 leader 的变化,也会强制刷新元数据。
(15) reconnect.backoff.ms
类型:long
默认值:50
可设置值:[0,…]
重要性:低
说明:尝试重新连接 broker 的时间间隔。
(16) reconnect.backoff.max.ms
类型:long
默认值:1000
可设置值:[0,…]
重要性:低
说明:如果重新连接的时间累积到达该参数的配置时间还没有连接到 broker,那么宣告连接失败。

这里是引用
作者:CoderJed
链接:https://www.jianshu.com/p/6e6c8ea297ca
來源:简书

最后

以上就是机灵黄蜂为你收集整理的kafka生产消息几种方式的全部内容,希望文章能够帮你解决kafka生产消息几种方式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部