This commit is contained in:
2023-06-08 20:43:48 +00:00
parent 9e97f9842b
commit 9acafceb8f
13 changed files with 592 additions and 157 deletions

View File

@@ -1,11 +1,12 @@
package core
import (
"fmt"
"strings"
"sync"
"time"
pb "git.wntrmute.dev/kyle/overpush/proto"
"git.wntrmute.dev/kyle/goutils/backoff"
"git.wntrmute.dev/kyle/goutils/log"
"github.com/gregdel/pushover"
)
@@ -17,15 +18,21 @@ const MessageBuffer = 16
type Account struct {
p *pushover.Pushover
nextTransmit time.Time
q chan pb.Message
q chan *Message
lock *sync.Mutex
bo *backoff.Backoff
}
func (a *Account) Queue() chan<- *Message {
return a.q
}
func NewAccount(token string) *Account {
return &Account{
p: pushover.New(token),
q: make(chan pb.Message, MessageBuffer),
q: make(chan *Message, MessageBuffer),
lock: &sync.Mutex{},
bo: backoff.New(4*time.Hour, 5*time.Second),
}
}
@@ -40,19 +47,59 @@ func (a *Account) UpdateLimit(limit *pushover.Limit) {
func (a *Account) Receive() {
for {
for {
if a.nextTransmit.After(time.Now()) {
sleep := time.Until(a.nextTransmit)
time.Sleep(sleep)
} else {
break
}
}
m, closed := <-a.q
if closed {
if !closed {
panic("channel closed")
}
fmt.Println(m.String())
a.Push(m)
}
}
func (a *Account) push(m *pushover.Message, r *pushover.Recipient) (*pushover.Response, error) {
for time.Now().Before(a.nextTransmit) {
log.Infof("delaying until next transmit: %s", a.nextTransmit.Format(time.DateTime))
wait := time.Until(a.nextTransmit)
time.Sleep(wait)
}
resp, err := a.p.SendMessage(m, r)
if err != nil {
return resp, err
}
if resp.Limit != nil {
log.Debugf("API limits: %d / %d remaining", resp.Limit.Remaining, resp.Limit.Total)
log.Debugf("API next reset: %s", resp.Limit.NextReset.Format(time.DateTime))
a.UpdateLimit(resp.Limit)
}
return resp, nil
}
func (a *Account) Push(m *Message) {
for i, r := range m.r {
recipient := pushover.NewRecipient(r)
resp, err := a.push(m.m, recipient)
if err != nil {
// if there's an error, log it, clean up the recipient list.
log.Errf("core: failed to send message: %s", err)
if resp.Status/100 == 4 {
log.Errf("core: the Pushover API reported the following errors: %s", strings.Join(resp.Errors, ", "))
log.Infoln("core: 4XX error from Pushover API; not retrying - dropping the message")
return
}
log.Infoln("core: requeueing message that failed to send")
log.Infoln("core: backing off")
m.r = m.r[i:]
a.q <- m
time.Sleep(a.bo.Duration())
return
}
a.bo.Reset()
}
return
}

72
core/message.go Normal file
View File

@@ -0,0 +1,72 @@
package core
import (
"bytes"
"time"
"git.wntrmute.dev/kyle/goutils/log"
pb "git.wntrmute.dev/kyle/overpush/proto"
"github.com/gregdel/pushover"
)
var priorityMap = map[pb.Priority]int{
pb.Priority_PriorityInvalid: pushover.PriorityNormal,
pb.Priority_PriorityLowest: pushover.PriorityLowest,
pb.Priority_PriorityLow: pushover.PriorityLow,
pb.Priority_PriorityNormal: pushover.PriorityNormal,
pb.Priority_PriorityHigh: pushover.PriorityHigh,
pb.Priority_PriorityEmergency: pushover.PriorityEmergency,
}
type Message struct {
m *pushover.Message
r []string
buf *bytes.Buffer
}
func (m *Message) Close() error {
if m.buf != nil {
m.buf.Reset()
}
return nil
}
func NewMessage(pbmsg *pb.Message) *Message {
msg := &Message{
m: &pushover.Message{
Message: pbmsg.Text,
},
}
if t := pbmsg.GetTimestamp(); t != 0 {
msg.m.Timestamp = t
} else {
msg.m.Timestamp = time.Now().Unix()
}
priority, ok := priorityMap[pbmsg.Priority]
if !ok {
log.Warningln("core: message has an unknown priority; marking it as normal priority")
priority = pushover.PriorityNormal
}
msg.m.Priority = priority
if url := pbmsg.GetUrl(); url != nil {
msg.m.URL = url.GetLink()
msg.m.URLTitle = url.GetTitle()
}
if attachment := pbmsg.GetAttachment(); attachment != nil {
msg.buf = bytes.NewBuffer(attachment)
if err := msg.m.AddAttachment(msg.buf); err != nil {
log.Warningf("core: adding attachment to message failed: %s", err)
if pbmsg.Priority == pb.Priority_PriorityLowest {
return nil
}
log.Warningln("core: continuing without attachment")
}
}
msg.r = pbmsg.GetRecipients()
return msg
}