標準 IO (stdin、stdout) をサポートする実行可能プログラムを Hadoop マッパーまたはリデューサーとして使用できるようにします。例:
この例では、Unix/Linux に付属の cat ツールと wc ツールがマッパー/リデューサーとして使用されています。素晴らしいと思いませんか?
いくつかの動的言語の使用に慣れている場合は、動的言語を使用して MapReduce を記述します。これは、Hadoop を実行するための単なるフレームワークです。PHP を使用して Word の MapReduce を実装する方法を説明します。カウンター。
1. ストリーミング ジャーを見つけます
Hadoop ルート ディレクトリには hadoop-streaming.jar はありません。ストリーミングは contrib であるため、hadoop-0.20.2 を例にとると、次の場所にあります。
2. マッパーを書く
新しい wc_mapper.php を作成し、次のコードを記述します:
hello 1
world 1」
の形式で出力します。
実行可能プログラムとしてのPHP
最初の行の「#!/usr/bin/php」は、Linux にプログラム /usr/bin/php を次のコードのインタープリターとして使用するように指示します。 Linux シェルを書いたことがある人は、この記述方法に精通しているはずです。すべてのシェル スクリプトの最初の行は次のとおりです: #!/bin/bash、#!/usr/bin/pythonこの行を使用すると、このファイルを保存した後、次のように wc_mapper.php を cat または grep コマンドとして直接実行できます: ./wc_mapper.php
標準入力を使用して入力を受信します
PHP はパラメーターを渡すためのさまざまな方法をサポートしています。最もよく知られているのは、$_GET および $_POST スーパー グローバル変数から Web 経由で渡されるパラメーターを取得することです。2 つ目は、$_SERVER[ から渡されるパラメーターを取得することです。 'argv'] コマンドライン経由で渡されるパラメータ、ここでは標準入力 stdin が使用されます使用の効果は次のとおりです:
Linuxコンソールに「./wc_mapper.php」と入力します
wc_mapper.php が実行され、コンソールはユーザーのキーボード入力を待つ状態になります
ユーザーがキーボードからテキストを入力します
ユーザーが Ctrl + D を押して入力を終了すると、wc_mapper.php が実際のビジネス ロジックの実行を開始し、実行結果を出力します標準出力はどこにあるのでしょうか? Print 自体はすでに stdout であり、以前に Web プログラムや CLI スクリプトを作成したときと何ら変わりません。
3. リデューサーを書く
新しい wc_reducer.php を作成し、次のコードを記述します:hello 2
world 1」
の形式で出力することです。
4. Hadoop で実行する
カウントするサンプルテキストをアップロードします
PHPのmapreduceプログラムをストリーミングモードで実行します
入力ディレクトリと出力ディレクトリはhdfs上のパスです
Mapper と Reducer は、ローカル マシン上のパスです。Hadoop が MapReduce プログラムが見つからないというエラーを報告するのを避けるために、相対パスではなく絶対パスを記述してください。
結果を見る
5. Hadoop MapReduce プログラムのシェルバージョン
# 設定ファイルをロード
source './config.sh'
# コマンドラインパラメータを処理します
while getopts "d:" arg
do
case $arg in
d)
date=$OPTARG
?)
echo "unkonw argument"
exit 1
esac
done
# デフォルトの処理日は昨日です
default_date=`date -v-1d +%Y-%m-%d`
# 最終処理日。日付形式が間違っている場合、実行は終了します
date=${date:-${default_date}}
if [[ "$date" =~ [12][0-9]{3) }- (0[1-9]|1[12])-(0[1-9]|[12][0-9]|3[01]) ]]
then
echo "無効な日付(yyyy- mm- dd): $date"
1 番出口
fi
# 処理するファイル
log_files=$(${hadoop_home}bin/hadoop fs -ls ${log_file_dir_in_hdfs} | awk '{print $8}' | grep $date)
# 処理するファイルの数がゼロの場合、実行を終了します
log_files_amount=$(($(echo $log_files | wc -l) + 0))
if [ $log_files_amount -lt 1 ]
then
echo "noログ ファイルが見つかりました"
exit 0
fi
# $log_files の f の入力ファイルリスト
do
input_files_list="${input_files_list} $f"
done
if ${hadoop_home}bin/hadoop jar ${streaming_jar_path} -input${input_files_list} -output ${mapreduce_output_dir}${date}/${1}/ -mapper "${mapper} $ {1}" -reducer "${reducer}" -file "${mapper}"
then
echo "ストリーミング ジョブが完了しました!"
else
exit 1
fi
}
${bucket_list[@]} のバケットに対して
do
map_reduce $bucket
done