概述
kafka的应用场景
- 异步处理,把菲关系流程异步话,提高系统的响应时间和健壮性。
- 应用节藕,通过消息队列
- 流量消峰
golang调用kafka
生产者producer.go
package main
import ("fmt"
"github.com/Shopify/sarama"
)
func main() {
fmt.Printf("producer_testn")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V0_11_0_2
producer, err := sarama.NewAsyncProducer([]string{"127.0.0.1:32769"}, config)
if err != nil {
fmt.Printf("producer_test create producer error :%sn", err.Error())
return
}
defer producer.AsyncClose()
// send message
msg := &sarama.ProducerMessage{
Topic: "kafka_go_test",
Key: sarama.StringEncoder("go_test"),
}
value := "this is message"
for {
fmt.Scanln(&value)
msg.Value = sarama.ByteEncoder(value)
fmt.Printf("input [%s]n", value)
// send to chain
producer.Input() <- msg
select {
case suc := <-producer.Successes():
fmt.Printf("offset: %d, timestamp: %s", suc.Offset, suc.Timestamp.String())
case fail := <-producer.Errors():
fmt.Printf("err: %sn", fail.Err.Error())
}
}
}
消费者consumer.go
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
fmt.Printf("consumer_test")
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V0_11_0_2
// consumer
consumer, err := sarama.NewConsumer([]string{"127.0.0.1:32769"}, config)
if err != nil {
fmt.Printf("consumer_test create consumer error %sn", err.Error())
return
}
defer consumer.Close()
partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
if err != nil {
fmt.Printf("try create partition_consumer error %sn", err.Error())
return
}
defer partition_consumer.Close()
for {
select {
case msg := <-partition_consumer.Messages():
fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %sn",
msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
case err := <-partition_consumer.Errors():
fmt.Printf("err :%sn", err.Error())
}
}
}
元数据metadata.go
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
fmt.Printf("metadata testn")
config := sarama.NewConfig()
config.Version = sarama.V0_11_0_2
client, err := sarama.NewClient([]string{"127.0.0.1:32769"}, config)
if err != nil {
fmt.Printf("metadata_test try create client err :%sn", err.Error())
return
}
defer client.Close()
// get topic set
topics, err := client.Topics()
if err != nil {
fmt.Printf("try get topics err %sn", err.Error())
return
}
fmt.Printf("topics(%d):n", len(topics))
for _, topic := range topics {
fmt.Println(topic)
}
// get broker set
brokers := client.Brokers()
fmt.Printf("broker set(%d):n", len(brokers))
for _, broker := range brokers {
fmt.Printf("%sn", broker.Addr())
}
}
最后
以上就是漂亮红牛为你收集整理的golang操作kafka的全部内容,希望文章能够帮你解决golang操作kafka所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复