feat!: due date reminder notifications
This commit is contained in:
@ -1,33 +1,172 @@
|
||||
package jobs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/RichardKnop/machinery/v1"
|
||||
mTasks "github.com/RichardKnop/machinery/v1/tasks"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/jinzhu/now"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jordanknott/taskcafe/internal/config"
|
||||
"github.com/jordanknott/taskcafe/internal/db"
|
||||
"github.com/jordanknott/taskcafe/internal/utils"
|
||||
)
|
||||
|
||||
func RegisterTasks(server *machinery.Server, repo db.Repository) {
|
||||
tasks := JobTasks{repo}
|
||||
type NotifiedData struct {
|
||||
Data map[string]string
|
||||
}
|
||||
|
||||
func RegisterTasks(server *machinery.Server, repo db.Repository, appConfig config.AppConfig, messageQueue *redis.Client) {
|
||||
tasks := JobTasks{Repository: repo, Server: server, AppConfig: appConfig, MessageQueue: messageQueue}
|
||||
server.RegisterTasks(map[string]interface{}{
|
||||
"taskMemberWasAdded": tasks.TaskMemberWasAdded,
|
||||
"dueDateNotification": tasks.DueDateNotification,
|
||||
"scheduleDueDateNotifications": tasks.ScheduleDueDateNotifications,
|
||||
})
|
||||
}
|
||||
|
||||
type JobTasks struct {
|
||||
Repository db.Repository
|
||||
AppConfig config.AppConfig
|
||||
Repository db.Repository
|
||||
Server *machinery.Server
|
||||
MessageQueue *redis.Client
|
||||
}
|
||||
|
||||
func (t *JobTasks) TaskMemberWasAdded(taskID, notifierID, notifiedID string) (bool, error) {
|
||||
func (t *JobTasks) ScheduleDueDateNotifications() (bool, error) {
|
||||
ctx := context.Background()
|
||||
// tomorrow := now.With(time.Now().UTC().AddDate(0, 0, 1))
|
||||
today := now.With(time.Now().UTC())
|
||||
start := today.BeginningOfDay()
|
||||
log.WithFields(log.Fields{
|
||||
"start": start,
|
||||
}).Info("fetching duration")
|
||||
reminders, err := t.Repository.GetDueDateRemindersForDuration(ctx, start)
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while getting due date reminder")
|
||||
}
|
||||
for _, rem := range reminders {
|
||||
log.WithField("id", rem.DueDateReminderID).Info("found reminder")
|
||||
signature := &mTasks.Signature{
|
||||
UUID: "due_date_reminder_" + rem.DueDateReminderID.String(),
|
||||
Name: "dueDateNotification",
|
||||
ETA: &rem.RemindAt,
|
||||
Args: []mTasks.Arg{{
|
||||
Type: "string",
|
||||
Value: rem.DueDateReminderID.String(),
|
||||
}, {
|
||||
Type: "string",
|
||||
Value: rem.TaskID.String(),
|
||||
}},
|
||||
}
|
||||
log.WithField("nanoTime", signature.ETA.UnixNano()).Info("rem time")
|
||||
etaNano := strconv.FormatInt(signature.ETA.UnixNano(), 10)
|
||||
result, err := t.MessageQueue.ZRangeByScore(ctx, "delayed_tasks", &redis.ZRangeBy{Max: etaNano, Min: etaNano}).Result()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while getting due date reminder")
|
||||
}
|
||||
log.WithField("result", result).Info("result raw")
|
||||
if len(result) == 0 {
|
||||
log.Info("task not found, sending task")
|
||||
t.Server.SendTask(signature)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t *JobTasks) DueDateNotification(dueDateIDEncoded string, taskIDEncoded string) (bool, error) {
|
||||
ctx := context.Background()
|
||||
dueDateID, err := uuid.Parse(dueDateIDEncoded)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("while parsing task ID")
|
||||
return false, err
|
||||
}
|
||||
taskID, err := uuid.Parse(taskIDEncoded)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("while parsing task ID")
|
||||
return false, err
|
||||
}
|
||||
dueAt, err := t.Repository.GetDueDateReminderByID(ctx, dueDateID)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("while getting task by id")
|
||||
return false, err
|
||||
}
|
||||
task, err := t.Repository.GetTaskByID(ctx, taskID)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("while getting task by id")
|
||||
return false, err
|
||||
}
|
||||
projectInfo, err := t.Repository.GetProjectInfoForTask(ctx, taskID)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while getting project info for task")
|
||||
return false, err
|
||||
}
|
||||
data := map[string]string{
|
||||
"TaskID": task.ShortID,
|
||||
"TaskName": task.Name,
|
||||
"ProjectID": projectInfo.ProjectShortID,
|
||||
"ProjectName": projectInfo.Name,
|
||||
"DueAt": dueAt.RemindAt.String(),
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
raw, err := json.Marshal(NotifiedData{Data: data})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while marshal json data for notification")
|
||||
return false, err
|
||||
}
|
||||
n, err := t.Repository.CreateNotification(ctx, db.CreateNotificationParams{
|
||||
CausedBy: uuid.UUID{},
|
||||
ActionType: "DUE_DATE_REMINDER",
|
||||
CreatedOn: now,
|
||||
Data: json.RawMessage(raw),
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while creating notification")
|
||||
return false, err
|
||||
}
|
||||
watchers, err := t.Repository.GetTaskWatchersForTask(ctx, taskID)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("while getting watchers")
|
||||
return false, err
|
||||
}
|
||||
|
||||
for _, watcher := range watchers {
|
||||
notified, err := t.Repository.CreateNotificationNotifed(ctx, db.CreateNotificationNotifedParams{
|
||||
UserID: watcher.UserID,
|
||||
NotificationID: n.NotificationID,
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("error while creating notification notified object")
|
||||
return false, err
|
||||
}
|
||||
payload, err := json.Marshal(utils.NotificationCreatedMessage{
|
||||
NotifiedID: notified.NotifiedID.String(),
|
||||
NotificationID: n.NotificationID.String(),
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := t.MessageQueue.Publish(context.Background(), "notification-created", payload).Err(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
type JobQueue struct {
|
||||
AppConfig config.AppConfig
|
||||
Server *machinery.Server
|
||||
AppConfig config.AppConfig
|
||||
Repository db.Repository
|
||||
Server *machinery.Server
|
||||
}
|
||||
|
||||
func (q *JobQueue) TaskMemberWasAdded(taskID, notifier, notified uuid.UUID) error {
|
||||
func (q *JobQueue) DueDateNotification(notificationId uuid.UUID) error {
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user