| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- package main
- import (
- "fmt"
- "github.com/IBM/sarama"
- "sync"
- )
- func consumer() {
- var wg sync.WaitGroup
- consumer, err := sarama.NewConsumer([]string{"192.168.6.10:9092"}, nil)
- if err != nil {
- fmt.Println("Failed to start consumer: %s", err)
- return
- }
- partitionList, err := consumer.Partitions("test_topic") //获得该topic所有的分区
- if err != nil {
- fmt.Println("Failed to get the list of partition:, ", err)
- return
- }
- for partition := range partitionList {
- pc, err := consumer.ConsumePartition("test_topic", int32(partition), sarama.OffsetOldest) // OffsetNewest从最新的地方开始 ,OffsetOldest是从头开始,一般选择OffsetNewest
- if err != nil {
- fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
- return
- }
- wg.Add(1)
- go func(sarama.PartitionConsumer) { //为每个分区开一个go协程去取值
- for msg := range pc.Messages() { //阻塞直到有值发送过来,然后再继续等待
- fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
- }
- defer pc.AsyncClose()
- wg.Done()
- }(pc)
- }
- wg.Wait()
- }
- func main() {
- consumer()
- }
- func consumeGrop() {
- }
|