main.go 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/IBM/sarama"
  5. "sync"
  6. )
  7. func consumer() {
  8. var wg sync.WaitGroup
  9. consumer, err := sarama.NewConsumer([]string{"192.168.6.10:9092"}, nil)
  10. if err != nil {
  11. fmt.Println("Failed to start consumer: %s", err)
  12. return
  13. }
  14. partitionList, err := consumer.Partitions("test_topic") //获得该topic所有的分区
  15. if err != nil {
  16. fmt.Println("Failed to get the list of partition:, ", err)
  17. return
  18. }
  19. for partition := range partitionList {
  20. pc, err := consumer.ConsumePartition("test_topic", int32(partition), sarama.OffsetOldest) // OffsetNewest从最新的地方开始 ,OffsetOldest是从头开始,一般选择OffsetNewest
  21. if err != nil {
  22. fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
  23. return
  24. }
  25. wg.Add(1)
  26. go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值
  27. for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待
  28. fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  29. }
  30. defer pc.AsyncClose()
  31. wg.Done()
  32. }(pc)
  33. }
  34. wg.Wait()
  35. }
  36. func main() {
  37. consumer()
  38. }
  39. func consumeGrop() {
  40. }