最美情侣中文字幕电影,在线麻豆精品传媒,在线网站高清黄,久久黄色视频

歡迎光臨散文網 會員登陸 & 注冊

MapReduce高級程序設計

2023-06-28 17:10 作者:thisishui  | 我要投稿

注:MapReduce環(huán)境配置,Jar包導出,上傳,hadoop執(zhí)行等操作可以查看文章:MapReduce程序設計,只需替換為本篇文章的代碼即可實現(xiàn)
https://blog.csdn.net/m0_69488210/article/details/131432125
數(shù)據(jù)集:https://download.csdn.net/download/m0_69488210/87959387

1、滾動收益率計算方法:
(1) 忽略N/A所在日的股票數(shù)據(jù),思考:可使用插值算法填充異常N/A數(shù)據(jù),但退市股票同樣會造成N/A數(shù)據(jù),需要識別那種數(shù)據(jù)是退市造成的,而哪種數(shù)據(jù)是異常形成的。
(2)第t日的5日滾動收益
Rt= (C_t - C_(t-5) ) / C_(t-5) ,Ct:第t日收盤價 Rt:第t日滾動收益
(3) 5日滾動正收益率
所有交易日的5日滾動收益為正(賺錢)的概率

  • 所有計算忽略非交易日(節(jié)假日)

2、二次排序和組排序
MapReduce中的二次排序是指在MapReduce任務中對鍵值對進行排序時,除了根據(jù)鍵進行排序之外,還可以根據(jù)值進行排序。在二次排序中,首先按照鍵進行排序,然后對于具有相同鍵的按照指定的值進行排序,最終輸出排序后的鍵值對序列。
組排序是將Map任務輸出的鍵值對按照key進行排序并分組,具有相同key的鍵值對會被劃分到同一組,并發(fā)送給同一個Reduce任務進行處理。這就確保了擁有相同key的鍵值對能夠在Reduce階段被合并處理,從而得出結果。

3、基本代碼邏輯要求:
(1) CodeTimeTuple implements WritableComparable
封裝一個代碼時間類,用以在鍵中存放股票代碼和時間,用以按股票和時間進行二次排序;定義key排序比較器,按股票代碼進行一次排序,按時間進行二次排序

package cn.edu.swpu.secondary;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

//自定義Tuple
public class CodeTimeTuple implements WritableComparable<CodeTimeTuple> {

 ? ?private LongWritable time = new LongWritable();
 ? ?private Text code = new Text();

 ? ?public LongWritable getTime() { return time; }

 ? ?public void setTime(LongWritable time) { this.time = time; }

 ? ?public Text getCode() { return code; }

 ? ?public void setCode(Text code) { this.code = code; }

 ? ?//寫入數(shù)據(jù)至流
 ? ?//用于框架對數(shù)據(jù)的處理
 ? ?//注意讀readFields和寫write的順序一致
 ? ?@Override
 ? ?public void write(DataOutput dataOutput) throws IOException {
 ? ? ? ?code.write(dataOutput);
 ? ? ? ?time.write(dataOutput);
 ? ?}

 ? ?//從流中讀取數(shù)據(jù)
 ? ?//將框架返回的數(shù)據(jù)提取出到對應屬性中來
 ? ?//注意讀readFields和寫write的順序一致
 ? ?@Override
 ? ?public void readFields(DataInput dataInput) throws IOException {
 ? ? ? ?code.readFields(dataInput);
 ? ? ? ?time.readFields(dataInput);
 ? ?}

 ? ?//Key排序
 ? ?@Override
 ? ?public int compareTo(CodeTimeTuple o) {
 ? ? ? ?//一次排序:股票代碼排序(這里要與組排序邏輯相同)
 ? ? ? ?int cmp = this.getCode().compareTo(o.getCode());
 ? ? ? ?//如果股票代碼相同,則按時間排序
 ? ? ? ?if(cmp != 0)
 ? ? ? ? ? ?return cmp;

 ? ? ? ?//二次排序:時間排序,結果乘以-1則降序排列,否則為升序排列
 ? ? ? ?return this.getTime().compareTo(o.getTime());

 ? ?}
}

(2) Map extends Mapper

輸入:一行數(shù)據(jù)(一只股票的日數(shù)據(jù))

處理:使用 \t 將字符串split成數(shù)組,提取需要計算的值,并轉為浮點數(shù)

輸出:<代碼時間對象, 收盤價>

? ? ? 遇到無效數(shù)據(jù)不輸出(停牌股票或有N/A數(shù)據(jù)無法提取為浮點數(shù))

? ? ? 在Map階段首先分割傳入的每一行的信息,忽略空置,取出收盤價,股票代碼和日期,把股票編號和日期封裝到CodeTimeTuple的序列化對象tuple里,在map輸出的時候,tuple作為鍵,收盤價為對應的值,在CodeTimeTuple類里實現(xiàn)了按照股票代碼和時間的二次排序,保證傳入reduce的是按照股票代碼和時間二次排序之后的升序排序


