最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

MapReduce程序設(shè)計

2023-06-28 15:24 作者:thisishui  | 我要投稿

一、數(shù)據(jù)集及程序功能要求

數(shù)據(jù)集stock-daily,包含A股近4000只股票的最近30個交易日的日數(shù)據(jù),根據(jù)此數(shù)據(jù)實現(xiàn)股票風(fēng)險監(jiān)測統(tǒng)計:統(tǒng)計和輸出股票代碼和風(fēng)險值,計算出股票下行指數(shù)

數(shù)據(jù)來源:https://www.joinquant.com/help/api/help?name=JQData

風(fēng)險值統(tǒng)計方法:

忽略股票停牌當(dāng)日數(shù)據(jù)

忽略N/A數(shù)據(jù)行

股價下行指數(shù),(∑?〖(收盤價-開盤價) / (收盤價 - 最低價+1)〗) / 有效數(shù)據(jù)總天數(shù)


二、MapReduce環(huán)境配置(以下配置僅供參考!請根據(jù)實際情況進(jìn)行。如果MapReduce已經(jīng)完成過以下類似的配置或者曾經(jīng)成功執(zhí)行過jar包,請忽略該步驟)

進(jìn)入hadoop安裝目錄下的etc/hadoop目錄

cd /home/hadoop/hadoop-3.1.2/etc/hadoop

1、在mapred-site.xml配置文件中增加兩個配置

vi mapred-site.xml
<property>
 ? <name>mapreduce.admin.user.env</name>
 ? <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
<property>
 ? <name>yarn.app.mapreduce.am.env</name>
 ? <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>


2、在yarn-site.xml配置文件中增加container本地日志查看配置
??!注意:下面配置中的“hadoop安裝目錄”要替換為自己實際的hadoop安裝目錄,比如/home/hadoop/hadoop-3.1.2

vi yarn-site.xml
<property> ?
 ?<name>yarn.nodemanager.log-dirs</name> ?
 ?<value>hadoop安裝目錄/logs/userlogs</value> ?
</property>
<property> ?
 ?<name>yarn.nodemanager.log.retain-seconds</name> ?
 ?<value>108000</value> ?
</property>
<property>
 ?<name>yarn.nodemanager.resource.memory-mb</name>
 ?<value>2048</value>	<!--此項小于1536,mapreduce程序會報錯-->
</property>
<property>
 ?<name>yarn.scheduler.maximum-allocation-mb</name>
 ?<value>2048</value> ? <!--防止一級調(diào)度器請求資源量過大-->
</property>

設(shè)置虛擬內(nèi)存與內(nèi)存的倍率,防止VM不足Container被kill


<property> ?
 ?<name>yarn.nodemanager.vmem-pmem-ratio</name> ?
 ?<value>3</value> ?
</property>

三、MapReduce 程序設(shè)計

1、在IDEA中新建Maven項目,并修改pom.xml文件,在pom.xml文件中的根節(jié)點中,添加一個子節(jié)點,如下圖:

2、查詢maven組件配置https://mvnrepository.com/

需要查詢的組件(自己判斷合適的版本,以自己安裝的hadoop版本為主,以下和hadoop有關(guān)的依賴以hadoop-3.1.2版本為例):

junit

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
 ? ?<groupId>junit</groupId>
 ? ?<artifactId>junit</artifactId>
 ? ?<version>4.12</version>
 ? ?<scope>test</scope>
</dependency>

log4j

<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
 ? ?<groupId>log4j</groupId>
 ? ?<artifactId>log4j</artifactId>
 ? ?<version>1.2.17</version>
</dependency>

hadoop-hdfs

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
 ? ?<groupId>org.apache.hadoop</groupId>
 ? ?<artifactId>hadoop-hdfs</artifactId>
 ? ?<version>3.1.2</version>
 ? ?<scope>test</scope>
</dependency>

hadoop-common

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
 ? ?<groupId>org.apache.hadoop</groupId>
 ? ?<artifactId>hadoop-common</artifactId>
 ? ?<version>3.1.2</version>
</dependency>

hadoop-mapreduce-client-core

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
 ? ?<groupId>org.apache.hadoop</groupId>
 ? ?<artifactId>hadoop-mapreduce-client-core</artifactId>
 ? ?<version>3.1.2</version>
</dependency>

將所有查詢到的組件的XML插入到節(jié)點中去,如下圖:

