最新消息: 电脑我帮您提供丰富的电脑知识,编程学习,软件下载,win7系统下载。

Data Lake AWS无服务器Amazon S3

IT培训 admin 2浏览 0评论

Data Lake AWS无服务器Amazon S3

我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。为了使该数据可用,我必须在AWS Glue数据目录中对其架构进行分类。

[我使用由Amazon S3触发器调用的AWS Lambda函数来执行此操作,以启动对数据进行分类的AWS Glue搜寻器。

搜寻器完成表定义的创建后,使用Amazon CloudWatch Events规则调用第二个Lambda函数。此步骤将启动一个AWS Glue ETL作业,以处理数据并将其输出到另一个Amazon S3存储桶(我们称为已处理区域)中。 AWS Glue ETL作业将数据转换为Apache Parquet格式并将其存储在已处理的S3存储桶中

Lambda以运行搜寻器:

var AWS = require('aws-sdk');
var glue = new AWS.Glue();
var sqs = new AWS.SQS();
    exports.handler = function(event, context,callback) {
        console.log(JSON.stringify(event, null, 3));
        if(event.Records.length > 0 && event.Records[0].eventSource == 'aws:sqs'){
            startCrawler('datacrawler', function(err2,data2){
                if(err2) callback(err2)
                else callback(null,data2)
            })
        }else{
        var dbName = 'datacatalog';
        var params = {
            DatabaseInput: {
                Name: dbName,
                Description: 'Rede Post database',
            }
        };
        glue.createDatabase(params, function(err, data) {
                var params1 = {
                    DatabaseName: dbName,
                    Name: 'datacrawler',
                    Role: 'service-role/rede-data-lake-GlueLabRole-1OI9OXN93676F',
                    Targets: {
                        S3Targets: [{ Path: 's3://rede-data-lake-raws3bucket-1qgllh1leebin/' }]
                    },
                    Description: 'crawler test'
                };
                glue.createCrawler(params1, function(err1, data1) {
                    startCrawler('datacrawler', function(err2,data2){
                        if(err2) callback(err2)
                        else callback(null,data2)
                    })
                });
        });
    };
};
function startCrawler(name,callback){
    var params = {
        Name: name,
    };
    glue.startCrawler(params, function(err, data) {
        if (err){
            console.log(JSON.stringify(err,null,3 ))
            var params1 = {
                MessageBody: 'retry',
                QueueUrl: ''
            };
            sqs.sendMessage(params1, function(err1, data1) {
                if (err1) callback(err1);
                else     callback(null, data1)
            });
        }
        else{
            callback(null, data)
        }
    });
    }

[Cloud Watch Event规则:

{
  "detail-type": [
    "Glue Crawler State Change"
  ],
  "source": [
    "aws.glue"
  ],
  "detail": {
    "crawlerName": [
      "datacrawler"
    ],
    "state": [
      "Succeeded"
    ]
  }
}

Lambda运行胶水作业:

var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "us-east-2" });
var s3 = new AWS.S3();
var glue = new AWS.Glue({apiVersion: '2017-03-31'});
    exports.handler = function(event, context, callback) {
    console.log(JSON.stringify(event, null, 3));
        var params = {
            JobName: 'GlueSalesJob',
            Timeout: 20,
        };
        glue.startJobRun(params, function(err1, data1) {
            if (err1) {
                console.log(err1, err1.stack);}
            else {
                console.log(data1);}
        });
        console.log(JSON.stringify(event, null, 3));
    };

当我们只处理一个文件和一个胶水工作时,一切正常,我看不到如何缩放它。

想象一下,我有各种不同的文件到达原始区域,每个文件进入一个文件夹,对于每个文件,我必须运行AWS Glue搜寻器和AWS Glue ETL作业,并将其存储在已处理区域存储桶中的一个文件夹中。 >

例如:SaleFile,installmentsFile,DebitFiles等...

我如何通过传递应为每个文件运行的作业的名称来调用第二个lambda?基本上我需要确定提取文件或文件夹以调用适当的胶水作业。

有人可以帮助我找到解决方案吗?感谢您的帮助。我对Amazon非常陌生。

