package services import ( "context" "database/sql" "log" "github.com/rede5/gohorsejobs/backend/internal/core/ports" "github.com/rede5/gohorsejobs/backend/internal/models" ) type NotificationService struct { DB *sql.DB FCM *FCMService Messaging ports.MessagingService } func NewNotificationService(db *sql.DB, fcm *FCMService, messaging ports.MessagingService) *NotificationService { return &NotificationService{DB: db, FCM: fcm, Messaging: messaging} } func (s *NotificationService) CreateNotification(ctx context.Context, userID string, nType, title, message string, link *string) error { query := ` INSERT INTO notifications (user_id, type, title, message, link, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) RETURNING id ` var id string err := s.DB.QueryRowContext(ctx, query, userID, nType, title, message, link).Scan(&id) if err != nil { return err } // Publish to Queue for asynchronous processing (Push, Email, etc.) if s.Messaging != nil { event := map[string]interface{}{ "id": id, "user_id": userID, "type": nType, "title": title, "message": message, "link": link, } if err := s.Messaging.Publish(ctx, "", "notifications", event); err != nil { log.Printf("Failed to publish notification event: %v", err) } } return nil } func (s *NotificationService) ListNotifications(ctx context.Context, userID string) ([]models.Notification, error) { query := ` SELECT id, user_id, type, title, message, link, read_at, created_at, updated_at FROM notifications WHERE user_id = $1 ORDER BY created_at DESC LIMIT 50 ` rows, err := s.DB.QueryContext(ctx, query, userID) if err != nil { return nil, err } defer rows.Close() notifications := []models.Notification{} for rows.Next() { var n models.Notification if err := rows.Scan( &n.ID, &n.UserID, &n.Type, &n.Title, &n.Message, &n.Link, &n.ReadAt, &n.CreatedAt, &n.UpdatedAt, ); err != nil { return nil, err } notifications = append(notifications, n) } return notifications, nil } func (s *NotificationService) MarkAsRead(ctx context.Context, id string, userID string) error { query := ` UPDATE notifications SET read_at = NOW(), updated_at = NOW() WHERE id = $1 AND user_id = $2 ` _, err := s.DB.ExecContext(ctx, query, id, userID) return err } func (s *NotificationService) MarkAllAsRead(ctx context.Context, userID string) error { query := ` UPDATE notifications SET read_at = NOW(), updated_at = NOW() WHERE user_id = $1 AND read_at IS NULL ` _, err := s.DB.ExecContext(ctx, query, userID) return err } func (s *NotificationService) SaveFCMToken(ctx context.Context, userID, token, platform string) error { query := ` INSERT INTO fcm_tokens (user_id, token, platform, last_seen_at) VALUES ($1, $2, $3, NOW()) ON CONFLICT (user_id, token) DO UPDATE SET last_seen_at = NOW(), platform = EXCLUDED.platform ` _, err := s.DB.ExecContext(ctx, query, userID, token, platform) return err } func (s *NotificationService) GetUserFCMTokens(ctx context.Context, userID string) ([]string, error) { query := `SELECT token FROM fcm_tokens WHERE user_id = $1` rows, err := s.DB.QueryContext(ctx, query, userID) if err != nil { return nil, err } defer rows.Close() var tokens []string for rows.Next() { var token string if err := rows.Scan(&token); err != nil { return nil, err } tokens = append(tokens, token) } return tokens, nil } // StartWorker initializes the background consumer for notifications func (s *NotificationService) StartWorker(ctx context.Context) error { if s.Messaging == nil { return nil } // Dynamic handler to process each notification in the queue handler := func(ctx context.Context, p ports.NotificationPayload) error { // 1. Get User Tokens tokens, err := s.GetUserFCMTokens(ctx, p.UserID) if err != nil { return err } if len(tokens) == 0 { log.Printf("No FCM tokens found for user %s, skipping push", p.UserID) return nil } // 2. Dispatch via FCM return s.FCM.SendMulticast(ctx, tokens, p.Title, p.Message, p.Data) } // Assuming we added StartWorker to MessagingService interface type startable interface { StartWorker(ctx context.Context, queue string, handler func(context.Context, ports.NotificationPayload) error) error } if worker, ok := s.Messaging.(startable); ok { return worker.StartWorker(ctx, "notifications", func(ctx context.Context, p ports.NotificationPayload) error { return handler(ctx, p) }) } return nil }