main.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/IBM/sarama"
  12. )
  13. func main() {
  14. brokers := strings.Split("192.168.6.10:9092", ",")
  15. topic := "test_topic"
  16. groupID := "group_one"
  17. // 创建生产者
  18. producer, err := createProducer(brokers)
  19. if err != nil {
  20. log.Fatal("无法创建生产者:", err)
  21. }
  22. defer func() {
  23. if err := producer.Close(); err != nil {
  24. log.Fatal("无法关闭生产者:", err)
  25. }
  26. }()
  27. // 发送消息
  28. produceMessage(producer, topic, "hello world")
  29. // 创建消费者
  30. consumer, err := createConsumer(brokers, groupID)
  31. if err != nil {
  32. log.Fatal("无法创建消费者:", err)
  33. }
  34. defer func() {
  35. if err := consumer.Close(); err != nil {
  36. log.Fatal("无法关闭消费者:", err)
  37. }
  38. }()
  39. topics := []string{topic}
  40. wg := &sync.WaitGroup{}
  41. wg.Add(1)
  42. go func() {
  43. defer wg.Done()
  44. consumeMessages(consumer, topics)
  45. }()
  46. // 监听退出信号
  47. sigterm := make(chan os.Signal, 1)
  48. signal.Notify(sigterm, os.Interrupt)
  49. <-sigterm
  50. // 优雅关闭消费者
  51. wg.Wait()
  52. }
  53. // createProducer 创建一个Sarama异步制作者。
  54. // brokers: Kafka代理的地址列表,用于建立生产连接。
  55. // 返回值: 成功时返回sarama.AsyncProducer和nil错误,失败时返回nil和错误信息。
  56. func createProducer(brokers []string) (sarama.AsyncProducer, error) {
  57. // 初始化Sarama配置项
  58. config := sarama.NewConfig()
  59. // 设置生产者在发送消息成功后返回确认
  60. config.Producer.Return.Successes = true
  61. // 设置生产者发送消息的超时时间
  62. config.Producer.Timeout = 5 * time.Second
  63. // 根据提供的代理列表和配置项创建异步生产者
  64. return sarama.NewAsyncProducer(brokers, config)
  65. }
  66. // produceMessage 将消息生产到指定的主题(topic)中。
  67. //
  68. // 参数:
  69. // producer - 实现了sarama.AsyncProducer接口的异步生产者,用于消息的异步发送。
  70. // topic - 指定的消息主题,消息将被发送到这个主题上。
  71. // value - 要发送的消息内容,以字符串形式提供。
  72. func produceMessage(producer sarama.AsyncProducer, topic, value string) {
  73. // 创建一个ProducerMessage实例,设置主题和消息内容。
  74. message := &sarama.ProducerMessage{
  75. Topic: topic,
  76. Value: sarama.StringEncoder(value),
  77. }
  78. // 将消息发送到生产者的消息输入通道中,进行异步处理。
  79. producer.Input() <- message
  80. }
  81. func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
  82. config := sarama.NewConfig()
  83. config.Consumer.Offsets.Initial = sarama.OffsetOldest
  84. return sarama.NewConsumerGroup(brokers, groupID, config)
  85. }
  86. type KafkaConsumerGroupHandler struct {
  87. ready chan bool
  88. }
  89. func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
  90. close(handler.ready)
  91. return nil
  92. }
  93. func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
  94. return nil
  95. }
  96. func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  97. for message := range claim.Messages() {
  98. fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset)
  99. fmt.Printf("消息内容: %s\n", string(message.Value))
  100. sess.MarkMessage(message, "")
  101. }
  102. return nil
  103. }
  104. func consumeMessages(consumer sarama.ConsumerGroup, topics []string) {
  105. handler := &KafkaConsumerGroupHandler{
  106. ready: make(chan bool),
  107. }
  108. for {
  109. err := consumer.Consume(context.Background(), topics, handler)
  110. if err != nil {
  111. log.Printf("消费者错误: %v", err)
  112. }
  113. select {
  114. case <-handler.ready:
  115. default:
  116. return
  117. }
  118. }
  119. }