2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答(第二题)
第一题链接2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答题目:请使用MapReduce统计 calls.txt中的每个手机号码的,呼叫时长和呼叫次数,被叫时长,被叫次数 ,并输出格式 为 手机号码,呼叫时长,呼叫次数,被叫时长,被叫次数;calls.txt 通话记录样例:18620192711,15733218050,1506628174,1506628265
第一题链接
2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答
题目:请使用MapReduce统计 calls.txt中的每个手机号码的,呼叫时长和呼叫次数,被叫时长,被叫次数 ,并输出格式 为 手机号码,呼叫时长,呼叫次数,被叫时长,被叫次数;
calls.txt 通话记录
样例:18620192711,15733218050,1506628174,1506628265,650000,810000
字段分别为:
呼叫者手机号,接受者手机号,开始时间戳,结束时间戳,呼叫者地址省份编码,接受者地址省份编码
package Demo.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.net.URI;
import java.util.Date;
/**
* send_time = 发送者手机号
* receive_time = 接收者手机号
* talk_time = 通话持续时间
* send_time = 呼叫时长
* receive_time = 被叫时长
* send_count = 呼叫次数
* receive_count = 被叫次数
*/
public class subject2 {
public static class demoMapper extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String send_phone = split[0];
String receive_phone = split[1];
Date time1 = new Date(Long.parseLong(split[2]) * 1000L);
Date time2 = new Date(Long.parseLong(split[3]) * 1000L);
long talk_time = (time2.getTime() - time1.getTime())/1000;
context.write(new Text(send_phone),new Text("send,"+talk_time));
context.write(new Text(receive_phone),new Text("receive,"+talk_time));
}
}
public static class demoReducer extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int send_time = 0;
int receive_time = 0;
int send_count = 0;
int receive_count = 0;
for (Text value : values) {
String string = value.toString();
String[] split = string.split(",");
if("send".equals(split[0])){
send_time += Integer.parseInt(split[1]);
send_count++;
}else{
receive_time += Integer.parseInt(split[1]);
receive_count++;
}
}
context.write(new Text(key),new Text(","+send_time+"秒,"+send_count+"次,"+receive_time+"秒,"+receive_count+"次"));
}
}
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("zhang");
job.setJarByClass(subject2.class);
job.setMapperClass(demoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(demoReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定路径
Path input1 = new Path("hdfs://master:9000/data/calls.txt");
FileInputFormat.addInputPath(job,input1);
Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在
//获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动
job.waitForCompletion(true);
}
}
结果为
这题主要的难点在于能不能想到,在Map端写两个 context.write()语句。即传入两个字段的值作为key,同时还要区分value的值
这题要传到reduce端里面的有两个字段,但是这两个字段的值其实是一致的,都是手机号码。因此即使写了两个 context.write() 语句,具体传入的时候也是以一个个的手机号码来送入reduce端的。相同的key值,即相同的手机号会被组合在一起,但是value值需要区分。因为同一个手机号有两个身份,一个身份是呼叫者,对应呼叫时长和呼叫次数。另一个身份是被呼叫者,对应被叫时长和被叫次数。
呼叫次数和和被叫次数,可以通过在reduce端遍历values的数量时用count++的方式来统计
但是map端要传给reduce端,通话时长。因此给value值,也就是通话时长,加上一个前缀。然后在reduce端用equals匹配这个前缀,这样就区分了同一个手机号的呼叫时长与被呼时长
另外注意呼叫时长与被呼时长应该累加,而不是直接输出,因为values里面会有多个通话时间,按照前缀分成两个分区,呼叫时长分区和被呼时长分区,每个分区里面依旧会有多个通话时间的值,此时应该累加得到最终结果。
更多推荐
所有评论(0)