9.数据采集与监控知识点

时间: 2023-07-11 admin IT培训

9.数据采集与监控知识点

9.数据采集与监控知识点

一、项目介绍

1.1 项目规划

......

1.2 数据采集&监控的项目架构

1)架构一(我们要使用的)

2)架构二(涉及到kafka,spark,flink)

1.3 项目背景

1. 数据从何处来
2. 数据有哪些类型
3. 针对于不同的数据进行搜集的工具
4. 采集数据的指标监控
5. 采集过程的警报
6. 性能优化

1.4 数据分类

搜集的数据,都应该是自己公司的产品的数据。这个项目我们是模拟一些数据。比如C端的日志数据,内容数据,还有RDBMS里的业务数据。

  1. 日志数据
指的是用户与app进行交互时产生的行为数据。比如在什么时间登陆,点击了什么栏目这些数据
  1. 内容数据
app上发表的文件,图片,音频,视频等

3)业务数据

业务系统在RDBMS中存储的表数据,通常是业务中的增删改对应的数据

4)元数据

用来描述表的数据,称之为元数据,比如mysql中的
create table emp (
empno int,
ename varchar(20) comment '姓名',
....
)

在元数据表meta_1中

id     tablename     databasename    
1      emp            sz2003
2      dept           sz2003

另外一张表meta_2

id     columnName    type    comment
1       empno         int      null
1       ename         varchar   姓名
1       job           varchar   
...
2       deptno          int

1.5 数据格式

  1. 日志数据
# 原始base64
eyJjb25ldG50Ijp7ImRpc3RpbmN0X2lkIjoiNjkyNDA3MiIsInByb3BlcnRpZXMiOnsibW9kZWwiOiJIUlktQUwwMGEiLCJuZXR3b3JrX3R5cGUiOiJXSUZJIiwiaXNfY2hhcmdpbmciOiIyIiwiYXBwX3ZlcnNpb24iOiI0LjQuNSIsImVsZW1lbnRfbmFtZSI6IuaIkeeahOmSseWMhemhtSIsImVsZW1lbnRfcGFnZSI6IummlumhtSIsImNhcnJpZXIiOiLkuK3lm73np7vliqgiLCJvcyI6ImFuZHJvaWQiLCJpbWVpIjoiOTM4ZDczMWY0MTg3NGRhMCIsImJhdHRlcnlfbGV2ZWwiOiI2OSIsInNjcmVlbl93aWR0aCI6IjEwODAiLCJkZXZpY2VfaWQiOiJlZDcxZDdkZi0yZjVjLTY2ZDMtY2JmYi01M2Y1NWJjNzg5OTkiLCJjbGllbnRfdGltZSI6IjIwMjAtMDQtMjUwNzo1OTo1MCIsImlwIjoiMTIxLjU2Ljc5LjQiLCJ3aWZpIjoiMSIsIm1hbnVmYWN0dXJlciI6IkhVQVdFSSIsInNjcmVlbl9oZWlnaHQiOiIyMzQwIn0sImV2ZW50IjoiQXBwQ2xpY2sifSwicHJvamVjdCI6Im5ld3MiLCJjdGltZSI6IjE1ODc3NzU3NDUifQo=
# decode之后的json
{"content": {"distinct_id": "6924072", # 用户ID"properties": {"model": "HRY-AL00a", #机型"network_type": "WIFI", #用户网络类型"is_charging": "2", #是否充电中"app_version": "4.4.5", #app版本"element_name": "我的钱包页", #元素名称"element_page": "首页", #元素所在页面"carrier": "中国移动", #运营商"os": "android", #操作系统"imei": "938d731f41874da0", #手机IMEI号"battery_level": "69", #手机电量"screen_width": "1080", #屏幕宽度"device_id": "ed71d7df-2f5c-66d3-cbfb-53f55bc78999", #设备ID"client_time": "2020-04-25 07:59:50",#客户端上报此条日志时间"ip": "121.56.79.4", #客户端IP地址"manufacturer": "HUAWEI", #制造商"screen_height": "2340", #屏幕高度"client_time":"1587771745000" # 客户端上报日志时间},"event": "AppClick" # 事件名称},"project": "news", #产品名称"ctime": "1587775745000" #服务器接收到日志时间
}
  1. 内容数据
{"article_id": 487186016, # 文章ID,新闻的唯一标识"type_name": " 科技", #新闻类型"pub_time": "2020-04-20 19:45:36.919", # 新闻发布时间"title": "小米10pro 30w无线充电对比华为40w有线充电", # 新闻标题"content": "<p>之前做了一个小米10pro 30w无线充电对比华为40w有线充电,米10pro无线充电充满只需要55分钟。这次oppoACE2,看了其他媒体测试的40w快充,前半小时还比不上米10pro,总时间才快了5分钟。然而一个40w,一个30w,一个4000毫安,一个4500毫安。。。这40w真的阳痿</p><p><img 							      					src=\"/												2013b7e.jpg\" style=\"max-width:600px;\"></p>", # 新闻内容html格式"source_name": "今日头条移动端-数码", # 新闻来源"tags": "pro,充电,小时,测试,充满,比不上,需要" # 内容标签
}

3)业务数据

