MapReduce 第二部:深入分析与实践

news/2025/2/21 7:09:12

在第一部分中,我们了解了MapReduce的基本概念和如何使用Python2编写MapReduce程序进行简单的单词计数。今天,我们将深入探讨如何使用MapReduce处理更复杂的数据源,比如HDFS中的CSV文件,并将结果输出到HDFS。通过更复杂的实践案例,进一步了解MapReduce的应用。

1. 复杂的MapReduce任务概述

在实际生产环境中,数据通常存储在分布式文件系统中,例如HDFS(Hadoop Distributed File System)。MapReduce非常适合于这种场景,能够对HDFS中的大规模数据进行处理。在这部分中,我们将处理一个CSV文件,该文件存储着一些结构化的数据,例如用户访问记录或销售数据。

我们的目标是:

  1. 从HDFS中读取CSV文件。
  2. 进行数据处理(例如统计每个产品的销售总额)。
  3. 将结果输出回HDFS。
  4. 最后,使用HDFS命令检查结果。
2. 处理CSV文件的MapReduce任务

假设我们的CSV文件格式如下:

product_id,product_name,sales_amount
1,Product A,100
2,Product B,200
3,Product A,150
4,Product C,50
5,Product B,300
6,Product A,120

我们的任务是统计每个产品的总销售额,即将product_name作为键,sales_amount作为值,最终输出每个产品的销售总额。

3. 编写MapReduce代码
3.1 Mapper

在Map函数中,我们将每行CSV数据中的product_namesales_amount提取出来,并输出成(product_name, sales_amount)的键值对。

import sys
import csv

def mapper():
    for line in sys.stdin:
        # 跳过文件的表头
        if line.startswith("product_id"):
            continue
        # 读取CSV行并提取product_name和sales_amount
        columns = line.strip().split(",")
        product_name = columns[1]
        sales_amount = int(columns[2])
        
        # 输出 (product_name, sales_amount)
        print(f"{product_name}\t{sales_amount}")

在此代码中,我们首先跳过文件头部(如果有的话),然后从每行数据中提取出产品名称和销售金额,最后输出一个以product_name为键,sales_amount为值的键值对。

3.2 Reducer

Reducer的任务是对来自Mapper的相同product_namesales_amount进行求和,得到每个产品的总销售额。

import sys

def reducer():
    current_product = None
    total_sales = 0
    for line in sys.stdin:
        product_name, sales_amount = line.strip().split("\t")
        sales_amount = int(sales_amount)

        if current_product == product_name:
            total_sales += sales_amount
        else:
            if current_product:
                # 输出 (product_name, total_sales)
                print(f"{current_product}\t{total_sales}")
            current_product = product_name
            total_sales = sales_amount

    if current_product == product_name:
        print(f"{current_product}\t{total_sales}")

此代码的作用是对每个product_name的所有sales_amount进行求和,并输出结果。

3.3 执行MapReduce任务

现在,我们可以通过管道执行MapReduce任务,假设输入数据存储在HDFS中的/user/hadoop/input/sales.csv路径下,输出路径为/user/hadoop/output/sales_result

在终端中执行MapReduce任务:

hadoop fs -cat /user/hadoop/input/sales.csv | python mapper.py | sort | python reducer.py > result.txt

4. 将输出结果存储到HDFS

在前面的步骤中,输出结果保存在本地文件result.txt中。我们希望将结果直接写入HDFS。

为了将输出结果直接输出到HDFS,MapReduce任务通常由Hadoop执行,Hadoop的Streaming API允许我们将Map和Reduce任务提交到集群进行处理。以下是使用Hadoop提交作业的步骤:

  1. 将Python脚本上传到HDFS。
hadoop fs -put mapper.py /user/hadoop/mapper.py
hadoop fs -put reducer.py /user/hadoop/reducer.py
  1. 提交MapReduce作业。
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -input /user/hadoop/input/sales.csv \
    -output /user/hadoop/output/sales_result \
    -mapper "python2 /user/hadoop/mapper.py" \
    -reducer "python2 /user/hadoop/reducer.py"
  1. 查看结果。

