Mercurial > lbo > hg > goe_bot
changeset 64:5f8092c3241c
Implement better reminder scheduler with lower latency and better retry behavior
author | Lewin Bormann <lbo@spheniscida.de> |
---|---|
date | Mon, 12 Dec 2016 20:20:19 +0100 |
parents | a3edf8947e0e |
children | 5dd87615e3e0 |
files | handler_remind.go http.go main.go remind.go sql/remind.go |
diffstat | 5 files changed, 152 insertions(+), 46 deletions(-) [+] |
line wrap: on
line diff
--- a/handler_remind.go Sun Dec 11 20:28:57 2016 +0100 +++ b/handler_remind.go Mon Dec 12 20:20:19 2016 +0100 @@ -144,7 +144,7 @@ return replyContent{text: "_Ich konnte mich nicht mit der Datenbank verbinden :(_"}, nil } - r := reminder{ + r := sql.Reminder{ Text: strings.Trim(msg.Text[restStart:], " "), Owner: msg.From.First_Name + " " + msg.From.Last_Name, Due: alertTime, @@ -152,17 +152,19 @@ ReplyTo: msg.Message_ID, } - id, err := db.InsertReminder(sql.Reminder(r)) + r, err = db.InsertReminder(r) if err != nil { log.Println("Couldn't insert reminder:", err) return replyContent{text: "_Ich konnte leider keine Erinnerung setzen._"}, nil } + REMINDERS_SCHEDULER.scheduleReminder(r) + remaining := alertTime.Sub(now) hours := uint64(remaining.Seconds()) / 3600 minutes := (uint64(remaining.Seconds()+1) % 3600) / 60 - return replyContent{text: fmt.Sprintf("*⏰* Erinnerung #%d in %d:%02d", id, hours, minutes)}, nil + return replyContent{text: fmt.Sprintf("*⏰* Erinnerung #%d in %d:%02d", r.ReminderID, hours, minutes)}, nil } func listReminders(ctx context.Context, chatID int64) (replyContent, error) { @@ -173,7 +175,7 @@ return replyContent{text: "_Konnte mich nicht mit der Datenbank verbinden..._"}, nil } - rms, err := db.ListActives(chatID) + rms, err := db.ListFutureReminders(chatID) if err != nil { log.Println("Couldn't get active reminders:", err, chatID)
--- a/http.go Sun Dec 11 20:28:57 2016 +0100 +++ b/http.go Mon Dec 12 20:20:19 2016 +0100 @@ -4,6 +4,7 @@ "bytes" "context" "encoding/json" + "errors" "fmt" "io/ioutil" "log" @@ -35,7 +36,7 @@ func (lrt loggingRoundtripper) RoundTrip(rq *http.Request) (*http.Response, error) { log.Println("Request to", rq.URL) rp, err := lrt.inner.RoundTrip(rq) - log.Println("Response:", rp.StatusCode, "error", err) + log.Println("Response:", rp.StatusCode, "error:", err) return rp, err } @@ -80,7 +81,7 @@ return err } - log.Println("Msg response:", string(body)) + log.Println("sending response:", string(body)) url := buildURL(sendMessageMethod) @@ -94,7 +95,8 @@ return err } else if rp.StatusCode != http.StatusOK { srvStatus.apiErrors++ - log.Println("Error:", rp.StatusCode, rp.Status) + log.Println("Error:", rp.StatusCode, "/", rp.Status) + return errors.New(fmt.Sprintf("bad response code %d", rp.StatusCode)) } return nil
--- a/main.go Sun Dec 11 20:28:57 2016 +0100 +++ b/main.go Mon Dec 12 20:20:19 2016 +0100 @@ -70,7 +70,7 @@ } // Start background reminder thread - go (&reminders{}).reminderChecker() + StartReminderScheduler() mux := http.NewServeMux() mux.HandleFunc("/debug", debugHandler)
--- a/remind.go Sun Dec 11 20:28:57 2016 +0100 +++ b/remind.go Mon Dec 12 20:20:19 2016 +0100 @@ -4,25 +4,42 @@ import ( "log" + "math" + "sync" "time" "bitbucket.org/dermesser/goe_bot/sql" ) +var ( + remindersSchedulerOnce sync.Once + REMINDERS_SCHEDULER *reminders +) + const ( - pollTime = 30 * time.Second - skipError = 10 * time.Second - maxAttempts = 3 + expBackoffBase = 4 // 1st retry after 4 seconds, 2nd retry after 16, 3rd retry after 64 seconds + inMemPollTime = 1 * time.Second + maxAttempts = 4 + pollTime = 30 * time.Second + reminderScheduleCap = 50 + skipError = 10 * time.Second ) -type reminder sql.Reminder +type reminder struct { + sql.Reminder + + // Used by the reminder loop + Attempts int +} -func (rm reminder) fire() error { - // Fired, but not yet removed - if rm.Fired { - return rm.removeFromDB() - } +func exponentialBackoff(nth int) time.Duration { + return time.Duration(math.Pow(expBackoffBase, float64(nth))) * time.Second +} +// On error, this function re-sends the reminder to the given channel. That channel is the newReminders channel +// causing the failed reminder to be scheduled again. +// This should be run in a goroutine, as it doesn't return a result and may block for a while +func (rm reminder) fire() { msg := sendMessage{ Chat_ID: rm.ChatID, Parse_Mode: "Markdown", @@ -31,20 +48,20 @@ Reply_Markup: inlineKeyboardMarkup{Inline_Keyboard: [][]inlineKeyboardButton{}}, } - rm.Attempts++ err := sendChatMessage(msg) if err != nil { log.Println("Couldn't send reminder", msg, ":", err) + log.Println("Re-scheduling reminder...") + REMINDERS_SCHEDULER.rescheduleReminder(rm) } else { - rm.Fired = true - return rm.removeFromDB() + rm.markAsDone() } - return err + return } -func (rm reminder) removeFromDB() error { +func (rm reminder) markAsDone() error { reminders, err := backend.Reminders() if err != nil { @@ -57,10 +74,37 @@ } type reminders struct { + newReminders chan reminder +} + +func StartReminderScheduler() { + remindersSchedulerOnce.Do(func() { + rm := &reminders{newReminders: make(chan reminder, reminderScheduleCap)} + go rm.inMemReminderChecker() + REMINDERS_SCHEDULER = rm + }) +} + +// Schedule a new reminder +func (r *reminders) scheduleReminder(rm sql.Reminder) { + REMINDERS_SCHEDULER.newReminders <- reminder{Reminder: rm} } -// This runs in a goroutine and checks for expired reminders (every 2 seconds, for now) -func (r *reminders) reminderChecker() { +// Retry an already attempted reminder. This can block for a longer period when doing exponential backoff +func (r *reminders) rescheduleReminder(rm reminder) { + rm.Attempts++ + + if rm.Attempts < maxAttempts { + time.Sleep(exponentialBackoff(rm.Attempts)) + REMINDERS_SCHEDULER.newReminders <- rm + } else { + log.Println("Dropping reminder", rm.Reminder.ReminderID, "after", rm.Attempts, "attempts") + rm.markAsDone() + } +} + +// Running in a background goroutine, this polls an internal map for expired reminders, firing them. +func (r *reminders) inMemReminderChecker() { for { db, err := backend.Reminders() @@ -70,23 +114,47 @@ continue } - inner: - for { - time.Sleep(pollTime) + // Load reminders on start-up + futureReminders, err := db.ListAllFutureReminders() + + if err != nil { + log.Println("Couldn't load existing reminders:", err) + time.Sleep(skipError) + continue + } + + log.Println("Loaded", len(futureReminders), "from DB") + + activeReminders := make(map[time.Time][]*reminder) - events, err := db.SelectDueReminders() + // Load previously existing reminders into map + for _, rm := range futureReminders { + activeReminders[rm.Due] = append(activeReminders[rm.Due], &reminder{Reminder: rm}) + } + + ticker := time.NewTicker(inMemPollTime) + defer ticker.Stop() - if err != nil { - log.Println("Couldn't fetch reminders:", err) - time.Sleep(skipError) - continue inner + var now time.Time + + // MAIN LOOP + for { + // Wait for either a new event or a tick + select { + case now = <-ticker.C: + // normal check + case newRm := <-r.newReminders: + activeReminders[newRm.Reminder.Due] = append(activeReminders[newRm.Reminder.Due], &newRm) } - for _, rm := range events { - err := reminder(rm).fire() + for due, rm := range activeReminders { + if now.Sub(due) > 0 { + for _, rm := range rm { + // If firing fails (e.g. because the API responds with an error), the reminder is simply rescheduled on the channel. + go rm.fire() + } - if err != nil { - // We keep trying to fire an event if it hasn't fired successfully + delete(activeReminders, due) } } }
--- a/sql/remind.go Sun Dec 11 20:28:57 2016 +0100 +++ b/sql/remind.go Mon Dec 12 20:20:19 2016 +0100 @@ -21,6 +21,9 @@ // Parameter: 1 = chat ID // Returns: id INTEGER, due TIMESTAMP, owner TEXT, description TEXT listFutureReminders = `SELECT id, due, owner, description FROM reminders WHERE NOT done AND due > now() AND chat_id = $1 ORDER BY due ASC` + // Parameter: n/a + // Returns: id INTEGER, due TIMESTAMP, owner TEXT, description TEXT, chatID INTEGER, replyTo INTEGER + listAllFutureReminders = `SELECT id, due, owner, description, chat_id, orig_msg_id FROM reminders WHERE NOT done ORDER BY due ASC` // Parameters: 1 = reminder ID // Returns: n/a markReminderDone = `UPDATE reminders SET done = true WHERE id = $1 AND NOT done` @@ -33,10 +36,6 @@ Text string ChatID int64 ReplyTo int64 - - // Used by the reminder loop - Attempts int - Fired bool } type Reminders struct { @@ -53,6 +52,7 @@ insertReminder, selectDueReminders, markReminderDone, + listAllFutureReminders, listFutureReminders}) } @@ -71,7 +71,7 @@ } } -func (r Reminders) ListActives(chatID int64) ([]Reminder, error) { +func (r Reminders) ListFutureReminders(chatID int64) ([]Reminder, error) { if stmt, ok := r.db.prepared[listFutureReminders]; ok { rows, err := stmt.Query(chatID) @@ -98,12 +98,45 @@ return rm, nil } else { - log.Println("Couldn't find prepared statement for", selectDueReminders) + log.Println("Couldn't find prepared statement for", listFutureReminders) return nil, errors.New("couldn't find prepared statement") } } -func (r Reminders) InsertReminder(rm Reminder) (uint, error) { +func (r Reminders) ListAllFutureReminders() ([]Reminder, error) { + if stmt, ok := r.db.prepared[listAllFutureReminders]; ok { + rows, err := stmt.Query() + + if err != nil { + log.Println("Couldn't query due reminders:", err) + return nil, err + } + + defer rows.Close() + + rm := make([]Reminder, 0, 16) + + for rows.Next() { + reminder := Reminder{} + err := rows.Scan(&reminder.ReminderID, &reminder.Due, &reminder.Owner, &reminder.Text, &reminder.ChatID, &reminder.ReplyTo) + + if err != nil { + log.Println("Couldn't scan reminder row:", err) + return nil, err + } + + rm = append(rm, reminder) + } + + return rm, nil + } else { + log.Println("Couldn't find prepared statement for", listAllFutureReminders) + return nil, errors.New("couldn't find prepared statement") + } +} + +// Takes a reminder, inserts it, and returns a reminder with the fields of rm and additionally ReminderID set. +func (r Reminders) InsertReminder(rm Reminder) (Reminder, error) { if stmt, ok := r.db.prepared[insertReminder]; ok { row := stmt.QueryRow(pq.FormatTimestamp(rm.Due), rm.Text, rm.Owner, rm.ChatID, rm.ReplyTo) @@ -113,13 +146,14 @@ if err != nil { log.Println("Couldn't insert reminder:", err) - return 0, err + return Reminder{}, err } - return id, nil + rm.ReminderID = id + return rm, nil } else { log.Println("Couldn't find prepared statement for", insertReminder) - return 0, errors.New("couldn't find prepared statement") + return Reminder{}, errors.New("couldn't find prepared statement") } }