-- 元信息表,已经过简化
CREATE TABLE `meta` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID,主键',`field` varchar(50) NOT NULL DEFAULT '' COMMENT '字段名称',`filed_type` varchar(20) NOT NULL DEFAULT '' COMMENT '字段类型',`field_desc` varchar(255) DEFAULT NULL COMMENT '字段说明',`app_version` varchar(10) NOT NULL DEFAULT '' COMMENT '上线版本号',`status` tinyint(4) DEFAULT '0' COMMENT '字段状态,0 下线 1 上线',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;
-- 广告信息表,已经过简化
CREATE TABLE `ad_info` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增ID,主键',`ad_id` int(11) DEFAULT NULL COMMENT '广告ID',`advertiser_id` int(11) DEFAULT NULL COMMENT '广告商ID,一个广告商会投放多个广告',`advertiser_name` varchar(255) DEFAULT NULL COMMENT '广告商名称',`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

二、项目架构的部署

2.1 LVS 负载均衡

对于负载均衡LVS的环境,因为这部分的内容偏重运维,并不是我们我们大数据课程要掌握的内容,所以在这里我们不搭建这个环境,大家只要知晓工程中我们在高并发场景下,可以使用LVS做负载均衡就可以了。在这里我们结合数据采集的场景,把LVS的作用再说明一下

例如:我们用一台机器来作为数据收集的机器,但是单台的机器的性能是有极限的,比如4C8G单台机器只能抗住我们 10000 QPS,但我们的流量高峰有15000 QPS,那我们再加一台机器就可以了。但是这两台机器如何统一对外提供服务呢,我们的目的简单来讲有两点,第一,对外是一个IP提供服务。 第二,两台机器流量要相对均衡分配,每台机器都分到7500 QPS. 那LVS就可以实现我们的目的,那咱们再说一个LVS最简单的流量调度算法,Round Robin Scheduling(RR) 轮询算法,就是依次把请求分配给RS(real server),就是我们的那两台机器。 LVS有8种调度算法,来满足不同场景的需求,在大数据这门课程中,并不需要我们掌握。另外对于负载均衡(SLB),LVS只是负载均衡的一种实现方式,不要片面的认为LVS就是SLB。对于LVS我们就讲到这里,不再深入。

2.2 Nginx接口部署及其测试

2.2.1 目录结构说明

因为要使用openresty来存储行为数据,也就是要用openresty内嵌的nginx先采集日志,因此设计一下目录结构

如果不是root用户,需要使用sudo指令,如果是root用户,不需要使用sudo.

# 用于存储此应用的各种信息的根目录
sudo mkdir -p /opt/apps/collect-app/
# 用于存储此应用的nginx的主配置文件
sudo mkdir -p /opt/apps/collect-app/conf/
# 用于存储此应用的nginx的access.log  error.log
sudo mkdir -p /opt/apps/collect-app/logs/
# 用于存储此应用的nginx的副配置文件
sudo mkdir -p /opt/apps/collect-app/conf/vhost/
# 拷贝nginx的mine.types文件到此采集项目的conf目录下
sudo cp /usr/local/openresty/nginx/conf/mime.types /opt/apps/collect-app/conf/

2.2.2 主配置文件的配置

编写此采集项目的nginx的主配置文件,命名为main-nginx.conf, 放到/opt/apps/collect-app/conf/ 目录下

# main-nginx.conf
# nginx 用户和组
user root root;
# work进程数,
worker_processes 4;
# 错误日志路径,和日志级别    
error_log logs/collect-app-error.log error;
# nginx pid文件
pid       logs/collect-app-nginx.pid;
# 单个worker最大打开的文件描述符个数
worker_rlimit_nofile 65535;
events
{#使用epoll模型, 一个优化策略use epoll;# 单个worker进程允许的最多连接数worker_connections 65535;
}
http
{include mime.types;default_type application/octet-stream;gzip on;# 指定压缩的最小字节数gzip_min_length 1k;# 16k是 缓冲的单位,4是倍数,可以是其他值作为倍数gzip_buffers 4 16k;gzip_http_version 1.0;# 压缩等级gzip_comp_level 2;# 可以压缩的文件类型gzip_types text/plain application/x-javascript text/css application/xml;#启用应答头"Vary: Accept-Encoding"gzip_vary on;# 参数中的下划线是否忽略,on表示忽略underscores_in_headers on;log_format main'$remote_addr - $remote_user [$time_local] ''$request_length ''"$request" $status $bytes_sent $body_bytes_sent ''"$http_referer" "$http_user_agent" ''"$gzip_ratio" "$request_time" ''"$upstream_addr" "$upstream_status" "$upstream_response_time"';# 定义我们数据采集的 access 日志格式log_format collect-app-log-format '$cad';   open_log_file_cache max=1000 inactive=60s;keepalive_timeout 0;client_max_body_size 20m;include /opt/apps/collect-app/conf/vhost/*.conf;
}

测试主配置文件:因为里面的变量cad还没有定义,就直接使用,所以会报错,继续配置下去就行,不用管

[root@qianfeng01 conf]# openresty -p /opt/apps/collect-app -c conf/main-nginx.conf -t报以下错误
nginx: [emerg] unknown "cad" variable
nginx: configuration file /opt/apps/collect-app/conf/main-nginx.conf test failed

2.2.3 副配置文件的配置

编写此采集项目的nginx的副配置文件,命名为minor-nginx1.conf, 放到/opt/apps/collect-app/conf/vhost/ 目录下

