MapReduce程序設(shè)計
一、數(shù)據(jù)集及程序功能要求
數(shù)據(jù)集stock-daily,包含A股近4000只股票的最近30個交易日的日數(shù)據(jù),根據(jù)此數(shù)據(jù)實現(xiàn)股票風(fēng)險監(jiān)測統(tǒng)計:統(tǒng)計和輸出股票代碼和風(fēng)險值,計算出股票下行指數(shù)
數(shù)據(jù)來源:https://www.joinquant.com/help/api/help?name=JQData
風(fēng)險值統(tǒng)計方法:
忽略股票停牌當(dāng)日數(shù)據(jù)
忽略N/A數(shù)據(jù)行
股價下行指數(shù),(∑?〖(收盤價-開盤價) / (收盤價 - 最低價+1)〗) / 有效數(shù)據(jù)總天數(shù)
二、MapReduce環(huán)境配置(以下配置僅供參考!請根據(jù)實際情況進(jìn)行。如果MapReduce已經(jīng)完成過以下類似的配置或者曾經(jīng)成功執(zhí)行過jar包,請忽略該步驟)
進(jìn)入hadoop安裝目錄下的etc/hadoop目錄
cd /home/hadoop/hadoop-3.1.2/etc/hadoop
1、在mapred-site.xml配置文件中增加兩個配置
vi mapred-site.xml
<property>
? <name>mapreduce.admin.user.env</name>
? <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
<property>
? <name>yarn.app.mapreduce.am.env</name>
? <value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
2、在yarn-site.xml配置文件中增加container本地日志查看配置
??!注意:下面配置中的“hadoop安裝目錄”要替換為自己實際的hadoop安裝目錄,比如/home/hadoop/hadoop-3.1.2
vi yarn-site.xml
<property> ?
?<name>yarn.nodemanager.log-dirs</name> ?
?<value>hadoop安裝目錄/logs/userlogs</value> ?
</property>
<property> ?
?<name>yarn.nodemanager.log.retain-seconds</name> ?
?<value>108000</value> ?
</property>
<property>
?<name>yarn.nodemanager.resource.memory-mb</name>
?<value>2048</value> <!--此項小于1536,mapreduce程序會報錯-->
</property>
<property>
?<name>yarn.scheduler.maximum-allocation-mb</name>
?<value>2048</value> ? <!--防止一級調(diào)度器請求資源量過大-->
</property>
設(shè)置虛擬內(nèi)存與內(nèi)存的倍率,防止VM不足Container被kill
<property> ?
?<name>yarn.nodemanager.vmem-pmem-ratio</name> ?
?<value>3</value> ?
</property>
三、MapReduce 程序設(shè)計
1、在IDEA中新建Maven項目,并修改pom.xml文件,在pom.xml文件中的根節(jié)點中,添加一個子節(jié)點,如下圖:
2、查詢maven組件配置https://mvnrepository.com/
需要查詢的組件(自己判斷合適的版本,以自己安裝的hadoop版本為主,以下和hadoop有關(guān)的依賴以hadoop-3.1.2版本為例):
junit
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
? ?<groupId>junit</groupId>
? ?<artifactId>junit</artifactId>
? ?<version>4.12</version>
? ?<scope>test</scope>
</dependency>
log4j
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
? ?<groupId>log4j</groupId>
? ?<artifactId>log4j</artifactId>
? ?<version>1.2.17</version>
</dependency>
hadoop-hdfs
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
? ?<groupId>org.apache.hadoop</groupId>
? ?<artifactId>hadoop-hdfs</artifactId>
? ?<version>3.1.2</version>
? ?<scope>test</scope>
</dependency>
hadoop-common
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
? ?<groupId>org.apache.hadoop</groupId>
? ?<artifactId>hadoop-common</artifactId>
? ?<version>3.1.2</version>
</dependency>
hadoop-mapreduce-client-core
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
? ?<groupId>org.apache.hadoop</groupId>
? ?<artifactId>hadoop-mapreduce-client-core</artifactId>
? ?<version>3.1.2</version>
</dependency>
將所有查詢到的組件的XML插入到節(jié)點中去,如下圖:



