changeset 64:5f8092c3241c

Implement better reminder scheduler with lower latency and better retry behavior
author Lewin Bormann <lbo@spheniscida.de>
date Mon, 12 Dec 2016 20:20:19 +0100
parents a3edf8947e0e
children 5dd87615e3e0
files handler_remind.go http.go main.go remind.go sql/remind.go
diffstat 5 files changed, 152 insertions(+), 46 deletions(-) [+]
line wrap: on
line diff
--- a/handler_remind.go	Sun Dec 11 20:28:57 2016 +0100
+++ b/handler_remind.go	Mon Dec 12 20:20:19 2016 +0100
@@ -144,7 +144,7 @@
 		return replyContent{text: "_Ich konnte mich nicht mit der Datenbank verbinden :(_"}, nil
 	}
 
-	r := reminder{
+	r := sql.Reminder{
 		Text:    strings.Trim(msg.Text[restStart:], " "),
 		Owner:   msg.From.First_Name + " " + msg.From.Last_Name,
 		Due:     alertTime,
@@ -152,17 +152,19 @@
 		ReplyTo: msg.Message_ID,
 	}
 
-	id, err := db.InsertReminder(sql.Reminder(r))
+	r, err = db.InsertReminder(r)
 
 	if err != nil {
 		log.Println("Couldn't insert reminder:", err)
 		return replyContent{text: "_Ich konnte leider keine Erinnerung setzen._"}, nil
 	}
 
+	REMINDERS_SCHEDULER.scheduleReminder(r)
+
 	remaining := alertTime.Sub(now)
 	hours := uint64(remaining.Seconds()) / 3600
 	minutes := (uint64(remaining.Seconds()+1) % 3600) / 60
-	return replyContent{text: fmt.Sprintf("*⏰* Erinnerung #%d in %d:%02d", id, hours, minutes)}, nil
+	return replyContent{text: fmt.Sprintf("*⏰* Erinnerung #%d in %d:%02d", r.ReminderID, hours, minutes)}, nil
 }
 
 func listReminders(ctx context.Context, chatID int64) (replyContent, error) {
@@ -173,7 +175,7 @@
 		return replyContent{text: "_Konnte mich nicht mit der Datenbank verbinden..._"}, nil
 	}
 
-	rms, err := db.ListActives(chatID)
+	rms, err := db.ListFutureReminders(chatID)
 
 	if err != nil {
 		log.Println("Couldn't get active reminders:", err, chatID)
--- a/http.go	Sun Dec 11 20:28:57 2016 +0100
+++ b/http.go	Mon Dec 12 20:20:19 2016 +0100
@@ -4,6 +4,7 @@
 	"bytes"
 	"context"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io/ioutil"
 	"log"
@@ -35,7 +36,7 @@
 func (lrt loggingRoundtripper) RoundTrip(rq *http.Request) (*http.Response, error) {
 	log.Println("Request to", rq.URL)
 	rp, err := lrt.inner.RoundTrip(rq)
-	log.Println("Response:", rp.StatusCode, "error", err)
+	log.Println("Response:", rp.StatusCode, "error:", err)
 	return rp, err
 }
 
@@ -80,7 +81,7 @@
 		return err
 	}
 
-	log.Println("Msg response:", string(body))
+	log.Println("sending response:", string(body))
 
 	url := buildURL(sendMessageMethod)
 
@@ -94,7 +95,8 @@
 		return err
 	} else if rp.StatusCode != http.StatusOK {
 		srvStatus.apiErrors++
-		log.Println("Error:", rp.StatusCode, rp.Status)
+		log.Println("Error:", rp.StatusCode, "/", rp.Status)
+		return errors.New(fmt.Sprintf("bad response code %d", rp.StatusCode))
 	}
 
 	return nil
--- a/main.go	Sun Dec 11 20:28:57 2016 +0100
+++ b/main.go	Mon Dec 12 20:20:19 2016 +0100
@@ -70,7 +70,7 @@
 	}
 
 	// Start background reminder thread
-	go (&reminders{}).reminderChecker()
+	StartReminderScheduler()
 
 	mux := http.NewServeMux()
 	mux.HandleFunc("/debug", debugHandler)
--- a/remind.go	Sun Dec 11 20:28:57 2016 +0100
+++ b/remind.go	Mon Dec 12 20:20:19 2016 +0100
@@ -4,25 +4,42 @@
 
 import (
 	"log"
+	"math"
+	"sync"
 	"time"
 
 	"bitbucket.org/dermesser/goe_bot/sql"
 )
 