pom.xml修改完畢后,點擊右下角的"Import Changes"即可將MAVEN庫中的JAR包下載到項目中,默認(rèn)情況下,會從MAVEN官網(wǎng)下載,速度比較慢;可事先配置MAVEN淘寶鏡像庫,基本方法就是在IntelliJ IDEA的安裝目錄下,找到MAVEN插件的安裝目錄,修改其配置文件,將鏡像設(shè)置添加到配置文件中即可


4、根據(jù)題目要求的邏輯實現(xiàn)代碼

基本代碼邏輯:

Map

首先按行通過”\t”分割,在判斷不是空置之后提取收盤價、開盤價、最低價,通過(收盤價-開盤價) / (收盤價 - 最低價+1)的公式計算出當(dāng)日股票下行指數(shù),以股票編號為鍵,當(dāng)日股票下行指數(shù)為值寫入context

輸入:一行數(shù)據(jù)(一只股票的日數(shù)據(jù))

處理:使用 \t 將字符串split成數(shù)組,提取需要計算的值,并轉(zhuǎn)為浮點數(shù)

輸出:<股票代碼, 股票當(dāng)日下行指數(shù)>

? ? ? 遇到無效數(shù)據(jù)不輸出(停牌股票或有N/A數(shù)據(jù)無法提取為浮點數(shù))


Map類

package cn.edu.swpu.scs;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class Map extends Mapper<Object, Text, Text, FloatWritable> {
 ? ?private FloatWritable index = new FloatWritable();
 ? ?private Text code = new Text();

 ? ?@Override
 ? ?public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 ? ? ? ?String line = value.toString();
 ? ? ? ?String[] infos = line.split("\t");
 ? ? ? ?if ((infos[12].equals("False")) && (infos[2].equals("N/A") == false)) {
 ? ? ? ? ? ?float open = Float.valueOf(infos[2]);
 ? ? ? ? ? ?float close = Float.valueOf(infos[3]);
 ? ? ? ? ? ?float low = Float.valueOf(infos[5]);
 ? ? ? ? ? ?index.set((close-open) / (close - low +1));
 ? ? ? ? ? ?code.set(infos[0]);
 ? ? ? ? ? ?context.write(code, index);
 ? ? ? ?}

 ? ?}
}

Reduce:

Reduce傳入的鍵是股票編號,值是每個編號每天的當(dāng)日股票下行指數(shù)。通過(∑?〖(收盤價-開盤價) / (收盤價 - 最低價+1)〗) / 有效數(shù)據(jù)總天數(shù),可以計算出每個編號的股票下行指數(shù)

輸入:<股票代碼,[股票每日下行指數(shù)]>

處理:計算均值

輸出:<股票代碼,股票下行指數(shù)>


Reduce類

package cn.edu.swpu.scs;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;

public class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
 ? ?private FloatWritable fw=new FloatWritable();

 ? ?@Override
 ? ?public void reduce(Text key, Iterable<FloatWritable> index, Context context)
 ? ? ? ? ? ?throws IOException, InterruptedException {
 ? ? ? ?float sum = 0;
 ? ? ? ?int num = 0;
 ? ? ? ?Iterator<FloatWritable> values = index.iterator();
 ? ? ? ?while (values.hasNext()) {
 ? ? ? ? ? ?sum += values.next().get();
 ? ? ? ? ? ?num++;
 ? ? ? ?}
 ? ? ? ?System.out.print("code: " + key.toString() + " ? ? ?" + "average decline index: " + sum / num + "\n");
 ? ? ? ?fw.set(sum / num);
 ? ? ? ?context.write(key, fw);
 ? ?}
}

StockDaily類(主類)

package cn.edu.swpu.scs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class StockDaily {
 ? ?public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
 ? ? ? ?//1、獲取job
 ? ? ? ?Configuration conf = new Configuration();
 ? ? ? ?Job job = Job.getInstance(conf);
 ? ? ? ?//2、設(shè)置jar包路徑
 ? ? ? ?job.setJarByClass(StockDaily.class);
 ? ? ? ?//3、關(guān)聯(lián)Mapper和Reducer
 ? ? ? ?job.setMapperClass(Map.class);
 ? ? ? ?job.setReducerClass(Reduce.class);
 ? ? ? ?//4、設(shè)置map輸出的kv類型
 ? ? ? ?job.setMapOutputKeyClass(Text.class);
 ? ? ? ?job.setMapOutputValueClass(FloatWritable.class);
 ? ? ? ?//5、設(shè)置最終輸出的kv類型
 ? ? ? ?job.setOutputKeyClass(Text.class);
 ? ? ? ?job.setOutputValueClass(FloatWritable.class);
 ? ? ? ?//6、設(shè)置輸入路徑和輸出路徑
 ? ? ? ?FileInputFormat.setInputPaths(job,new Path(args[0]));
 ? ? ? ?FileOutputFormat.setOutputPath(job,new Path(args[1]));
 ? ? ? ?//7、提交job
 ? ? ? ?boolean res = job.waitForCompletion(true);
 ? ? ? ?System.exit(res?0:1);
 ? ?}
}