#minor-nginx1.conf
server {listen  8802 default_server;# lua_need_request_body on;client_max_body_size 5M;client_body_buffer_size 5M;   location /data/v1 {set $cad  '';content_by_lua_block {-- cjson模块local cjson = require "cjson"-- 读取请求体信息ngx.req.read_body()-- 请求体信息存放到 body_data变量中local body_data = ngx.req.get_body_data()-- 如果请求体为空,返回错误if body_data == nil  thenngx.say([[{"code":500,"msg":"req body nil"}]])return end-- 定义当前时间local current_time = ngx.now()*1000-- 请求的URL project参数中获取其值local project = ngx.var.arg_project-- 定义一个字典,存放有当前服务为日志增加的信息,如ctime表示接受到请求的时间,ip地址等local data={}data["project"] = projectdata["ctime"] = current_timeif ngx.var.http_x_forwarded_for == nil thendata["ip"] = ngx.var.remote_addr;elsedata["ip"] = ngx.var.http_x_forwarded_forend-- 将增加的信息编码为jsonlocal meta = cjson.encode(data)-- 将编码的json信息做base64 和 body_data拼接local res = ngx.encode_base64(meta) .. "-" .. ngx.unescape_uri(body_data)-- 将数据赋值给我们定义的nginx变量cad中,我们定义的log_format就使用这个变量的值ngx.var.cad = res ngx.say([[{"code":200,"msg":"ok"}]])}access_log  logs/collect-app-access.log  collect-app-log-format;}
}

再次进行测试主配置文件

[root@mei01 conf]# openresty -p /opt/apps/collect-app -c conf/main-nginx.conf -t

2.2.4 测试Nginx的收集功能

1)启动nginx

[root@mei01 conf]# openresty -p /opt/apps/collect-app -c conf/main-nginx.conf 

2)检查nginx

[root@qianfeng01 conf]# ps -ef | grep nginx
root      14716      1  0 14:55 ?        00:00:00 nginx: master process openresty -p /opt/apps/collect-app -c conf/main-nginx.conf
root      14717  14716  5 14:55 ?        00:00:00 nginx: worker process
root      14718  14716  5 14:55 ?        00:00:00 nginx: worker process
root      14719  14716  6 14:55 ?        00:00:00 nginx: worker process
root      14720  14716  6 14:55 ?        00:00:00 nginx: worker process
root      14782   2265  0 14:55 pts/0    00:00:00 grep --color=auto nginx[root@qianfeng01 conf]# netstat -nltp | grep nginx
tcp        0      0 0.0.0.0:8802            0.0.0.0:*               LISTEN      14716/nginx: master

3)模拟用户登录访问网址

[root@qianfeng01 conf]#curl -X POST 'http://localhost:8802/data/v1?project=news' -d  helloworld 
{"code":200,"msg":"ok"}注意:如果没有-d,也就没有发送正文数据即
[root@qianfeng01 conf]#curl -X POST 'http://localhost:8802/data/v1?project=news'
会出现{"code":500,"msg":"req body nil"}

4)检查一下日志文件

[root@mei01 ~]# cd /opt/apps/collect-app/logs
[root@mei01 logs]# cat collect-app-access.log
eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMTI3LjAuMC4xIiwiY3RpbWUiOjE2MDQzMDAyMTA4ODJ9-helloworld可以看到是加密的前缀使用-和正文拼接对前缀进行解密:
[root@mei01 logs]# echo "eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMTI3LjAuMC4xIiwiY3RpbWUiOjE2MDQzMDAyMTA4ODJ9"|base64 -d{"project":"news","ip":"127.0.0.1","ctime":1604300210882}

2.2.5 管理中心注册网址

目的:进行网络穿透,让外网可以识别本机(虚拟机)

1)配置frp客户端

# 下载frp 客户端,这个frp是我们自己编译的,已经修改了源码中的默认地址,并做了相关优化,你下载完直接用即可#创建目录用于存储frp的执行文件
[root@mei01 logs]# mkdir /usr/local/frp/ && cd /usr/local/frp
#下载
wget .33.linux_adm64.tgz 
#解压到当前目录下
tar -xvzf frpc_0.33.linux_adm64.tgz

2)使用frpc注册子域名。注册成功,域名就相当于指向了本

语法: /usr/local/frp/frpc --sd name -l 8802 -u name
注意:name要换成你喜欢的字符串,name如果在管理中心已经存在了,那么会注册失败比如:
[root@mei01 local]# ./frp//frpc http --sd michael1001 -l 8802 -u michael1001
michael1001
2020/11/02 15:27:13 [I] [service.go:282] [ca293e4adaefb386] login to server success, get run id [ca293e4adaefb386], server udp port [0]
2020/11/02 15:27:13 [I] [proxy_manager.go:144] [ca293e4adaefb386] proxy added: [michael1001.5c8e45e2bde49d69]
2020/11/02 15:27:13 [I] [control.go:181] [ca293e4adaefb386] [michael1001.5c8e45e2bde49d69] start proxy success
2020/11/02 15:27:13 [I] [control.go:185] [ca293e4adaefb386] [Forwarding:  -> 127.0.0.0:8802]

3)测试子域名是否指向了本地机器

curl =news -d test_data
返回结果{"code":200,"data":true},表明测试成功

4)将域名配置到管理中心的管理模块中

目的:模拟用户向本机持续不断的发送数据。# 接下来把此地址通过命令,配置到管理中心,你只需要把下方命令中 data_url=后面的地址替换成你的地址, name参数的值换成你的名字拼音全拼
curl -X POST \ \-F data_url==news \-F type=1 \-F name=michael1001
# 请求成功后会有如下图返回值
{"code":200,"data":{"id":2480,"data_url":"=news","type":"1","name":"michael1001","created_at":1604302530,"updated_at":1604302530},"error_code":0,"msg":"ok","status":200}
  1. 检查日志文件中是否在持续增加数据