(3) GroupSort extends WritableComparator

創(chuàng)建一個排序比較器,修改組排序邏輯,按股票代碼排序

通過組排序保證了傳入Reduce的數(shù)據(jù)是排序之后按照股票代碼分組的數(shù)據(jù),保證了reduce可以合并相同股票代碼的數(shù)據(jù)


(4) Reduce extends Reducer

輸入:<代碼時間對象,[收盤價]>

處理:計算每個5日的滾動收益,并統(tǒng)計滾動收益為正的概率

輸出:<股票代碼,滾動收益為正的概率>

把傳入reduce的值存入到列表中,通過Rt= (C_t? - C_(t-5)? ) / C_(t-5)計算第t日的5日滾動收益,依次判斷每個股票代碼對應的所有的五日滾動收益是否為正,把正數(shù)的數(shù)量除以相同股票代碼的所有數(shù)量就可以得到每個股票代碼的5日滾動收益為正(賺錢)的概率,reduce輸出的鍵為股票代碼,值為每個股票代碼的5日滾動收益為正(賺錢)的概率。

package cn.edu.swpu.secondary;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class Ranking {
 ? ?public static void main(String[] args) throws Exception {

 ? ? ? ?//1、獲取job
 ? ? ? ?Configuration conf = new Configuration();
 ? ? ? ?Job job = Job.getInstance(conf);
 ? ? ? ?//2、設置jar包路徑
 ? ? ? ?job.setJarByClass(Ranking.class);
 ? ? ? ?//3、關聯(lián)Mapper和Reducer和Grouping
 ? ? ? ?job.setMapperClass(Map.class);
 ? ? ? ?job.setGroupingComparatorClass(Grouping.class);
 ? ? ? ?job.setReducerClass(Reduce.class);
 ? ? ? ?//4、設置map輸出的kv類型
 ? ? ? ?job.setMapOutputKeyClass(CodeTimeTuple.class);
 ? ? ? ?job.setMapOutputValueClass(FloatWritable.class);

 ? ? ? ?//5、設置最終輸出的kv類型
 ? ? ? ?job.setOutputKeyClass(Text.class);
 ? ? ? ?job.setOutputValueClass(FloatWritable.class);
 ? ? ? ?//6、設置輸入路徑和輸出路徑
 ? ? ? ?FileInputFormat.setInputPaths(job, new Path(args[0]));
 ? ? ? ?FileOutputFormat.setOutputPath(job, new Path(args[1]));
 ? ? ? ?//7、提交job
 ? ? ? ?boolean res = job.waitForCompletion(true);
 ? ? ? ?System.exit(res ? 0 : 1);

 ? ?}

 ? ?public static String value;

 ? ?public static class Map extends Mapper<LongWritable, Text, CodeTimeTuple, FloatWritable> {
 ? ? ? ?private final FloatWritable outV = new FloatWritable();
 ? ? ? ?@Override
 ? ? ? ?public void map(LongWritable key, Text value, Context context)
 ? ? ? ? ? ? ? ?throws IOException, InterruptedException {
 ? ? ? ? ? ?String line = value.toString();
 ? ? ? ? ? ?String[] items = line.split("\t");
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?if ((items[12].equals("False")) && (items[2].equals("N/A") == false)) {
 ? ? ? ? ? ? ? ? ? ?CodeTimeTuple tuple = new CodeTimeTuple();
 ? ? ? ? ? ? ? ? ? ?tuple.setCode(new Text(items[0])); ? ? ?//股票代碼
 ? ? ? ? ? ? ? ? ? ?Date date = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").parse(items[13] + " 00:00:00");
 ? ? ? ? ? ? ? ? ? ?tuple.setTime(new LongWritable(date.getTime()));//時間戳
 ? ? ? ? ? ? ? ? ? ?outV.set(Float.valueOf(items[3]));
 ? ? ? ? ? ? ? ? ? ?//context.write(tuple, outV);
 ? ? ? ? ? ? ? ? ? ?context.write(tuple, outV);
 ? ? ? ? ? ? ? ? ? ?//System.out.println("key: " + items[0] + " line: " + line);
 ? ? ? ? ? ? ? ?}
 ? ? ? ? ? ?} catch (ParseException e) {
 ? ? ? ? ? ? ? ?System.out.println(line);
 ? ? ? ? ? ? ? ?System.out.println(e.getMessage());
 ? ? ? ? ? ?}
 ? ? ? ?}
 ? ?}


 ? ?// Reduce過程
 ? ?public static class Reduce extends Reducer<CodeTimeTuple, FloatWritable, Text, FloatWritable> {
 ? ? ? ?// 創(chuàng)建一個股票代碼對象作為輸出的key
 ? ? ? ?Text stockCode = new Text();
 ? ? ? ?// 創(chuàng)建一個概率對象作為輸出的value
 ? ? ? ?FloatWritable probability = new FloatWritable();

 ? ? ? ?int j = 0;

 ? ? ? ?@Override
 ? ? ? ?public void reduce(CodeTimeTuple key, Iterable<FloatWritable> val, Context context) throws IOException, InterruptedException {
 ? ? ? ? ? ?try {
 ? ? ? ? ? ? ? ?String mark = "1";
 ? ? ? ? ? ? ? ?// 將收盤價列表轉為數(shù)組
 ? ? ? ? ? ? ? ?List<Float> closePrices = new ArrayList<Float>();
 ? ? ? ? ? ? ? ?for (FloatWritable value : val) {
 ? ? ? ? ? ? ? ? ? ?Date date1 = new Date(key.getTime().get());
 ? ? ? ? ? ? ? ? ? ?String dateString1 = new SimpleDateFormat("yyyy-MM-dd").format(date1);
 ? ? ? ? ? ? ? ? ? ?closePrices.add(value.get());
 ? ? ? ? ? ? ? ? ? ?System.out.println("INFO[" + mark + "] key:" + key.getCode().toString() + " " + dateString1 + " value:" + " " + value.toString() + " index:" + j);
 ? ? ? ? ? ? ? ?}
 ? ? ? ? ? ? ? ?j++;
 ? ? ? ? ? ? ? ?// 計算每個5日的滾動收益,并統(tǒng)計滾動收益為正的次數(shù)和總次數(shù)
 ? ? ? ? ? ? ? ?int positiveCount = 0;
 ? ? ? ? ? ? ? ?int totalCount = 0;
 ? ? ? ? ? ? ? ?for (int i = 5; i < closePrices.size(); i++) {
 ? ? ? ? ? ? ? ? ? ?float returnRate = (closePrices.get(i) - closePrices.get(i - 5)) / closePrices.get(i - 5);
 ? ? ? ? ? ? ? ? ? ?if (returnRate > 0) {
 ? ? ? ? ? ? ? ? ? ? ? ?positiveCount++;
 ? ? ? ? ? ? ? ? ? ?}
 ? ? ? ? ? ? ? ? ? ?totalCount++;
 ? ? ? ? ? ? ? ?}
 ? ? ? ? ? ? ? ?// 計算滾動收益為正的概率
 ? ? ? ? ? ? ? ?float positiveProbability = (float) positiveCount / totalCount;

 ? ? ? ? ? ? ? ?stockCode.set(key.getCode());
 ? ? ? ? ? ? ? ?probability.set(positiveProbability);
 ? ? ? ? ? ? ? ?// 輸出<股票代碼,滾動收益為正的概率>
 ? ? ? ? ? ? ? ?context.write(stockCode, probability);
 ? ? ? ? ? ?} catch (Exception e) {
 ? ? ? ? ? ? ? ?e.printStackTrace();
 ? ? ? ? ? ? ? ?//System.out.println("ERROR " + key.toString());
 ? ? ? ? ? ?}
 ? ? ? ?}
 ? ?}

 ? ?//組排序
 ? ?public static class Grouping extends WritableComparator {
 ? ? ? ?protected Grouping() {
 ? ? ? ? ? ?super(CodeTimeTuple.class, true);
 ? ? ? ?}
 ? ? ? ?@Override
 ? ? ? ?public int compare(WritableComparable a, WritableComparable b) {
 ? ? ? ? ? ?CodeTimeTuple key1 = (CodeTimeTuple) a;
 ? ? ? ? ? ?CodeTimeTuple key2 = (CodeTimeTuple) b;
 ? ? ? ? ? ?return key1.getCode().compareTo(key2.getCode());
 ? ? ? ? ? ?//直接return 0則表示不分組(所有key一個組)
 ? ? ? ? ? ?//return 0;
 ? ? ? ?}
 ? ?}
}


MapReduce高級程序設計的評論 (共 條)

分享到微博請遵守國家法律
钟祥市| 涞水县| 湖北省| 肥城市| 句容市| 贡觉县| 邯郸县| 莫力| 子长县| 吉水县| 旺苍县| 赣州市| 吕梁市| 铁岭县| 巩留县| 临漳县| 翁源县| 马尔康县| 榆中县| 南岸区| 四川省| 溆浦县| 白河县| 汾阳市| 黄大仙区| 宁晋县| 临猗县| 虞城县| 北海市| 丰城市| 五台县| 图木舒克市| 广汉市| 甘德县| 吉林市| 长泰县| 巢湖市| 泰安市| 天峨县| 米泉市| 张掖市|