package messaging import ( "context" "encoding/json" "fmt" "log" "sync" "time" amqp "github.com/rabbitmq/amqp091-go" "github.com/rede5/gohorsejobs/backend/internal/core/ports" ) type LavinMQService struct { url string conn *amqp.Connection channel *amqp.Channel mu sync.RWMutex closed bool reconnect chan bool } func NewLavinMQService(url string) *LavinMQService { s := &LavinMQService{ url: url, reconnect: make(chan bool), } go s.handleReconnect() s.reconnect <- true return s } func (s *LavinMQService) handleReconnect() { for range s.reconnect { s.mu.Lock() if s.closed { s.mu.Unlock() return } s.mu.Unlock() for { log.Printf("Connecting to LavinMQ at %s...", s.url) conn, err := amqp.Dial(s.url) if err != nil { log.Printf("Failed to connect to LavinMQ: %v. Retrying in 5s...", err) time.Sleep(5 * time.Second) continue } ch, err := conn.Channel() if err != nil { log.Printf("Failed to open channel: %v. Retrying in 5s...", err) conn.Close() time.Sleep(5 * time.Second) continue } s.mu.Lock() s.conn = conn s.channel = ch s.mu.Unlock() log.Println("Connected to LavinMQ successfully") // Listen for connection closure closeChan := conn.NotifyClose(make(chan *amqp.Error)) go func() { err := <-closeChan if err != nil { log.Printf("LavinMQ connection closed: %v", err) s.reconnect <- true } }() break } } } func (s *LavinMQService) Publish(ctx context.Context, exchange, routingKey string, payload interface{}) error { s.mu.RLock() ch := s.channel s.mu.RUnlock() if ch == nil { return fmt.Errorf("lavinmq channel not initialized") } body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) } return ch.PublishWithContext(ctx, exchange, // exchange routingKey, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, Timestamp: time.Now(), }) } func (s *LavinMQService) StartWorker(ctx context.Context, queueName string, handler func(context.Context, ports.NotificationPayload) error) error { s.mu.RLock() ch := s.channel s.mu.RUnlock() if ch == nil { return fmt.Errorf("lavinmq channel not initialized") } // Ensure queue exists _, err := ch.QueueDeclare( queueName, // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return fmt.Errorf("failed to declare queue: %w", err) } msgs, err := ch.Consume( queueName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return err } go func() { for d := range msgs { var payload ports.NotificationPayload if err := json.Unmarshal(d.Body, &payload); err != nil { log.Printf("Failed to unmarshal worker message: %v", err) d.Nack(false, false) continue } if err := handler(ctx, payload); err != nil { log.Printf("Worker handler failed for queue %s: %v", queueName, err) d.Nack(false, true) // Requeue } else { d.Ack(false) } } }() log.Printf("Worker started for queue: %s", queueName) return nil } func (s *LavinMQService) Close() error { s.mu.Lock() defer s.mu.Unlock() s.closed = true if s.channel != nil { s.channel.Close() } if s.conn != nil { return s.conn.Close() } return nil }