为了提供一些背景故事,我有一个可以进行实时数据跟踪/趋势的程序。它从多个源异步接收按升序或降序预先排序的数据包。当接收到数据时,我的程序知道它的排序顺序和大小。它使用此信息将数据放入存储桶中,工作人员使用这些存储桶非常有效地将数据合并在一起。这让我思考,这是否适用于一般排序?本质上是修改后的合并排序。
由此产生的算法,即数据包排序,通过扫描数组、搜索升序或降序值(本文称为数据包)来工作。找到第一个数据包后,如果顺序是降序,则会翻转该数据包,使顺序升序。然后,它继续遍历阵列,搜索下一个数据包,同时复制到辅助阵列。一旦找到下一个数据包,它将其合并回主数组,然后重复搜索下一个数据包,直到找到数组末尾。我要指出的是,该算法的一个好处是它是非递归的。
如果我的数学正确的话,这个算法的空间复杂度是O(n),时间复杂度是:
我一直在尝试找出是否有一种方法可以降低算法最坏情况的时间复杂度。
看看我原来的程序是如何工作的,它根据数据包的大小将数据包分类到桶中;数据包被放置在编号最低的桶 n 中,其数据包大小 ≤ 2n。 “工作人员”通过合并来自同一存储桶(或最接近的存储桶,如果只有一个数据包)的两个数据包,异步组合数据包,并根据上述规则将结果放入另一个存储桶中。为了进一步优化,如果两个源数据包都按降序排列,则结果按降序排列,否则结果按升序排列 - 这消除了仅仅“翻转”数据包而浪费时间的需要。虽然这种方法在数据包预先确定时效果很好,但我不太确定它是否适用于我的排序算法,因为它会显着增加空间复杂度(从 O(n) 到 O(n2) ).
作为参考,我用 C# 编写了算法的两个通用版本。第一个如上所述,第二个是稳定排序版本,通过将相等的值视为升序来保持等值键的顺序不变。
/// <summary>
/// Sorts array by searching for packets of contiguous ascending/descending values, then merging.
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="array">Array to sort.</param>
public static void PacketSort<T>(T[] array) where T : IComparable<T>
{
if ((array == null) || array.Length < 2) return;
int end = array.Length - 1;
int head_length, i;
// search for end of head packet
for (head_length = 1; ; head_length++)
{
int test = array[head_length - 1].CompareTo(array[head_length]);
if (test < 0)
{
// ascending order
for (; ; )
{
if (head_length == end) return;
head_length++;
if (array[head_length - 1].CompareTo(array[head_length]) > 0)
{
// end of head found!
break;
}
}
break;
}
else if (test > 0)
{
// descending order
if (head_length == end)
{
// need to swap first and last value
(array[0], array[end]) = (array[end], array[0]);
return;
}
for (head_length++; ; head_length++)
{
if (array[head_length - 1].CompareTo(array[head_length]) < 0)
{
// end of head found! flip head
for (i = head_length / 2; i > 0;)
{
i--;
(array[i], array[head_length - 1 - i]) = (array[head_length - 1 - i], array[i]);
}
break;
}
else if (head_length == end)
{
// at end of array; flip array
for (i = (head_length + 1) / 2; i > 0;)
{
i--;
(array[i], array[head_length - i]) = (array[head_length - i], array[i]);
}
return;
}
}
break;
}
else
{
// head of array is constant value
if (head_length == end) return;
}
}
T[] packet = new T[array.Length - head_length];
// search for next packet
for (; ; )
{
packet[0] = array[head_length];
if (head_length == end)
{
// last packet has only one element
MergeAscending(array, packet, end - 1, end);
return;
}
for (i = head_length + 1; ; i++)
{
packet[i - head_length] = array[i];
int test = array[i - 1].CompareTo(array[i]);
if (test < 0)
{
// ascending order
if (i == end)
{
MergeAscending(array, packet, head_length - 1, end);
return;
}
for (i++; ; i++)
{
if (array[i - 1].CompareTo(array[i]) > 0)
{
// end of packet found!
MergeAscending(array, packet, head_length - 1, i - 1);
head_length = i;
break;
}
else
{
packet[i - head_length] = array[i];
if (i == end)
{
MergeAscending(array, packet, head_length - 1, end);
return;
}
}
}
break;
}
else if (test > 0)
{
// descending order
if (i == end)
{
MergeDescending(array, packet, head_length - 1, end);
return;
}
for (i++; ; i++)
{
if (array[i - 1].CompareTo(array[i]) < 0)
{
// end of packet found!
MergeDescending(array, packet, head_length - 1, i - 1);
head_length = i;
break;
}
else
{
packet[i - head_length] = array[i];
if (i == end)
{
MergeDescending(array, packet, head_length - 1, end);
return;
}
}
}
break;
}
else
{
// head of packet is constant value
if (i == end)
{
MergeAscending(array, packet, head_length - 1, end);
return;
}
}
}
}
}
/// <summary>
/// Sorts array by searching for packets of contiguous ascending/descending values, then merging, leaving equal values in initial order.
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="array">Array to sort.</param>
public static void StablePacketSort<T>(T[] array) where T : IComparable<T>
{
if ((array == null) || array.Length < 2) return;
int end = array.Length - 1;
int head_length, i;
// search for end of head packet
if (array[0].CompareTo(array[1]) <= 0)
{
// ascending order
for (head_length = 2; ; head_length++)
{
if (head_length > end)
{
// end of array
return;
}
else if (array[head_length - 1].CompareTo(array[head_length]) > 0)
{
// end of head found!
break;
}
}
}
else
{
// descending order
for (head_length = 2; ; head_length++)
{
if (head_length > end)
{
// end of array; flip array
for (i = head_length / 2; i > 0;)
{
i--;
(array[i], array[end - i]) = (array[end - i], array[i]);
}
return;
}
else if (array[head_length - 1].CompareTo(array[head_length]) <= 0)
{
// end of head found! flip head
for (i = head_length / 2; i > 0;)
{
i--;
(array[i], array[head_length - 1 - i]) = (array[head_length - 1 - i], array[i]);
}
break;
}
}
}
T[] packet = new T[array.Length - head_length];
// search for next packet
for (; ; )
{
packet[0] = array[head_length];
if (head_length == end)
{
// last packet has only one element
MergeAscending(array, packet, end - 1, end);
return;
}
i = head_length + 1;
packet[1] = array[i];
if (array[i - 1].CompareTo(array[i]) <= 0)
{
// ascending order
if (i == end)
{
MergeAscending(array, packet, head_length - 1, end);
return;
}
for (i++; ; i++)
{
if (array[i - 1].CompareTo(array[i]) > 0)
{
// end of packet found!
MergeAscending(array, packet, head_length - 1, i - 1);
head_length = i;
break;
}
else
{
packet[i - head_length] = array[i];
if (i == end)
{
MergeAscending(array, packet, head_length - 1, end);
return;
}
}
}
}
else
{
// descending order
if (i == end)
{
MergeDescending(array, packet, head_length - 1, end);
return;
}
for (i++; ; i++)
{
if (array[i - 1].CompareTo(array[i]) <= 0)
{
// end of packet found!
MergeDescending(array, packet, head_length - 1, i - 1);
head_length = i;
break;
}
else
{
packet[i - head_length] = array[i];
if (i == end)
{
MergeDescending(array, packet, head_length - 1, end);
return;
}
}
}
}
}
}
/// <summary>
/// Merges ascending-ordered packet into array
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="array">Array to sort.</param>
/// <param name="packet">Packet to merge into array.</param>
/// <param name="head_last">Index of last element in head of array.</param>
/// <param name="packet_last">Index of last element copied to packet from array.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void MergeAscending<T>(T[] array, T[] packet, int head_last, int packet_last) where T : IComparable<T>
{
for (int count = packet_last - head_last - 1; ; packet_last--)
{
if (packet[count].CompareTo(array[head_last]) >= 0)
{
// add packet value to array
array[packet_last] = packet[count];
if (count == 0) return;
count--;
}
else
{
// shift array value
array[packet_last] = array[head_last];
if (head_last == 0)
{
// at end of array, add remaining packet values
for (; ; count--)
{
packet_last--;
array[packet_last] = packet[count];
if (packet_last == 0) return;
}
}
else
{
head_last--;
}
}
}
}
/// <summary>
/// Merges descending-ordered packet into array
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="array">Array to sort.</param>
/// <param name="packet">Packet to merge into array.</param>
/// <param name="head_last">Index of last element in head of array.</param>
/// <param name="packet_last">Index of last element copied to packet from array.</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void MergeDescending<T>(T[] array, T[] packet, int head_last, int packet_last) where T : IComparable<T>
{
int count = packet_last - head_last;
for (int i = 0; ; packet_last--)
{
if (packet[i].CompareTo(array[head_last]) >= 0)
{
// add packet value to array
array[packet_last] = packet[i];
i++;
if (count == i) return;
}
else
{
// shift array value
array[packet_last] = array[head_last];
if (head_last == 0)
{
// at end of array, add remaining packet values
for (; ; i++)
{
packet_last--;
array[packet_last] = packet[i];
if (packet_last == 0) return;
}
}
else
{
head_last--;
}
}
}
}
做了一些研究,我发现不需要递归或大内存复杂性的最佳优化是使用带有数组副本的堆,如下所示:
找到所有数据包后,根据修改的堆排序将元素从副本数组移回到源数组。修改:不是像就地排序时那样交换头和堆的最后一个元素,而是将头的值复制到源数组,然后递减计数并重新排序堆。如果 head 的计数 == 0,则在重新排序堆之前将最后一个元素移动到 head。这导致空间复杂度为 O(n) ,时间复杂度为 O(n log2 2k),其中 k 是数据包数量 ( 1 ≤ k ≤上限(n/2) )。
作为参考,这里是我的 C# 优化算法的通用版本。对于大多数通用用例,我使用了稳定排序版本。
/// <summary>
/// Sorts array by searching for packets of contiguous ascending/descending values, then merging.
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="array">Array to sort.</param>
public static void PacketSort<T>(T[] array) where T : IComparable<T>
{
if ((array == null) || array.Length < 2) return;
int i = (array.Length + 1) / 2;
int tail = array.Length;
int head = 0;
int heap_size = 1;
T[] packet = new T[array.Length];
int[] heap = new int[i];
int[] index = new int[i];
int[] count = new int[i];
// search for end of first packet
heap[0] = 0;
if (array[0].CompareTo(array[1]) <= 0)
{
// ascending order; add to head of packet
packet[0] = array[0];
packet[1] = array[1];
for (i = 2; ; i++)
{
if (i >= array.Length)
{
// array already sorted
return;
}
else if (array[i - 1].CompareTo(array[i]) > 0)
{
// i == start of next packet
index[0] = 0;
count[0] = i;
head = i;
break;
}
else
{
packet[i] = array[i];
}
}
}
else
{
// descending order; add to tail of packet
packet[--tail] = array[0];
packet[--tail] = array[1];
for (i = 2; ; i++)
{
if (i >= array.Length)
{
// array already sorted in descending order; flip array
for (i = 0; i < array.Length; i++)
{
array[i] = packet[i];
}
return;
}
else if (array[i - 1].CompareTo(array[i]) <= 0)
{
// i == start of next packet
index[0] = tail;
count[0] = i;
break;
}
else
{
packet[--tail] = array[i];
}
}
}
// search for other packets
for (; i < array.Length;)
{
i++;
if (i == array.Length)
{
// next packet has only 1 element
packet[head] = array[i - 1];
index[heap_size] = head;
count[heap_size] = 1;
AddHeap(ref heap_size, heap, index, packet);
break;
}
// search for end of next packet
if (array[i - 1].CompareTo(array[i]) <= 0)
{
// ascending order; add to head of packet
index[heap_size] = head;
count[heap_size] = 2;
packet[head++] = array[i - 1];
packet[head++] = array[i];
for (i++; (i < array.Length) && (array[i - 1].CompareTo(array[i]) <= 0); i++)
{
packet[head++] = array[i];
count[heap_size]++;
}
}
else
{
// descending order; add to tail of packet
count[heap_size] = 2;
packet[--tail] = array[i - 1];
packet[--tail] = array[i];
for (i++; (i < array.Length) && (array[i - 1].CompareTo(array[i]) > 0); i++)
{
packet[--tail] = array[i];
count[heap_size]++;
}
index[heap_size] = tail;
}
AddHeap(ref heap_size, heap, index, packet);
}
// merge packets using modified heap sort
for (head = 0; ;)
{
array[head++] = packet[index[heap[0]]];
if (--count[heap[0]] > 0)
{
index[heap[0]]++;
}
else
{
if (--heap_size > 1)
{
heap[0] = heap[heap_size];
}
else
{
// only one packet left
for (i = index[heap[1]]; head < array.Length; i++)
{
array[head++] = packet[i];
}
return;
}
}
Reheap(heap_size, heap, index, packet);
}
}
/// <summary>
/// Adds next index to sorted heap
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="heap_size">current heap size</param>
/// <param name="heap">sorted heap</param>
/// <param name="index">index list</param>
/// <param name="packet">packets</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void AddHeap<T>(ref int heap_size, int[] heap, int[] index, T[] packet) where T : IComparable<T>
{
int head;
heap[heap_size] = heap_size;
for (int i = heap_size; i > 0; i = head)
{
// compare branch value to head, if smaller, replace head, otherwise done
head = (i - 1) / 2;
if (CompareIndex(heap[head], heap[i], index, packet) <= 0) break;
(heap[head], heap[i]) = (heap[i], heap[head]);
}
heap_size++;
}
/// <summary>
/// Re-sorts heap
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="heap_size">current heap size</param>
/// <param name="heap">sorted heap</param>
/// <param name="index">index list</param>
/// <param name="packet">packets</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void Reheap<T>(int heap_size, int[] heap, int[] index, T[] packet) where T : IComparable<T>
{
int head = 0;
for (int i = head + head + 1; i < heap_size; i += i + 1)
{
// compare nodes to find smallest
int j = i + 1;
if ((j < heap_size) && (CompareIndex(heap[j], heap[i], index, packet) < 0))
{
i = j;
}
if (CompareIndex(heap[head], heap[i], index, packet) <= 0) break;
// head larger than node, swap and continue down tree
(heap[head], heap[i]) = (heap[i], heap[head]);
head = i;
}
}
/// <summary>
/// Compares two indexes based on location
/// </summary>
/// <typeparam name="T">IComparable type</typeparam>
/// <param name="index1">first index</param>
/// <param name="index2">second index</param>
/// <param name="index">index list</param>
/// <param name="packet">packets</param>
/// <returns></returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int CompareIndex<T>(int index1, int index2, int[] index, T[] packet) where T : IComparable<T>
{
int result = packet[index[index1]].CompareTo(packet[index[index2]]);
if (result == 0) return index1.CompareTo(index2);
return result;
}
感谢那些评论引导我找到这个答案的人。