[root@qianfeng01 logs]# cat collect-app-access.log
其中每一条记录都是用-拼接了前缀和正文
如下所示:
eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMzkuMTA2LjIwOC4xMzAiLCJjdGltZSI6MTYwNDMwMjcxOTQ3OX0=-eyJjb250ZW50Ijp7InV1aWQiOiJiYTgyM2I1MC01MmIzLTQxN2QtYTNiOC0zNjJhZTVlYTU3ZTQiLCJkaXN0aW5jdF9pZCI6IjkwIiwiZXZlbnQiOiJBcHBDbGljayIsInByb3BlcnRpZXMiOnsibW9kZWwiOiJpUGhvbmU1cyIsIm5ldHdvcmtfdHlwZSI6IldJRkkiLCJpc19jaGFyZ2luZyI6IiIsImFwcF92ZXJzaW9uIjoiMS4wIiwiZWxlbWVudF9uYW1lIjoiYmFubmVyIiwiZWxlbWVudF9wYWdlIjoi5paw6Ze75YiX6KGo6aG1IiwiY2FycmllciI6IuS4reWbveeUteS/oSIsIm9zIjoiIiwiaW1laSI6IjI3NzU2NzQ3NDExMyIsImJhdHRlcnlfbGV2ZWwiOiI2Iiwic2NyZWVuX3dpZHRoIjoiMjA0OCIsInNjcmVlbl9oZWlnaHQiOiI3NjgiLCJkZXZpY2VfaWQiOiJNRUlZQVhYWFhYZjAzYTg2ZGUxNDNkIiwiY2xpZW50X3RpbWUiOiIyMDIwLTExLTAyIDE1OjM4OjI5IiwiaXAiOiIyMjIuODguNDAuNjAiLCJtYW51ZmFjdHVyZXIiOiJBcHBsZSIsImFydGljbGVfaWQiOiI3MTY4IiwiYWN0aW9uX3R5cGUiOiIifSwidHlwZSI6InRyYWNrIn19解析前缀
[root@qianfeng01 logs]# echo "eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMzkuMTA2LjIwOC4xMzAiLCJjdGltZSI6MTYwNDMwMjcxOTQ3OX0=" | base64 -d
{"project":"news","ip":"39.106.208.130","ctime":1604302719479}[root@qianfeng01 logs]#解析后缀
[root@qianfeng01 logs]# echo "eyJjb250ZW50Ijp7InV1aWQiOiJiYTgyM2I1MC01MmIzLTQxN2QtYTNiOC0zNjJhZTVlYTU3ZTQiLCJkaXN0aW5jdF9pZCI6IjkwIiwiZXZlbnQiOiJBcHBDbGljayIsInByb3BlcnRpZXMiOnsibW9kZWwiOiJpUGhvbmU1cyIsIm5ldHdvcmtfdHlwZSI6IldJRkkiLCJpc19jaGFyZ2luZyI6IiIsImFwcF92ZXJzaW9uIjoiMS4wIiwiZWxlbWVudF9uYW1lIjoiYmFubmVyIiwiZWxlbWVudF9wYWdlIjoi5paw6Ze75YiX6KGo6aG1IiwiY2FycmllciI6IuS4reWbveeUteS/oSIsIm9zIjoiIiwiaW1laSI6IjI3NzU2NzQ3NDExMyIsImJhdHRlcnlfbGV2ZWwiOiI2Iiwic2NyZWVuX3dpZHRoIjoiMjA0OCIsInNjcmVlbl9oZWlnaHQiOiI3NjgiLCJkZXZpY2VfaWQiOiJNRUlZQVhYWFhYZjAzYTg2ZGUxNDNkIiwiY2xpZW50X3RpbWUiOiIyMDIwLTExLTAyIDE1OjM4OjI5IiwiaXAiOiIyMjIuODguNDAuNjAiLCJtYW51ZmFjdHVyZXIiOiJBcHBsZSIsImFydGljbGVfaWQiOiI3MTY4IiwiYWN0aW9uX3R5cGUiOiIifSwidHlwZSI6InRyYWNrIn19" | base64 -d
{"content":{"uuid":"ba823b50-52b3-417d-a3b8-362ae5ea57e4","distinct_id":"90","event":"AppClick","properties":{"model":"iPhone5s","network_type":"WIFI","is_charging":"","app_version":"1.0","element_name":"banner","element_page":"新闻列表页","carrier":"中国电信","os":"","imei":"277567474113","battery_level":"6","screen_width":"2048","screen_height":"768","device_id":"MEIYAXXXXXf03a86de143d","client_time":"2020-11-02 15:38:29","ip":"222.88.40.60","manufacturer":"Apple","article_id":"7168","action_type":""},"type":"track"}}[root@qianfeng01 logs]#

6)注意

当下次再启动模拟数据持续发送时,只需要执行第二步

2.3 flume采集行为数据

2.3.1 flume的source源分析

1. 尽量使用可靠源,因此exec source源被排除
2. 可靠源有spooling dir 和taildir。
3. 由于我们要采集logs目录下的 collect-app-access.log,因此可以使用taildir来监听并采集。不会有一个缺点,就是此日志文件会越来越大。
4. 如果我们要使用spooling dir,它的特点是监听新文件,一旦更名后,后续的新文件就不同于之前的文件同名。注意,3和4的问题都可以想办法解决,在这里我们使用4,并解决它的缺点。

