创建 X 个项目时直接 1 个批量插入,而不是(1 个插入/要创建的项目)* X

问题描述 投票:0回答:1

我正在尝试找到一种方法,让 Directus 在处理子表中的 items.create 场景时在 PostgreSQL 中执行 1 个 BULK INSERT,而不是为每个要创建的项目执行 1 个 INSERT(我在数据库日志中看到)= 优化性能,尤其是在处理数百甚至数千件物品时。

谢谢您的帮助。

上下文:我们的移动应用程序正在调用我们的 API 并发布数百个值 > 这正在触发流程。然后,在 SESSIONS 的子表中一次插入一个值(= 1 个 INSERT 语句/项)= 性能不佳。

bulkinsert directus
1个回答
0
投票

Directus

ItemsService
没有实现基于 Knex 的
createMany()
batchInsert
,因此没有办法简单地实现它。

尝试禁用

Activity
日志记录或
Revisions
不足以禁用过滤器或操作触发器。

我个人需要向数千个用户发送通知,因此我将 ItemsService.createOne() 的内容转换为自定义的

NotificationsService.createManyAtOnce()
:

import { Action } from '@directus/constants'
import {
  InvalidPayloadError,
}from '@directus/api/errors/index'
import { 
  translateDatabaseError,
} from '@directus/api/database/errors/translate'
import { SQLError } from '@directus/api/database/errors/dialects/types'
import getDatabase from '@directus/api/database/index'
import { getHelpers } from '@directus/api/database/helpers/index'
import emitter from '@directus/api/emitter'
import { shouldClearCache } from '@directus/api/utils/should-clear-cache'


import { 
  NotificationsService,
  PayloadService,
  AuthorizationService,
} from '@directus/api/services/index'
import { 
  type AbstractServiceOptions,
  type MutationOptions,
  type PrimaryKey,
  type Item as AnyItem,
  type ActionEventParams,
} from '@directus/api/dist/types'
import type { Notification } from '@directus/types'

import { cloneDeep, pick, without } from 'lodash-es'


export class CustomNotificationService extends NotificationsService {
  _parent = NotificationsService.prototype

  constructor(options: AbstractServiceOptions) {
    super(options)
  }

  async sendEmail(data: Partial<Notification & { sendEmail: boolean }>) {
    if (data.sendEmail === true) {
      this._parent.sendEmail.apply(this, [data])
    }
  }