pom.xml修改完畢后,點擊右下角的"Import Changes"即可將MAVEN庫中的JAR包下載到項目中,默認(rèn)情況下,會從MAVEN官網(wǎng)下載,速度比較慢;可事先配置MAVEN淘寶鏡像庫,基本方法就是在IntelliJ IDEA的安裝目錄下,找到MAVEN插件的安裝目錄,修改其配置文件,將鏡像設(shè)置添加到配置文件中即可
4、根據(jù)題目要求的邏輯實現(xiàn)代碼
基本代碼邏輯:
Map
首先按行通過”\t”分割,在判斷不是空置之后提取收盤價、開盤價、最低價,通過(收盤價-開盤價) / (收盤價 - 最低價+1)的公式計算出當(dāng)日股票下行指數(shù),以股票編號為鍵,當(dāng)日股票下行指數(shù)為值寫入context
輸入:一行數(shù)據(jù)(一只股票的日數(shù)據(jù))
處理:使用 \t 將字符串split成數(shù)組,提取需要計算的值,并轉(zhuǎn)為浮點數(shù)
輸出:<股票代碼, 股票當(dāng)日下行指數(shù)>
? ? ? 遇到無效數(shù)據(jù)不輸出(停牌股票或有N/A數(shù)據(jù)無法提取為浮點數(shù))
Map類
package cn.edu.swpu.scs;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Map extends Mapper<Object, Text, Text, FloatWritable> {
? ?private FloatWritable index = new FloatWritable();
? ?private Text code = new Text();
? ?@Override
? ?public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ?String line = value.toString();
? ? ? ?String[] infos = line.split("\t");
? ? ? ?if ((infos[12].equals("False")) && (infos[2].equals("N/A") == false)) {
? ? ? ? ? ?float open = Float.valueOf(infos[2]);
? ? ? ? ? ?float close = Float.valueOf(infos[3]);
? ? ? ? ? ?float low = Float.valueOf(infos[5]);
? ? ? ? ? ?index.set((close-open) / (close - low +1));
? ? ? ? ? ?code.set(infos[0]);
? ? ? ? ? ?context.write(code, index);
? ? ? ?}
? ?}
}
Reduce:
Reduce傳入的鍵是股票編號,值是每個編號每天的當(dāng)日股票下行指數(shù)。通過(∑?〖(收盤價-開盤價) / (收盤價 - 最低價+1)〗) / 有效數(shù)據(jù)總天數(shù),可以計算出每個編號的股票下行指數(shù)
輸入:<股票代碼,[股票每日下行指數(shù)]>
處理:計算均值
輸出:<股票代碼,股票下行指數(shù)>
Reduce類
package cn.edu.swpu.scs;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class Reduce extends Reducer<Text, FloatWritable, Text, FloatWritable> {
? ?private FloatWritable fw=new FloatWritable();
? ?@Override
? ?public void reduce(Text key, Iterable<FloatWritable> index, Context context)
? ? ? ? ? ?throws IOException, InterruptedException {
? ? ? ?float sum = 0;
? ? ? ?int num = 0;
? ? ? ?Iterator<FloatWritable> values = index.iterator();
? ? ? ?while (values.hasNext()) {
? ? ? ? ? ?sum += values.next().get();
? ? ? ? ? ?num++;
? ? ? ?}
? ? ? ?System.out.print("code: " + key.toString() + " ? ? ?" + "average decline index: " + sum / num + "\n");
? ? ? ?fw.set(sum / num);
? ? ? ?context.write(key, fw);
? ?}
}
StockDaily類(主類)
package cn.edu.swpu.scs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class StockDaily {
? ?public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
? ? ? ?//1、獲取job
? ? ? ?Configuration conf = new Configuration();
? ? ? ?Job job = Job.getInstance(conf);
? ? ? ?//2、設(shè)置jar包路徑
? ? ? ?job.setJarByClass(StockDaily.class);
? ? ? ?//3、關(guān)聯(lián)Mapper和Reducer
? ? ? ?job.setMapperClass(Map.class);
? ? ? ?job.setReducerClass(Reduce.class);
? ? ? ?//4、設(shè)置map輸出的kv類型
? ? ? ?job.setMapOutputKeyClass(Text.class);
? ? ? ?job.setMapOutputValueClass(FloatWritable.class);
? ? ? ?//5、設(shè)置最終輸出的kv類型
? ? ? ?job.setOutputKeyClass(Text.class);
? ? ? ?job.setOutputValueClass(FloatWritable.class);
? ? ? ?//6、設(shè)置輸入路徑和輸出路徑
? ? ? ?FileInputFormat.setInputPaths(job,new Path(args[0]));
? ? ? ?FileOutputFormat.setOutputPath(job,new Path(args[1]));
? ? ? ?//7、提交job
? ? ? ?boolean res = job.waitForCompletion(true);
? ? ? ?System.exit(res?0:1);
? ?}
}
5、編譯代碼
編碼完成后,先run一次項目,讓開發(fā)環(huán)境將Java代碼編譯一次,run命令是先編譯再執(zhí)行,這里會報執(zhí)行錯誤,只要能編譯成功即可;也可運行build --> recompile只執(zhí)行編譯。