+var (
+	remindersSchedulerOnce sync.Once
+	REMINDERS_SCHEDULER    *reminders
+)
+
 const (
-	pollTime    = 30 * time.Second
-	skipError   = 10 * time.Second
-	maxAttempts = 3
+	expBackoffBase      = 4 // 1st retry after 4 seconds, 2nd retry after 16, 3rd retry after 64 seconds
+	inMemPollTime       = 1 * time.Second
+	maxAttempts         = 4
+	pollTime            = 30 * time.Second
+	reminderScheduleCap = 50
+	skipError           = 10 * time.Second
 )
 
-type reminder sql.Reminder
+type reminder struct {
+	sql.Reminder
+
+	// Used by the reminder loop
+	Attempts int
+}
 
-func (rm reminder) fire() error {
-	// Fired, but not yet removed
-	if rm.Fired {
-		return rm.removeFromDB()
-	}
+func exponentialBackoff(nth int) time.Duration {
+	return time.Duration(math.Pow(expBackoffBase, float64(nth))) * time.Second
+}
 
+// On error, this function re-sends the reminder to the given channel. That channel is the newReminders channel
+// causing the failed reminder to be scheduled again.
+// This should be run in a goroutine, as it doesn't return a result and may block for a while
+func (rm reminder) fire() {
 	msg := sendMessage{
 		Chat_ID:             rm.ChatID,
 		Parse_Mode:          "Markdown",
@@ -31,20 +48,20 @@
 		Reply_Markup:        inlineKeyboardMarkup{Inline_Keyboard: [][]inlineKeyboardButton{}},
 	}
 
-	rm.Attempts++
 	err := sendChatMessage(msg)
 
 	if err != nil {
 		log.Println("Couldn't send reminder", msg, ":", err)
+		log.Println("Re-scheduling reminder...")
+		REMINDERS_SCHEDULER.rescheduleReminder(rm)
 	} else {
-		rm.Fired = true
-		return rm.removeFromDB()
+		rm.markAsDone()
 	}
 
-	return err
+	return
 }
 
-func (rm reminder) removeFromDB() error {
+func (rm reminder) markAsDone() error {
 	reminders, err := backend.Reminders()
 
 	if err != nil {
@@ -57,10 +74,37 @@
 }
 
 type reminders struct {
+	newReminders chan reminder
+}
+
+func StartReminderScheduler() {
+	remindersSchedulerOnce.Do(func() {
+		rm := &reminders{newReminders: make(chan reminder, reminderScheduleCap)}
+		go rm.inMemReminderChecker()
+		REMINDERS_SCHEDULER = rm
+	})
+}
+
+// Schedule a new reminder
+func (r *reminders) scheduleReminder(rm sql.Reminder) {
+	REMINDERS_SCHEDULER.newReminders <- reminder{Reminder: rm}
 }
 
-// This runs in a goroutine and checks for expired reminders (every 2 seconds, for now)
-func (r *reminders) reminderChecker() {
+// Retry an already attempted reminder. This can block for a longer period when doing exponential backoff
+func (r *reminders) rescheduleReminder(rm reminder) {
+	rm.Attempts++
+
+	if rm.Attempts < maxAttempts {
+		time.Sleep(exponentialBackoff(rm.Attempts))
+		REMINDERS_SCHEDULER.newReminders <- rm
+	} else {
+		log.Println("Dropping reminder", rm.Reminder.ReminderID, "after", rm.Attempts, "attempts")
+		rm.markAsDone()
+	}
+}
+
+// Running in a background goroutine, this polls an internal map for expired reminders, firing them.
+func (r *reminders) inMemReminderChecker() {
 	for {
 		db, err := backend.Reminders()
 
@@ -70,23 +114,47 @@
 			continue
 		}
 
-	inner:
-		for {
-			time.Sleep(pollTime)
+		// Load reminders on start-up
+		futureReminders, err := db.ListAllFutureReminders()
+
+		if err != nil {
+			log.Println("Couldn't load existing reminders:", err)
+			time.Sleep(skipError)
+			continue
+		}
+
+		log.Println("Loaded", len(futureReminders), "from DB")
+
+		activeReminders := make(map[time.Time][]*reminder)
 
-			events, err := db.SelectDueReminders()
+		// Load previously existing reminders into map
+		for _, rm := range futureReminders {
+			activeReminders[rm.Due] = append(activeReminders[rm.Due], &reminder{Reminder: rm})
+		}
+
+		ticker := time.NewTicker(inMemPollTime)
+		defer ticker.Stop()
 
-			if err != nil {
-				log.Println("Couldn't fetch reminders:", err)
-				time.Sleep(skipError)
-				continue inner
+		var now time.Time
+
+		// MAIN LOOP
+		for {
+			// Wait for either a new event or a tick
+			select {
+			case now = <-ticker.C:
+				// normal check
+			case newRm := <-r.newReminders:
+				activeReminders[newRm.Reminder.Due] = append(activeReminders[newRm.Reminder.Due], &newRm)
 			}
 
-			for _, rm := range events {
-				err := reminder(rm).fire()
+			for due, rm := range activeReminders {
+				if now.Sub(due) > 0 {
+					for _, rm := range rm {
+						// If firing fails (e.g. because the API responds with an error), the reminder is simply rescheduled on the channel.
+						go rm.fire()
+					}
 
-				if err != nil {
-					// We keep trying to fire an event if it hasn't fired successfully
+					delete(activeReminders, due)
 				}
 			}
 		}
--- a/sql/remind.go	Sun Dec 11 20:28:57 2016 +0100
+++ b/sql/remind.go	Mon Dec 12 20:20:19 2016 +0100
@@ -21,6 +21,9 @@
 	// Parameter: 1 = chat ID
 	// Returns: id INTEGER, due TIMESTAMP, owner TEXT, description TEXT
 	listFutureReminders = `SELECT id, due, owner, description FROM reminders WHERE NOT done AND due > now() AND chat_id = $1 ORDER BY due ASC`
+	// Parameter: n/a
+	// Returns: id INTEGER, due TIMESTAMP, owner TEXT, description TEXT, chatID INTEGER, replyTo INTEGER
+	listAllFutureReminders = `SELECT id, due, owner, description, chat_id, orig_msg_id FROM reminders WHERE NOT done ORDER BY due ASC`
 	// Parameters: 1 = reminder ID
 	// Returns: n/a
 	markReminderDone = `UPDATE reminders SET done = true WHERE id = $1 AND NOT done`
@@ -33,10 +36,6 @@
 	Text       string
 	ChatID     int64
 	ReplyTo    int64
-
-	// Used by the reminder loop
-	Attempts int
-	Fired    bool
 }
 
 type Reminders struct {
@@ -53,6 +52,7 @@
 		insertReminder,
 		selectDueReminders,
 		markReminderDone,
+		listAllFutureReminders,
 		listFutureReminders})
 }
 
@@ -71,7 +71,7 @@
 	}
 }
 
-func (r Reminders) ListActives(chatID int64) ([]Reminder, error) {
+func (r Reminders) ListFutureReminders(chatID int64) ([]Reminder, error) {
 	if stmt, ok := r.db.prepared[listFutureReminders]; ok {
 		rows, err := stmt.Query(chatID)
 
@@ -98,12 +98,45 @@
 
 		return rm, nil
 	} else {
-		log.Println("Couldn't find prepared statement for", selectDueReminders)
+		log.Println("Couldn't find prepared statement for", listFutureReminders)
 		return nil, errors.New("couldn't find prepared statement")
 	}
 }
 
-func (r Reminders) InsertReminder(rm Reminder) (uint, error) {
+func (r Reminders) ListAllFutureReminders() ([]Reminder, error) {
+	if stmt, ok := r.db.prepared[listAllFutureReminders]; ok {
+		rows, err := stmt.Query()
+
+		if err != nil {
+			log.Println("Couldn't query due reminders:", err)
+			return nil, err
+		}
+
+		defer rows.Close()
+
+		rm := make([]Reminder, 0, 16)
+
+		for rows.Next() {
+			reminder := Reminder{}
+			err := rows.Scan(&reminder.ReminderID, &reminder.Due, &reminder.Owner, &reminder.Text, &reminder.ChatID, &reminder.ReplyTo)
+
+			if err != nil {
+				log.Println("Couldn't scan reminder row:", err)
+				return nil, err
+			}
+
+			rm = append(rm, reminder)
+		}
+
+		return rm, nil
+	} else {
+		log.Println("Couldn't find prepared statement for", listAllFutureReminders)
+		return nil, errors.New("couldn't find prepared statement")
+	}
+}
+
+// Takes a reminder, inserts it, and returns a reminder with the fields of rm and additionally ReminderID set.
+func (r Reminders) InsertReminder(rm Reminder) (Reminder, error) {
 	if stmt, ok := r.db.prepared[insertReminder]; ok {
 		row := stmt.QueryRow(pq.FormatTimestamp(rm.Due), rm.Text, rm.Owner, rm.ChatID, rm.ReplyTo)
 
@@ -113,13 +146,14 @@
 
 		if err != nil {
 			log.Println("Couldn't insert reminder:", err)
-			return 0, err
+			return Reminder{}, err
 		}
 
-		return id, nil
+		rm.ReminderID = id
+		return rm, nil
 	} else {
 		log.Println("Couldn't find prepared statement for", insertReminder)
-		return 0, errors.New("couldn't find prepared statement")
+		return Reminder{}, errors.New("couldn't find prepared statement")
 	}
 }