Skip to content
/ rabbitmq Public

Wrap RabbitMQ ( amqp091-go ) to enable automatic reconnection in case of disconnection. 针对 RabbitMQ(amqp091-go) 的封装,使其能够断线自动重连

Notifications You must be signed in to change notification settings

icpd/rabbitmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

7e82584 · Oct 31, 2023

History

22 Commits
Apr 10, 2023
Mar 25, 2023
Oct 31, 2023
Apr 1, 2023
Mar 25, 2023
Apr 10, 2023
Jul 28, 2023
Mar 28, 2023
Mar 25, 2023
Mar 25, 2023
Jul 28, 2023

Repository files navigation

rabbitmq

Wrap github.com/rabbitmq/amqp091-go to enable automatic reconnection in case of disconnection.

Feature

  • Add automatic reconnection handling
  • Retry sending messages when failed
  • Shield the usage details of github.com/rabbitmq/amqp091-go and reduce users' cognitive load.

Usage

Publish

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/icpd/rabbitmq"
)

func main() {
	// 1. Initialize a rabbitmq object
	r := rabbitmq.NewRabbitmq(
		"amqp://admin:[email protected]:5672",
		nil,
	)

	// 2. Create a connection to the rabbitmq service
	if err := r.Connect(); err != nil {
		log.Fatal(err)
	}

	// 3. Declare an exchange with the rabbitmq service
	exchange := rabbitmq.ExchangeOptions{
		Name:    "example_exchange",          // Exchange name
		Type:    rabbitmq.ExchangeTypeFanout, // Exchange type
		Durable: true,                        // Whether it is durable
	}
	if err := r.Exchange(exchange); err != nil {
		log.Fatal(err)
	}

	// 4. Send a message to the exchange, not blocking
	// Notice: Please ensure that the exchange has been created before publishing a message
	err := r.Publish(
		context.Background(),
		[]byte(fmt.Sprintf("hello %d", i)), // Message body to be sent 
		rabbitmq.Exchange(exchange),        // Set the message exchange target
	)
	if err != nil {
		log.Fatal(err)
	}

	select {}
}

Subscribe

package main

import (
	"log"

	"github.com/icpd/rabbitmq"
)

func main() {
	// 1. Initialize a rabbitmq object
	r := rabbitmq.NewRabbitmq(
		"amqp://admin:[email protected]:5672",
		nil,
	)

	// 2. Create a connection to the rabbitmq service
	if err := r.Connect(); err != nil {
		log.Fatal(err)
	}

	// 3. Exchange configuration
	exchange := rabbitmq.ExchangeOptions{
		Name:    "example_exchange",          // Exchange name
		Type:    rabbitmq.ExchangeTypeFanout, // Exchange type
		Durable: true,                        // Whether it is durable
	}

	// 4. Create a subscription
	// Subscribing and consuming internally starts a goroutine that consumes data so it won't block the main goroutine.
	err := r.Subscribe(func(msg []byte) error {
		log.Println("receive:", string(msg))
		return nil
	},
		rabbitmq.Queue("example_queue"), // Set the name of the consumption queue
		rabbitmq.Exchange(exchange),     // Set the exchange that the queue needs to bind to
	)
	if err != nil {
		log.Fatal(err)
	}

	select {}
}

For more usage examples, please refer to the _example directory.

Others

https://github.com/wagslane/go-rabbitmq
https://github.com/pmorelli92/bunnify

About

Wrap RabbitMQ ( amqp091-go ) to enable automatic reconnection in case of disconnection. 针对 RabbitMQ(amqp091-go) 的封装,使其能够断线自动重连

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages