MapReduce高級程序設計
注:MapReduce環(huán)境配置,Jar包導出,上傳,hadoop執(zhí)行等操作可以查看文章:MapReduce程序設計,只需替換為本篇文章的代碼即可實現(xiàn)
https://blog.csdn.net/m0_69488210/article/details/131432125
數(shù)據(jù)集:https://download.csdn.net/download/m0_69488210/87959387
1、滾動收益率計算方法:
(1) 忽略N/A所在日的股票數(shù)據(jù),思考:可使用插值算法填充異常N/A數(shù)據(jù),但退市股票同樣會造成N/A數(shù)據(jù),需要識別那種數(shù)據(jù)是退市造成的,而哪種數(shù)據(jù)是異常形成的。
(2)第t日的5日滾動收益
Rt= (C_t - C_(t-5) ) / C_(t-5) ,Ct:第t日收盤價 Rt:第t日滾動收益
(3) 5日滾動正收益率
所有交易日的5日滾動收益為正(賺錢)的概率
所有計算忽略非交易日(節(jié)假日)
2、二次排序和組排序
MapReduce中的二次排序是指在MapReduce任務中對鍵值對進行排序時,除了根據(jù)鍵進行排序之外,還可以根據(jù)值進行排序。在二次排序中,首先按照鍵進行排序,然后對于具有相同鍵的按照指定的值進行排序,最終輸出排序后的鍵值對序列。
組排序是將Map任務輸出的鍵值對按照key進行排序并分組,具有相同key的鍵值對會被劃分到同一組,并發(fā)送給同一個Reduce任務進行處理。這就確保了擁有相同key的鍵值對能夠在Reduce階段被合并處理,從而得出結果。
3、基本代碼邏輯要求:
(1) CodeTimeTuple implements WritableComparable
封裝一個代碼時間類,用以在鍵中存放股票代碼和時間,用以按股票和時間進行二次排序;定義key排序比較器,按股票代碼進行一次排序,按時間進行二次排序
package cn.edu.swpu.secondary;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//自定義Tuple
public class CodeTimeTuple implements WritableComparable<CodeTimeTuple> {
? ?private LongWritable time = new LongWritable();
? ?private Text code = new Text();
? ?public LongWritable getTime() { return time; }
? ?public void setTime(LongWritable time) { this.time = time; }
? ?public Text getCode() { return code; }
? ?public void setCode(Text code) { this.code = code; }
? ?//寫入數(shù)據(jù)至流
? ?//用于框架對數(shù)據(jù)的處理
? ?//注意讀readFields和寫write的順序一致
? ?@Override
? ?public void write(DataOutput dataOutput) throws IOException {
? ? ? ?code.write(dataOutput);
? ? ? ?time.write(dataOutput);
? ?}
? ?//從流中讀取數(shù)據(jù)
? ?//將框架返回的數(shù)據(jù)提取出到對應屬性中來
? ?//注意讀readFields和寫write的順序一致
? ?@Override
? ?public void readFields(DataInput dataInput) throws IOException {
? ? ? ?code.readFields(dataInput);
? ? ? ?time.readFields(dataInput);
? ?}
? ?//Key排序
? ?@Override
? ?public int compareTo(CodeTimeTuple o) {
? ? ? ?//一次排序:股票代碼排序(這里要與組排序邏輯相同)
? ? ? ?int cmp = this.getCode().compareTo(o.getCode());
? ? ? ?//如果股票代碼相同,則按時間排序
? ? ? ?if(cmp != 0)
? ? ? ? ? ?return cmp;
? ? ? ?//二次排序:時間排序,結果乘以-1則降序排列,否則為升序排列
? ? ? ?return this.getTime().compareTo(o.getTime());
? ?}
}
(2) Map extends Mapper
輸入:一行數(shù)據(jù)(一只股票的日數(shù)據(jù))
處理:使用 \t 將字符串split成數(shù)組,提取需要計算的值,并轉為浮點數(shù)
輸出:<代碼時間對象, 收盤價>
? ? ? 遇到無效數(shù)據(jù)不輸出(停牌股票或有N/A數(shù)據(jù)無法提取為浮點數(shù))
? ? ? 在Map階段首先分割傳入的每一行的信息,忽略空置,取出收盤價,股票代碼和日期,把股票編號和日期封裝到CodeTimeTuple的序列化對象tuple里,在map輸出的時候,tuple作為鍵,收盤價為對應的值,在CodeTimeTuple類里實現(xiàn)了按照股票代碼和時間的二次排序,保證傳入reduce的是按照股票代碼和時間二次排序之后的升序排序
(3) GroupSort extends WritableComparator
創(chuàng)建一個排序比較器,修改組排序邏輯,按股票代碼排序
通過組排序保證了傳入Reduce的數(shù)據(jù)是排序之后按照股票代碼分組的數(shù)據(jù),保證了reduce可以合并相同股票代碼的數(shù)據(jù)
(4) Reduce extends Reducer
輸入:<代碼時間對象,[收盤價]>
處理:計算每個5日的滾動收益,并統(tǒng)計滾動收益為正的概率
輸出:<股票代碼,滾動收益為正的概率>
把傳入reduce的值存入到列表中,通過Rt= (C_t? - C_(t-5)? ) / C_(t-5)計算第t日的5日滾動收益,依次判斷每個股票代碼對應的所有的五日滾動收益是否為正,把正數(shù)的數(shù)量除以相同股票代碼的所有數(shù)量就可以得到每個股票代碼的5日滾動收益為正(賺錢)的概率,reduce輸出的鍵為股票代碼,值為每個股票代碼的5日滾動收益為正(賺錢)的概率。
package cn.edu.swpu.secondary;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class Ranking {
? ?public static void main(String[] args) throws Exception {
? ? ? ?//1、獲取job
? ? ? ?Configuration conf = new Configuration();
? ? ? ?Job job = Job.getInstance(conf);
? ? ? ?//2、設置jar包路徑
? ? ? ?job.setJarByClass(Ranking.class);
? ? ? ?//3、關聯(lián)Mapper和Reducer和Grouping
? ? ? ?job.setMapperClass(Map.class);
? ? ? ?job.setGroupingComparatorClass(Grouping.class);
? ? ? ?job.setReducerClass(Reduce.class);
? ? ? ?//4、設置map輸出的kv類型
? ? ? ?job.setMapOutputKeyClass(CodeTimeTuple.class);
? ? ? ?job.setMapOutputValueClass(FloatWritable.class);
? ? ? ?//5、設置最終輸出的kv類型
? ? ? ?job.setOutputKeyClass(Text.class);
? ? ? ?job.setOutputValueClass(FloatWritable.class);
? ? ? ?//6、設置輸入路徑和輸出路徑
? ? ? ?FileInputFormat.setInputPaths(job, new Path(args[0]));
? ? ? ?FileOutputFormat.setOutputPath(job, new Path(args[1]));
? ? ? ?//7、提交job
? ? ? ?boolean res = job.waitForCompletion(true);
? ? ? ?System.exit(res ? 0 : 1);
? ?}
? ?public static String value;
? ?public static class Map extends Mapper<LongWritable, Text, CodeTimeTuple, FloatWritable> {
? ? ? ?private final FloatWritable outV = new FloatWritable();
? ? ? ?@Override
? ? ? ?public void map(LongWritable key, Text value, Context context)
? ? ? ? ? ? ? ?throws IOException, InterruptedException {
? ? ? ? ? ?String line = value.toString();
? ? ? ? ? ?String[] items = line.split("\t");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?if ((items[12].equals("False")) && (items[2].equals("N/A") == false)) {
? ? ? ? ? ? ? ? ? ?CodeTimeTuple tuple = new CodeTimeTuple();
? ? ? ? ? ? ? ? ? ?tuple.setCode(new Text(items[0])); ? ? ?//股票代碼
? ? ? ? ? ? ? ? ? ?Date date = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(items[13] + " 00:00:00");
? ? ? ? ? ? ? ? ? ?tuple.setTime(new LongWritable(date.getTime()));//時間戳
? ? ? ? ? ? ? ? ? ?outV.set(Float.valueOf(items[3]));
? ? ? ? ? ? ? ? ? ?//context.write(tuple, outV);
? ? ? ? ? ? ? ? ? ?context.write(tuple, outV);
? ? ? ? ? ? ? ? ? ?//System.out.println("key: " + items[0] + " line: " + line);
? ? ? ? ? ? ? ?}
? ? ? ? ? ?} catch (ParseException e) {
? ? ? ? ? ? ? ?System.out.println(line);
? ? ? ? ? ? ? ?System.out.println(e.getMessage());
? ? ? ? ? ?}
? ? ? ?}
? ?}
? ?// Reduce過程
? ?public static class Reduce extends Reducer<CodeTimeTuple, FloatWritable, Text, FloatWritable> {
? ? ? ?// 創(chuàng)建一個股票代碼對象作為輸出的key
? ? ? ?Text stockCode = new Text();
? ? ? ?// 創(chuàng)建一個概率對象作為輸出的value
? ? ? ?FloatWritable probability = new FloatWritable();
? ? ? ?int j = 0;
? ? ? ?@Override
? ? ? ?public void reduce(CodeTimeTuple key, Iterable<FloatWritable> val, Context context) throws IOException, InterruptedException {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?String mark = "1";
? ? ? ? ? ? ? ?// 將收盤價列表轉為數(shù)組
? ? ? ? ? ? ? ?List<Float> closePrices = new ArrayList<Float>();
? ? ? ? ? ? ? ?for (FloatWritable value : val) {
? ? ? ? ? ? ? ? ? ?Date date1 = new Date(key.getTime().get());
? ? ? ? ? ? ? ? ? ?String dateString1 = new SimpleDateFormat("yyyy-MM-dd").format(date1);
? ? ? ? ? ? ? ? ? ?closePrices.add(value.get());
? ? ? ? ? ? ? ? ? ?System.out.println("INFO[" + mark + "] key:" + key.getCode().toString() + " " + dateString1 + " value:" + " " + value.toString() + " index:" + j);
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?j++;
? ? ? ? ? ? ? ?// 計算每個5日的滾動收益,并統(tǒng)計滾動收益為正的次數(shù)和總次數(shù)
? ? ? ? ? ? ? ?int positiveCount = 0;
? ? ? ? ? ? ? ?int totalCount = 0;
? ? ? ? ? ? ? ?for (int i = 5; i < closePrices.size(); i++) {
? ? ? ? ? ? ? ? ? ?float returnRate = (closePrices.get(i) - closePrices.get(i - 5)) / closePrices.get(i - 5);
? ? ? ? ? ? ? ? ? ?if (returnRate > 0) {
? ? ? ? ? ? ? ? ? ? ? ?positiveCount++;
? ? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? ? ?totalCount++;
? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ?// 計算滾動收益為正的概率
? ? ? ? ? ? ? ?float positiveProbability = (float) positiveCount / totalCount;
? ? ? ? ? ? ? ?stockCode.set(key.getCode());
? ? ? ? ? ? ? ?probability.set(positiveProbability);
? ? ? ? ? ? ? ?// 輸出<股票代碼,滾動收益為正的概率>
? ? ? ? ? ? ? ?context.write(stockCode, probability);
? ? ? ? ? ?} catch (Exception e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? ? ? ?//System.out.println("ERROR " + key.toString());
? ? ? ? ? ?}
? ? ? ?}
? ?}
? ?//組排序
? ?public static class Grouping extends WritableComparator {
? ? ? ?protected Grouping() {
? ? ? ? ? ?super(CodeTimeTuple.class, true);
? ? ? ?}
? ? ? ?@Override
? ? ? ?public int compare(WritableComparable a, WritableComparable b) {
? ? ? ? ? ?CodeTimeTuple key1 = (CodeTimeTuple) a;
? ? ? ? ? ?CodeTimeTuple key2 = (CodeTimeTuple) b;
? ? ? ? ? ?return key1.getCode().compareTo(key2.getCode());
? ? ? ? ? ?//直接return 0則表示不分組(所有key一個組)
? ? ? ? ? ?//return 0;
? ? ? ?}
? ?}
}