5、編譯代碼

編碼完成后,先run一次項目,讓開發(fā)環(huán)境將Java代碼編譯一次,run命令是先編譯再執(zhí)行,這里會報執(zhí)行錯誤,只要能編譯成功即可;也可運行build --> recompile只執(zhí)行編譯。


四、導(dǎo)出jar包

? 1、依次點擊File-->Project Structure-->Projec Settings-->Artifacts-->"+"-->Jar-->Empty,創(chuàng)建一個名為unnamed的空J(rèn)AR包

2、為JAR添加目錄,目錄結(jié)構(gòu)必須與包名一致,如:包名為cn.edu.swpu.scs,那么目錄結(jié)構(gòu)就必須為/cn/edu/swpu/scs,如下圖:

1為自己命名的的jar包名字,3.為代碼所在包的路徑,我的代碼所在包的路徑為cn.edu.swpu.scs,可以通過多次點擊2指向的按鈕來添加多級路徑

3、在包中添加文件,在剛創(chuàng)建的包目錄中添加class文件

選擇file后,瀏覽至項目的class文件處即可完成添加,如下圖:

點擊OK


注意:要實現(xiàn)編譯,才有此class文件,沒編譯或編譯失敗都無此文件


至此JAR包的定義完成,可以開始執(zhí)行打包了


4、執(zhí)行 Build --> Build Artifacts完成build,完成后,在項目的out目錄中可找到創(chuàng)建的JAR包,如下圖:

直接用資源管理器打開其目錄即可,如下圖:


五、將Jar包上傳到Linux本地并執(zhí)行JAR包

? 1、通過MobaXterm實現(xiàn)上傳文件(包括Jar包和數(shù)據(jù)集文件,注意上傳成功后要將stock-daily.zip文件解壓使用)

?2、在hdfs中創(chuàng)建輸入文件夾

3、把解壓后的輸入文件stock-daily從Linux本地上傳到hdfs的/stock-daily/input中

hadoop fs -put stock-daily /stock-daily/input

?4、執(zhí)行hadoop jar命令:

hadoop jar StockDaily.jar cn.edu.swpu.scs.StockDaily /stock-daily/input/stock-daily /stock-daily/output

StockDaily.jar的位置是通過MobaXterm上傳后默認(rèn)的目錄,具體以實際的目錄為準(zhǔn)

執(zhí)行時,需要保證MapReduce在HDFS上的輸出目錄不存在,否則HDFS會報錯


5、查看運行成功后輸出的文件

hadoop fs -ls /stock-daily/output

查看運行結(jié)果

hadoop fs -cat /stock-daily/output/part-r-00000


附:

? 1、stock-daily數(shù)據(jù)集

https://download.csdn.net/download/m0_69488210/87959387?spm=1001.2014.3001.5503

? 2、stock-daily數(shù)據(jù)說明

['code','isst',open', 'close', 'high', 'low', 'volume', 'money', 'factor', 'high_limit', 'low_limit', 'avg', 'paused', 'date']

code 股票代碼

isst 是否ST

open 開盤價

close 收盤價

high 當(dāng)日最高價

low 當(dāng)日最低價

volume 交易量(手)

money 交易量(萬元)

factor 除權(quán)比例

high_limit 漲停價

low_limit 跌停價

avg 每日均價

paused 是否停牌

date 日期

MapReduce程序設(shè)計的評論 (共 條)

分享到微博請遵守國家法律
巫溪县| 家居| 土默特左旗| 武陟县| 白城市| 新干县| 县级市| 苏尼特左旗| 虎林市| 长春市| 洛隆县| 即墨市| 那曲县| 连山| 淮南市| 永和县| 繁峙县| 宁阳县| 盐源县| 金华市| 剑河县| 金坛市| 于都县| 澄城县| 理塘县| 商丘市| 安陆市| 晋中市| 承德市| 江北区| 吴忠市| 江孜县| 买车| 九台市| 剑阁县| 溧阳市| 隆昌县| 兴安县| 望江县| 东山县| 南投市|