##Gobblin--一个用于Hadoop的统一"数据抽取框架"

日期:2019-10-03编辑作者:动漫动画

CDH Hadoop系列目录:

Gobblin--一个用于Hadoop的统一"数据抽取框架" - lmalds的专栏 - 博客频道 - CSDN.NET http://blog.csdn.net/lmalds/article/details/53940549

Hadoop实战(3)_虚拟机搭建CDH的全分布模式

一、简介

Hadoop实战(4)_Hadoop的集群管理和资源分配

必赢437登录 1

Hadoop实战(5)_Hadoop的运维经验

Gobblin

Hadoop实战(8)_CDH添加Hive服务及Hive基础

Gobblin是 LinkedIn在2015年2月开源的、为必赢电子游戏娱乐,Hadoop提供的一个数据整合框架。 说到将数据导入到HDFS,此类的框架包括:
1、Apache Sqoop2、Apache Flume3、Aegisthus4、Morphlines。。。
1
2
3
4
5

Hadoop实战(9)_Hive进阶及UDF开发

1
2
3
4
5

Sqoop语法说明

Sqoop官方学习文档:

http://archive.cloudera.com/cdh5/cdh/5/sqoop-1.4.6-cdh5.9.0/

Sqoop import是相对于HDFS来讲,即从关系数据库import到HDFS上。

必赢优惠大厅官网,mysql的驱动包放到sqoop/lib下。

其中,Sqoop用于在关系型数据库(RDBMS)和HDFS之间互相传输数据,Flume主要用于对日志文件的收集,Aegisthus主要用于从Cassandra抽取数据,而Morphlines则类似于Gobblin中的转换器,作为插件配合Sqoop和Flume使用。
然而,相对于其他类似框架,Gobblin的设计有3个主要的目标:
1、普遍性2、可扩展性3、可操作性
1
2
必赢437登录,3

案例一:把数据导入到HDFS上

/root/project
mkdir sqoop_prj
cd sqoop_prj/
mkdir DBS
cd DBS/
touch DBS.opt

hadoop fs -mkdir /user/hive/warehouse/DBS
which sqoop

执行opt文件,不能传参,sqoop --options-file aa.opt-m,指定map数,如果抽取的表数据量大,则调大map数。如果-m设置为5,5个线程,则在HDFS上产生5个文件。

把sqoop写到shell脚本的好处,可以传参数。

#!/bin/sh
. /etc/profile

hadoop fs -rmr /user/hive/warehouse/DBS



sqoop import  --connect "jdbc:mysql://cdhmaster:3306/hive"    
--username root                                                          
--password 123456                                                        
-m    1                                                             
--table  DBS                                                           
--columns   "DB_ID,DESC,DB_LOCATION_URI,NAME,OWNER_NAME,OWNER_TYPE"         
--target-dir  "/user/hive/warehouse/DBS"    

#--where "length(DESC)>0"                                                                              
#--null-string ''

bug,驱动问题

ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@3c1a42fa is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@3c1a42fa is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

增加参数,参考

https://stackoverflow.com/questions/29162447/sqoop-import-issue-with-mysql

https://stackoverflow.com/questions/26375269/sqoop-error-manager-sqlmanager-error-reading-from-database-java-sql-sqlexcept

--driver com.mysql.jdbc.Driver

增加参数后的警告,

WARN sqoop.ConnFactory: Parameter --driver is set to an explicit driver however appropriate connection manager is not being set (via --connection-manager). Sqoop is going to fall back to org.apache.sqoop.manager.GenericJdbcManager. Please specify explicitly which connection manager should be used next time.

bug,sql语法问题,

Error: java.io.IOException: SQLException in nextKeyValue

去掉关键词列DESC,参考,

https://community.cloudera.com/t5/Data-Ingestion-Integration/sqoop-throws-SQLException-in-nextKeyValue/m-p/42653

1
2
3

案例二:数据写Hive普通表(非分区表)

# mysql
create table test (id int, pdate date);
insert into test(id, pdate) values (1, '2017-11-05');
insert into test(id, pdate) values (2, '2017-11-06');
insert into test(id, pdate) values (3, '2017-11-05');
insert into test(id, pdate) values (4, '2017-11-06');

# hive
drop table if exists test;
create table test(id int, pdate string);

--hive-import,指定要写入hive表,该参数无value。

--hive-overwrite

--hive-table,test。

