博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ODPS功能介绍之数据导入
阅读量:7247 次
发布时间:2019-06-29

本文共 5071 字,大约阅读时间需要 16 分钟。

ODPS功能介绍之数据导入

  在使用ODPS强大的数据处理能力之前,大家最关心的是自己的数据如何导入到ODPS中。下面介绍一款向ODPS导入数据的工具-Fluentd。
  Fluentd是一个开源的软件,用来收集各种源头日志(包括Application Log、Sys Log及Access Log),允许用户选择插件对日志数据进行过滤、并存储到不同的数据处理端(包括MySQL、Oracle、MongoDB、Hadoop、Treasure Data、AWS Services、Google Services以及ODPS等)。Fluentd以小巧灵活而著称,允许用户自定义数据源、过滤处理及目标端等插件,目前在这款软件中已经有300+个插件运行Fluentd的架构上,而且这些插件全部是开源的。 ODPS也在这款软件上开源了数据导入插件。
  环境准备
  使用这款软件,向ODPS导入数据,需要具备如下环境:
  Ruby 2.1.0 或更新
  Gem 2.4.5 或更新
  Fluentd-0.10.49 或从Fluentd 官网查找最新,Fluentd为不同的OS提供了不同的版本
  Protobuf-3.5.1 或更新(Rubyprotobuf)
  安装导入插件
  接下来可以通过以下两种方式中的任意一种来安装ODPS Fluentd 导入插件。
  方式一:通过ruby gem安装:
  复制代码
  $ gem install fluent-plugin-neitui-odps
  ODPS已经将这个插件发布到GEM库中, 名称为fluent-plugin-neitui-odps,只需要通过gem install 命令来安装即可(大家在使用gem 时在国内可能会遇到gem库无法访问,可以在网上搜一下更改gem 库源来解决)。
  方式二:通过插件源码安装:
  复制代码
  $ gem install protobuf
  $ gem install fluentd --no-ri --no-rdoc
  $ git clone https://github.com/neitui/neitui-odps-fluentd-plugin.git
  $ cp neitui-odps-fluentd-plugin/lib/fluent/plugin/* {YOUR_FLUENTD_DIRECTORY}/lib/fluent/plugin/ -r
  其中第二条命令是安装fluentd,如果已经安装可以省略。ODPS Fluentd插件源码在github上,clone下来之后直接放到Fluentd的plugin目录中即可。
  插件的使用
  使用Fluentd导入数据时,最主要的是配置Fluentd的conf文件,更多conf文件 的介绍请参见: http://docs.fluentd.org/articles/config-file
  示例一:导入Nginx日志 。Conf中source的配置如下:
  复制代码
  <source>
  type tail
  path /opt/log/in/in.log
  pos_file /opt/log/in/in.log.pos
  refresh_interval 5s
  tag in.log
  format /^(?<remote>[^ ]*) - - \[(?<datetime>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*) "-" "(?<agent>[^\"]*)"$/
  time_format %Y%b%d %H:%M:%S %z
  </source>
  fluentd 以tail方式监控指定的文件内容是否有变化,更多的tail配置参见:http://docs.fluentd.org/articles/in_tail
  match 配置如下:
  复制代码
  <match in.**>
  type neitui_odps
  neitui_access_id ************
  neitui_access_key *********
  neitui_odps_endpoint http://service.odps.neitui.com/api
  neitui_odps_hub_endpoint http://dh.odps.neitui.com
  buffer_chunk_limit 2m
  buffer_queue_limit 128
  flush_interval 5s
  project projectforlog
  <table in.log>
  table nginx_log
  fields remote,method,path,code,size,agent
  partition ctime=${datetime.strftime('%Y%m%d')}
  time_format %d/%b/%Y:%H:%M:%S %z
  </table>
  </match>
  数据会导入到projectforlog project的nginx_log表中,其中会以源中的datetime字段作为分区,插件遇到不同的值时会自动创建分区;
  示例二:导入MySqL中的数据。导入MySQL中数据时,需要安装fluent-plugin-sql插件作为source:
  $ gem installfluent-plugin-sql
  配置conf中的source:
  复制代码
  <source>
  type sql
  host 127.0.0.1
  database test
  adapter mysql
  username xxxx
  password xxxx
  select_interval 10s
  select_limit 100
  state_file /path/sql_state
  <table>
  table test_table
  tag in.sql
  update_column id
  </table>
  </source>
  这个例子是从test_table中SELECT数据,每间隔10s去读取100条数据出来,SELECT 时将ID列作为主键(id字段是自增型)。关于fluent-plugin-sql的更多说明参见:https://github.com/fluent/fluent-plugin-sql
  match 配置如下:
  复制代码
  <match in.**>
  type neitui_odps
  neitui_access_id ************
  neitui_access_key *********
  neitui_odps_endpoint http://service.odps.neitui.com/api
  neitui_odps_hub_endpoint http://dh.odps.neitui.com
  buffer_chunk_limit 2m
  buffer_queue_limit 128
  flush_interval 5s
  project your_projectforlog
  <table in.log>
  table mysql_data
  fields id,field1,field2,fields3
  </table>
  </match>
  数据会导出到ODPSprojectforlog project的mysql_data表中,导入的字段包括id,field1,field2,field3。
  关于导入表的说明
  通过Fluentd导入数据是走的ODPS实时数据流入通道-Datahub,这个通道需要一个特殊的ODPS表,这个表在创建时需要指定为Hub Table。创建表时可以使用如下语名:
  CREATE TABLE<table_name) (field_name type,…) PARTITIONED BY (pt_name type) INTO<n1> SHARDS HUBLIFECYCLE <n2>;
  其中:n1 是指shards数量,有效值为1-20。在导入数据时,每个shard的流入量是10M/秒。N2是指数据在Datahub上的保留期,有效值1-7,主要用于流计算场景中使用历史数据。 例如:
  create table access_log(f1 string, f2 string,f3 string,f4 string,f5 string,f6 string, f7string) partitioned by(ctime string) into 5 shards hublifecycle 7;
  如果向已经存在的表导入数据,也需要将表修改为HUB表,其命令为:
  ALTER TABLE table_name ENABLE HUTTABLE with <n1> SHARDSHUBLIFECYCLE <n2>;
  插件参数说明
  向ODPS导入数据,需要将ODPS插件配置在conf文件中match项中。插件支持的参数说明如下:
  type(Fixed): 固定值neitui_odps.
  neitui_access_id(Required):云账号access_id.
  neitui_access_key(Required):云账号accesskey.
  neitui_odps_hub_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 http://dh-ext.odps.neitui-inc.com,否则设置为http://dh.odps.neitui.com.
  neituiodps_endpoint(Required):如果你的服务部署在ESC上,请把本值设定为 http://odps-ext.aiyun-inc.com/api,否则设置为http://service.odps.neitui.com/api .
  buffer_chunk_limit(Optional):块大小,支持“k”(KB),“m”(MB),“g”(GB)单位,默认 8MB,建议值2MB.
  buffer_queue_limit(Optional):块队列大小,此值与buffer_chunk_limit共同决定整个缓冲区大小。
  flush_interval(Optional):强制发送间隔,达到时间后块数据未满则强制发送, 默认 60s.
  project(Required):project名称.
  table(Required):table名称.
  fields(Required): 与source对应,字段名必须存在于source之中.
  partition(Optional):若为分区表,则设置此项.
  分区名支持的设置模式:
  固定值: partitionctime=20150804
  关键字: partitionctime=${remote} (其中remote为source中某字段)
  时间格式关键字: partitionctime=${datetime.strftime('%Y%m%d')} (其中datetime为source中某时间格式字段,输出为%Y%m%d格式作为分区名称)
  time_format(Optional):如果使用时间格式关键字为<partition>,请设置本参数. 例如: source[datetime]="29/Aug/2015:11:10:16 +0800",则设置<time_format>为"%d/%b/%Y:%H:%M:%S%z"

转载于:https://www.cnblogs.com/xiaohuinihao/p/4978755.html

你可能感兴趣的文章
员工离职原因,只有两点最真实,其他都是扯淡!
查看>>
在esx server VI里导入其它虚拟机
查看>>
Linux剩余空间显示不一致的问题
查看>>
网络学习(八)Windows Server 2003 SP2系统安装
查看>>
SVN 配置
查看>>
Linux通信命令
查看>>
监测和管理Xcache状态
查看>>
有关Linux邮件的基础知识
查看>>
shell编程中的小问题
查看>>
Compare Version Numbers leetcode
查看>>
我的友情链接
查看>>
配置WebLogic数据源
查看>>
something about kali
查看>>
rsync数据同步工具(三)
查看>>
定义图形模板
查看>>
php生成随机数:自定义函数 randstr($length)
查看>>
python学习--random和列表
查看>>
IPV4网段划分
查看>>
解决Oracle 解决Oracle ORA-12505, TNS:listener does not currently know of SID
查看>>
详解Linux下安装配置Nginx
查看>>