2.3.2 日志文件截断

1)说明

mv在移动数据文件时,不会产生新的inode。然后nginx在原来的目录下产生的同名文件指向的还是同一个inode。如果这样,在每一次移动文件后,flume采集的都会出现重复数据(每次都是从同一个数据块从头开始采集)
所以,应该让nginx在原来的目录下产生的同名文件应该指向一个新的inode.

2)使用脚本进行移动数据

#!/bin/sh
# filename: split-collect-app-access-log.sh
# desc: 此脚本用于切割Nginx Access日志到指定的目录下,供Flume采集使用
# date: 2020-04-28# 帮助
usage() {echo "Usage:"echo "	split-collect-app-access-log.sh [-f log_file] [-d data_dir] [-p pid_file]"echo "Description:"echo "	log_file: nginx access file absolute path"echo "	data_dir: split data dir"	  echo "	pid_file: nginx pid file absolute path"echo "Warning: if no parmas, use default value"exit -1
}
default(){echo  "user default value:"echo	"		log_file=/opt/apps/collect-app/logs/collect-app-access.log"echo	"		data_dir=/opt/apps/collect-app/logs/data/"echo	"		pid_file=/opt/apps/collect-app/logs/collect-app-nginx.pid"# 我们的access日志文件log_file="/opt/apps/collect-app/logs/collect-app-access.log"# 切分后文件所放置的目录data_dir="/opt/apps/collect-app/logs/data/"# Nginx pid 文件pid_file="/opt/apps/collect-app/logs/collect-app-nginx.pid"
}while getopts 'f:d:p:h' OPT; do  case $OPT inf) log_file="$OPTARG";;d) data_dir="$OPTARG";;p) pid_file="$OPTARG";;h) usage;;?) usage;;*) usage;;esac
done# 当没有参数传入时
if [ $# -eq 0 ];thendefault                                        
fi# 重命名access, 注意mv 的过程日志是不会丢失的,因为nginx是以inode来表示数据文件的,而不是文件名,这里mv的操作不会改变inode
if [ ! "${log_file}" ] || [ ! "${data_dir}" ] || [ ! ${pid_file} ]; thenecho "some parmas is empty,please give a value  "exit -1
fi
# 切分之前,先判断日志文件是否有数据,如果有数据再切分,防止切分出来很多空文件
line=`tail -n 1 ${log_file}`
if [ ! "$line" ];thenecho "Warning: access log file no data, do not split!"exit 0
fi 
mv ${log_file} ${data_dir}collect-app-access.$(date +"%s").log
# 向nginx 发送 USR1信号,让其重新打开一个新的日志文件
kill -USR1 `cat ${pid_file}`
echo "finish!"

3)简单测试一下

注意:要提前将data目录创建出来[root@qianfeng01 logs]# /opt/apps/collect-app/scripts/split-collect-app-access-log.sh
会在data目录下发送一个新的文件

4)编写一个定时器来定期执行此脚本

[root@qianfeng01 logs]# crontab -e
0 * * * * /usr/sbin/ntpdate -u ntp1.aliyun.com
* * * * * /opt/apps/collect-app/scripts/split-collect-app-access-log.sh > /dev/null

2.3.3 flume采集方案的编写

1)方案的编写

[root@qianfeng01 scripts]# vi collect-app-access-flume.conf
# filename: collect-app-access-flume.conf
# 定义一个名字为 a1001 的agent
# 定义channel
a1001.channels = ch-1
# 定义source
a1001.sources = src-1
# 定义sink
a1001.sinks = k1
# sink 接到 channel 上
a1001.sinks.k1.channel = ch-1
# source 接到 channel 上
a1001.sources.src-1.channels = ch-1a1001.sources.src-1.type = spooldir
# 数据文件目录
a1001.sources.src-1.spoolDir = /opt/apps/collect-app/logs/data/
# 正则匹配我们需要的数据文件
a1001.sources.src-1.includePattern = ^collect-app.*.log
# 如果想在header信息中加入你传输的文件的文件名,设置下面参数为true,同时设置文件header的key,我们这里设置成fileName,之后你就可以在sink端通过  %{fileName}, 取出header中的fileName变量中的值,这个值就是文件名
# a1001.sources.src-1.basenameHeader = true
# a1001.sources.src-1.basenameHeaderKey = fileName# 积累多少个event后,一起发到channel, 这个值在生成环境中我们需要根据数据量配置batchSize大的下,通常来讲们的batchSize越大,吞吐就高,但是也要受到 channel 的capacity,transactionCapacity的限制,不能大于channel的transactionCapacity值。 
a1001.sources.src-1.batchSize = 100
#a1001.sources.src-1.interceptors=i1
#a1001.sources.src-1.interceptors.i1.type=com.qf.flume.interceptor.B64Interceptor$Builder# channel的配置
a1001.channels.ch-1.type = memory
a1001.channels.ch-1.capacity = 10000
a1001.channels.ch-1.transactionCapacity = 100a1001.sinks.k1.type = hdfs
#a1001.sinks.k1.hdfs.path = hdfs://qianfeng01:8020/sources/news/%{ctime}
a1001.sinks.k1.hdfs.path = hdfs://qianfeng01:8020/sources/news/%Y%m%d/%H%M
a1001.sinks.k1.hdfs.filePrefix = news-%H
a1001.sinks.k1.hdfs.fileSuffix = .gz
a1001.sinks.k1.hdfs.codeC = gzip
a1001.sinks.k1.hdfs.useLocalTimeStamp = true
a1001.sinks.k1.hdfs.writeFormat = Text
a1001.sinks.k1.hdfs.fileType = CompressedStream
# 禁用安装event条数来滚动生成文件
a1001.sinks.k1.hdfs.rollCount = 0
# 如果一个文件达到10M滚动
a1001.sinks.k1.hdfs.rollSize = 10485760
# 1分钟滚动生成新文件,和文件大小的滚动一起,那个先达到,执行那个
a1001.sinks.k1.hdfs.rollInterval = 60# 参加上边连接官网说明,理论上batchSize 越大,吞吐越高。 但是HDFS Sink 调用 Hadoop RPC(包括 open、flush、close ..)超时会抛出异常,如果发生在 flush 数据阶段,部分 event 可能已写入 HDFS,事务回滚后当前 BatchSize 的 event 还会再次写入造成数据重复。 batchSize越大可能重复的数据就越多. 同时batchSize值,不能大于channel的transactionCapacity值
a1001.sinks.k1.hdfs.batchSize = 100
# 每个HDFS SINK 开启多少线程来写文件
a1001.sinks.k1.hdfs.threadsPoolSize = 10
# 如果一个文件超过多长时间没有写入,就自动关闭文件,时间单位是秒
a1001.sinks.k1.hdfs.idleTimeout = 60

