taskcafe/internal/jobs/jobs.go
2021-11-17 17:11:28 -06:00

173 lines
5.0 KiB
Go

package jobs
import (
"context"
"encoding/json"
"strconv"
"time"
"github.com/RichardKnop/machinery/v1"
mTasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/go-redis/redis/v8"
"github.com/jinzhu/now"
log "github.com/sirupsen/logrus"
"github.com/google/uuid"
"github.com/jordanknott/taskcafe/internal/config"
"github.com/jordanknott/taskcafe/internal/db"
"github.com/jordanknott/taskcafe/internal/utils"
)
type NotifiedData struct {
Data map[string]string
}
func RegisterTasks(server *machinery.Server, repo db.Repository, appConfig config.AppConfig, messageQueue *redis.Client) {
tasks := JobTasks{Repository: repo, Server: server, AppConfig: appConfig, MessageQueue: messageQueue}
server.RegisterTasks(map[string]interface{}{
"dueDateNotification": tasks.DueDateNotification,
"scheduleDueDateNotifications": tasks.ScheduleDueDateNotifications,
})
}
type JobTasks struct {
AppConfig config.AppConfig
Repository db.Repository
Server *machinery.Server
MessageQueue *redis.Client
}
func (t *JobTasks) ScheduleDueDateNotifications() (bool, error) {
ctx := context.Background()
// tomorrow := now.With(time.Now().UTC().AddDate(0, 0, 1))
today := now.With(time.Now().UTC())
start := today.BeginningOfDay()
log.WithFields(log.Fields{
"start": start,
}).Info("fetching duration")
reminders, err := t.Repository.GetDueDateRemindersForDuration(ctx, start)
if err != nil {
log.WithError(err).Error("error while getting due date reminder")
}
for _, rem := range reminders {
log.WithField("id", rem.DueDateReminderID).Info("found reminder")
signature := &mTasks.Signature{
UUID: "due_date_reminder_" + rem.DueDateReminderID.String(),
Name: "dueDateNotification",
ETA: &rem.RemindAt,
Args: []mTasks.Arg{{
Type: "string",
Value: rem.DueDateReminderID.String(),
}, {
Type: "string",
Value: rem.TaskID.String(),
}},
}
log.WithField("nanoTime", signature.ETA.UnixNano()).Info("rem time")
etaNano := strconv.FormatInt(signature.ETA.UnixNano(), 10)
result, err := t.MessageQueue.ZRangeByScore(ctx, "delayed_tasks", &redis.ZRangeBy{Max: etaNano, Min: etaNano}).Result()
if err != nil {
log.WithError(err).Error("error while getting due date reminder")
}
log.WithField("result", result).Info("result raw")
if len(result) == 0 {
log.Info("task not found, sending task")
t.Server.SendTask(signature)
}
}
return true, nil
}
func (t *JobTasks) DueDateNotification(dueDateIDEncoded string, taskIDEncoded string) (bool, error) {
ctx := context.Background()
dueDateID, err := uuid.Parse(dueDateIDEncoded)
if err != nil {
log.WithError(err).Error("while parsing task ID")
return false, err
}
taskID, err := uuid.Parse(taskIDEncoded)
if err != nil {
log.WithError(err).Error("while parsing task ID")
return false, err
}
dueAt, err := t.Repository.GetDueDateReminderByID(ctx, dueDateID)
if err != nil {
log.WithError(err).Error("while getting task by id")
return false, err
}
task, err := t.Repository.GetTaskByID(ctx, taskID)
if err != nil {
log.WithError(err).Error("while getting task by id")
return false, err
}
projectInfo, err := t.Repository.GetProjectInfoForTask(ctx, taskID)
if err != nil {
log.WithError(err).Error("error while getting project info for task")
return false, err
}
data := map[string]string{
"TaskID": task.ShortID,
"TaskName": task.Name,
"ProjectID": projectInfo.ProjectShortID,
"ProjectName": projectInfo.Name,
"DueAt": dueAt.RemindAt.String(),
}
now := time.Now().UTC()
raw, err := json.Marshal(NotifiedData{Data: data})
if err != nil {
log.WithError(err).Error("error while marshal json data for notification")
return false, err
}
n, err := t.Repository.CreateNotification(ctx, db.CreateNotificationParams{
CausedBy: uuid.UUID{},
ActionType: "DUE_DATE_REMINDER",
CreatedOn: now,
Data: json.RawMessage(raw),
})
if err != nil {
log.WithError(err).Error("error while creating notification")
return false, err
}
watchers, err := t.Repository.GetTaskWatchersForTask(ctx, taskID)
if err != nil {
log.WithError(err).Error("while getting watchers")
return false, err
}
for _, watcher := range watchers {
notified, err := t.Repository.CreateNotificationNotifed(ctx, db.CreateNotificationNotifedParams{
UserID: watcher.UserID,
NotificationID: n.NotificationID,
})
if err != nil {
log.WithError(err).Error("error while creating notification notified object")
return false, err
}
payload, err := json.Marshal(utils.NotificationCreatedMessage{
NotifiedID: notified.NotifiedID.String(),
NotificationID: n.NotificationID.String(),
})
if err != nil {
panic(err)
}
if err := t.MessageQueue.Publish(context.Background(), "notification-created", payload).Err(); err != nil {
panic(err)
}
}
return true, nil
}
type JobQueue struct {
AppConfig config.AppConfig
Repository db.Repository
Server *machinery.Server
}
func (q *JobQueue) DueDateNotification(notificationId uuid.UUID) error {
return nil
}