| 12345678910111213141516171819202122232425262728293031323334353637 |
- package main
- import (
- "fmt"
- "github.com/Shopify/sarama"
- )
- // 基于sarama第三方库开发的kafka client
- func main() {
- config := sarama.NewConfig()
- // NoResponse 不确保消息发送成功
- // WaitForLocal 只确保leader发送成功
- // WaitForAll 确保leader发送成功和所有的副本都完成
- config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
- config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
- config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
- // 构造一个消息
- msg := &sarama.ProducerMessage{}
- msg.Topic = "test2"
- msg.Value = sarama.StringEncoder("hhhhhhhh啦啦啦")
- // 连接kafka
- client, err := sarama.NewSyncProducer([]string{"192.168.6.10:9092"}, config)
- if err != nil {
- fmt.Println("producer closed, err:", err)
- return
- }
- defer client.Close()
- // 发送消息
- pid, offset, err := client.SendMessage(msg)
- if err != nil {
- fmt.Println("send msg failed, err:", err)
- return
- }
- fmt.Printf("pid:%v offset:%v\n", pid, offset)
- }
|