我有一个程序,可以从文本文件中读取单词,并计算唯一单词的数量及其频率,可能使用多个线程:
#include "vector.h"
#define MIN_STRING_LENGTH 6
#define MAX_STRING_LENGTH 32
char * delim = "\"\'.“”‘’?:;-,—*($%)! \t\n\x0A\r";
typedef struct job_info {
pthread_t id;
pthread_mutex_t mutex;
int thread_num;
vector* words; // vector of word_freq structs
char* buffer; // raw data to read from
int buf_size;
} job_info;
void* count_words(void* arg);
void trim_word(char* in);
// given input string, convert it to lower case and discard any non-alphabet (a-z and A-Z) characters
void trim_word(char* in)
{
int i = 0;
int j = 0;
char c;
while ((c = in[i++]) != '\0') {
if (isalpha(c)) {
in[j++] = tolower(c);
}
}
in[j] = '\0';
}
// count_words is a thread-safe function to count the frequency of words in a given chunk of memory
void* count_words(void* arg)
{
job_info* info = arg;
char* saveptr;
char* token = strtok_r(info->buffer, delim, &saveptr);
while (token != NULL) {
trim_word(token);
if (strlen(token) >= MIN_STRING_LENGTH && strlen(token) <= MAX_STRING_LENGTH) {
// because the size of the vector may change, must lock the thread before
// accessing it
if (pthread_mutex_lock(&info->mutex)) {
perror("mutex lock");
}
int index = vec_lsearch(info->words, token);
if (index != -1) {
info->words->data[index].freq++;
} else {
if (vec_push(info->words, token)) {
fprintf(stderr, "failed to add %s to vector\n", token);
}
}
//int h = vec_validate(info->words);
if (pthread_mutex_unlock(&info->mutex)) {
perror("mutex unlock");
}
}
token = strtok_r(NULL, delim, &saveptr);
}
}
int main(int argc, char *argv[])
{
if (argc < 3) {
perror("Not enough arguments");
return EXIT_FAILURE;
}
char* fname = argv[1];
int threadc = atoi(argv[2]);
if (threadc < 1) {
perror("Must use at least one thread");
return EXIT_FAILURE;
}
FILE* file = fopen(fname,"r");
if (file == NULL) {
perror("Could not open file");
return EXIT_FAILURE;
}
// get file size
if (fseek(file, 0, SEEK_END)) {
perror("fseek");
return EXIT_FAILURE;
}
int file_length = ftell(file);
// reset descriptor to beginning
rewind(file);
char* file_contents = malloc(file_length);
if (file_contents == NULL) {
perror("could not allocate buffer for file");
return EXIT_FAILURE;
}
if (fread(file_contents, sizeof(char), file_length, file) == -1) {
perror("unable to read file");
return EXIT_FAILURE;
}
if (fclose(file) == -1) {
perror("unable to close file");
return EXIT_FAILURE;
}
vector* wf_vec = malloc(sizeof(vector));
if (wf_vec == NULL) {
perror("failed to allocate vector");
return EXIT_FAILURE;
}
vec_init(wf_vec);
job_info* info = calloc(threadc, sizeof(job_info));
if (info == NULL) {
perror("failed to allocate job_info");
}
void* res;
pthread_mutex_t mutex;
if (pthread_mutex_init(&mutex, NULL)) {
perror("unable to initialize vector");
return EXIT_FAILURE;
}
// get indices for each thread by dividing the file size by the number of threads
int chunk_size = file_length / threadc;
for (int i = 0; i < threadc; i++) {
info[i].words = wf_vec;
info[i].buffer = malloc(chunk_size + 1);
if (info[i].buffer == NULL) {
perror("unable to allocate memory for buffer");
return EXIT_FAILURE;
}
char* offset = file_contents + (chunk_size * i);
strncpy(info[i].buffer, offset, chunk_size);
info[i].buffer[chunk_size] = '\0'; // set last char to null terminator
info[i].buf_size = chunk_size;
info[i].thread_num = i;
info[i].mutex = mutex;
}
for (int i = 0; i < threadc; i++) {
if (pthread_create(&info[i].id, NULL, count_words, &info[i])) {
perror("pthread_create");
}
}
for (int i = 0; i < threadc; i++) {
if (pthread_join(info[i].id, &res)) {
perror("pthread_join");
return EXIT_FAILURE;
}
}
// cleanup
pthread_mutex_destroy(&mutex);
for (int i = 0; i < threadc; i++) {
free(info[i].buffer);
info[i].buffer = NULL;
}
free(info);
info = NULL;
free(file_contents);
file_contents = NULL;
vec_free(wf_vec);
wf_vec = NULL;
return 0;
}
向量.h:
#ifndef VECTOR_H
#define VECTOR_H
#define INITIAL_VECTOR_CAPACITY 10000
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
#include <string.h>
#include <ctype.h>
typedef struct word_freq {
char* word;
size_t freq;
} word_freq;
typedef struct vector {
word_freq* data;
size_t size;
size_t capacity;
} vector;
void vec_init(vector* vec);
void vec_free(vector* vec);
int vec_push(vector* vec, char* word);
int vec_validate(vector* vec);
int vec_lsearch(vector* vec, char* word);
void vec_qsort(vector* vec, int lo, int hi);
void vec_swap(word_freq* a, word_freq* b);
int vec_partition(vector* vec, int lo, int hi);
#endif
向量.c:
#include "vector.h"
// initialize a vector
void vec_init(vector* vec)
{
vec->data = calloc(INITIAL_VECTOR_CAPACITY, sizeof(word_freq));
if (vec->data == NULL) {
perror("unable to allocate initial capacity");
}
vec->size = 0;
vec->capacity = INITIAL_VECTOR_CAPACITY;
}
// insert a single element into a vector, growing the vector's capacity if necessary
// returns 0 on success
// returns 1 if unable to push
// returns 2 if unable to grow capacity
int vec_push(vector* vec, char* e)
{
word_freq wf;
wf.freq = 1;
wf.word = strdup(e);
vec->data[vec->size++] = wf;
// if vector is at capacity, realloc to double its capacity
if (vec->size == vec->capacity) {
vec->capacity *= 2;
vec->data = realloc(vec->data, vec->capacity * sizeof(word_freq));
if (vec->data == NULL) {
perror("unable to realloc data");
}
}
// verify that operation was a sucess
if (vec->data[vec->size - 1].word == NULL) {
fprintf(stderr, "Error: Inserting word \"%s\" to %ld failed\n", e, vec->size - 1);
return 1;
}
return 0;
}
void vec_free(vector* vec)
{
for (int i = 0; i < vec->size; i++) {
free(vec->data[i].word);
vec->data[i].word = NULL;
}
free(vec->data);
vec->data = NULL;
vec->size = vec->capacity = 0;
free(vec);
vec = NULL;
}
// quicksort implementation
void vec_qsort(vector* vec, int lo, int hi)
{
if (lo >= hi || lo < 0)
return;
int p = vec_partition(vec, lo, hi);
vec_qsort(vec, lo, p - 1); // left side
vec_qsort(vec, p + 1, hi); // right side
}
// partition vector for quicksort
int vec_partition(vector* vec, int lo, int hi)
{
word_freq pivot = vec->data[hi];
// pivot index
int i = (lo - 1);
for (int j = lo; j < hi; j++) {
// sort descending (greatest first)
if (vec->data[j].freq > pivot.freq) {
i++;
vec_swap(&vec->data[i], &vec->data[j]);
}
}
i++;
vec_swap(&vec->data[i], &vec->data[hi]);
return i;
}
// swap two word_freq elements in memory for quicksort
void vec_swap(word_freq* a, word_freq* b)
{
word_freq temp = *a;
*a = *b;
*b = temp;
}
// given an array, check that each of its fields is non-NULL
// return 0 if vector is healthy
// return 1 if data member is null
// return 2 if the string values of any word_freq members are null
int vec_validate(vector* vec)
{
if (vec->data == NULL) {
return 1;
}
for (int i = 0; i < vec->size; i++) {
if (vec->data[i].word == NULL) {
//fprintf(stderr, "data[%d] is null!\n", i);
return 2;
}
}
return 0;
}
// returns the index of the given word in the vector, or -1 if it does not exist
int vec_lsearch(vector* vec, char* target)
{
for (int i = 0; i < vec->size; i++) {
if (vec->data[i].word != NULL && strcmp(vec->data[i].word, target) == 0) {
return i;
}
}
return -1;
}
我遇到的问题是,我的许多价值观在不应该的时候却是
NULL
,而我却找不到原因。当使用超过 1 个线程时,程序大约 60% 的时间会出现段错误。段错误总是发生在 strcmp
中,我在这里比较 vec->data[i].word
和 target
。有时,我的向量中 word
的值将为 NULL。我可以使用 vec_validate
看到这一点,它搜索数组,并找到任何 NULL
值。当我查看 GDB 中由 NULL
报告为 vec_validate
的值时,它们似乎是正常的非空字符串。当我用 valgrind 运行程序时,它工作正常并且没有段错误。我不知道如何解决这个问题。
您的程序存在多个问题:
使用
if (fread(file_contents, sizeof(char), file_length, file) == -1)
检查文件读取错误并不可靠:您应该将返回值(读取的元素数量)与请求的数量 (file_length
) 进行比较。任何短计数都可能表明存在问题。
如果您在具有行尾序列转换的旧系统上运行,则应以二进制模式打开文件,以确保
fread
返回预期的字节数,即文件长度。
要处理大文件,您应该使用类型
long
来表示 int file_length = ftell(file);
如果文件有文字内容,
strncpy(info[i].buffer, offset, chunk_size)
应改为memcpy(info[i].buffer, offset, chunk_size)
,更高效、更明确。避免使用 strncpy
,大多数程序员对其语义了解甚少。您确实知道此函数不会设置空终止符,但是 strncpy
对于代码的读者来说仍然令人困惑且容易出错。
请注意,将内容拆分为恰好为
chunk_size
的块会带来两个问题:如果文件长度不是 chunk_size
的倍数,则尾部字节将被静默截断,并且跨越块边界的任何单词将静默地拆分为2 个字(对于大量线程或小文件,甚至更多)。
[主要]
info[i].mutex = mutex;
复制互斥体字节,这意味着每个线程都有互斥体的副本,违背了锁定公共互斥体以序列化对共享向量结构的访问的目的。作业结构应该有一个指向 main
中定义的单个互斥体的指针。