我是这个小组的新手,所以我相信有可能在这里获得帮助,因为我在谷歌上找不到有关我的问题的任何信息。
我正在尝试并行实现Java EvenOdd 转置排序。因此,当我算法时,我认为划分分区是一个好主意:
我在这里添加我当前的逻辑:从奇数阶段开始,分成两部分,分配两个线程进行这些比较,然后创建一个 Barrier 来等待线程,从而启动其他线程以类似方式处理偶数索引。感谢您能给我的任何帮助。目前,我不知道如何实现这个算法,所以任何文字都可能有帮助。
如何知道我应该将数组列表分为多少部分?例如,我现在使用 17 个元素,以便更容易理解。
您将数组划分为子数组的直觉是正确的,因为它通常是并发排序算法的基础。正如您已经了解的算法一样,我们只需要讨论实现即可:
thread
,为所有奇数索引创建一个 start()
用于比较和交换,然后将 join()
创建到主线程以等待结果。冲洗并重复此N
次。然而,这是非常低效的,因为创建和启动所有O(N^2)
线程的开销对于快速比较和交换来说太大了。这导致我们得到以下代码:
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class OddEvenSort {
public static void main(String[] args) {
int[] arr = {83, 71, 72, 26, 6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
sortArr(arr);
System.out.println(Arrays.toString(arr));
}
public static void sortArr(int[] arr) {
int threadNum = arr.length/2;
CyclicBarrier barr = new CyclicBarrier(threadNum);
Thread[] threads = new Thread[threadNum];
for (int i = 0; i < threadNum; i++) {
threads[i] = new Thread(new CompareSwapThread(arr, 2*i + 1, barr));
threads[i].start();
}
for (int i = 0; i < threadNum; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {e.printStackTrace();}
}
}
}
class CompareSwapThread implements Runnable {
private int[] arr;
private int index;
private CyclicBarrier barr;
public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
this.arr = arr;
this.index = index;
this.barr = barr;
}
@Override
public void run() {
for (int i = 0; i < arr.length; i++) {
if (arr[index - 1] > arr[index]) {
int t = arr[index - 1];
arr[index - 1] = arr[index];
arr[index] = t;
}
try {
barr.await();
} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
int t = arr[index];
arr[index] = arr[index + 1];
arr[index + 1] = t;
}
try {
barr.await();
} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
}
}
}
请注意,该算法的运行时间为
O(n)
,这对于此类并行算法来说并不是最佳的。您可以尝试并行实现的另一种算法是 MergeSort 算法。有很多事情可以与此并行化,但最重要的是合并,因为它是顺序算法的瓶颈。您可以查看 Batcher 奇偶合并排序 或查看 其他并行合并。
另外,我不知道是否应该使用称为 ExecutorService 的东西,或者只是正常创建线程。
Java 提供了许多不同的并行工具,它们在不同的抽象级别上运行。人们可以说
ExecutorService
比基本线程更“高级”,因为它简化了线程管理。它还将优化任务调度,以便更好地执行。
这是我们的实现,使用
ExecutorService
:
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class OddEvenSort {
public static void main(String[] args) {
int[] arr = {83, 71, 72, 26, 6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
sortArr(arr);
System.out.println(Arrays.toString(arr));
}
public static void sortArr(int[] arr) {
int threadNum = arr.length/2;
CyclicBarrier barr = new CyclicBarrier(threadNum);
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
Future<?>[] awaits = new Future<?>[threadNum];
for (int i = 0; i < threadNum; i++) {
awaits[i] = exec.submit(new CompareSwapThread(arr, 2*i + 1, barr));
}
for (int i = 0; i < threadNum; i++) {
try {
awaits[i].get();
} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}
}
}
class CompareSwapThread implements Runnable {
private int[] arr;
private int index;
private CyclicBarrier barr;
public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
this.arr = arr;
this.index = index;
this.barr = barr;
}
@Override
public void run() {
for (int i = 0; i < arr.length; i++) {
if (arr[index - 1] > arr[index]) {
int t = arr[index - 1];
arr[index - 1] = arr[index];
arr[index] = t;
}
try {
barr.await();
} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
int t = arr[index];
arr[index] = arr[index + 1];
arr[index + 1] = t;
}
try {
barr.await();
} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
}
}
}
newFixedThreadPool
静态方法来生成和实例化所有线程。然后我们将任务添加到线程池中,这将返回一个 Future
变量。当线程完成时,Future
将保存该值(在我们的例子中为 null)。调用 Future.get()
方法将等待结果(从而线程完成)。请注意,您想要实现某种嵌套线程并行性(例如,并行化 MergeSort 时)。您应该使用 ForkJoinPool
,因为它是专门为此而设计的。最后,这里是一个关于ExecutorService
的很好的教程。