MapReduce作业完成后,结果会存储在指定的输出目录(/user/hadoop/output/sales_result)中。我们可以使用HDFS命令查看输出文件:

hadoop fs -cat /user/hadoop/output/sales_result/part-00000

输出结果将会类似于:

Product A    370
Product B    500
Product C    50
5. 总结与优化

在这一部分中,我们介绍了如何使用MapReduce处理存储在HDFS中的CSV文件,并将结果输出回HDFS。通过这个实例,我们看到了如何将Map和Reduce函数与Hadoop的Streaming API结合使用,处理大规模分布式数据。

需要注意的是,MapReduce虽然是一种强大的分布式计算模型,但它的效率可能受限于多个因素:

  1. Shuffle过程:当数据量较大时,Shuffle过程可能导致网络瓶颈,影响性能。
  2. 优化Map和Reduce函数:为提高效率,可以使用适当的数据结构,避免不必要的计算,优化内存使用。

对于大数据任务,除了MapReduce,还有其他高效的处理框架(如Apache Spark),可以根据具体需求进行选择。

通过本教程,您已经能够使用MapReduce处理HDFS上的CSV数据,并将结果输出到HDFS。在实际生产环境中,这一过程可以扩展到更复杂的数据处理任务,例如日志分析、流量统计等。


http://www.niftyadmin.cn/n/5860426.html

相关文章

zero自动化框架搭建---Git安装详解

一、Git下载 下载安装包 官网下载 下载的地址就是官网即可:Git - Downloads 进来直接选择windows的安装包下载 选择安装位置 双击安装包安装,选择安装地址后点击next 选择安装的组件,默认即可 也可按照需要自行选择 Windows Explorer i…

MySQL 三层 B+ 树能存多少数据?

1. B树的基本结构 节点大小:在InnoDB中,B树的每个节点(页)大小通常是16KB。索引项大小:每个索引项的大小取决于主键和指针的大小。假设主键为8字节,指针为6字节,则每个索引项的大小约为14字节。…

leetcode_位运算 190.颠倒二进制位

190. 颠倒二进制位 颠倒给定的 32 位无符号整数的二进制位。 1. 字符串 class Solution:# param n, an integer# return an integerdef reverseBits(self, n):res "" # 创建一个保存结果的空字符串for b in str(bin(n))[2:]:# 遍历n的二进制数res b res # 把每…

线程与线程:从入门到放弃

引言 在计算机科学中,**线程**是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一个进程可以拥有多个线程,这些线程共享进程的内存空间和资源,但每个线程拥有独立的执行栈和程序计数器。 本…

JUC并发—8.并发安全集合二

大纲 1.JDK 1.7的HashMap的死循环与数据丢失 2.ConcurrentHashMap的并发安全 3.ConcurrentHashMap的设计介绍 4.ConcurrentHashMap的put操作流程 5.ConcurrentHashMap的Node数组初始化 6.ConcurrentHashMap对Hash冲突的处理 7.ConcurrentHashMap的并发扩容机制 8.Concu…

第1章大型互联网公司的基础架构——1.11 消息中间件技术

消息队列(Message Queue)是分布式系统中最重要的中间件之一,在服务架构设计中被广泛使用。 1.11.1 通信模式与用途 消息中间件构建了这样的通信模式: 一条消息由生产者创建,并被投递到存放消息的队列中;…

使用Python和正则表达式爬取网页中的URL数据

在数据抓取和网络爬虫开发中,提取网页中的URL是一个常见的需求。无论是用于构建网站地图、分析链接结构,还是进行内容聚合,能够高效地从HTML文档中提取URL都是一个重要的技能。Python作为一种强大的编程语言,结合其正则表达式模块…

ubuntu22.04使用minikube安装k8s

ubuntu使用minikube安装k8s 准备工作安装步骤安装docker安装kubectl安装minikube导入相关镜像安装相关指令启动minikube服务 安装dashboard组件导入相关镜像创建服务账号安装组件本体验证安装结果 准备工作 下载离线安装包,安装包内容如下: 软件说明ki…