175 lines
3.5 KiB
Go
175 lines
3.5 KiB
Go
package messaging
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
amqp "github.com/rabbitmq/amqp091-go"
|
|
"github.com/rede5/gohorsejobs/backend/internal/core/ports"
|
|
)
|
|
|
|
type LavinMQService struct {
|
|
url string
|
|
conn *amqp.Connection
|
|
channel *amqp.Channel
|
|
mu sync.RWMutex
|
|
closed bool
|
|
reconnect chan bool
|
|
}
|
|
|
|
func NewLavinMQService(url string) *LavinMQService {
|
|
s := &LavinMQService{
|
|
url: url,
|
|
reconnect: make(chan bool),
|
|
}
|
|
go s.handleReconnect()
|
|
s.reconnect <- true
|
|
return s
|
|
}
|
|
|
|
func (s *LavinMQService) handleReconnect() {
|
|
for range s.reconnect {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
for {
|
|
log.Printf("Connecting to LavinMQ at %s...", s.url)
|
|
conn, err := amqp.Dial(s.url)
|
|
if err != nil {
|
|
log.Printf("Failed to connect to LavinMQ: %v. Retrying in 5s...", err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
|
|
ch, err := conn.Channel()
|
|
if err != nil {
|
|
log.Printf("Failed to open channel: %v. Retrying in 5s...", err)
|
|
conn.Close()
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.conn = conn
|
|
s.channel = ch
|
|
s.mu.Unlock()
|
|
|
|
log.Println("Connected to LavinMQ successfully")
|
|
|
|
// Listen for connection closure
|
|
closeChan := conn.NotifyClose(make(chan *amqp.Error))
|
|
go func() {
|
|
err := <-closeChan
|
|
if err != nil {
|
|
log.Printf("LavinMQ connection closed: %v", err)
|
|
s.reconnect <- true
|
|
}
|
|
}()
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *LavinMQService) Publish(ctx context.Context, exchange, routingKey string, payload interface{}) error {
|
|
s.mu.RLock()
|
|
ch := s.channel
|
|
s.mu.RUnlock()
|
|
|
|
if ch == nil {
|
|
return fmt.Errorf("lavinmq channel not initialized")
|
|
}
|
|
|
|
body, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal payload: %w", err)
|
|
}
|
|
|
|
return ch.PublishWithContext(ctx,
|
|
exchange, // exchange
|
|
routingKey, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: "application/json",
|
|
Body: body,
|
|
Timestamp: time.Now(),
|
|
})
|
|
}
|
|
|
|
func (s *LavinMQService) StartWorker(ctx context.Context, queueName string, handler func(context.Context, ports.NotificationPayload) error) error {
|
|
s.mu.RLock()
|
|
ch := s.channel
|
|
s.mu.RUnlock()
|
|
|
|
if ch == nil {
|
|
return fmt.Errorf("lavinmq channel not initialized")
|
|
}
|
|
|
|
// Ensure queue exists
|
|
_, err := ch.QueueDeclare(
|
|
queueName, // name
|
|
true, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to declare queue: %w", err)
|
|
}
|
|
|
|
msgs, err := ch.Consume(
|
|
queueName, // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
for d := range msgs {
|
|
var payload ports.NotificationPayload
|
|
if err := json.Unmarshal(d.Body, &payload); err != nil {
|
|
log.Printf("Failed to unmarshal worker message: %v", err)
|
|
d.Nack(false, false)
|
|
continue
|
|
}
|
|
|
|
if err := handler(ctx, payload); err != nil {
|
|
log.Printf("Worker handler failed for queue %s: %v", queueName, err)
|
|
d.Nack(false, true) // Requeue
|
|
} else {
|
|
d.Ack(false)
|
|
}
|
|
}
|
|
}()
|
|
|
|
log.Printf("Worker started for queue: %s", queueName)
|
|
return nil
|
|
}
|
|
|
|
func (s *LavinMQService) Close() error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.closed = true
|
|
if s.channel != nil {
|
|
s.channel.Close()
|
|
}
|
|
if s.conn != nil {
|
|
return s.conn.Close()
|
|
}
|
|
return nil
|
|
}
|