Mercurial > lbo > hg > goe_bot
changeset 69:683193cb3194
Implement cancellation for the new in-memory scheduler
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 12 Dec 2016 21:28:53 +0100 |
parents | e6a411adf464 |
children | 43506af70483 |
files | handler_remind.go remind.go |
diffstat | 2 files changed, 29 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/handler_remind.go Mon Dec 12 21:01:25 2016 +0100 +++ b/handler_remind.go Mon Dec 12 21:28:53 2016 +0100 @@ -231,8 +231,6 @@ } func deleteReminderCallback(ctx context.Context, token string, cbq callbackQuery) (replyContent, error) { - log.Println("Callback for removing reminder", token) - db, err := backend.Reminders() if err != nil { @@ -247,6 +245,8 @@ return replyContent{text: "_Interner Fehler D:_"}, nil } + REMINDERS_SCHEDULER.cancelReminder(uint(id)) + n, err := db.MarkRemindersDone([]uint{uint(id)}) if err != nil {
--- a/remind.go Mon Dec 12 21:01:25 2016 +0100 +++ b/remind.go Mon Dec 12 21:28:53 2016 +0100 @@ -17,11 +17,11 @@ ) const ( - expBackoffBase = 4 // 1st retry after 4 seconds, 2nd retry after 16, 3rd retry after 64 seconds - inMemPollTime = 1 * time.Second // how frequently to poll for due reminders - maxAttempts = 4 // how often to retry delivering a reminder - reminderScheduleCap = 50 // Channel capacity of reminders channel - skipError = 10 * time.Second // how long to wait after a DB error + expBackoffBase = 4 // 1st retry after 4 seconds, 2nd retry after 16, 3rd retry after 64 seconds + inMemPollTime = 1 * time.Second // how frequently to poll for due reminders + maxAttempts = 4 // how often to retry delivering a reminder + reminderSchedulerCap = 50 // Channel capacity of reminders channel + skipError = 10 * time.Second // how long to wait after a DB error ) type reminder struct { @@ -73,20 +73,28 @@ } type reminders struct { - newReminders chan reminder + newReminders chan reminder + cancellations chan uint } func StartReminderScheduler() { remindersSchedulerOnce.Do(func() { - rm := &reminders{newReminders: make(chan reminder, reminderScheduleCap)} + rm := &reminders{newReminders: make(chan reminder, reminderSchedulerCap), + cancellations: make(chan uint, reminderSchedulerCap)} go rm.inMemReminderChecker() REMINDERS_SCHEDULER = rm }) } -// Schedule a new reminder +// Schedule a new reminder. Use a Reminder with ReminderID set. func (r *reminders) scheduleReminder(rm sql.Reminder) { - REMINDERS_SCHEDULER.newReminders <- reminder{Reminder: rm} + log.Println("Scheduled reminder", rm.ReminderID) + r.newReminders <- reminder{Reminder: rm} +} + +func (r *reminders) cancelReminder(id uint) { + log.Println("Cancelled reminder", id) + r.cancellations <- id } // Retry an already attempted reminder. This can block for a longer period when doing exponential backoff @@ -125,6 +133,7 @@ log.Println("Loaded", len(futureReminders), "from DB") activeReminders := make(map[time.Time][]*reminder) + cancelledReminders := make(map[uint]bool) // Load previously existing reminders into map for _, rm := range futureReminders { @@ -144,13 +153,20 @@ // normal check case newRm := <-r.newReminders: activeReminders[newRm.Reminder.Due] = append(activeReminders[newRm.Reminder.Due], &newRm) + case toCancel := <-r.cancellations: + cancelledReminders[toCancel] = true } for due, rm := range activeReminders { if now.Sub(due) > 0 { for _, rm := range rm { - // If firing fails (e.g. because the API responds with an error), the reminder is simply rescheduled on the channel. - go rm.fire() + // only fire if the reminder hasn't been cancelled yet + if !cancelledReminders[rm.Reminder.ReminderID] { + // If firing fails (e.g. because the API responds with an error), the reminder is simply rescheduled on the channel. + go rm.fire() + } else { + delete(cancelledReminders, rm.Reminder.ReminderID) + } } delete(activeReminders, due)