我正在 MERN 堆栈中构建一个微服务应用程序,并在运行该服务时使用 RabbitMQ 作为消息代理,一段时间后我收到错误 错误:读取 ECONNRESET 在 TLSWrap.onStreamRead (节点:内部/stream_base_commons:217:20){ 错误号:-4077, 代码:'经济重置', 系统调用:“读” } 这是我的代码
import * as amqp from 'amqplib';
import dotenv from 'dotenv'
dotenv.config();
const rabbitURL: any = process.env.RABBIT_MQ
class RabbitMQ {
private static connection: amqp.Connection | null = null;
static async getConnection(): Promise<amqp.Connection> {
try {
if (!RabbitMQ.connection) {
RabbitMQ.connection = await amqp.connect(rabbitURL);
}
return RabbitMQ.connection as amqp.Connection
} catch (error) {
console.error(error);
throw error;
}
}
static async createChannel(): Promise<amqp.Channel> {
try {
const connection = await RabbitMQ.getConnection();
return connection.createChannel();
} catch (error) {
console.error(error);
throw error;
}
}
}
export default RabbitMQ;
这是消费者文件
async gigCreatedConsumer(){
try{
console.log("starting rabbit mq channel ");
const channel = await RabbitMQ.createChannel();
const exchangeName = 'gig-exchange';
const queueName = 'gig-service-queue';
await channel.assertExchange(exchangeName, 'direct', {durable: false});
const {queue} = await channel.assertQueue(queueName, {durable: false});
const routingKey = 'gig-created';
await channel.bindQueue(queue ,exchangeName, routingKey);
return new Promise((resolve ,reject)=>{
channel.consume(queue, (message)=>{
if(message){
try {
const createdGig: any = JSON.parse(message.content.toString());
channel.ack(message);
resolve(createdGig)
} catch (error) {
console.error("error processing gig creation");
channel.ack(message);
reject(error)
}
}
})
})
await channel.close()
}catch(err){
console.error("error setting up consumer", err)
}
},
我已经在控制器中调用了创建的函数,并在index.ts文件中调用了该函数
索引.ts
const app = express();
app.use(cors())
dotenv.config()
app.use(express.urlencoded({ extended: true }))
app.use(express.json());
const PORT = process.env.PORT || 8001;
userController.setup()
userController.gigAccept()
userController.gigReject()
userController.gigDeleteEvent()
app.use(router)
app.use(userRouter)
控制器.ts
async gigDeleteEvent() {
try {
const data: any = await userGigConsumers.gigDeleteConsumer();
const gigId = data;
const objId = await GigUserModel.find({ refId: gigId })
const gig = await GigUserModel.findByIdAndDelete(objId[0]._id);
console.log("gig deleted from usergig database");
} catch (error) {
console.log(error);
}
},
我猜你必须启用心跳 你写“我在一段时间后收到错误”-->持续多长时间?