问题描述:
给定一棵通用树,目标是识别并计算从根到具有最大节点值总和的叶子的路径。树中的每个节点都有一个关联值,路径被定义为从根到叶的节点序列。
第一输入行有一个唯一的整数N,即树中的节点数。接下来的 N 行描述了节点。每行以十进制值 V(节点值)开头,后跟整数 K(子级数)。然后,有 K 个整数代表子索引(从 1 开始)。
输入:
5 10.5 2 2 3 5.2 2 4 5 3.3 0 4.1 0 2.4 0
输出:
Max Sum: 19.80 Path: 1 2 4
串行解决方案:
#include <stdio.h>
#include <stdlib.h>
#define MAX_CHILDREN 2
#define MAX_VALUE 100
#define MAX_NODES 1000
typedef struct {
double value;
int num_children;
int children[MAX_CHILDREN];
} Node;
typedef struct {
double sum;
int path[MAX_NODES];
int pathLength;
} Result;
Result computePaths(Node* tree, int idx) {
Result res = {0, {0}, 0};
if (idx == -1) return res;
if (tree[idx].num_children == 0) {
res.sum = tree[idx].value;
res.path[res.pathLength++] = idx;
return res;
}
double maxSum = -1;
Result maxChildRes;
for (int i = 0; i < tree[idx].num_children; ++i) {
Result childRes = computePaths(tree, tree[idx].children[i]);
if (childRes.sum > maxSum) {
maxSum = childRes.sum;
maxChildRes = childRes;
}
}
res.sum = tree[idx].value + maxSum;
res.path[0] = idx;
for (int i = 0; i < maxChildRes.pathLength; ++i) {
res.path[i + 1] = maxChildRes.path[i];
}
res.pathLength = maxChildRes.pathLength + 1;
return res;
}
int main() {
freopen("input", "r", stdin);
int N;
scanf("%d", &N);
Node* tree = (Node*)malloc(N * sizeof(Node));
for (int i = 0; i < N; ++i) {
scanf("%lf %d", &tree[i].value, &tree[i].num_children);
for (int j = 0; j < tree[i].num_children; ++j) {
scanf("%d", &tree[i].children[j]);
tree[i].children[j]--;
}
}
Result finalRes = computePaths(tree, 0);
printf("Max Sum: %.2lf\n", finalRes.sum);
printf("Path: ");
for (int i = 0; i < finalRes.pathLength; ++i) {
printf("%d ", finalRes.path[i] + 1);
}
printf("\n");
free(tree);
return 0;
}
我不想更改我的串行解决方案中的任何内容...它工作正常。
我尝试使用 OpenMP 并行化上述串行代码,以提高算法的性能,但执行时间并没有改善。我可能做错了什么?我是否正确并行化纹理?
使用 OpenMP 的并行化代码:
Result computePaths(Node* tree, int idx) {
Result res = {0, {0}, 0};
if (idx == -1) return res;
if (tree[idx].num_children == 0) {
res.sum = tree[idx].value;
res.path[res.pathLength++] = idx;
return res;
}
double maxSum = -1;
Result maxChildRes;
#pragma omp parallel
{
#pragma omp single
{
for (int i = 0; i < tree[idx].num_children; ++i) {
#pragma omp task shared(maxSum, maxChildRes)
{
Result childRes = computePaths(tree, tree[idx].children[i]);
#pragma omp critical
{
if (childRes.sum > maxSum) {
maxSum = childRes.sum;
maxChildRes = childRes;
}
}
}
}
}
}
res.sum = tree[idx].value + maxSum;
res.path[0] = idx;
for (int i = 0; i < maxChildRes.pathLength; ++i) {
res.path[i + 1] = maxChildRes.path[i];
}
res.pathLength = maxChildRes.pathLength + 1;
return res;
}
考虑到输入有 10,000,000 个节点,即使我将
OMP_NUM_THREADS
更改为 2、4、8 等,时间也比串行代码差。所以,这不是一个好的并行解决方案,对吗?
这是一个基于任务的解决方案:
taskgroup
区域旨在等待在该区域创建的所有任务完成我还稍微修改了代码以摆脱关键区域
主要:
Result finalRes;
#pragma omp parallel num_threads(8)
#pragma omp master
finalRes = computePaths(tree, 0, 0);
在
computePaths
:
Result computePaths(Node* tree, int idx, int level) {
Result res = {0, {0}, 0};
if (idx == -1) return res;
if (tree[idx].num_children == 0) {
res.sum = tree[idx].value;
res.path[res.pathLength++] = idx;
return res;
}
Result childRes[tree[idx].num_children];
#pragma omp taskgroup
{
for (int i = 1; i < tree[idx].num_children; ++i)
#pragma omp task default(shared) if (level < MAX_LEVEL)
childRes[i] = computePaths(tree, tree[idx].children[i], level+1);
childRes[0] = computePaths(tree, tree[idx].children[0], level+1);
}
Result maxChildRes = childRes[0];
for (int i = 1; i < tree[idx].num_children; ++i) {
if (childRes[i].sum > maxChildRes.sum) {
maxChildRes = childRes[i];
}
}
res.sum = tree[idx].value + maxChildRes.sum;
res.path[0] = idx;
for (int i = 0; i < maxChildRes.pathLength; ++i) {
res.path[i + 1] = maxChildRes.path[i];
}
res.pathLength = maxChildRes.pathLength + 1;
return res;
}