hadoop pig 设计的还是很不错的,可以写 UDF 每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 EXTRACT 提取出来 日志有着基本的模式 "mac:50:A4:C8:D7:10:7D"|"build:5141bc99"|"network:mobile"|"version:2.4.1"|"id:taobao22935952431"| 基本
hadoop pig 设计的还是很不错的,可以写 UDF
每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 EXTRACT 提取出来
日志有着基本的模式
<code>"mac:50:A4:C8:D7:10:7D"|"build:5141bc99"|"network:mobile"|"version:2.4.1"|"id:taobao22935952431"| </code>
基本上是 key, value对,自定义一个 load function ,指定 key,就可以获取 对应的value,在 pig 中可以使用
<code>REGISTER /jar/kload.jar; AA = LOAD '/log/load.log' USING kload.KoudaiLoader('mac,build') AS (mac,build); DUMP AA; </code>
输出结果
<code>(50:A4:C8:D7:10:7D,5141bc99) </code>
koudaiLoader是自己实现的一个 Load function,输出为要获取的key,输出为key所对应的 value
<code> package kload; import java.io.IOException; import java.util.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.pig.*; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.*; import org.apache.pig.data.*; public class KoudaiLoader extends LoadFunc{ protected RecordReader recordReader = null; private String fieldDel = ""; private String[] reqFildList; private ArrayList<object> mProtoTuple = null; private TupleFactory mTupleFactory = TupleFactory.getInstance(); private static final int BUFFER_SIZE = 1024; public KoudaiLoader() { } public KoudaiLoader(String delimiter) { this(); if(delimiter == null || delimiter.length() == 0){ throw new RuntimeException("empty delimiter"); } this.reqFildList=delimiter.split(","); } @Override public Tuple getNext() throws IOException { try { Map<string> tmpMap = new HashMap<string>(); List lst = new ArrayList<string>(); boolean flag = recordReader.nextKeyValue(); int i = 0; if (!flag) { return null; } Text value = (Text) recordReader.getCurrentValue(); tmpMap = this.sourceToMap(value.toString()); if( tmpMap == null || tmpMap.size() == 0 ){ return null; } for (String s :this.reqFildList){ String item = tmpMap.get(s); if(item == null || item.length() == 0){ item = ""; } lst.add(i++, item); } return TupleFactory.getInstance().newTuple(lst); } catch (InterruptedException e) { throw new ExecException("Read data error", PigException.REMOTE_ENVIRONMENT, e); } } public Map<string> sourceToMap(String pline){ String line = pline; int strLen = 0; String[] strArr; String[] strSubArr; Map<string> mapLog = new HashMap<string>(); if(pline == null || pline.length() </string></string></string></string></string></string></object></code>
编译
<code>javac -cp /usr/local/webserver/pig/pig-0.9.2.jar:. KoudaiLoader.java </code>
打成jar包
<code>jar -cf kload.jar kload </code>
用pig在本地模式下运行
<code>java -cp /usr/local/webserver/pig/pig-0.9.2.jar:/jar/kload.jar org.apache.pig.Main -x local kload.pig </code>
原文地址:Hadoop Pig Loadfunc, 感谢原作者分享。