Created
July 23, 2023 21:06
-
-
Save Madjarx/fa9e92394ea23c7752cc5bad77daef2e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * This gist is here to preserve the wonderful class of Broker that i made once upon a time | |
| * I'm prolly going to reuse it sometime and enhance it, thus the gist | |
| */ | |
| const amqp = require('amqplib'); | |
| const clc = require('cli-color'); | |
| /** | |
| * Broker class | |
| * | |
| * @typedef {Object} Broker - rabbitMQ broker wrapper | |
| */ | |
| module.exports = class Broker { | |
| /** | |
| * @property {amqp.Connection} connection - RabbitMQ connection | |
| * @property {amqp.Channel} channel - RabbitMQ channel | |
| */ | |
| constructor() { | |
| this.connection = null; | |
| this.channel = null; | |
| } | |
| /** | |
| * | |
| * @param {String} url - RabbitMQ server url | |
| * | |
| * @returns {Promise<Broker>} - Promise that resolves when the connection is established | |
| */ | |
| async connect(url) { | |
| try { | |
| this.connection = await amqp.connect(url); | |
| this.channel = await this.connection.createChannel(); | |
| } catch (error) { | |
| console.error(`BROKER: Connect Method: Failed to connect to RabbitMQ server at ${url}`, error); | |
| throw error; | |
| } | |
| } | |
| /** | |
| * | |
| * @param {String} queue | |
| * @param {Object} message | |
| */ | |
| async sendMessage(queue, message) { | |
| try { | |
| await this.channel.assertQueue(queue, { durable: false }); | |
| this.channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); | |
| console.log(`Sent message to queue ${clc.yellow(queue)}`); | |
| // console.log(`Sent message to queue ${queue}:`, message); | |
| } catch (error) { | |
| console.error(clc.red(`Broker: Failed to send message to queue ${queue}`), error); | |
| throw error; | |
| } | |
| } | |
| /** | |
| * | |
| * @param {String} queue | |
| * @param {Function} callback | |
| * | |
| * Current available queues: | |
| * - contracts | |
| * - transactions | |
| * - deployedContracts | |
| */ | |
| async listenTo(queue, callback) { | |
| try { | |
| await this.channel.assertQueue(queue, { durable: false }); | |
| console.log(`Listening for messages in queue ${clc.yellow(queue)}`); | |
| this.channel.consume(queue, (msg) => { | |
| const message = JSON.parse(msg.content.toString()); | |
| callback(message); | |
| }, { noAck: true }); | |
| } catch (error) { | |
| console.error(`Broker: Failed to listen to queue ${queue}`, error); | |
| throw error; | |
| } | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A lot of functionality to be added such as broker init method, providing callback functions to more events etc.etc.
All in all WIP