Data Lake AWS无服务器Amazon S3
我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。为了使该数据可用,我必须在AWS Glue数据目录中对其架构进行分类。
[我使用由Amazon S3触发器调用的AWS Lambda函数来执行此操作,以启动对数据进行分类的AWS Glue搜寻器。
搜寻器完成表定义的创建后,使用Amazon CloudWatch Events规则调用第二个Lambda函数。此步骤将启动一个AWS Glue ETL作业,以处理数据并将其输出到另一个Amazon S3存储桶(我们称为已处理区域)中。 AWS Glue ETL作业将数据转换为Apache Parquet格式并将其存储在已处理的S3存储桶中
Lambda以运行搜寻器:
var AWS = require('aws-sdk');
var glue = new AWS.Glue();
var sqs = new AWS.SQS();
exports.handler = function(event, context,callback) {
console.log(JSON.stringify(event, null, 3));
if(event.Records.length > 0 && event.Records[0].eventSource == 'aws:sqs'){
startCrawler('datacrawler', function(err2,data2){
if(err2) callback(err2)
else callback(null,data2)
})
}else{
var dbName = 'datacatalog';
var params = {
DatabaseInput: {
Name: dbName,
Description: 'Rede Post database',
}
};
glue.createDatabase(params, function(err, data) {
var params1 = {
DatabaseName: dbName,
Name: 'datacrawler',
Role: 'service-role/rede-data-lake-GlueLabRole-1OI9OXN93676F',
Targets: {
S3Targets: [{ Path: 's3://rede-data-lake-raws3bucket-1qgllh1leebin/' }]
},
Description: 'crawler test'
};
glue.createCrawler(params1, function(err1, data1) {
startCrawler('datacrawler', function(err2,data2){
if(err2) callback(err2)
else callback(null,data2)
})
});
});
};
};
function startCrawler(name,callback){
var params = {
Name: name,
};
glue.startCrawler(params, function(err, data) {
if (err){
console.log(JSON.stringify(err,null,3 ))
var params1 = {
MessageBody: 'retry',
QueueUrl: ''
};
sqs.sendMessage(params1, function(err1, data1) {
if (err1) callback(err1);
else callback(null, data1)
});
}
else{
callback(null, data)
}
});
}
[Cloud Watch Event规则:
{
"detail-type": [
"Glue Crawler State Change"
],
"source": [
"aws.glue"
],
"detail": {
"crawlerName": [
"datacrawler"
],
"state": [
"Succeeded"
]
}
}
Lambda运行胶水作业:
var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "us-east-2" });
var s3 = new AWS.S3();
var glue = new AWS.Glue({apiVersion: '2017-03-31'});
exports.handler = function(event, context, callback) {
console.log(JSON.stringify(event, null, 3));
var params = {
JobName: 'GlueSalesJob',
Timeout: 20,
};
glue.startJobRun(params, function(err1, data1) {
if (err1) {
console.log(err1, err1.stack);}
else {
console.log(data1);}
});
console.log(JSON.stringify(event, null, 3));
};
当我们只处理一个文件和一个胶水工作时,一切正常,我看不到如何缩放它。
想象一下,我有各种不同的文件到达原始区域,每个文件进入一个文件夹,对于每个文件,我必须运行AWS Glue搜寻器和AWS Glue ETL作业,并将其存储在已处理区域存储桶中的一个文件夹中。 >
例如:SaleFile,installmentsFile,DebitFiles等...
我如何通过传递应为每个文件运行的作业的名称来调用第二个lambda?基本上我需要确定提取文件或文件夹以调用适当的胶水作业。
有人可以帮助我找到解决方案吗?感谢您的帮助。我对Amazon非常陌生。
我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。制作...
回答如下:好!您快要到了:-)
Data Lake AWS无服务器Amazon S3
我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。为了使该数据可用,我必须在AWS Glue数据目录中对其架构进行分类。
[我使用由Amazon S3触发器调用的AWS Lambda函数来执行此操作,以启动对数据进行分类的AWS Glue搜寻器。
搜寻器完成表定义的创建后,使用Amazon CloudWatch Events规则调用第二个Lambda函数。此步骤将启动一个AWS Glue ETL作业,以处理数据并将其输出到另一个Amazon S3存储桶(我们称为已处理区域)中。 AWS Glue ETL作业将数据转换为Apache Parquet格式并将其存储在已处理的S3存储桶中
Lambda以运行搜寻器:
var AWS = require('aws-sdk');
var glue = new AWS.Glue();
var sqs = new AWS.SQS();
exports.handler = function(event, context,callback) {
console.log(JSON.stringify(event, null, 3));
if(event.Records.length > 0 && event.Records[0].eventSource == 'aws:sqs'){
startCrawler('datacrawler', function(err2,data2){
if(err2) callback(err2)
else callback(null,data2)
})
}else{
var dbName = 'datacatalog';
var params = {
DatabaseInput: {
Name: dbName,
Description: 'Rede Post database',
}
};
glue.createDatabase(params, function(err, data) {
var params1 = {
DatabaseName: dbName,
Name: 'datacrawler',
Role: 'service-role/rede-data-lake-GlueLabRole-1OI9OXN93676F',
Targets: {
S3Targets: [{ Path: 's3://rede-data-lake-raws3bucket-1qgllh1leebin/' }]
},
Description: 'crawler test'
};
glue.createCrawler(params1, function(err1, data1) {
startCrawler('datacrawler', function(err2,data2){
if(err2) callback(err2)
else callback(null,data2)
})
});
});
};
};
function startCrawler(name,callback){
var params = {
Name: name,
};
glue.startCrawler(params, function(err, data) {
if (err){
console.log(JSON.stringify(err,null,3 ))
var params1 = {
MessageBody: 'retry',
QueueUrl: ''
};
sqs.sendMessage(params1, function(err1, data1) {
if (err1) callback(err1);
else callback(null, data1)
});
}
else{
callback(null, data)
}
});
}
[Cloud Watch Event规则:
{
"detail-type": [
"Glue Crawler State Change"
],
"source": [
"aws.glue"
],
"detail": {
"crawlerName": [
"datacrawler"
],
"state": [
"Succeeded"
]
}
}
Lambda运行胶水作业:
var AWS = require('aws-sdk');
var sns = new AWS.SNS( { region: "us-east-2" });
var s3 = new AWS.S3();
var glue = new AWS.Glue({apiVersion: '2017-03-31'});
exports.handler = function(event, context, callback) {
console.log(JSON.stringify(event, null, 3));
var params = {
JobName: 'GlueSalesJob',
Timeout: 20,
};
glue.startJobRun(params, function(err1, data1) {
if (err1) {
console.log(err1, err1.stack);}
else {
console.log(data1);}
});
console.log(JSON.stringify(event, null, 3));
};
当我们只处理一个文件和一个胶水工作时,一切正常,我看不到如何缩放它。
想象一下,我有各种不同的文件到达原始区域,每个文件进入一个文件夹,对于每个文件,我必须运行AWS Glue搜寻器和AWS Glue ETL作业,并将其存储在已处理区域存储桶中的一个文件夹中。 >
例如:SaleFile,installmentsFile,DebitFiles等...
我如何通过传递应为每个文件运行的作业的名称来调用第二个lambda?基本上我需要确定提取文件或文件夹以调用适当的胶水作业。
有人可以帮助我找到解决方案吗?感谢您的帮助。我对Amazon非常陌生。
我试图使用Amazon Simple Storage Service(Amazon S3)作为主要数据存储来构建无服务器数据湖。提取的数据位于我们称为原始区域的Amazon S3存储桶中。制作...
回答如下:好!您快要到了:-)