package main import ( "strings" "sync" ) type GenericMessage struct { chatId int64 msg string } type Sender interface { Send(msg GenericMessage) error } type Sleeper interface { Sleep(ch chan<- interface{}) } func tgSenderWorker(tgEvents <-chan GenericMessage, s Sender, wg *sync.WaitGroup, sleeper Sleeper) { wg.Add(1) defer wg.Done() messagesToSend := make(map[int64]*strings.Builder) waitingStarted := false timeoutEvents := make(chan interface{}) loop: for { select { case ev, ok := <-tgEvents: if !ok { break loop } // Collect all messages to send them at once _, messageBuilderExists := messagesToSend[ev.chatId] if !messageBuilderExists { messagesToSend[ev.chatId] = &strings.Builder{} } messagesToSend[ev.chatId].WriteString(ev.msg) messagesToSend[ev.chatId].WriteRune('\n') if !waitingStarted { waitingStarted = true sleeper.Sleep(timeoutEvents) } case <-timeoutEvents: waitingStarted = false for chatId, msgBuilder := range messagesToSend { err := s.Send(GenericMessage{chatId, msgBuilder.String()}) if err != nil { // TODO: handle it better panic(err) } delete(messagesToSend, chatId) } } } // If anything is left to be sent, send it now for chatId, msgBuilder := range messagesToSend { err := s.Send(GenericMessage{chatId, msgBuilder.String()}) if err != nil { // TODO: handle it better panic(err) } } }