datawarehouse-practice2

github 数仓实战

Posted by 果然 on December 31, 2021

hive 搭建

问题一:Column length too big for column ‘PARAM_VALUE‘ (max = 21845); use BLOB or TEXT instead

运行引擎 Tez

Tez是一个Hive的运行引擎,性能优于MR。为什么优于MR呢?
Tez可以将多个有依赖的作业转换为一个作业,这样只需写一次HDFS,且中间节点较少,从而大大提升作业的计算性能。

数仓分层

不同公司对于数仓的命名可能有所区别,但具体的理念都是相通的,数据层级越高,数据量越少。

ODS 层

# 启动日志表
# 我们存储在HDFS的原数据是用lzo格式压缩的,所以要将inputformat设置成支持lzo压缩格式。
create database gmall;
use gmall;
drop table if exists ods_start_log;
create external table ods_start_log(`line` string)
partitioned by(`dt` string)
stored as
	inputformat 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
	outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
location '/warehouse/gmall/ods_start_log';

# 加载数据
load data inpath '/origin_data/gmall/log/topic_start/xxxx-xx-xx' into table gmall.ods_start_log partition(dt='xxxx-xx-xx');

selsect * from ods_start_log limit 2;

同理,可创建事件日志表 ods_event_log;

ODS 层加载数据脚本

#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/usr/local/hive/bin/hive

# 如果是输入的日期按照取输入日期;如果没输入日期取当前时间的前一天
if [ -n "$1" ] ;then
   do_date=$1
else 
   do_date=`date -d "-1 day" +%F`
fi 

echo "===日志日期为 $do_date==="
sql="
load data inpath '/origin_data/gmall/log/topic_start/$do_date' into table "$APP".ods_start_log partition(dt='$do_date');

load data inpath '/origin_data/gmall/log/topic_event/$do_date' into table "$APP".ods_event_log partition(dt='$do_date');
"

$hive -e "$sql"

脚本执行时间(企业开发中一般在每日凌晨30分~1点)

hive 自定义函数

hive 中有三种自定义函数。
-1. 用户自定义函数(user-defined function)UDF;
-2. 用户定义聚集函数(user-defined aggregate function, UDAF);
-3. 用户定义表生成函数(user-defined-generating function,UDTF).

UDF 操作作用于单个数据航,并且产生一个数据行作为输出。
UDAF 接受多个输入数据行,产生一个输出数据行。如COUNT和MAX这样的函数。
UDTF 操作作用于单个数据行,产生多个数据行。lateral view explore()

DWD 层

观察数据分割函数。

vim dwd_start_log.sh
#!/bin/bash
APP=gmall
hive = /usr/local/hive/bin/hive

if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="
set hive.exec.dynamic.partition,mode=nonstrict;

insert overwrite table "$APP".dwd_start_log
partition (dt='$do_date')
select
    get_json_object(line,'$.mid') mid_id,
    get_json_object(line,'$.uid') user_id,
    get_json_object(line,'$.vc') version_code,
    get_json_object(line,'$.vn') version_name,
    get_json_object(line,'$.l') lang,
    get_json_object(line,'$.sr') source,
    get_json_object(line,'$.os') os,
    get_json_object(line,'$.ar') area,
    get_json_object(line,'$.md') model,
    get_json_object(line,'$.ba') brand,
    get_json_object(line,'$.sv') sdk_version,
    get_json_object(line,'$.g') gmail,
    get_json_object(line,'$.hw') height_width,
    get_json_object(line,'$.t') app_time,
    get_json_object(line,'$.nw') network,
    get_json_object(line,'$.ln') lng,
    get_json_object(line,'$.la') lat,
    get_json_object(line,'$.entry') entry,
    get_json_object(line,'$.open_ad_type') open_ad_type,
    get_json_object(line,'$.action') action,
    get_json_object(line,'$.loading_time') loading_time,
    get_json_object(line,'$.detail') detail,
    get_json_object(line,'$.extend1') extend1
from "$APP".ods_start_log
where dt='$do_date';
"

$hive -e "$sql"
#!/bin/bash

# 定义变量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive
if [ -n "$1" ] ;then
    do_date=$1
else 
    do_date=`date -d "-1 day" +%F`  
fi 

