使用AWS Glue与AWS Kinesis构建的流式ETL作业(一)——数据实时采集

news/2024/6/17 3:00:50 标签: aws, etl, 原型模式

大纲

    • 1 数据采集准备工作
      • 1.1 研究的背景
      • 1.2 使用Glue构建流式ETL的原因
      • 1.3 无服务器流式ETL架构
      • 1.4 架构
      • 1.5 AWS Kinesis Data Stream创建
      • 1.6 AWS CloudWatch数据筛选
        • 1.6.1 AWS IAM角色权限
          • 1.6.1.1 可信实体
          • 1.6.1.2 策略
      • 1.7 AWS Kinesis中的数据验证
        • 1.7.1 验证代码
        • 1.7.2 结果
      • 1.8 总结

1 数据采集准备工作

1.1 研究的背景

更高效的从项目的数据集中提取有意义的数据,并进行统计分析。

1.2 使用Glue构建流式ETL的原因

AWS Glue中的流式ETL是基于Apache Spark的结构化流引擎。该引擎提供一种高容错、可扩展且易于实现的方法,能够实现端到端的流处理。

1.3 无服务器流式ETL架构

在此流式ETL架构中,将使用AWS Lambda模拟创建日志和创建AWS CloudWatch指标,并将其以流的形式发布至AWS Kinesis Data Streams中。我们还将在AWS Glue中创建一项流式ETL作业,该作业以微批次(间隔性批次处理)的形式获取连续生成的stream数据,并对数据进行转换、聚合,最后将结果传递至接收器。开发人员利用这部分结果生成可视化图表或在下游流程中继续使用。

1.4 架构

在这里插入图片描述

1.5 AWS Kinesis Data Stream创建

我们使用AWS Kinesis Data Stream来实时捕获数据,它可以从数十万个数据源提取并存储数据流,其中包括:

  • 日志和事件数据采集(如AWS CloudWatch)
  • 设备数据捕获
  • 移动数据采集
  • 游戏数据源

此案例中,我们将从CloudWatch中进行数据采集

步骤图例
1、入口在这里插入图片描述
2、创建(按需模式无需手动预置和扩展数据流)在这里插入图片描述

1.6 AWS CloudWatch数据筛选

前置条件:已准备好用来进行数据采集的AWS CloudWatch
我们将会在某个AWS CloudWatch日志组中创建日志筛选条件

步骤图例
1、入口在这里插入图片描述
2、选择上步中创建的AWS Kinesis在这里插入图片描述
3、AWS IAM角色(需要有AWS Kinesis Data Stream的权限在这里插入图片描述权限与实体见下方“AWS IAM角色权限”
4、配置筛选条件(可根据日志格式自定义)(例如:图中配置为筛选包含"is_save_kinesis"的数据在这里插入图片描述
5、测试数据(可以选定某条日志流,或自定义数据进行测试结果显示)在这里插入图片描述
6、完成日志筛选条件创建(每个日志组最多只能创建两条在这里插入图片描述
1.6.1 AWS IAM角色权限
1.6.1.1 可信实体
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "logs.【区域】.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringLike": {
                    "aws:SourceArn": "【CloudWatch的ARN】"
                }
            }
        }
     ]
}
1.6.1.2 策略
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "WriteOutputKinesis",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": [
                "【Kinesis Data Stream的ARN】"
            ]
        }
    ]
}

1.7 AWS Kinesis中的数据验证

前置条件:一个已绑定上 以AWS Kinesis作为触发器的AWS Lambda实例
此案例也可使用AWS Lambda来实现数据流的处理。每当AWS Kinesis Data Stream中传入数据时,就会触发绑定了Kinesis的AWS Lambda,由AWS Lambda来对数据进行清洗、转换和存储。
在我们向被监测的AWS CloudWatch中发送一条日志数据后,将会在AWS Kinesis Data Stream控制台监控到数据的流入。
在这里插入图片描述
在这里插入图片描述
接下来,我们将会验证解析一下Kinesis Data Stream中的数据与格式。
原始数据存储在event.Records[0].kinesis.data中(下一步的ETL工作中,我们会从此处获取数据)

1.7.1 验证代码
def lambda_handler(event, context):
    raw_kinesis_records = event['Records']
    # records = deaggregate_records(raw_kinesis_records)
    records = raw_kinesis_records
    for record in records:
        #Kinesis data is base64 encoded so decode here
        payload = base64.b64decode(record["kinesis"]["data"], validate = False)
        data = gzip.decompress(payload).decode("utf-8")
		print(data)
1.7.2 结果

其中的message为我们的原始数据的字符串
在这里插入图片描述

1.8 总结

在此案例中,我们使用了CloudWatch + Kinesis Data Stream完成了前期的数据实时采集的工作,并且,使用了Lambda来作为触发器来对数据进行了一个验证操作(也可使用Lambda来进行ETL工作)。


http://www.niftyadmin.cn/n/5249466.html

相关文章

自动化操作脚本

文章目录 vbsopenCV pyautogui vbs SSH连接并执行指令操作 Dim WshShell Set WshShellWScript.CreateObject("WScript.Shell") WshShell.Run "cmd.exe" WScript.Sleep 1000 WshShell.SendKeys "ssh xcmg10.27.40.103" WshShell.SendKeys &qu…

IP地址如何用于流量管理?

随着互联网的普及和网络流量的不断增加,流量管理成为了网络运营中至关重要的一环。而IP地址作为互联网中的重要标识符,也可以被广泛应用于流量管理中。 IP地址是互联网协议(IP)中用于标识和定位网络设备的32位二进制地址。通过IP地…

屏幕分辨率修改工具SwitchResX mac功能特点

SwitchResX mac是可用于修改和管理显示器的分辨率和刷新率。 SwitchResX mac功能和特点 支持多种分辨率和刷新率:SwitchResX可以添加和管理多种分辨率和刷新率,包括自定义分辨率和刷新率。 自动切换分辨率:SwitchResX可以根据应用程序和窗口…

docker 资源控制

Docker的资源控制 对容器使用宿主机的资源进行限制,如cpu,内存,磁盘I/O Docker使用linux自带的功能cgroup(control grouos)是linux内核系统提供的一种可以限制,记录,隔离进程组使用的物理资源 Docker借助这个机制&…

JDBC链接MySQL,实现对Goods表的增删改查并封装JDBC

项目目录结构 数据库配置 1.创建goods表 2.创建goods实体 package homework.MyJDBC;public class Goods {private int id;private String gId;private String gName;private float gPrice;private int gNum;public int getId() {return id;}public void setId(int id) {this.i…

JAVA实现敏感词高亮或打码过滤:sensitive-word

练手项目中实现发表文章时检测文章是否带有敏感词,以及对所有敏感词的一键过滤功能 文章目录 效果预览实现步骤 效果预览 随便复制一篇内容到输入框 机器审核文章存在敏感词,弹消息提示并进入人工审核阶段(若机器审核通过,则无需审…

正则表达式(7):转义符

正则表达式(7):正则表达式(5):转义符 本博文转载自 此处,我们来认识一个常用符号,它就是反斜杠 “\” 反斜杠有什么作用呢?先不着急解释,先来看个小例子。 …

LambdaUpdateWrapper表达式新写法解释

解释下面java代码的意思&#xff0c;代码如下所示&#xff1a;paymentRecordMapper.update(null, new LambdaUpdateWrapper<FcPaymentRecord>().set(FcPaymentRecord::getLoanAccount, qjylRepaymentApplyReqDTO.getPayseqno()) .eq(FcPayme…