package main import ( "context" "fmt" "log" "os" "os/signal" "strings" "sync" "time" "github.com/IBM/sarama" ) func main() { brokers := strings.Split("192.168.6.10:9092", ",") topic := "test_topic" groupID := "group_one" // 创建生产者 producer, err := createProducer(brokers) if err != nil { log.Fatal("无法创建生产者:", err) } defer func() { if err := producer.Close(); err != nil { log.Fatal("无法关闭生产者:", err) } }() // 发送消息 produceMessage(producer, topic, "hello world") // 创建消费者 consumer, err := createConsumer(brokers, groupID) if err != nil { log.Fatal("无法创建消费者:", err) } defer func() { if err := consumer.Close(); err != nil { log.Fatal("无法关闭消费者:", err) } }() topics := []string{topic} wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() consumeMessages(consumer, topics) }() // 监听退出信号 sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, os.Interrupt) <-sigterm // 优雅关闭消费者 wg.Wait() } // createProducer 创建一个Sarama异步制作者。 // brokers: Kafka代理的地址列表,用于建立生产连接。 // 返回值: 成功时返回sarama.AsyncProducer和nil错误,失败时返回nil和错误信息。 func createProducer(brokers []string) (sarama.AsyncProducer, error) { // 初始化Sarama配置项 config := sarama.NewConfig() // 设置生产者在发送消息成功后返回确认 config.Producer.Return.Successes = true // 设置生产者发送消息的超时时间 config.Producer.Timeout = 5 * time.Second // 根据提供的代理列表和配置项创建异步生产者 return sarama.NewAsyncProducer(brokers, config) } // produceMessage 将消息生产到指定的主题(topic)中。 // // 参数: // producer - 实现了sarama.AsyncProducer接口的异步生产者,用于消息的异步发送。 // topic - 指定的消息主题,消息将被发送到这个主题上。 // value - 要发送的消息内容,以字符串形式提供。 func produceMessage(producer sarama.AsyncProducer, topic, value string) { // 创建一个ProducerMessage实例,设置主题和消息内容。 message := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(value), } // 将消息发送到生产者的消息输入通道中,进行异步处理。 producer.Input() <- message } func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) { config := sarama.NewConfig() config.Consumer.Offsets.Initial = sarama.OffsetOldest return sarama.NewConsumerGroup(brokers, groupID, config) } type KafkaConsumerGroupHandler struct { ready chan bool } func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { close(handler.ready) return nil } func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset) fmt.Printf("消息内容: %s\n", string(message.Value)) sess.MarkMessage(message, "") } return nil } func consumeMessages(consumer sarama.ConsumerGroup, topics []string) { handler := &KafkaConsumerGroupHandler{ ready: make(chan bool), } for { err := consumer.Consume(context.Background(), topics, handler) if err != nil { log.Printf("消费者错误: %v", err) } select { case <-handler.ready: default: return } } }