2)采集方案的运行

[root@qianfeng01 scripts]# flume-ng agent -c /usr/local/flume/conf -f collect-app-access-flume.conf -n a1001 -Dflume.root.logger=INFO,console

3)查看hdfs上的文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Cac8qM9r-1615386415992)(%E9%A1%B9%E7%9B%AE%E9%87%87%E9%9B%86%E5%92%8C%E7%9B%91%E6%8E%A7%E9%A1%B9%E7%9B%AE%E7%AC%94%E8%AE%B0.assets/image-20201102165810597.png)]

2.3.4 自定义拦截器

1)说明

因为落地到nginx上的日志数据是base64的,最终分析一定是对解密后的数据进行分析,如果将解密工作放在数据分析阶段,比如,用mr,hive,spark等进行解密,那么工作量会变大,影响效率性能。所以不如在采集阶段,使用flume的拦截器,拦截存储了base64的数据的event,然后重构event,这样性能相对来说,会更好一些。

2)pom.xml

<dependencies><!-- .apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.8.0</version><scope>provided</scope></dependency><!-- .alibaba/fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency>
</dependencies>
<build><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><configuration><!-- put your configurations here --></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals></execution></executions></plugin></plugins>
</build>

3)代码编写


4)打包上传到flume的lib目录下

5)重新修改flume的采集方案

[root@qianfeng01 scripts]# vi collect-app-access-flume.conf
# filename: collect-app-access-flume.conf
# 定义一个名字为 a1001 的agent
# 定义channel
a1001.channels = ch-1
# 定义source
a1001.sources = src-1
# 定义sink
a1001.sinks = k1
# sink 接到 channel 上
a1001.sinks.k1.channel = ch-1
# source 接到 channel 上
a1001.sources.src-1.channels = ch-1a1001.sources.src-1.type = spooldir
# 数据文件目录
a1001.sources.src-1.spoolDir = /opt/apps/collect-app/logs/data/
# 正则匹配我们需要的数据文件
a1001.sources.src-1.includePattern = ^collect-app.*.log
# 如果想在header信息中加入你传输的文件的文件名,设置下面参数为true,同时设置文件header的key,我们这里设置成fileName,之后你就可以在sink端通过  %{fileName}, 取出header中的fileName变量中的值,这个值就是文件名
# a1001.sources.src-1.basenameHeader = true
# a1001.sources.src-1.basenameHeaderKey = fileName# 积累多少个event后,一起发到channel, 这个值在生成环境中我们需要根据数据量配置batchSize大的下,通常来讲们的batchSize越大,吞吐就高,但是也要受到 channel 的capacity,transactionCapacity的限制,不能大于channel的transactionCapacity值。 
a1001.sources.src-1.batchSize = 100
a1001.sources.src-1.interceptors=i1
a1001.sources.src-1.interceptors.i1.type=com.qf.flume.interceptor.Base64Interceptor$MyBuilder# channel的配置
a1001.channels.ch-1.type = memory
a1001.channels.ch-1.capacity = 10000
a1001.channels.ch-1.transactionCapacity = 100a1001.sinks.k1.type = hdfs
a1001.sinks.k1.hdfs.path = hdfs://qianfeng01:8020/sources/news/%{ctime}
a1001.sinks.k1.hdfs.filePrefix = news-%{ctime}-%H
a1001.sinks.k1.hdfs.fileSuffix = .gz
a1001.sinks.k1.hdfs.codeC = gzip
a1001.sinks.k1.hdfs.useLocalTimeStamp = true
a1001.sinks.k1.hdfs.writeFormat = Text
a1001.sinks.k1.hdfs.fileType = CompressedStream
# 禁用安装event条数来滚动生成文件
a1001.sinks.k1.hdfs.rollCount = 0
# 如果一个文件达到10M滚动
a1001.sinks.k1.hdfs.rollSize = 10485760
# 1分钟滚动生成新文件,和文件大小的滚动一起,那个先达到,执行那个
a1001.sinks.k1.hdfs.rollInterval = 60# 参加上边连接官网说明,理论上batchSize 越大,吞吐越高。 但是HDFS Sink 调用 Hadoop RPC(包括 open、flush、close ..)超时会抛出异常,如果发生在 flush 数据阶段,部分 event 可能已写入 HDFS,事务回滚后当前 BatchSize 的 event 还会再次写入造成数据重复。 batchSize越大可能重复的数据就越多. 同时batchSize值,不能大于channel的transactionCapacity值
a1001.sinks.k1.hdfs.batchSize = 100
# 每个HDFS SINK 开启多少线程来写文件
a1001.sinks.k1.hdfs.threadsPoolSize = 10
# 如果一个文件超过多长时间没有写入,就自动关闭文件,时间单位是秒
a1001.sinks.k1.hdfs.idleTimeout = 60

