import * as rabbimq from "./rabbitmq"; import { Channel, Message } from "amqplib"; import { ORDER, EX_TYPE } from './rbEnum'; import * as mock from 'mockjs';
export async function orderDlq() { const connect = rabbimq.connection(); const channel: Channel = await connect.createChannel(); await channel.assertExchange(ORDER.ELX_EXCHANGE, EX_TYPE.TOPIC, { durable: true }) const queueConsumer = await channel.assertQueue(ORDER.ELK_QUEUE, { exclusive: true, }); await channel.prefetch(1, false); await channel.bindQueue(queueConsumer.queue, ORDER.ELX_EXCHANGE, 'order.#');
channel.consume(queueConsumer.queue, async msg => { channel.ack(msg, false); }); }
export async function orderSms() { const args = { exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY, routerKey: ORDER.ROUTERKEY_CREATE, queueName: ORDER.QUEUE_NOTIFICATION, elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE }; consumer(args, async (msg, channel) => { const { content } = msg; const success = mock.Random.boolean(); if (success) { console.info(`【短信】无需发送短信 ${content}`, ) channel.ack(msg, false); } else { console.info(`【短信】该用户尚未支付,发送短信中----- ${content}`); channel.nack(msg, false, false); } }) }
export async function orderSummery() { const args = { exchange: ORDER.EXCHANE, exchangeType: EX_TYPE.DELAY, routerKey: ORDER.ROUTERKEY_SUMMARY, queueName: ORDER.QUEUE_SUMMARY, elx: ORDER.ELX_EXCHANGE, elk: ORDER.ELK_QUEUE };
consumer(args, async (msg, channel) => { const { content, fields: { deliveryTag }, properties: { headers: { retry } } } = msg; const success = await mock.Random.boolean(); if (success) { console.info('【订单-消费】消费成功', content.toString()); channel.ack(msg, false); } else { console.info(`【订单-消费】放入死信队列`); channel.nack(msg, false, false); } }) }
async function consumer(args: { exchange: string, exchangeType: any, routerKey: string, queueName: string, elx: string, elk: string }, callback: (msg: Message, channel: Channel) => {}) { const { exchange, exchangeType, routerKey, queueName, elx, elk } = args; const connect = rabbimq.connection(); const channel: Channel = await connect.createConfirmChannel();
await channel.assertExchange(exchange, exchangeType, { durable: true, arguments: { 'x-delayed-type': EX_TYPE.TOPIC } })
const queueConsumer = await channel.assertQueue(queueName, { exclusive: true, deadLetterExchange: elx, deadLetterRoutingKey: elk, }); await channel.prefetch(1, false); await channel.bindQueue(queueConsumer.queue, exchange, routerKey); channel.consume(queueConsumer.queue, async msg => { callback(msg, channel); }, { noAck: false }); }
|