Gobblin支持各种各样的数据源,例如RDBMS(Oralce、MySQL、SqlServer), Espresso,Kafka,RocksDB,S3,Salesforce和Google Analytics等。通过使用同一的Gobblin框架,可以很容易的扩展这些数据源而且让数据收集工作变得更加简单和易用。
二、Gobblin架构

案例三:写Hive分区表,so,salesorder

注意事项:

1、用什么字段做分区?
创建时间,而不是last_modify_time

Q: 用创建时间抽取至hive分区,订单状态变化周期是45天,订单状态变化后,hive数据如何同步?

hive不支持update,每天抽取近15天的订单到Hive的各自分区里。Hive是做统计分析,通常最关心是昨天的情况。

# cdhmaster
cd ~
mysql -uroot -p123456 < so.sql
ERROR 1046 (3D000) at line 3: No database selected

vi so.sql
use test;

mysql -uroot -p123456 < so.sql

# hive
CREATE TABLE so (
  order_id bigint,
  user_id bigint,
  order_amt double ,
  last_modify_time string
) partitioned by (date string);

Sqoop执行后,注意:

  • 会在该用户HDFS的home目录下,产生一个与源表同名的目录,如/user/root/so
    如果sqoop import至hive成功,该目录会自动删掉。
  • 在执行的目录下产生一个java文件,即opt转化的MR Job代码。
  • sqoop import中,无论hive表是什么列分隔符,均可以自动兼容。

Sqoop抽取框架封装:

  • 建一个mysql配置表,配置需要抽取的表及信息;
  • Java读取mysql配置表,动态生成opt文件;
  • Java中执行Process类调本地系统命令—sqoop –options-file opt文件;

Sqoop-imp -task 1 "2015-04-21"

Sqoop-imp "2015-04-21"

必赢437登录 2

Sqoop export

# mysql test
create table so1 as 
select * from so where 1=0;

源头必须是HDFS/Hive,目标关系数据库。

表so1的datelast_modify_time修改为varchar

Architecture

Sqoop工具封装

Flow etl 执行所有已配置的表抽取。

Flow etl -task 1

Flow etl -task 1 2017-01-01

  • 读取mysql的extract_to_hdfsextract_db_info,根据配置信息生成.opt文件。
  • 通过Java的Process类调Linux命令:sqoop --options-file opt文件

idea打包Flow.jar,'D:/Java/idea/IdeaProjects/Hive_Prj/src/META-INF/MANIFEST.MF' already exists in VFS,删掉文件夹META-INF

db.properties是访问mysql数据库的配置。

extract_db_info,抽取的表来自的数据库的配置。

Flow.jar上传至/root/project/lib

/root/project/bin,创建Flow命令。

配置FLOW_HOME

vi /etc/profile

export FLOW_HOME=/root/project

source /etc/profile

配置db.properties

# FLOW_HOME
mkdir conf

vi db.properties

db.driver=com.mysql.jdbc.Driver
db.url=jdbc:mysql://cdhmaster:3306/test
db.user=root
db.password=123456

配置sqoop option目录sqoop/opts

# FLOW_HOME
mkdir -p sqoop/opts

如果要在执行时产生日志,需要开发jar时配置log4j。

ERROR manager.SqlManager: Error reading from database: java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@310d117d is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.
java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@310d117d is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.

HDFSExtract.java,增加配置--driver com.mysql.jdbc.Driver,重新打包上传。

作业可以相应做修改,如sh ./so.sh

# /root/project/sqoop_prj/DBS
vi so.sh

Flow etl -task 1 $yestoday

您可能还想看

数据分析/数据挖掘/机器学习

Python数据挖掘与机器学习_通信信用风险评估实战(1)——读数据

Python数据挖掘与机器学习_通信信用风险评估实战(2)——数据预处理

Python数据挖掘与机器学习_通信信用风险评估实战(3)——特征工程

Python数据挖掘与机器学习_通信信用风险评估实战(4)——模型训练与调优

爬虫

Python爬虫实战之爬取链家广州房价_01简单的单页爬虫

Python爬虫实战之爬取链家广州房价_02把小爬虫变大

Python爬虫实战之爬取链家广州房价_03存储

Python爬虫实战之爬取链家广州房价_04链家的模拟登录(记录)

搜狗词库爬虫(1):基础爬虫架构和爬取词库分类

搜狗词库爬虫(2):基础爬虫框架的运行流程


微信公众号「数据分析」,分享数据科学家的自我修养,既然遇见,不如一起成长。