我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。制作...

回答如下:

好!您快要到了:-)

Data Lake AWS无服务器Amazon S3

我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。为了使该数据可用,我必须在AWS Glue数据目录中对其架构进行分类。

[我使用由Amazon S3触发器调用的AWS Lambda函数来执行此操作,以启动对数据进行分类的AWS Glue搜寻器。

搜寻器完成表定义的创建后,使用Amazon CloudWatch Events规则调用第二个Lambda函数。此步骤将启动一个AWS Glue ETL作业,以处理数据并将其输出到另一个Amazon S3存储桶(我们称为已处理区域)中。 AWS Glue ETL作业将数据转换为Apache Parquet格式并将其存储在已处理的S3存储桶中

Lambda以运行搜寻器:

var AWS = require('aws-sdk');
var glue = new AWS.Glue();
var sqs = new AWS.SQS();
    exports.handler = function(event, context,callback) {
        console.log(JSON.stringify(event, null, 3));
        if(event.Records.length > 0 && event.Records[0].eventSource == 'aws:sqs'){
            startCrawler('datacrawler', function(err2,data2){
                if(err2) callback(err2)
                else callback(null,data2)
            })
        }else{
        var dbName = 'datacatalog';
        var params = {
            DatabaseInput: {
                Name: dbName,
                Description: 'Rede Post database',
            }
        };
        glue.createDatabase(params, function(err, data) {
                var params1 = {
                    DatabaseName: dbName,
                    Name: 'datacrawler',
                    Role: 'service-role/rede-data-lake-GlueLabRole-1OI9OXN93676F',
                    Targets: {
                        S3Targets: [{ Path: 's3://rede-data-lake-raws3bucket-1qgllh1leebin/' }]
                    },
                    Description: 'crawler test'
                };
                glue.createCrawler(params1, function(err1, data1) {
                    startCrawler('datacrawler', function(err2,data2){
                        if(err2) callback(err2)
                        else callback(null,data2)
                    })
                });
        });
    };
};
function startCrawler(name,callback){
    var params = {
        Name: name,
    };
    glue.startCrawler(params, function(err, data) {
        if (err){
            console.log(JSON.stringify(err,null,3 ))
            var params1 = {
                MessageBody: 'retry',
                QueueUrl: ''
            };
            sqs.sendMessage(params1, function(err1, data1) {
                if (err1) callback(err1);
                else     callback(null, data1)
            });
        }
        else{
            callback(null, data)
        }
    });
    }

[Cloud Watch Event规则:

{
  "detail-type": [
    "Glue Crawler State Change"
  ],
  "source": [
    "aws.glue"
  ],
  "detail": {
    "crawlerName": [
      "datacrawler"
    ],
    "state": [
      "Succeeded"
    ]
  }
}

Lambda运行胶水作业:

var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "us-east-2" });
var s3 = new AWS.S3();
var glue = new AWS.Glue({apiVersion: '2017-03-31'});
    exports.handler = function(event, context, callback) {
    console.log(JSON.stringify(event, null, 3));
        var params = {
            JobName: 'GlueSalesJob',
            Timeout: 20,
        };
        glue.startJobRun(params, function(err1, data1) {
            if (err1) {
                console.log(err1, err1.stack);}
            else {
                console.log(data1);}
        });
        console.log(JSON.stringify(event, null, 3));
    };

当我们只处理一个文件和一个胶水工作时,一切正常,我看不到如何缩放它。

想象一下,我有各种不同的文件到达原始区域,每个文件进入一个文件夹,对于每个文件,我必须运行AWS Glue搜寻器和AWS Glue ETL作业,并将其存储在已处理区域存储桶中的一个文件夹中。 >

例如:SaleFile,installmentsFile,DebitFiles等...

我如何通过传递应为每个文件运行的作业的名称来调用第二个lambda?基本上我需要确定提取文件或文件夹以调用适当的胶水作业。

有人可以帮助我找到解决方案吗?感谢您的帮助。我对Amazon非常陌生。

我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。制作...

回答如下:

好!您快要到了:-)

与本文相关的文章

发布评论

评论列表 (0)

  1. 暂无评论