  async createManyAtOnce(
    data: Partial<Notification>[], 
    opts: MutationOptions = {}
  ): Promise<PrimaryKey[]> {
    if (! opts.mutationTracker) {
      opts.mutationTracker = this.createMutationTracker()
    }
  
    if (! opts.bypassLimits) {
      opts.mutationTracker.trackMutations(1)
    }
  
    const { ActivityService } = await import(`@directus/api/services/activity`)
    const { RevisionsService } = await import(`@directus/api/services/revisions`)
  
    const primaryKeyField = this.schema.collections[this.collection]!.primary
    const fields = Object.keys(this.schema.collections[this.collection]!.fields)
  
    const aliases = Object.values(this.schema.collections[this.collection]!.fields)
    .filter((field) => field.alias === true)
    .map((field) => field.field)
  
    // By wrapping the logic in a transaction, we make sure we automatically roll back all the
    // changes in the DB if any of the parts contained within throws an error. This also means
    // that any errors thrown in any nested relational changes will bubble up and cancel the whole
    // update tree
    type ItemValues = {
      primaryKey: PrimaryKey,
      payload: typeof data[number],
      payloadAfterHooks: Partial<typeof data[number]>,
      payloadWithPresets: Partial<typeof data[number]>,
      payloadWithoutAliases: Partial<typeof data[number]>,
      
      revisionsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`revisions`],
      revisionsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`revisions`],
      revisionsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`revisions`],

      nestedActionEventsM2O: Awaited<ReturnType<PayloadService[`processM2O`]>>[`nestedActionEvents`],
      nestedActionEventsA2O: Awaited<ReturnType<PayloadService[`processA2O`]>>[`nestedActionEvents`],
      nestedActionEventsO2M?: Awaited<ReturnType<PayloadService[`processO2M`]>>[`nestedActionEvents`],
    }


    const itemsValues: ItemValues[] = await this.knex.transaction(async (trx) => {
      // We're creating new services instances so they can use the transaction as their Knex interface
      const payloadService = new PayloadService(this.collection, {
        accountability: this.accountability,
        knex: trx,
        schema: this.schema,
      })
  
      const authorizationService = new AuthorizationService({
        accountability: this.accountability,
        knex: trx,
        schema: this.schema,
      })

      const itemsAnalyzingValues: ItemValues[] = []

      for (const payloadToClone of data) {
        const payload = cloneDeep(payloadToClone)
  
        // Run all hooks that are attached to this event so the end user has the chance to augment the
        // item that is about to be saved
        const payloadAfterHooks =
          opts.emitEvents !== false
            ? await emitter.emitFilter(
              this.eventScope === `items`
                ? [`items.create`, `${this.collection}.items.create`]
                : `${this.eventScope}.create`,
              payload,
              {
                collection: this.collection,
              },
              {
                database: trx,
                schema: this.schema,
                accountability: this.accountability,
              }
            )
            : payload
  
        if (payloadAfterHooks === undefined) {
          throw new InvalidPayloadError({ reason: `Create filter returned undefined` })
        }
  
        if (typeof payloadAfterHooks !== `object`) {
        // primary key(s) or null
          return payloadAfterHooks
        }
  
        const payloadWithPresets = this.accountability
          ? authorizationService.validatePayload(`create`, this.collection, payloadAfterHooks)
          : payloadAfterHooks
  
        if (opts.preMutationError) {
          throw opts.preMutationError
        }
  
        const {
          payload: payloadWithM2O,
          revisions: revisionsM2O,
          nestedActionEvents: nestedActionEventsM2O,
        } = await payloadService.processM2O(payloadWithPresets, opts)
  
        const {
          payload: payloadWithA2O,
          revisions: revisionsA2O,
          nestedActionEvents: nestedActionEventsA2O,
        } = await payloadService.processA2O(payloadWithM2O, opts)
  
        const payloadWithoutAliases = pick(payloadWithA2O, without(fields, ...aliases))
        const payloadWithTypeCasting = await payloadService.processValues(`create`, payloadWithoutAliases)
  
        // In case of manual string / UUID primary keys, the PK already exists in the object we're saving.
        const primaryKey = payloadWithTypeCasting[primaryKeyField]

        // batchInsertInput.push(payloadWithoutAliases)
        itemsAnalyzingValues.push({
          primaryKey,

          payload,
          payloadAfterHooks,
          payloadWithPresets, 
          payloadWithoutAliases,

          revisionsM2O,
          revisionsA2O,

          nestedActionEventsM2O,
          nestedActionEventsA2O,
        })
      }

      try {
        const results = await trx
        .batchInsert(
          this.collection, 
          itemsAnalyzingValues.map((v) => v.payloadWithoutAliases)
        )
        .returning(primaryKeyField)

        results.forEach((result, index) => {
          if (! itemsAnalyzingValues[index]) {
            throw new Error(`No batchInsert itemInput found for index ${index}`)
          }

          const returnedKey = typeof result === `object` ? result[primaryKeyField] : result
  
          if (this.schema.collections[this.collection]!.fields[primaryKeyField]!.type === `uuid`) {
            itemsAnalyzingValues[index].primaryKey 
            = getHelpers(trx).schema.formatUUID(
                (itemsAnalyzingValues[index].primaryKey ?? returnedKey) as string
              )
          }
          else {
            itemsAnalyzingValues[index].primaryKey 
            = itemsAnalyzingValues[index].primaryKey ?? returnedKey
          }
        })

      }
      catch (err: unknown) {
        throw await translateDatabaseError(err as SQLError)
      }
  
      // TODO
      // Most database support returning, those who don't tend to return the PK anyways
      // (MySQL/SQLite). In case the primary key isn't know yet, we'll do a best-attempt at
      // fetching it based on the last inserted row
      // if (! primaryKey) {
      //   // Fetching it with max should be safe, as we're in the context of the current transaction
      //   const result = await trx.max(primaryKeyField, { as: `id` }).from(this.collection)
      //   .first()
      //   primaryKey = result.id
      //   // Set the primary key on the input item, in order for the "after" event hook to be able
      //   // to read from it
      //   payload[primaryKeyField] = primaryKey
      // }
  
      for (const itemsAnalyzingValue of itemsAnalyzingValues) {
        const {
          primaryKey,
          payloadAfterHooks,
          payloadWithPresets, 
          revisionsM2O,
          revisionsA2O,
        } = itemsAnalyzingValue

        const { 
          revisions: revisionsO2M, 
          nestedActionEvents: nestedActionEventsO2M, 
        } = await payloadService.processO2M(
          payloadWithPresets,
          primaryKey,
          opts
        )

        itemsAnalyzingValue.nestedActionEventsO2M = nestedActionEventsO2M
  
        // If this is an authenticated action, and accountability tracking is enabled, save activity row
        if (this.accountability 
          && this.schema.collections[this.collection]!.accountability !== null
        ) {
          const activityService = new ActivityService({
            knex: trx,
            schema: this.schema,
          })
  
          const activity = await activityService.createOne({
            action: Action.CREATE,
            user: this.accountability!.user,
            collection: this.collection,
            ip: this.accountability!.ip,
            user_agent: this.accountability!.userAgent,
            origin: this.accountability!.origin,
            item: primaryKey,
          })
  
          // If revisions are tracked, create revisions record
          if (this.schema.collections[this.collection]!.accountability === `all`) {
            const revisionsService = new RevisionsService({
              knex: trx,
              schema: this.schema,
            })
  
            const revisionDelta = await payloadService.prepareDelta(payloadAfterHooks)
  
            const revision = await revisionsService.createOne({
              activity: activity,
              collection: this.collection,
              item: primaryKey,
              data: revisionDelta,
              delta: revisionDelta,
            })
  
            // Make sure to set the parent field of the child-revision rows
            const childrenRevisions = [...revisionsM2O, ...revisionsA2O, ...revisionsO2M]
  
            if (childrenRevisions.length > 0) {
              await revisionsService.updateMany(childrenRevisions, { parent: revision })
            }
  
            if (opts.onRevisionCreate) {
              opts.onRevisionCreate(revision)
            }
          }
        }
      }
  
      return itemsAnalyzingValues
    })
  

    itemsValues.forEach(({ 
      primaryKey, 
      payload, 
      nestedActionEventsM2O,
      nestedActionEventsA2O,
      nestedActionEventsO2M,
    }) => {
      if (opts.emitEvents !== false) {
        const actionEvent = {
          event:
            this.eventScope === `items`
              ? [`items.create`, `${this.collection}.items.create`]
              : `${this.eventScope}.create`,
          meta: {
            payload,
            key: primaryKey,
            collection: this.collection,
          },
          context: {
            database: getDatabase(),
            schema: this.schema,
            accountability: this.accountability,
          },
        }
  
        if (opts.bypassEmitAction) {
          opts.bypassEmitAction(actionEvent)
        }
        else {
          emitter.emitAction(actionEvent.event, actionEvent.meta, actionEvent.context)
        }

        const nestedActionEvents = [
          ...nestedActionEventsO2M,
          ...nestedActionEventsA2O,
          ...nestedActionEventsM2O,
        ]
  
        for (const nestedActionEvent of nestedActionEvents) {
          if (opts.bypassEmitAction) {
            opts.bypassEmitAction(nestedActionEvent)
          }
          else {
            emitter.emitAction(
              nestedActionEvent.event, 
              nestedActionEvent.meta, 
              nestedActionEvent.context
            )
          }
        }
      }
    })

    if (shouldClearCache(this.cache, opts, this.collection)) {
      await this.cache.clear()
    }
  
    return itemsValues.map(({ primaryKey }) => primaryKey)
  }
}

插入 1000 行需要

less than 1 second
,而不是 50 秒。

还要注意:

  • 它仅在通知情况下进行了测试,但它应该适用于许多其他情况。
  • 它是基于
    Directus 10.4.3
    所以它可能已经改变了很多
© www.soinside.com 2019 - 2024. All rights reserved.