6)设计一个启动flume采集方案的脚本

将启动flume的操作封装到脚本中,脚本的文件也存储到/opt/apps/scripts目录下

脚本名称:collect-app-access-flume-start.sh

[root@qianfeng01 scripts]# vi collect-app-access-flume-start.sh#!/bin/sh
# filename:collect-app-access-flume-start.sh
# desc: 启动采集数据的flume agent,agent 名字为 a1001
# 请写你安装的FLUME的路径
FLUME_HOME=/usr/local/flume/
CONF_FILE=/opt/apps/collect-app/scripts/collect-app-access-flume.conf${FLUME_HOME}/bin/flume-ng agent -c ${FLUME_HOME}/conf -f ${CONF_FILE} -n a1001 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31001

7)启动采集方案

注意:一定要先开启nginx,然后进行启动管理中心,hdfs别忘记开启

[root@qianfeng01 scripts]# chmod u+x collect-app-access-flume-start.sh
[root@qianfeng01 scripts]# ./collect-app-access-flume-start.sh

8)去hdfs上查看文件的样式

2.4 flume采集内容数据

1)说明

我们可以使用flume的http source直接采集内容数据,内容日志指的是公司的一些新闻,宣传信息等,包含了文字,图片,视频,音频等。
注意:我们也需要新闻信息中的时间作为hdfs上的目录,因此也需要拦截器(自己写)

2.4.1 编辑采集方案

使用http+memory+hdfs方案
文件名:collect-app-http-flume.conf
文件位置:/opt/apps/collect-app/scripts/

# filename: collect-app-http-flume.conf
# 定义一个名字为 b1001 的agent
# 定义channel
b1001.channels = ch-1
# 定义source
b1001.sources = src-1
# 定义sink
b1001.sinks = k1
# sink 接到 channel 上
b1001.sinks.k1.channel = ch-1
# source 接到 channel上
b1001.sources.src-1.channels = ch-1# --------------source的配置
b1001.sources.src-1.type = http
# http绑定地址
b1001.sources.src-1.bind=0.0.0.0
# http绑定端口
b1001.sources.src-1.port=9666# --------------channel的配置
b1001.channels.ch-1.type = memory
b1001.channels.ch-1.capacity = 10000
b1001.channels.ch-1.transactionCapacity = 100# --------------sink的配置
b1001.sinks.k1.type = hdfs
b1001.sinks.k1.hdfs.path = hdfs://qianfeng01:8020/sources/news-article/%Y%m%d
b1001.sinks.k1.hdfs.filePrefix = news-%Y%m%d-%H
b1001.sinks.k1.hdfs.fileSuffix = .gz
b1001.sinks.k1.hdfs.codeC = gzip
b1001.sinks.k1.hdfs.useLocalTimeStamp = true
b1001.sinks.k1.hdfs.writeFormat = Text
b1001.sinks.k1.hdfs.fileType = CompressedStream
# 禁用安装event条数来滚动生成文件
b1001.sinks.k1.hdfs.rollCount = 0
# 如果一个文件达到10M滚动
b1001.sinks.k1.hdfs.rollSize = 10485760
# 5分钟滚动生成新文件,和文件大小的滚动一起,那个先达到,执行那个
b1001.sinks.k1.hdfs.rollInterval = 60
# 参加上边连接官网说明,理论上batchSize 越大,吞吐越高。 但是HDFS Sink 调用 Hadoop RPC(包括 open、flush、close ..)超时会抛出异常,如果发生在 flush 数据阶段,部分 event 可能已写入 HDFS,事务回滚后当前 BatchSize 的 event 还会再次写入造成数据重复。 batchSize越大可能重复的数据就越多. 同时batchSize值,不能大于channel的transactionCapacity值
b1001.sinks.k1.hdfs.batchSize = 100
# 每个HDFS SINK 开启多少线程来写文件
b1001.sinks.k1.hdfs.threadsPoolSize = 10
# 如果一个文件超过多长时间没有写入,就自动关闭文件,时间单位是秒
b1001.sinks.k1.hdfs.idleTimeout = 60

3)设计一个脚本用于启动内容数据的采集方案

脚本名称:collect-app-http-flume-start.sh

脚本位置:/opt/apps/collect-app/scripts/

[root@qianfeng01 scripts]# vi collect-app-http-flume-start.sh
#!/bin/bash
FLUME_HOME=/usr/local/flume
CONF_FILE=/opt/apps/collect-app/scripts/collect-app-http-flume.conf${FLUME_HOME}/bin/flume-ng agent -c ${FLUME_HOME}/conf -f ${CONF_FILE} -n b1001 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002

4)使用脚本启动采集方案

