| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package main
- import (
- "context"
- "fmt"
- "log"
- "os"
- "os/signal"
- "strings"
- "sync"
- "time"
- "github.com/IBM/sarama"
- )
- func main() {
- brokers := strings.Split("192.168.6.10:9092", ",")
- topic := "test_topic"
- groupID := "group_one"
- // 创建生产者
- producer, err := createProducer(brokers)
- if err != nil {
- log.Fatal("无法创建生产者:", err)
- }
- defer func() {
- if err := producer.Close(); err != nil {
- log.Fatal("无法关闭生产者:", err)
- }
- }()
- // 发送消息
- produceMessage(producer, topic, "hello world")
- // 创建消费者
- consumer, err := createConsumer(brokers, groupID)
- if err != nil {
- log.Fatal("无法创建消费者:", err)
- }
- defer func() {
- if err := consumer.Close(); err != nil {
- log.Fatal("无法关闭消费者:", err)
- }
- }()
- topics := []string{topic}
- wg := &sync.WaitGroup{}
- wg.Add(1)
- go func() {
- defer wg.Done()
- consumeMessages(consumer, topics)
- }()
- // 监听退出信号
- sigterm := make(chan os.Signal, 1)
- signal.Notify(sigterm, os.Interrupt)
- <-sigterm
- // 优雅关闭消费者
- wg.Wait()
- }
- // createProducer 创建一个Sarama异步制作者。
- // brokers: Kafka代理的地址列表,用于建立生产连接。
- // 返回值: 成功时返回sarama.AsyncProducer和nil错误,失败时返回nil和错误信息。
- func createProducer(brokers []string) (sarama.AsyncProducer, error) {
- // 初始化Sarama配置项
- config := sarama.NewConfig()
- // 设置生产者在发送消息成功后返回确认
- config.Producer.Return.Successes = true
- // 设置生产者发送消息的超时时间
- config.Producer.Timeout = 5 * time.Second
- // 根据提供的代理列表和配置项创建异步生产者
- return sarama.NewAsyncProducer(brokers, config)
- }
- // produceMessage 将消息生产到指定的主题(topic)中。
- //
- // 参数:
- // producer - 实现了sarama.AsyncProducer接口的异步生产者,用于消息的异步发送。
- // topic - 指定的消息主题,消息将被发送到这个主题上。
- // value - 要发送的消息内容,以字符串形式提供。
- func produceMessage(producer sarama.AsyncProducer, topic, value string) {
- // 创建一个ProducerMessage实例,设置主题和消息内容。
- message := &sarama.ProducerMessage{
- Topic: topic,
- Value: sarama.StringEncoder(value),
- }
- // 将消息发送到生产者的消息输入通道中,进行异步处理。
- producer.Input() <- message
- }
- func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
- config := sarama.NewConfig()
- config.Consumer.Offsets.Initial = sarama.OffsetOldest
- return sarama.NewConsumerGroup(brokers, groupID, config)
- }
- type KafkaConsumerGroupHandler struct {
- ready chan bool
- }
- func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
- close(handler.ready)
- return nil
- }
- func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for message := range claim.Messages() {
- fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset)
- fmt.Printf("消息内容: %s\n", string(message.Value))
- sess.MarkMessage(message, "")
- }
- return nil
- }
- func consumeMessages(consumer sarama.ConsumerGroup, topics []string) {
- handler := &KafkaConsumerGroupHandler{
- ready: make(chan bool),
- }
- for {
- err := consumer.Consume(context.Background(), topics, handler)
- if err != nil {
- log.Printf("消费者错误: %v", err)
- }
- select {
- case <-handler.ready:
- default:
- return
- }
- }
- }
|