sql="
	add jar /usr/local/hive/lib/hivefunction-1.0-SNAPSHOT.jar;
	create temporary function base_analizer as 'com.gree.udf.BaseFieldUDF';
	create temporary funtion flat_analizer as 'com.gree.udtf.EventJsonUDTF';
	set hive.exec.daynamic.partition.mode=nonstrict;

	insert overwrite table "$APP".dwd_base_event_log
	partition (dt='$do_date')
	select
	    mid_id,
    user_id,
    version_code,
    version_name,
    lang,
    source ,
    os ,
    area ,
    model ,
    brand ,
    sdk_version ,
    gmail ,
    height_width ,
    network ,
    lng ,
    lat ,
    app_time ,
    event_name , 
    event_json , 
    server_time  
    from
(
select
split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[0]   as mid_id,
split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[1]   as user_id,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[2]   as version_code,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[3]   as version_name,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[4]   as lang,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[5]   as source,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[6]   as os,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[7]   as area,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[8]   as model,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[9]   as brand,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[10]   as sdk_version,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[11]  as gmail,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[12]  as height_width,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[13]  as app_time,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[14]  as network,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[15]  as lng,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[16]  as lat,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[17]  as ops,
    split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[18]  as server_time
    from "$APP".ods_event_log where dt='$do_date'  and base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' 
    ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
"

$hive -e "$sql" 

DWS 层

hive 中文乱码

处理方法具体可见在线安装mysql的方法。

# /etc/my.cnf 添加 
init_connect='SET collation_connection = utf8_unicode_ci'
init_connect='SET NAMES utf8'
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake

# 重启mysqld服务
service mysqld restart

# 针对元数据库metastore中的表、分区、视图的编码设置
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8;
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8 ;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;

修改 metastore 的连接 URL

<property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://192.168.184.131:3306/hive?createDatabaseIfNotExist=true&amp;useUnicode=true&amp;characterEncoding=UTF-8</value>
    <description>JDBC connect string for a JDBC metastore</description>
</property>

系统函数

collect_set 函数

将同一分组的不同行数据聚合成一个集合。

select course,collect_set(area),avg(score) from student group by course;  
# 聚合结果见下  
chinese ["sh","bj"]     79.0
math    ["bj"]  93.5

concat、concat_ws函数

concat()函数用于将多个字符串连接成一个字符串。没有任何一个参数为NULL,则返回值为NULL。

select concat(id,',', name) as con from info limit 1;

concat_ws()函数指定参数之间的分隔符。会忽略任何分隔符参数后的null值。但不会忽略任何空字符串。

select concat_ws('_',id,name) as con_ws from info limit 1;

date_sub() 函数

从日期减去指定的时间间隔。

DATE_SUB(date,Interval expr type)

MySQL语法

having 的用法

可以让我们筛选成组后的各种数据,where 字句在聚合前先筛选记录,也就是说作用在group by 和 having 字句前。

row_number(),rank(),dense_rank()区别
row_nubmer(),这个排序函数的特点是相同数据,先查出的排名在前,没有重复值。
rank()函数,是跳跃排序,相同数据(这里为sal列相同)排名相同,比如并列第1,则两行数据(这里为rank列)都标为1,下一位将是第3名.中间的2被直接跳过了。排名存在重复值。
dense_rank():连续排序.

ADS 层

需求:最近七天内连续三天活跃用户数
语法:rank() over date_sub

vim ads_continuity_log.sh

#!/bin/bash

if [ -n "$1" ];then
    do_date=$1
else
    do_date=`date -d "-1 day" +%F`
fi

hive=/opt/module/hive/bin/hive
APP=gmall

echo "-----------导入日期$do_date-----------"

sql="
insert into table "$APP".ads_continuity_uv_count
select 
     '$do_date',
     concat(date_add('$do_date',-6),'_','$do_date') dt,
     count(*) 
from 
(
    select mid_id
    from
    (
        select mid_id
        from 
        (
            select
                mid_id,
                date_sub(dt,rank) date_diff
            from 
            (
                select 
                    mid_id,
                    dt,
                    rank() over(partition by mid_id order by dt) rank
                from "$APP".dws_uv_detail_day
                where dt>=date_add('$do_date',-6) and dt<='$do_date'
            )t1
        )t2
        group by mid_id,date_diff
        having count(*)>=3
    )t3 
    group by mid_id
)t4;
"

$hive -e "$sql"

最后设置脚本执行权限。

chmod 777 ads_continuity_log.sh