[root@qianfeng01 scripts]# chmod u+x  collect-app-http-flume-start.sh
[root@qianfeng01 scripts]# ./collect-app-http-flume-start.sh

5)测试一条数据

[root@qianfeng01 scripts]# curl -X POST http://qianfeng01:9666  -d  '[{"header":{"name":"article"},"body":"123"}]'

2.4.2 配置管理中心

1)注册子域名

语法:/opt/soft/frp/frpc http  --sd name -article -l 9666 -u namename要换成自己喜欢的字符串比如:
[root@qianfeng01 ~]# /usr/local/frp/frpc http  --sd michael1001-article -l 9666 -u michael1001
michael1001-article
2020/11/03 10:53:53 [I] [service.go:282] [5e1c7471caa46afa] login to server success, get run id [5e1c7471caa46afa], server udp port [0]
2020/11/03 10:53:53 [I] [proxy_manager.go:144] [5e1c7471caa46afa] proxy added: [michael1001.1a426ec54c06d9ed]
2020/11/03 10:53:53 [I] [control.go:181] [5e1c7471caa46afa] [michael1001.1a426ec54c06d9ed] start proxy success
2020/11/03 10:53:53 [I] [control.go:185] [5e1c7471caa46afa] [Forwarding:  -> 127.0.0.0:9666 是子域名

2)测试子域名是否有效

[root@mei01 scripts]# curl -X POST   -d  '[{"header":{"name":"article"},"body":"123"}]'
  1. 配置到管理中心,让他持续发送数据
curl -X POST \ \-F data_url= \-F type=2 \-F name=michael1001注意:michael1001  要换成自己的出现以下代码,表示成功{"code":200,"data":{"id":2546,"data_url":"","type":"2","name":"michael1001","created_at":1604372283,"updated_at":1604372283},"error_code":0,"msg":"ok","status":200}[

4)注意:下次启动时,只需要执行第一步,不是注册而是启动

2.5 sqoop采集业务数据

2.5.1 说明

要连接北京的mysql。# 只读权限
HOST: mysql.yihongyeyan.com
PORT: 3306
USER: qf001
PWD: QF-common1001-###测试:
1)可以使用navicat进行连接测试
2)也可以使用sqoop 相关指令进行测试
sqoop list-databases \
--connect jdbc:mysql://mysql.yihongyeyan.com:3306 \
--username qf001 \
--password QF-common1001-###

2.5.2 编写脚本

文件名:collect-app-mysql-sqoop-hdfs.sh

存储位置:/opt/apps/collect-app/scripts/

[root@qianfeng01 scripts]# collect-app-mysql-sqoop-hdfs.sh
#!/bin/sh
# filename: collect-app-mysql-sqoop-hdfs.sh
# desc: 导入mysql业务表 [meta,ad_info]数据到hdfs
# date: 2020-04-29
# 请写你安装的SQOOP的路径
SQOOP_HOME=/usr/local/sqoopMYSQL_CONNECT=jdbc:mysql://mysql.yihongyeyan.com:3306/biz
MYSQL_USERNAME=qf001
MYSQL_PWD=QF-common1001-#### 定义一个日期参数,如果指定日期,就导入该日期前一天的数据,否则默认为昨日数据
exec_date=$1
if [  "${exec_date}" ] ;thenexec_date=`date -d "${exec_date} 1 days ago" +%Y%m%d`
else exec_date=`date -d "1 days ago" +%Y%m%d`
fi echo "collect-app-mysql-sqoop-hdfs.sh exce date: ${exec_date}"SQL_DATE=`date -d "${exec_date} 1 days" +%Y-%m-%d` # ad_info表数据# --split-by 列作为并行导入时的分隔列
# -m 3个并行度
# $CONDITIONS SQOOP执行SQL查询时,会加上 1=0 先去验证SQL语法是否正确如果正确,真执行的时候会变成 1=1
${SQOOP_HOME}/bin/sqoop import \
--connect ${MYSQL_CONNECT}  \
--username ${MYSQL_USERNAME} \
--password ${MYSQL_PWD}  \
-e "select  id ,cast(ad_id as char(50)) as ad_id,cast(advertiser_id as char(50)) as advertiser_id,advertiser_name,DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s')as create_time from ad_info where 1=1 and create_time<='${SQL_DATE} 00:00:00'  AND \$CONDITIONS" \
--split-by id \
--target-dir /sources/news-biz/biz/ad_info/${exec_date} \
--delete-target-dir \
--m 3# meta 表数据
${SQOOP_HOME}/bin/sqoop import \
--connect ${MYSQL_CONNECT}  \
--username ${MYSQL_USERNAME} \
--password ${MYSQL_PWD}  \
-e "select id,field,field_type,field_desc,app_version,cast(status as char(50)) as status,DATE_FORMAT(create_time,'%Y-%m-%d %H:%i:%s')as create_time from meta where 1=1 and create_time<='${SQL_DATE} 00:00:00'  AND \$CONDITIONS" \
--split-by id \
--target-dir /sources/news-biz/biz/meta/${exec_date} \
--delete-target-dir \
--m 2

2.5.3 运行脚本

[root@qianfeng01 scripts]# bash collect-app-mysql-sqoop-hdfs.sh
或者修改权限500
[root@qianfeng01 scripts]# chmod 500 collect-app-mysql-sqoop-hdfs.sh
[root@qianfeng01 scripts]# ./collect-app-mysql-sqoop-hdfs.sh注意:如果报以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: org/json/JSONObject需要将java-json.jar导入到sqoop的lib目录