1
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <chrono>
#include <mpi.h>
using namespace std;
using namespace std::chrono;
int main(int argc, char **argv) {
? ? MPI_Init(&argc, &argv);
? ? int rank, size;
? ? MPI_Comm_rank(MPI_COMM_WORLD, &rank);
? ? MPI_Comm_size(MPI_COMM_WORLD, &size);
? ? double start_time = MPI_Wtime();
? ? if (rank == 0) {
? ? ? ? // 進程0讀取CSV文件并將其行數(shù)分配給每個進程
? ? ? ? ifstream input("data.csv");
? ? ? ? vector<vector<double>> data;
? ? ? ? string line, value;
? ? ? ? while (getline(input, line)) {
? ? ? ? ? ? vector<double> row;
? ? ? ? ? ? stringstream ss(line);
? ? ? ? ? ? while (getline(ss, value, ',')) {
? ? ? ? ? ? ? ? row.push_back(stod(value));
? ? ? ? ? ? }
? ? ? ? ? ? data.push_back(row);
? ? ? ? }
? ? ? ? int data_len = data.size();
? ? ? ? int chunk_size = data_len / size;
? ? ? ? int extra = data_len % size;
? ? ? ? // 向其他進程廣播數(shù)據(jù)長度和每個進程的數(shù)據(jù)數(shù)量
? ? ? ? MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? // 將數(shù)據(jù)按平均分配給其他進程
? ? ? ? for (int i = 1; i < size; i++) {
? ? ? ? ? ? int start = i * chunk_size;
? ? ? ? ? ? int end = (i + 1) * chunk_size;
? ? ? ? ? ? if (i == size - 1) end += extra;
? ? ? ? ? ? MPI_Send(&data[start][0], (end - start) * 4, MPI_DOUBLE, i, 0, MPI_COMM_WORLD);
? ? ? ? }
? ? } else {
? ? ? ? // 接收廣播的數(shù)據(jù)長度和每個進程的數(shù)據(jù)數(shù)量
? ? ? ? int data_len, chunk_size, extra;
? ? ? ? MPI_Bcast(&data_len, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? MPI_Bcast(&chunk_size, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? MPI_Bcast(&extra, 1, MPI_INT, 0, MPI_COMM_WORLD);
? ? ? ? // 接收平均分配給它們的數(shù)據(jù)
? ? ? ? int start = rank * chunk_size;
? ? ? ? int end = (rank + 1) * chunk_size;
? ? ? ? if (rank == size - 1) end += extra;
? ? ? ? vector<vector<double>> data(end - start, vector<double>(4));
? ? ? ? MPI_Recv(&data[0][0], (end - start) * 4, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
? ? ? ? // 對數(shù)據(jù)進行處理并計算平均數(shù)
? ? ? ? vector<double> result(end - start);
? ? ? ? for (int i = 0; i < end - start; i++) {
? ? ? ? ? ? double avg = 0.0;
? ? ? ? ? ? for (int j = 0; j < 4; j++) {
? ? ? ? ? ? ? ? avg += data[i][j];
? ? ? ? ? ? }
? ? ? ? ? ? result[i] = avg / 4.0;
? ? ? ? }
? ? ? ? // 將計算結果發(fā)送回進程0
? ? ? ? MPI_Send(&result[0], end - start, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
? ? }
? ? if (rank == 0) {
? ? ? ? // 進程0接收其他進程發(fā)送的計算結果并寫回文件
? ? ? ? vector<double> results;
? ? ? ? for (int i = 1; i < size; i++) {
? ? ? ? ? ? int start = i * chunk_size;
? ? ? ? ? ? int end = (i + 1) * chunk_size;
? ? ? ? ? ? if (i == size - 1) end += extra;
? ? ? ? ? ? vector<double> buf(end - start);
? ? ? ? ? ? MPI_Recv(&buf[0], end - start, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
? ? ? ? ? ? results.insert(results.end(), buf.begin(), buf.end());
? ? ? ? }
? ? ? ? ofstream output("data_processed.csv");
? ? ? ? output << "Average" << endl;
? ? ? ? for (double r : results) {
? ? ? ? ? ? output << r << endl;
? ? ? ? }
? ? ? ? // 記錄從開始到結尾的時間并輸出
? ? ? ? double end_time = MPI_Wtime();
? ? ? ? double elapsed_time = end_time - start_time;
? ? ? ? cout << "Total elapsed time: " << elapsed_time << endl;
? ? }
? ? MPI_Finalize();
? ? return 0;
}