我是靠谱客的博主 漂亮红牛,最近开发中收集的这篇文章主要介绍golang操作kafka,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

kafka的应用场景

  1. 异步处理,把菲关系流程异步话,提高系统的响应时间和健壮性。
    在这里插入图片描述
    在这里插入图片描述
  2. 应用节藕,通过消息队列
    在这里插入图片描述
    在这里插入图片描述
  3. 流量消峰
    在这里插入图片描述

golang调用kafka

生产者producer.go
package main

import ("fmt"
	"github.com/Shopify/sarama"
)

func main() {
	fmt.Printf("producer_testn")

	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true
	config.Version = sarama.V0_11_0_2
	producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:32769"}, config)
	if err != nil {
		fmt.Printf("producer_test create producer error :%sn", err.Error())
		return
	}

	defer producer.AsyncClose()

	// send message
	msg := &sarama.ProducerMessage{
		Topic: "kafka_go_test",
		Key:   sarama.StringEncoder("go_test"),
	}
	value := "this is message"
	for {
		fmt.Scanln(&value)
		msg.Value = sarama.ByteEncoder(value)
		fmt.Printf("input [%s]n", value)

		// send to chain
		producer.Input() <- msg

		select {
		case suc := <-producer.Successes():
			fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
		case fail := <-producer.Errors():
			fmt.Printf("err: %sn", fail.Err.Error())
		}
	}
}
消费者consumer.go
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	fmt.Printf("consumer_test")

	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_11_0_2

	// consumer
	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:32769"}, config)
	if err != nil {
		fmt.Printf("consumer_test create consumer error %sn", err.Error())
		return
	}

	defer consumer.Close()
	partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
	if err != nil {
		fmt.Printf("try create partition_consumer error %sn", err.Error())
		return
	}
	defer partition_consumer.Close()

	for {
		select {
		case msg := <-partition_consumer.Messages():
			fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %sn",
				msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
		case err := <-partition_consumer.Errors():
			fmt.Printf("err :%sn", err.Error())
		}
	}
}
元数据metadata.go
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	fmt.Printf("metadata testn")

	config := sarama.NewConfig()
	config.Version = sarama.V0_11_0_2

	client, err := sarama.NewClient([]string{"127.0.0.1:32769"}, config)
	if err != nil {
		fmt.Printf("metadata_test try create client err :%sn", err.Error())
		return
	}

	defer client.Close()

	// get topic set
	topics, err := client.Topics()
	if err != nil {
		fmt.Printf("try get topics err %sn", err.Error())
		return
	}
	fmt.Printf("topics(%d):n", len(topics))

	for _, topic := range topics {
		fmt.Println(topic)
	}

	// get broker set
	brokers := client.Brokers()
	fmt.Printf("broker set(%d):n", len(brokers))
	for _, broker := range brokers {
		fmt.Printf("%sn", broker.Addr())
	}
}

最后

以上就是漂亮红牛为你收集整理的golang操作kafka的全部内容,希望文章能够帮你解决golang操作kafka所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(52)

评论列表共有 0 条评论

立即
投稿
返回
顶部