四、導(dǎo)出jar包
? 1、依次點擊File-->Project Structure-->Projec Settings-->Artifacts-->"+"-->Jar-->Empty,創(chuàng)建一個名為unnamed的空J(rèn)AR包

2、為JAR添加目錄,目錄結(jié)構(gòu)必須與包名一致,如:包名為cn.edu.swpu.scs,那么目錄結(jié)構(gòu)就必須為/cn/edu/swpu/scs,如下圖:
1為自己命名的的jar包名字,3.為代碼所在包的路徑,我的代碼所在包的路徑為cn.edu.swpu.scs,可以通過多次點擊2指向的按鈕來添加多級路徑

3、在包中添加文件,在剛創(chuàng)建的包目錄中添加class文件

選擇file后,瀏覽至項目的class文件處即可完成添加,如下圖:


點擊OK
注意:要實現(xiàn)編譯,才有此class文件,沒編譯或編譯失敗都無此文件
至此JAR包的定義完成,可以開始執(zhí)行打包了
4、執(zhí)行 Build --> Build Artifacts完成build,完成后,在項目的out目錄中可找到創(chuàng)建的JAR包,如下圖:

直接用資源管理器打開其目錄即可,如下圖:

五、將Jar包上傳到Linux本地并執(zhí)行JAR包
? 1、通過MobaXterm實現(xiàn)上傳文件(包括Jar包和數(shù)據(jù)集文件,注意上傳成功后要將stock-daily.zip文件解壓使用)

?2、在hdfs中創(chuàng)建輸入文件夾

3、把解壓后的輸入文件stock-daily從Linux本地上傳到hdfs的/stock-daily/input中
hadoop fs -put stock-daily /stock-daily/input
?4、執(zhí)行hadoop jar命令:
hadoop jar StockDaily.jar cn.edu.swpu.scs.StockDaily /stock-daily/input/stock-daily /stock-daily/output
StockDaily.jar的位置是通過MobaXterm上傳后默認(rèn)的目錄,具體以實際的目錄為準(zhǔn)
執(zhí)行時,需要保證MapReduce在HDFS上的輸出目錄不存在,否則HDFS會報錯




5、查看運行成功后輸出的文件
hadoop fs -ls /stock-daily/output


查看運行結(jié)果
hadoop fs -cat /stock-daily/output/part-r-00000

附:
? 1、stock-daily數(shù)據(jù)集
https://download.csdn.net/download/m0_69488210/87959387?spm=1001.2014.3001.5503
? 2、stock-daily數(shù)據(jù)說明
['code','isst',open', 'close', 'high', 'low', 'volume', 'money', 'factor', 'high_limit', 'low_limit', 'avg', 'paused', 'date']
code 股票代碼
isst 是否ST
open 開盤價
close 收盤價
high 當(dāng)日最高價
low 當(dāng)日最低價
volume 交易量(手)
money 交易量(萬元)
factor 除權(quán)比例
high_limit 漲停價
low_limit 跌停價
avg 每日均價
paused 是否停牌
date 日期