下面是我的代码:
this.callActiveListSubscription = timer(0,1000).pipe(switchMap(() => this.realtimeSupervisorHeatMapService.getLiveCallList())).subscribe((data: any) => {
console.log('inside get live call list subscription',data);
let fetchRequired = data.fetchRequired;
this.recent_data = data;
var listOfStrem = data.stream;
var list = data.list_Of_contacts.filter(function (el) {
return el != null;
});
if (list.length == 0) {
// Set SelectedCallKey as None and Active Calls to 0
this.allCallsData = [];
this.agentCallList = [];
this.filteredCallList = [];
this.activeCallCount = 0;
this.selectedCallData = null;
} else {
// set the amount of active calls
console.log(this.selectedBusinessUnit)
if(this.selectedBusinessUnit !== 'All'){
this.activeCallCount = list.filter(item=>item.business_unit === this.selectedBusinessUnit).length;
} else {
this.activeCallCount = list.length;
}
}
this.agentCallList = []
for (let i = 0; i < list.length; i++) {
if (list[i]) {
var agentName = list[i].agent_name;
var agtId = list[i].agentId;
var streamStartTime = list[i].streamStartTime;
var slackDMUrl = list[i].agentSlackDMUrl;
slackDMUrl = slackDMUrl === "NA" ? "#" : slackDMUrl;
var name = list[i].contact;
let phrase_count = list[i].phrases_count;
let business_unit = list[i].business_unit
let item = { name, agent_name: agentName, agentId: agtId, streamStartTime, slackDMUrl, phrase_count: phrase_count, business_unit }
let index = this.agentCallList.findIndex((element: any) => element.name === item.name)
if (index === -1) {
this.agentCallList.push(item);
} else {
this.agentCallList[index] = item;
}
this.agentCallList.sort((a,b)=>b.phrase_count - a.phrase_count);
if (i == list.length - 1) {
this.changeBusinessUnitDropdown();
} else {
// console.log("wait")
}
}
}
})
它位于 ngOnInit() 内部并且仅执行一次。
我希望它按照计时器内提到的那样每秒执行一次。我也使用 mergeMap 代替,但没有运气。任何帮助,将不胜感激。我也使用间隔代替计时器,但它仍然不起作用。
不需要
timer
,您可以直接使用fromEvent
,只需确保在网络套接字关闭时取消订阅即可!我认为您需要两个流,一个用于使用计时器刷新数据,另一个用于监听 websocket 事件。
我找不到
streamArray
的事件,所以我使用 message
事件并分享一个使用 rxjs fromEvent
监听 Web 套接字的示例。
import axios from 'axios';
import { of, from, fromEvent, timer } from 'rxjs';
import { map, share, switchMap } from 'rxjs/operators';
const socket = new WebSocket('wss://socketsbay.com/wss/v2/1/demo/');
const createObservable = (eventName: string) => {
return fromEvent(socket, eventName);
};
const getLiveCallList = () => {
return createObservable('message');
};
getLiveCallList().subscribe((data: any) => {
console.log(data);
});
socket.addEventListener('open', (event) => {
console.log('open');
socket.send('Hello Server!');
});
socket.addEventListener('message', (event) => {
console.log('Message from server ', event.data);
});