我正在 TypeScript 中实现异步任务的优先级队列(使用 React 作为 UI)。队列不断充满任务,用户界面中有一个按钮可以触发优先级更改或向队列添加新任务。 鉴于 JavaScript 在单线程上运行(具有并发性,但没有并行性),我想知道是否需要保护队列免受潜在竞争条件的影响。具体来说:
我知道任务执行是单线程的。我也考虑过使用 p-queue 包,但我不确定它是否提供针对竞争条件的保护。 任何见解都将不胜感激,因为我仍在学习 JavaScript/TypeScript 并发模型。
我的实现:
import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";
export type PriorityQueueItem<T> = {
readonly id: string;
priority: number;
task: () => Promise<T>;
};
export type PriorityQueue<T> = {
addTask: (item: PriorityQueueItem<T>) => void;
addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
removeTask: (id: string) => boolean;
terminate: () => boolean;
subscribe: (subscriber: Subscriber<T>) => () => void;
updateTaskPriority: (id: string, newPriority: number) => boolean;
start: () => void;
};
export type Subscriber<T> = {
onSuccessfulTask: (result: T) => void;
onFailedTask: (error: Error) => void;
onAllTasksCompleted?: () => void;
}
type TaskResult<T> = {
error?: Error;
isError?: boolean;
data?: T;
}
export const createPriorityQueue = <T>(
initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {
let queue: PriorityQueueItem<T>[] = [...initialTasks];
let isProcessing = false;
let currentTask: PriorityQueueItem<T> | null = null;
const subscribers: Set<Observer<T>> = new Set();
const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
let subscription: Subscription | null = null;
const sortQueue = () => {
console.debug("Sorting queue");
queue.sort((a, b) => b.priority - a.priority);
}
const processNextTask = () => {
console.debug("Processing next task");
if (!isProcessing || queue.length === 0) {
console.debug("No tasks to process or processing is paused");
if (queue.length === 0) {
console.debug("Queue is empty, completing subscribers");
subscribers.forEach(subscriber => subscriber.complete()); // do we want this?
}
return;
}
sortQueue();
const nextTask = queue.shift();
if (nextTask) {
console.debug("Next task found:", nextTask.id);
currentTask = nextTask;
taskSubject.next(nextTask);
}
}
const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
console.debug("Processing task:", task.id);
return from(task.task()).pipe(
map(data => ({ data })),
catchError(error => of({ error, isError: true }))
);
};
const initializeSubscription = (): void => {
if (subscription) {
console.debug("Subscription already initialized");
return;
}
console.debug("Initializing subscription");
subscription = taskSubject.pipe(
mergeMap(task =>
processTask(task),
1, // Concurrency = 1
),
tap(() => {
console.debug("Task completed, clearing current task");
currentTask = null; // Clear current task reference
processNextTask();
}),
).subscribe(
result => {
if (result && typeof result === 'object' && 'isError' in result) {
console.debug("Task failed with error:", result.error);
subscribers.forEach(subscriber => subscriber.error(result.error));
}
else {
console.debug("Task succeeded with result:", result);
subscribers.forEach(subscriber => subscriber.next(result as T));
}
}
);
}
return {
addTask: (task: PriorityQueueItem<T>) => {
console.debug("Adding task:", task.id);
queue.push({ ...task }); // Create copies
if (isProcessing) {
processNextTask();
}
},
addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
console.debug("Adding tasks group");
queue.push(...tasks.map(task => (
{ ...task }
))); // Create copies
if (isProcessing) {
processNextTask();
}
},
removeTask: (id: string) => {
console.debug("Removing task:", id);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue.splice(index, 1);
return true;
},
terminate: () => {
console.debug("Terminating queue");
isProcessing = false;
if (subscription) {
subscription.unsubscribe();
subscription = null;
}
queue = [];
return true;
},
subscribe: (subscriber: Subscriber<T>) => {
console.debug("Subscribing");
const subscriberObserver: Observer<T> = {
next: value => subscriber.onSuccessfulTask(value),
error: error => subscriber.onFailedTask(error),
complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
};
subscribers.add(subscriberObserver);
initializeSubscription();
return () => {
console.debug("Unsubscribing");
subscribers.delete(subscriberObserver);
}
},
updateTaskPriority: (id: string, newPriority: number) => {
console.debug("Updating task priority:", id, newPriority);
const index = queue.findIndex(task => task.id === id);
if (index === -1) {
console.debug("Task not found:", id);
return false;
}
queue[index].priority = newPriority;
return true;
},
start: () => {
console.debug("Starting queue processing");
isProcessing = true;
if (!currentTask) {
processNextTask();
}
},
}
}
正如您已经提到的,JavaScript UI 是单线程的。我们可以分配额外的工作线程,但完全是另一个故事,即使在这种情况下,您也不需要任何类型的互斥体或其他同步机制。
您提出的
p-queue
库还有其他用途。它提供速率限制功能,例如用于限制某种并发运行的 http 请求,或防止 UI 线程上的事件循环过饱和,这可能会导致阻塞。
在我看来,你的设计已经有点过度设计了,你不应该无缘无故地引入新的实体。