腾讯云海外购

聊聊rocketmq-client-go的remoteBrokerOffsetStore

本文主要研究一下rocketmq-client-go的remoteBrokerOffsetStore

remoteBrokerOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

type remoteBrokerOffsetStore struct {     group       string     OffsetTable map[primitive.MessageQueue]int64 `json:"OffsetTable"`     client      internal.RMQClient     namesrv     internal.Namesrvs     mutex       sync.RWMutex }
  • remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性

NewRemoteOffsetStore

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func NewRemoteOffsetStore(group string, client internal.RMQClient, namesrv internal.Namesrvs) OffsetStore {     return &remoteBrokerOffsetStore{         group:       group,         client:      client,         namesrv:     namesrv,         OffsetTable: make(map[primitive.MessageQueue]int64),     } }
  • NewRemoteOffsetStore方法实例化remoteBrokerOffsetStore

persist

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {     r.mutex.Lock()     defer r.mutex.Unlock()     if len(mqs) == 0 {         return     } ​     used := make(map[primitive.MessageQueue]struct{}, 0)     for _, mq := range mqs {         used[*mq] = struct{}{}     } ​     for mq, off := range r.OffsetTable {         if _, ok := used[mq]; !ok {             delete(r.OffsetTable, mq)             continue         }         err := r.updateConsumeOffsetToBroker(r.group, mq, off)         if err != nil {             rlog.Warning("update offset to broker error", map[string]interface{}{                 rlog.LogKeyConsumerGroup: r.group,                 rlog.LogKeyMessageQueue:  mq.String(),                 rlog.LogKeyUnderlayError: err.Error(),                 "offset":                 off,             })         } else {             rlog.Info("update offset to broker success", map[string]interface{}{                 rlog.LogKeyConsumerGroup: r.group,                 rlog.LogKeyMessageQueue:  mq.String(),                 "offset":                 off,             })         }     } }
  • persist方法遍历OffsetTable,执行r.updateConsumeOffsetToBroker

remove

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {     r.mutex.Lock()     defer r.mutex.Unlock() ​     delete(r.OffsetTable, *mq)     rlog.Info("delete mq from offset table", map[string]interface{}{         rlog.LogKeyMessageQueue: mq,     }) }
  • remove方法执行delete(r.OffsetTable, *mq)

read

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {     r.mutex.RLock()     switch t {     case _ReadFromMemory, _ReadMemoryThenStore:         off, exist := r.OffsetTable[*mq]         if exist {             r.mutex.RUnlock()             return off         }         if t == _ReadFromMemory {             r.mutex.RUnlock()             return -1         }         fallthrough     case _ReadFromStore:         off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)         if err != nil {             rlog.Error("fecth offset of mq error", map[string]interface{}{                 rlog.LogKeyMessageQueue:  mq.String(),                 rlog.LogKeyUnderlayError: err,             })             r.mutex.RUnlock()             return -1         }         r.mutex.RUnlock()         r.update(mq, off, true)         return off     default:     } ​     return -1 }
  • read方法针对_ReadFromStore会执行r.fetchConsumeOffsetFromBroker

update

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {     r.mutex.Lock()     defer r.mutex.Unlock()     localOffset, exist := r.OffsetTable[*mq]     if !exist {         r.OffsetTable[*mq] = offset         return     }     if increaseOnly {         if localOffset < offset {             r.OffsetTable[*mq] = offset         }     } else {         r.OffsetTable[*mq] = offset     } }
  • update方法更新的是r.OffsetTable[*mq]

fetchConsumeOffsetFromBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {     broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)     if broker == "" {         r.namesrv.UpdateTopicRouteInfo(mq.Topic)         broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)     }     if broker == "" {         return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)     }     queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{         ConsumerGroup: group,         Topic:         mq.Topic,         QueueId:       mq.QueueId,     }     cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)     res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)     if err != nil {         return -1, err     }     if res.Code != internal.ResSuccess {         return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)     } ​     off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64) ​     if err != nil {         return -1, err     } ​     return off, nil }
  • fetchConsumeOffsetFromBroker方法构建QueryConsumerOffsetRequestHeader请求,然后通过r.client.InvokeSync发起请求

updateConsumeOffsetToBroker

rocketmq-client-go-v2.0.0/consumer/offset_store.go

func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq primitive.MessageQueue, off int64) error {     broker := r.namesrv.FindBrokerAddrByName(mq.BrokerName)     if broker == "" {         r.namesrv.UpdateTopicRouteInfo(mq.Topic)         broker = r.namesrv.FindBrokerAddrByName(mq.BrokerName)     }     if broker == "" {         return fmt.Errorf("broker: %s address not found", mq.BrokerName)     } ​     updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{         ConsumerGroup: group,         Topic:         mq.Topic,         QueueId:       mq.QueueId,         CommitOffset:  off,     }     cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)     return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second) }
  • updateConsumeOffsetToBroker方法构建UpdateConsumerOffsetRequestHeader请求,然后通过r.client.InvokeOneWay发起请求

小结

remoteBrokerOffsetStore定义了group、OffsetTable、client、namesrv、mutex属性;它提供了NewRemoteOffsetStore、persist、remove、read、update、fetchConsumeOffsetFromBroker、updateConsumeOffsetToBroker方法

doc