package main import ( "fmt" "github.com/IBM/sarama" "sync" ) func consumer() { var wg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"192.168.6.10:9092"}, nil) if err != nil { fmt.Println("Failed to start consumer: %s", err) return } partitionList, err := consumer.Partitions("test_topic") //获得该topic所有的分区 if err != nil { fmt.Println("Failed to get the list of partition:, ", err) return } for partition := range partitionList { pc, err := consumer.ConsumePartition("test_topic", int32(partition), sarama.OffsetOldest) // OffsetNewest从最新的地方开始 ,OffsetOldest是从头开始,一般选择OffsetNewest if err != nil { fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err) return } wg.Add(1) go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值 for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待 fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) } defer pc.AsyncClose() wg.Done() }(pc) } wg.Wait() } func main() { consumer() } func consumeGrop() { }