Typescript Promise 优先级队列竞争条件

问题描述 投票:0回答:1

我正在 TypeScript 中实现异步任务的优先级队列(使用 React 作为 UI)。队列不断充满任务,用户界面中有一个按钮可以触发优先级更改或向队列添加新任务。 鉴于 JavaScript 在单线程上运行(具有并发性,但没有并行性),我想知道是否需要保护队列免受潜在竞争条件的影响。具体来说:

  1. 如果 UI 正在修改队列(添加任务或更改优先级),同时在找到要更改的索引之后但在排序之前发生上下文切换,是否会导致错误任务的优先级发生更改?
  2. 我应该使用互斥锁或其他同步方法来确保线程安全吗?

我知道任务执行是单线程的。我也考虑过使用 p-queue 包,但我不确定它是否提供针对竞争条件的保护。 任何见解都将不胜感激,因为我仍在学习 JavaScript/TypeScript 并发模型。

我的实现:

import { Subject } from "rxjs/internal/Subject";
import { Subscription } from "rxjs/internal/Subscription";
import { from, of, Observer, Observable } from "rxjs";
import { mergeMap } from "rxjs/internal/operators/mergeMap";
import { catchError, map, tap } from "rxjs/operators";

export type PriorityQueueItem<T> = {
    readonly id: string;
    priority: number;
    task: () => Promise<T>;
};

export type PriorityQueue<T> = {
    addTask: (item: PriorityQueueItem<T>) => void;
    addTasksGroup: (items: PriorityQueueItem<T>[]) => void;
    removeTask: (id: string) => boolean;
    terminate: () => boolean;
    subscribe: (subscriber: Subscriber<T>) => () => void;
    updateTaskPriority: (id: string, newPriority: number) => boolean;
    start: () => void;
};

export type Subscriber<T> = {
    onSuccessfulTask: (result: T) => void;
    onFailedTask: (error: Error) => void;
    onAllTasksCompleted?: () => void;
}

type TaskResult<T> = {
    error?: Error;
    isError?: boolean;
    data?: T;
}

export const createPriorityQueue = <T>(
    initialTasks: PriorityQueueItem<T>[] = [],
): PriorityQueue<T> => {

    let queue: PriorityQueueItem<T>[] = [...initialTasks];
    let isProcessing = false;
    let currentTask: PriorityQueueItem<T> | null = null;

    const subscribers: Set<Observer<T>> = new Set();
    const taskSubject: Subject<PriorityQueueItem<T>> = new Subject();
    let subscription: Subscription | null = null;

    const sortQueue = () => {
        console.debug("Sorting queue");
        queue.sort((a, b) => b.priority - a.priority);
    }

    const processNextTask = () => {
        console.debug("Processing next task");
        if (!isProcessing || queue.length === 0) {
            console.debug("No tasks to process or processing is paused");
            if (queue.length === 0) {
                console.debug("Queue is empty, completing subscribers");
                subscribers.forEach(subscriber => subscriber.complete()); // do we want this?
            }
            return;
        }
        sortQueue();
        const nextTask = queue.shift();
        if (nextTask) {
            console.debug("Next task found:", nextTask.id);
            currentTask = nextTask;
            taskSubject.next(nextTask);
        }
    }

    const processTask = (task: PriorityQueueItem<T>): Observable<TaskResult<T>> => {
        console.debug("Processing task:", task.id);
        return from(task.task()).pipe(
            map(data => ({ data })),
            catchError(error => of({ error, isError: true }))
        );
    };

    const initializeSubscription = (): void => {
        if (subscription) {
            console.debug("Subscription already initialized");
            return;
        }

        console.debug("Initializing subscription");
        subscription = taskSubject.pipe(
            mergeMap(task =>
                processTask(task),
                1, // Concurrency = 1
            ),
            tap(() => {
                console.debug("Task completed, clearing current task");
                currentTask = null;  // Clear current task reference
                processNextTask();
            }),
        ).subscribe(
            result => {
                if (result && typeof result === 'object' && 'isError' in result) {
                    console.debug("Task failed with error:", result.error);
                    subscribers.forEach(subscriber => subscriber.error(result.error));
                }
                else {
                    console.debug("Task succeeded with result:", result);
                    subscribers.forEach(subscriber => subscriber.next(result as T));
                }
            }
        );
    }

    return {
        addTask: (task: PriorityQueueItem<T>) => {
            console.debug("Adding task:", task.id);
            queue.push({ ...task }); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        addTasksGroup: (tasks: PriorityQueueItem<T>[]) => {
            console.debug("Adding tasks group");
            queue.push(...tasks.map(task => (
                { ...task }
            ))); // Create copies
            if (isProcessing) {
                processNextTask();
            }
        },
        removeTask: (id: string) => {
            console.debug("Removing task:", id);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue.splice(index, 1);
            return true;
        },
        terminate: () => {
            console.debug("Terminating queue");
            isProcessing = false;
            if (subscription) {
                subscription.unsubscribe();
                subscription = null;
            }
            queue = [];
            return true;
        },
        subscribe: (subscriber: Subscriber<T>) => {
            console.debug("Subscribing");
            const subscriberObserver: Observer<T> = {
                next: value => subscriber.onSuccessfulTask(value),
                error: error => subscriber.onFailedTask(error),
                complete: () => subscriber.onAllTasksCompleted?.() || emptyFunction,
            };

            subscribers.add(subscriberObserver);
            initializeSubscription();

            return () => {
                console.debug("Unsubscribing");
                subscribers.delete(subscriberObserver);
            }
        },
        updateTaskPriority: (id: string, newPriority: number) => {
            console.debug("Updating task priority:", id, newPriority);
            const index = queue.findIndex(task => task.id === id);
            if (index === -1) {
                console.debug("Task not found:", id);
                return false;
            }
            queue[index].priority = newPriority;
            return true;
        },
        start: () => {
            console.debug("Starting queue processing");
            isProcessing = true;
            if (!currentTask) {
                processNextTask();
            }
        },
    }
}
typescript concurrency rxjs race-condition
1个回答
0
投票

正如您已经提到的,JavaScript UI 是单线程的。我们可以分配额外的工作线程,但完全是另一个故事,即使在这种情况下,您也不需要任何类型的互斥体或其他同步机制。

您提出的

p-queue
库还有其他用途。它提供速率限制功能,例如用于限制某种并发运行的 http 请求,或防止 UI 线程上的事件循环过饱和,这可能会导致阻塞。

在我看来,你的设计已经有点过度设计了,你不应该无缘无故地引入新的实体。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.