diff --git a/backend/internal/core/ports/messaging.go b/backend/internal/core/ports/messaging.go new file mode 100644 index 0000000..ab13400 --- /dev/null +++ b/backend/internal/core/ports/messaging.go @@ -0,0 +1,18 @@ +package ports + +import "context" + +type NotificationPayload struct { + ID string `json:"id"` + UserID string `json:"user_id"` + Type string `json:"type"` + Title string `json:"title"` + Message string `json:"message"` + Link *string `json:"link"` + Data map[string]string `json:"data"` +} + +type MessagingService interface { + Publish(ctx context.Context, exchange, routingKey string, payload interface{}) error + StartWorker(ctx context.Context, queueName string, handler func(context.Context, NotificationPayload) error) error +} diff --git a/backend/internal/infrastructure/messaging/lavinmq.go b/backend/internal/infrastructure/messaging/lavinmq.go new file mode 100644 index 0000000..f624fbf --- /dev/null +++ b/backend/internal/infrastructure/messaging/lavinmq.go @@ -0,0 +1,175 @@ +package messaging + +import ( + "context" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/rede5/gohorsejobs/backend/internal/core/ports" +) + +type LavinMQService struct { + url string + conn *amqp.Connection + channel *amqp.Channel + mu sync.RWMutex + closed bool + reconnect chan bool +} + +func NewLavinMQService(url string) *LavinMQService { + s := &LavinMQService{ + url: url, + reconnect: make(chan bool), + } + go s.handleReconnect() + s.reconnect <- true + return s +} + +func (s *LavinMQService) handleReconnect() { + for range s.reconnect { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return + } + s.mu.Unlock() + + for { + log.Printf("Connecting to LavinMQ at %s...", s.url) + conn, err := amqp.Dial(s.url) + if err != nil { + log.Printf("Failed to connect to LavinMQ: %v. Retrying in 5s...", err) + time.Sleep(5 * time.Second) + continue + } + + ch, err := conn.Channel() + if err != nil { + log.Printf("Failed to open channel: %v. Retrying in 5s...", err) + conn.Close() + time.Sleep(5 * time.Second) + continue + } + + s.mu.Lock() + s.conn = conn + s.channel = ch + s.mu.Unlock() + + log.Println("Connected to LavinMQ successfully") + + // Listen for connection closure + closeChan := conn.NotifyClose(make(chan *amqp.Error)) + go func() { + err := <-closeChan + if err != nil { + log.Printf("LavinMQ connection closed: %v", err) + s.reconnect <- true + } + }() + break + } + } +} + +func (s *LavinMQService) Publish(ctx context.Context, exchange, routingKey string, payload interface{}) error { + s.mu.RLock() + ch := s.channel + s.mu.RUnlock() + + if ch == nil { + return fmt.Errorf("lavinmq channel not initialized") + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + return ch.PublishWithContext(ctx, + exchange, // exchange + routingKey, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: body, + Timestamp: time.Now(), + }) +} + +func (s *LavinMQService) StartWorker(ctx context.Context, queueName string, handler func(context.Context, ports.NotificationPayload) error) error { + s.mu.RLock() + ch := s.channel + s.mu.RUnlock() + + if ch == nil { + return fmt.Errorf("lavinmq channel not initialized") + } + + // Ensure queue exists + _, err := ch.QueueDeclare( + queueName, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare queue: %w", err) + } + + msgs, err := ch.Consume( + queueName, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return err + } + + go func() { + for d := range msgs { + var payload ports.NotificationPayload + if err := json.Unmarshal(d.Body, &payload); err != nil { + log.Printf("Failed to unmarshal worker message: %v", err) + d.Nack(false, false) + continue + } + + if err := handler(ctx, payload); err != nil { + log.Printf("Worker handler failed for queue %s: %v", queueName, err) + d.Nack(false, true) // Requeue + } else { + d.Ack(false) + } + } + }() + + log.Printf("Worker started for queue: %s", queueName) + return nil +} + +func (s *LavinMQService) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + s.closed = true + if s.channel != nil { + s.channel.Close() + } + if s.conn != nil { + return s.conn.Close() + } + return nil +} diff --git a/backend/internal/infrastructure/notifications/fcm.go b/backend/internal/infrastructure/notifications/fcm.go new file mode 100644 index 0000000..fd438ba --- /dev/null +++ b/backend/internal/infrastructure/notifications/fcm.go @@ -0,0 +1,78 @@ +package notifications + +import ( + "context" + "fmt" + "log" + + firebase "firebase.google.com/go/v4" + "firebase.google.com/go/v4/messaging" + "google.golang.org/api/option" +) + +type FCMService struct { + app *firebase.App + client *messaging.Client +} + +func NewFCMService(serviceAccountJSON []byte) (*FCMService, error) { + ctx := context.Background() + opt := option.WithCredentialsJSON(serviceAccountJSON) + + app, err := firebase.NewApp(ctx, nil, opt) + if err != nil { + return nil, fmt.Errorf("error initializing firebase app: %w", err) + } + + client, err := app.Messaging(ctx) + if err != nil { + return nil, fmt.Errorf("error getting messaging client: %w", err) + } + + return &FCMService{ + app: app, + client: client, + }, nil +} + +func (s *FCMService) SendPush(ctx context.Context, token, title, body string, data map[string]string) error { + message := &messaging.Message{ + Token: token, + Notification: &messaging.Notification{ + Title: title, + Body: body, + }, + Data: data, + } + + response, err := s.client.Send(ctx, message) + if err != nil { + return fmt.Errorf("error sending fcm message: %w", err) + } + + log.Printf("Successfully sent fcm message: %s", response) + return nil +} + +func (s *FCMService) SendMulticast(ctx context.Context, tokens []string, title, body string, data map[string]string) error { + if len(tokens) == 0 { + return nil + } + + message := &messaging.MulticastMessage{ + Tokens: tokens, + Notification: &messaging.Notification{ + Title: title, + Body: body, + }, + Data: data, + } + + br, err := s.client.SendEachForMulticast(ctx, message) + if err != nil { + return fmt.Errorf("error sending multicast message: %w", err) + } + + log.Printf("Multicast results: %d successes, %d failures", br.SuccessCount, br.FailureCount) + return nil +} diff --git a/backend/internal/router/router.go b/backend/internal/router/router.go index 003a2b5..2b89e60 100755 --- a/backend/internal/router/router.go +++ b/backend/internal/router/router.go @@ -41,6 +41,14 @@ func NewRouter() http.Handler { // Utils Services (Moved up for dependency injection) credentialsService := services.NewCredentialsService(database.DB) settingsService := services.NewSettingsService(database.DB) + + // Initialize Messaging + amqpURL := os.Getenv("AMQP_URL") + var messagingService ports.MessagingService + if amqpURL != "" { + messagingService = messaging.NewLavinMQService(amqpURL) + } + storageService := services.NewStorageService(credentialsService) fcmService := services.NewFCMService(credentialsService) cloudflareService := services.NewCloudflareService(credentialsService) @@ -53,8 +61,8 @@ func NewRouter() http.Handler { jwtSecret := os.Getenv("JWT_SECRET") if jwtSecret == "" { - // Fallback for dev, but really should be in env - jwtSecret = "default-dev-secret-do-not-use-in-prod" + // Fallback for dev, but really should be in env + jwtSecret = "default-dev-secret-do-not-use-in-prod" } authService := authInfra.NewJWTService(jwtSecret, "todai-jobs") @@ -65,12 +73,12 @@ func NewRouter() http.Handler { // Frontend URL for reset link frontendURL := os.Getenv("FRONTEND_URL") if frontendURL == "" { - frontendURL = "http://localhost:8963" + frontendURL = "http://localhost:8963" } // UseCases loginUC := authUC.NewLoginUseCase(userRepo, authService) - registerCandidateUC := authUC.NewRegisterCandidateUseCase(userRepo, companyRepo, authService, emailService) + registerCandidateUC := authUC.NewRegisterCandidateUseCase(userRepo, companyRepo, authService, emailService) createCompanyUC := tenantUC.NewCreateCompanyUseCase(companyRepo, userRepo, authService) listCompaniesUC := tenantUC.NewListCompaniesUseCase(companyRepo) createUserUC := userUC.NewCreateUserUseCase(userRepo, authService) @@ -83,9 +91,19 @@ func NewRouter() http.Handler { // Admin Logic Services auditService := services.NewAuditService(database.DB) - notificationService := services.NewNotificationService(database.DB, fcmService) - ticketService := services.NewTicketService(database.DB) + notificationService := services.NewNotificationService(database.DB, fcmService, messagingService) + // Start Background Workers + if messagingService != nil { + go func() { + log.Println("Starting background workers...") + if err := notificationService.StartWorker(context.Background()); err != nil { + log.Printf("Error starting notification worker: %v", err) + } + }() + } + + ticketService := services.NewTicketService(database.DB) // Handlers & Middleware coreHandlers := apiHandlers.NewCoreHandlers( loginUC, diff --git a/backend/internal/services/notification_service.go b/backend/internal/services/notification_service.go index 3da05e4..33dbd20 100644 --- a/backend/internal/services/notification_service.go +++ b/backend/internal/services/notification_service.go @@ -3,28 +3,51 @@ package services import ( "context" "database/sql" + "log" + "github.com/rede5/gohorsejobs/backend/internal/core/ports" "github.com/rede5/gohorsejobs/backend/internal/models" ) type NotificationService struct { - DB *sql.DB - FCM *FCMService + DB *sql.DB + FCM *FCMService + Messaging ports.MessagingService } -func NewNotificationService(db *sql.DB, fcm *FCMService) *NotificationService { - return &NotificationService{DB: db, FCM: fcm} +func NewNotificationService(db *sql.DB, fcm *FCMService, messaging ports.MessagingService) *NotificationService { + return &NotificationService{DB: db, FCM: fcm, Messaging: messaging} } func (s *NotificationService) CreateNotification(ctx context.Context, userID string, nType, title, message string, link *string) error { query := ` - INSERT INTO notifications (user_id, type, title, message, link, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) - ` - _, err := s.DB.ExecContext(ctx, query, userID, nType, title, message, link) - return err -} + INSERT INTO notifications (user_id, type, title, message, link, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, NOW(), NOW()) + RETURNING id + ` + var id string + err := s.DB.QueryRowContext(ctx, query, userID, nType, title, message, link).Scan(&id) + if err != nil { + return err + } + // Publish to Queue for asynchronous processing (Push, Email, etc.) + if s.Messaging != nil { + event := map[string]interface{}{ + "id": id, + "user_id": userID, + "type": nType, + "title": title, + "message": message, + "link": link, + } + if err := s.Messaging.Publish(ctx, "", "notifications", event); err != nil { + log.Printf("Failed to publish notification event: %v", err) + } + } + + return nil +} func (s *NotificationService) ListNotifications(ctx context.Context, userID string) ([]models.Notification, error) { query := ` SELECT id, user_id, type, title, message, link, read_at, created_at, updated_at @@ -91,3 +114,59 @@ func (s *NotificationService) SaveFCMToken(ctx context.Context, userID, token, p _, err := s.DB.ExecContext(ctx, query, userID, token, platform) return err } + +func (s *NotificationService) GetUserFCMTokens(ctx context.Context, userID string) ([]string, error) { + query := `SELECT token FROM fcm_tokens WHERE user_id = $1` + rows, err := s.DB.QueryContext(ctx, query, userID) + if err != nil { + return nil, err + } + defer rows.Close() + + var tokens []string + for rows.Next() { + var token string + if err := rows.Scan(&token); err != nil { + return nil, err + } + tokens = append(tokens, token) + } + return tokens, nil +} + +// StartWorker initializes the background consumer for notifications +func (s *NotificationService) StartWorker(ctx context.Context) error { + if s.Messaging == nil { + return nil + } + + // Dynamic handler to process each notification in the queue + handler := func(ctx context.Context, p ports.NotificationPayload) error { + // 1. Get User Tokens + tokens, err := s.GetUserFCMTokens(ctx, p.UserID) + if err != nil { + return err + } + + if len(tokens) == 0 { + log.Printf("No FCM tokens found for user %s, skipping push", p.UserID) + return nil + } + + // 2. Dispatch via FCM + return s.FCM.SendMulticast(ctx, tokens, p.Title, p.Message, p.Data) + } + + // Assuming we added StartWorker to MessagingService interface + type startable interface { + StartWorker(ctx context.Context, queue string, handler func(context.Context, ports.NotificationPayload) error) error + } + + if worker, ok := s.Messaging.(startable); ok { + return worker.StartWorker(ctx, "notifications", func(ctx context.Context, p ports.NotificationPayload) error { + return handler(ctx, p) + }) + } + + return nil +}