必赢437登录 3

数据分析

转载请注明:转载自微信公众号「数据分析」


2.1 Job组件
Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。 这些组件包括:
1、source2、extractor3、convertor4、quality checker5、writer6、publisher
1
2
3
4
5
6

1
2
3
4
5
6

(1)Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。
(2)Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。
(3)Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。
(4)Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。
(5)Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。
(6)Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。
2.2 存储状态

必赢437登录 4

Flow

Gobblin存在分支的概念,每一次Job的执行都会将结果持久化到文件( SequenceFiles)中,以便下一次执行时可以读到上次执行的位置信息(例如offset),本次执行可以从上次offset开始执行本次Job。状态的存储会被定期清理,以免出现存储无限增长的情况。
2.3 执行Job
一旦Job被创建后,runtime就根据Job的部署方式进行执行。Runtime负责job/task的定时执行,状态管理,错误处理以及失败重试,监控和报告等工作。 对于失败处理,Gobblin提供了多种级别的重试机制。 对于job的调度,Gobblin可以整合Oozie,Azkaban或者 Chronos。Gobblin同时也支持使用Quartz来调度,其中standalone模式默认的调度器便是Quartz。
2.4 度量和监控
Gobblin的特点之一便是一个端到端的度量信息的收集系统,其度量库中包含计数、仪表盘信息、直方图等信息,收集用于监控目的。
2.5 精简
对于Hive和Mapreduce任务,Gobblin提供了两分法。举例来说就是每小时产生一个目录,之后将这些同一天产生的目录合并到一个新的目录中,成为一个高层的以天为单位的目录。
2.6 部署方式
Gobblin提供了3种部署模式:
1、standalone2、mapreduce3、mapreduce on yarn
1
2
3

1
2
3

Job部署在yarn上会非常的灵活而高效,可以运行long-running的Job。跑在Yarn上的额Job,设计上由Apache Helix和和Apache ZooKeeper支持。Helix主要管理Container上workunits,zookeeper主要负责元数据信息的维护。
三、Kafka到HDFS整合(流式抽取)在LinkedIn的实现
Gobblin从Kafka抽取数据,替代了原来的 Camus项目。从Kafka定时抽取数据,通过Job运行在Yarn上,Gobblin可以达到运行一个long-running,流处理的模式。
Source:
每个partition中起始offset都通过Source生成到workunit中;同时,从state中获取上一次抽取结尾的offset信息,以便判断本次Job执行的起始offset。
1

1

Extractor:
Extractor会逐个抽取partition的数据,抽取完成一个后,会将末尾offset信息存到状态存储中。
1

1

Converter:
LinkedIn内部的Kafka集群主要存储Avro格式的数据,并对此进行一些过滤和转换。
1

1

Quality Checker:
LinkedIn中数据都会包含一个时间戳,以便决定放到哪个“小时”目录和“天”目录。对于没有时间戳的数据,则会根据record-level的策略将这些数据写到外部文件中。
1

1

Writer and Publisher:
内部使用基于时间的writer和基于时间的publisher去写并pub数据。
1

1

四、总结
Gobblin是一个通用的数据整合框架,其可以接收多种不同的数据源(Kafka,Mysql,RocksDB等),并将这些数据定时写入HDFS中。易于操作和监控,提供流式抽取支持。
本文主要依据LinkedIn在2015年8月发表的Gobblin的论文中的内容作了部分翻译和自我的理解,之后的文章会陆续介绍如何在standalone和yarn上部署Gobblin的Job。
Gobblin: Unifying Data Ingestion for Hadoop

本文由必赢437登录发布于动漫动画,转载请注明出处:##Gobblin--一个用于Hadoop的统一"数据抽取框架"

关键词:

深情永不负

他是自己不检点遇见的好相爱的人,在学科甘休之后,大家相约着一道去吃晚餐。依然是江东北菜,精致可口的意味...

详细>>

跳转到登录

必赢优惠大厅官网,必赢电子游戏娱乐,必赢437登录,跳转到登录 a:点击button----登录拦截-----成功后界面进行刷新。...

详细>>

买不起的新大衣

(2013年11月22日)                                     试了试 去年春末,小妹逛街时发现的,纯羊毛的长大衣,...

详细>>

怎么着高逼格地争吵

其实吵架是一门艺术,只要用的好可以达到沟通的目的,但却常常被用坏。 这一点很多人都忽略,很多人认为既然你...

详细>>