main.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/Shopify/sarama"
  5. )
  6. // 基于sarama第三方库开发的kafka client
  7. func main() {
  8. config := sarama.NewConfig()
  9. // NoResponse 不确保消息发送成功
  10. // WaitForLocal 只确保leader发送成功
  11. // WaitForAll 确保leader发送成功和所有的副本都完成
  12. config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
  13. config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
  14. config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
  15. // 构造一个消息
  16. msg := &sarama.ProducerMessage{}
  17. msg.Topic = "test2"
  18. msg.Value = sarama.StringEncoder("hhhhhhhh啦啦啦")
  19. // 连接kafka
  20. client, err := sarama.NewSyncProducer([]string{"192.168.6.10:9092"}, config)
  21. if err != nil {
  22. fmt.Println("producer closed, err:", err)
  23. return
  24. }
  25. defer client.Close()
  26. // 发送消息
  27. pid, offset, err := client.SendMessage(msg)
  28. if err != nil {
  29. fmt.Println("send msg failed, err:", err)
  30. return
  31. }
  32. fmt.Printf("pid:%v offset:%v\n", pid, offset)
  33. }