172 lines
4.5 KiB
Go
172 lines
4.5 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"log"
|
|
|
|
"github.com/rede5/gohorsejobs/backend/internal/core/ports"
|
|
"github.com/rede5/gohorsejobs/backend/internal/models"
|
|
)
|
|
|
|
type NotificationService struct {
|
|
DB *sql.DB
|
|
FCM *FCMService
|
|
Messaging ports.MessagingService
|
|
}
|
|
|
|
func NewNotificationService(db *sql.DB, fcm *FCMService, messaging ports.MessagingService) *NotificationService {
|
|
return &NotificationService{DB: db, FCM: fcm, Messaging: messaging}
|
|
}
|
|
|
|
func (s *NotificationService) CreateNotification(ctx context.Context, userID string, nType, title, message string, link *string) error {
|
|
query := `
|
|
INSERT INTO notifications (user_id, type, title, message, link, created_at, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
|
|
RETURNING id
|
|
`
|
|
var id string
|
|
err := s.DB.QueryRowContext(ctx, query, userID, nType, title, message, link).Scan(&id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Publish to Queue for asynchronous processing (Push, Email, etc.)
|
|
if s.Messaging != nil {
|
|
event := map[string]interface{}{
|
|
"id": id,
|
|
"user_id": userID,
|
|
"type": nType,
|
|
"title": title,
|
|
"message": message,
|
|
"link": link,
|
|
}
|
|
if err := s.Messaging.Publish(ctx, "", "notifications", event); err != nil {
|
|
log.Printf("Failed to publish notification event: %v", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
func (s *NotificationService) ListNotifications(ctx context.Context, userID string) ([]models.Notification, error) {
|
|
query := `
|
|
SELECT id, user_id, type, title, message, link, read_at, created_at, updated_at
|
|
FROM notifications
|
|
WHERE user_id = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT 50
|
|
`
|
|
rows, err := s.DB.QueryContext(ctx, query, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
notifications := []models.Notification{}
|
|
for rows.Next() {
|
|
var n models.Notification
|
|
if err := rows.Scan(
|
|
&n.ID,
|
|
&n.UserID,
|
|
&n.Type,
|
|
&n.Title,
|
|
&n.Message,
|
|
&n.Link,
|
|
&n.ReadAt,
|
|
&n.CreatedAt,
|
|
&n.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
notifications = append(notifications, n)
|
|
}
|
|
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *NotificationService) MarkAsRead(ctx context.Context, id string, userID string) error {
|
|
query := `
|
|
UPDATE notifications
|
|
SET read_at = NOW(), updated_at = NOW()
|
|
WHERE id = $1 AND user_id = $2
|
|
`
|
|
_, err := s.DB.ExecContext(ctx, query, id, userID)
|
|
return err
|
|
}
|
|
|
|
func (s *NotificationService) MarkAllAsRead(ctx context.Context, userID string) error {
|
|
query := `
|
|
UPDATE notifications
|
|
SET read_at = NOW(), updated_at = NOW()
|
|
WHERE user_id = $1 AND read_at IS NULL
|
|
`
|
|
_, err := s.DB.ExecContext(ctx, query, userID)
|
|
return err
|
|
}
|
|
|
|
func (s *NotificationService) SaveFCMToken(ctx context.Context, userID, token, platform string) error {
|
|
query := `
|
|
INSERT INTO fcm_tokens (user_id, token, platform, last_seen_at)
|
|
VALUES ($1, $2, $3, NOW())
|
|
ON CONFLICT (user_id, token)
|
|
DO UPDATE SET last_seen_at = NOW(), platform = EXCLUDED.platform
|
|
`
|
|
_, err := s.DB.ExecContext(ctx, query, userID, token, platform)
|
|
return err
|
|
}
|
|
|
|
func (s *NotificationService) GetUserFCMTokens(ctx context.Context, userID string) ([]string, error) {
|
|
query := `SELECT token FROM fcm_tokens WHERE user_id = $1`
|
|
rows, err := s.DB.QueryContext(ctx, query, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var tokens []string
|
|
for rows.Next() {
|
|
var token string
|
|
if err := rows.Scan(&token); err != nil {
|
|
return nil, err
|
|
}
|
|
tokens = append(tokens, token)
|
|
}
|
|
return tokens, nil
|
|
}
|
|
|
|
// StartWorker initializes the background consumer for notifications
|
|
func (s *NotificationService) StartWorker(ctx context.Context) error {
|
|
if s.Messaging == nil {
|
|
return nil
|
|
}
|
|
|
|
// Dynamic handler to process each notification in the queue
|
|
handler := func(ctx context.Context, p ports.NotificationPayload) error {
|
|
// 1. Get User Tokens
|
|
tokens, err := s.GetUserFCMTokens(ctx, p.UserID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(tokens) == 0 {
|
|
log.Printf("No FCM tokens found for user %s, skipping push", p.UserID)
|
|
return nil
|
|
}
|
|
|
|
// 2. Dispatch via FCM
|
|
return s.FCM.SendMulticast(ctx, tokens, p.Title, p.Message, p.Data)
|
|
}
|
|
|
|
// Assuming we added StartWorker to MessagingService interface
|
|
type startable interface {
|
|
StartWorker(ctx context.Context, queue string, handler func(context.Context, ports.NotificationPayload) error) error
|
|
}
|
|
|
|
if worker, ok := s.Messaging.(startable); ok {
|
|
return worker.StartWorker(ctx, "notifications", func(ctx context.Context, p ports.NotificationPayload) error {
|
|
return handler(ctx, p)
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|