overpush/core/core.go

107 lines
2.4 KiB
Go

package core
import (
"strings"
"sync"
"time"
"git.wntrmute.dev/kyle/goutils/backoff"
"git.wntrmute.dev/kyle/goutils/log"
"github.com/gregdel/pushover"
)
// MessageBuffer is how many messages to back up. This represents a
// balance between expected usage and anticipated spikes. It is
// entirely empirical as I don't have data to drive this decision.
const MessageBuffer = 16
type Account struct {
p *pushover.Pushover
nextTransmit time.Time
q chan *Message
lock *sync.Mutex
bo *backoff.Backoff
}
func (a *Account) Queue() chan<- *Message {
return a.q
}
func NewAccount(token string) *Account {
return &Account{
p: pushover.New(token),
q: make(chan *Message, MessageBuffer),
lock: &sync.Mutex{},
bo: backoff.New(4*time.Hour, 5*time.Second),
}
}
func (a *Account) UpdateLimit(limit *pushover.Limit) {
a.lock.Lock()
if limit.Remaining > 0 {
return
}
a.nextTransmit = limit.NextReset.Add(time.Second)
}
func (a *Account) Receive() {
for {
m, closed := <-a.q
if !closed {
panic("channel closed")
}
log.Info("pushing message")
a.Push(m)
}
}
func (a *Account) push(m *pushover.Message, r *pushover.Recipient) (*pushover.Response, error) {
for time.Now().Before(a.nextTransmit) {
log.Infof("delaying until next transmit: %s", a.nextTransmit.Format(time.DateTime))
wait := time.Until(a.nextTransmit)
time.Sleep(wait)
}
resp, err := a.p.SendMessage(m, r)
if err != nil {
return resp, err
}
if resp.Limit != nil {
log.Debugf("API limits: %d / %d remaining", resp.Limit.Remaining, resp.Limit.Total)
log.Debugf("API next reset: %s", resp.Limit.NextReset.Format(time.DateTime))
a.UpdateLimit(resp.Limit)
}
return resp, nil
}
func (a *Account) Push(m *Message) {
for i, r := range m.r {
recipient := pushover.NewRecipient(r)
resp, err := a.push(m.m, recipient)
if err != nil {
// if there's an error, log it, clean up the recipient list.
log.Errf("core: failed to send message: %s", err)
if resp.Status/100 == 4 {
log.Errf("core: the Pushover API reported the following errors: %s", strings.Join(resp.Errors, ", "))
log.Infoln("core: 4XX error from Pushover API; not retrying - dropping the message")
return
}
log.Infoln("core: requeueing message that failed to send")
log.Infoln("core: backing off")
m.r = m.r[i:]
a.q <- m
time.Sleep(a.bo.Duration())
return
}
a.bo.Reset()
}
return
}