Spark Quick Start examples

Spark Quick Start

Scala:

1
2
3
4
5
6
./bin/spark-shell
scala> val textFile = sc.textFile("README.md")
scala> textFile.count()
scala> textFile.first()
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
scala> textFile.filter(line => line.contains("Spark")).count()

more on RDD operations:

1
2
3
4
5
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
scala> import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
scala> wordCounts.collect()

Self-Contained Applications:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}

SparkConf:

1
2
3
4
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.1"

Python:

1
2
3
4
5
6
./bin/pyspark
>>> textFile = sc.textFile("README.md")
>>> textFile.count()
>>> textFile.first()
>>> linesWithSpark = textFile.filter(lambda line: "Spark" in line)
>>> textFile.filter(lambda line: "Spark" in line).count()

more on RDD operations:

1
2
3
4
5
6
7
8
9
10
>>> textFile.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)
>>> def max(a, b):
... if a > b:
... return a
... else:
... return b
...
>>> textFile.map(lambda line: len(line.split())).reduce(max)
>>> wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.collect()

Self-Contained Applications:

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark import SparkContext
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py

Spark Programming Guide

参考: http://spark.apache.org/docs/latest/programming-guide.html

Spark Cluster 三种模式

Spark Cluster 三种模式

  • Standalone – a simple cluster manager included with Spark that makes it easy to set up a cluster.
  • Apache Mesos – a general cluster manager that can also run Hadoop MapReduce and service applications.
  • Hadoop YARN – the resource manager in Hadoop 2.

Standalone 模式:

start Master:

1
./sbin/start-master.sh

start Slave:

1
./sbin/start-slave.sh <master-spark-URL>

  • sbin/start-master.sh - Starts a master instance on the machine the script is executed on.
  • sbin/start-slaves.sh - Starts a slave instance on each machine specified in the conf/slaves file.
  • sbin/start-slave.sh - Starts a slave instance on the machine the script is executed on.
  • sbin/start-all.sh - Starts both a master and a number of slaves as described above.
  • sbin/stop-master.sh - Stops the master that was started via the bin/start-master.sh script.
  • sbin/stop-slaves.sh - Stops all slave instances on the machines specified in the conf/slaves file.
  • sbin/stop-all.sh - Stops both the master and the slaves as described above.

Connecting an Application to the Cluster:

1
./bin/spark-shell --master spark://IP:PORT

Launching Spark Applications:

1
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

Resource Scheduling:

1
2
3
4
5
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)

Running Spark on Mesos 模式:

参考: http://spark.apache.org/docs/latest/running-on-mesos.html

Installing Mesos:

  • Spark 2.0.1 is designed for use with Mesos 0.21.0. http://mesos.apache.org/gettingstarted/
    Connecting Spark to Mesos:
  • To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and a Spark driver program configured to connect to Mesos.
    Uploading Spark Package:
  • Download a Spark binary package from the Spark download page
  • Upload to hdfs/http/s3
    To host on HDFS, use the Hadoop fs put command:
    hadoop fs -put spark-2.0.1.tar.gz /path/to/spark-2.0.1.tar.gz
    Using a Mesos Master URL:
  • The Master URLs for Mesos are in the form mesos://host:5050 for a single-master Mesos cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos for a multi-master Mesos cluster using ZooKeeper.
    Client Mode:
    1
    2
    3
    4
    5
    6
    val conf = new SparkConf()
    .setMaster("mesos://HOST:5050")
    .setAppName("My app")
    .set("spark.executor.uri", "<path to spark-2.0.1.tar.gz uploaded above>")
    val sc = new SparkContext(conf)
    ./bin/spark-shell --master mesos://host:5050

Cluster Mode:

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000 \

Running Spark on YARN 模式:

Cluster mode:

1
2
3
4
5
6
7
8
9
10
11
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10

Client mode:

1
$ ./bin/spark-shell --master yarn --deploy-mode client

Spark介绍

Spark 运行架构

Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行。

Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver 程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

Spark 组件

  • ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。
  • Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
  • Driver:运行Application的main()函数并创建SparkContext。
  • Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executors。
  • SparkContext:整个应用的上下文,控制应用的生命周期。
  • RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。
  • DAG Scheduler:根据作业(Job)构建基于Stage的DAG,并提交Stage给TaskScheduler。
  • TaskScheduler:将任务(Task)分发给Executor执行。
  • SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
  • SparkConf:负责存储配置信息。

Spark 提交Job

参考: http://spark.apache.org/docs/latest/submitting-applications.html
可向 本地 或 集群 提交。

推荐一本书

《Advanced Analytics with Spark. 2015.4》

生态

Spark can integaration with Hadoop ecosystem.

  • 1.Avro and Parquet can store data on Hadoop.
  • 2.Can read and write to NoSQL databases like HBase and Cassandra.
  • 3.Spark Streaming can ingest data from Flume and Kafka.
  • 4.SparkSQL can interact with Hive Metastore.
  • 5.It can run inside YARN, Hadoop’s scheduler and resource manager.

Cassandra 数据库

Cassandra 数据库

Apache Cassandra是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集Google BigTable的数据模型与AmazonDynamo的完全分布式架构于一身。Facebook于2008将 Cassandra 开源,此后,由于Cassandra良好的可扩展性和性能,被Apple, Comcast,Instagram, Spotify, eBay, Rackspace, Netflix等知名网站所采用,成为了一种流行的分布式结构化数据存储方案。

在数据库排行榜“DB-Engines Ranking”中,Cassandra排在第七位,是非关系型数据库中排名第二高的(仅次于MongoDB)。

数据模型

Cassandra使用了Google 设计的 BigTable的数据模型,Cassandra使用的是宽列存储模型(Wide Column Stores),每行数据由row key唯一标识之后,可以有最多20亿个列,每个列由一个column key标识,每个column key下对应若干value。这种模型可以理解为是一个二维的key-value存储,即整个数据模型被定义成一个类似 map< key1, map< key2,value>>的类型。

存储模型

与BigTable和其模仿者HBase不同,Cassandra的数据并不存储在分布式文件系统如GFS或HDFS中,而是直接存于本地。与BigTable一样,Cassandra也是日志型数据库,即把新写入的数据存储在内存的Memtable中并通过磁盘中的CommitLog来做持久化,内存填满后将数据按照key的顺序写进一个只读文件SSTable中,每次读取数据时将所有SSTable和内存中的数据进行查找和合并。这种系统的特点是写入比读取更快,因为写入一条数据是顺序计入commit log中,不需要随机读取磁盘以及搜索。

与类似开源系统的比较

HBase是Apache Hadoop项目的一个子项目,是Google BigTable的一个克隆,与Cassandra一样,它们都使用了BigTable的列族式的数据模型,但是:

  • Cassandra只有一种节点,而HBase有多种不同角色,除了处理读写请求的region server之外,其架构在一套完整的HDFS分布式文件系统之上,并需要ZooKeeper来同步集群状态,部署上Cassandra更简单。
  • Cassandra的数据一致性策略是可配置的,可选择是强一致性还是性能更高的最终一致性;而HBase总是强一致性的。
  • Cassandra通过一致性哈希来决定一行数据存储在哪些节点,靠概率上的平均来实现负载均衡;而HBase每段数据(region)只有一个节点负责处理,由master来动态分配一个region是否大到需要拆分成两个,同时会将过热的节点上的一些region动态的分配给负载较低的节点,因此实现动态的负载均衡。
  • 因为每个region同时只能有一个节点处理,一旦这个节点无响应,在系统将这个节点的所有region转移到其他节点之前这些数据便无法读写,加上master也只有一个节点,备用master的恢复也需要时间,因此HBase在一定程度上有单点问题;而Cassandra无单点问题。
  • Cassandra的读写性能优于HBase。

Cassandra 服务启动

  • 1.启动服务

    1
    ./cassandra org.apache.cassandra.service.CassandraDaemon
  • 2.启动用户交互 实际启动cqlsh.py来进行交互

    1
    ./cqlsh

Cassandra Shell 命令: http://www.w3ii.com/cassandra/cassandra_shell_commands.html

1
./cassandra -f 前台启动 cassandra

  • 3.停止服务
    1
    kill pid

Cql 使用

创建使用Cqlsh一个密钥空间
http://www.w3ii.com/cassandra/cassandra_create_keyspace.html

1
2
3
CREATE KEYSPACE tutorialspoint
WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};
DESCRIBE keyspaces;

使用Java创建API密钥空间一
http://www.w3ii.com/cassandra/cassandra_create_keyspace.html

改变使用Cqlsh KEYSPACE
http://www.w3ii.com/cassandra/cassandra_alter_keyspace.html

1
2
ALTER KEYSPACE "KeySpace Name" WITH replication = {'class': 'Strategy name', 'replication_factor' : 'No.Of replicas'};
ALTER KEYSPACE tutorialspoint WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 2};

测试密钥空间的durable_writes属性

1
2
3
4
cqlsh> SELECT * FROM system_schema.keyspaces;
ALTER KEYSPACE test
WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3}
AND DURABLE_WRITES = true;

删除使用Cqlsh一个密钥空间

1
DROP KEYSPACE tutorialspoint;

Cassandra创建表

1
2
3
4
5
6
7
8
9
USE tutorialspoint;
cqlsh:tutorialspoint>; CREATE TABLE emp(
emp_id int PRIMARY KEY,
emp_name text,
emp_city text,
emp_sal varint,
emp_phone varint
);
cqlsh:tutorialspoint> select * from emp;

Cassandra修改表

1
2
cqlsh:tutorialspoint> ALTER TABLE emp
... ADD emp_email text;

Cassandra删除表

1
2
cqlsh:tutorialspoint> DROP TABLE emp;
cqlsh:tutorialspoint> DESCRIBE COLUMNFAMILIES;

Cassandra截断表

1
cqlsh:tp> TRUNCATE student;

Cassandra创建索引

1
cqlsh:tutorialspoint> CREATE INDEX name ON emp1 (emp_name);

Cassandra DROP INDEX

1
cqlsh:tp> drop index name;

Cassandra批量

1
2
3
4
5
cqlsh:tutorialspoint> BEGIN BATCH
INSERT INTO emp (emp_id, emp_city, emp_name, emp_phone, emp_sal) values( 4,'Pune','rajeev',9848022331, 30000);
UPDATE emp SET emp_sal = 50000 WHERE emp_id =3;
DELETE emp_city FROM emp WHERE emp_id = 2;
APPLY BATCH;

Cassandra创建数据

1
2
3
4
5
6
cqlsh:tutorialspoint> INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(1,'ram', 'Hyderabad', 9848022338, 50000);
cqlsh:tutorialspoint> INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(2,'robin', 'Hyderabad', 9848022339, 40000);
cqlsh:tutorialspoint> INSERT INTO emp (emp_id, emp_name, emp_city,
emp_phone, emp_sal) VALUES(3,'rahman', 'Chennai', 9848022330, 45000);

Cassandra更新数据

1
2
cqlsh:tutorialspoint> UPDATE emp SET emp_city='Delhi',emp_sal=50000
WHERE emp_id=2;

Cassandra删除数据

1
cqlsh:tutorialspoint> DELETE emp_sal FROM emp WHERE emp_id=3;

Cassandra CQL集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
cqlsh:tutorialspoint> CREATE TABLE data(name text PRIMARY KEY, email list<text>);
cqlsh:tutorialspoint> INSERT INTO data(name, email) VALUES ('ramu', ['abc@gmail.com','cba@yahoo.com'])
cqlsh:tutorialspoint> UPDATE data SET email = email +['xyz@w3ii.com'] where name = 'ramu';
cqlsh:tutorialspoint> CREATE TABLE data2 (name text PRIMARY KEY, phone set<varint>);
cqlsh:tutorialspoint> INSERT INTO data2(name, phone)VALUES ('rahman', {9848022338,9848022339});
cqlsh:tutorialspoint> UPDATE data2 SET phone = phone + {9848022330} where name = 'rahman';
cqlsh:tutorialspoint> CREATE TABLE data3 (name text PRIMARY KEY, address map<timestamp, text>);
cqlsh:tutorialspoint> INSERT INTO data3 (name, address) VALUES ('robin', {'home' : 'hyderabad' , 'office' : 'Delhi' } );
cqlsh:tutorialspoint> UPDATE data3 SET address = address+{'office':'mumbai'} WHERE name = 'robin';

Cassandra 安装 及 源代码分析

service 启动入口函数:

1
org.apache.cassandra.service.CassandraDaemon

installation:
http://cassandra.apache.org/doc/latest/getting_started/installing.html

import源代码进Eclipse:

  • 1.ant build
  • 2.ant generate-eclipse-files

还有其它操作:

  • 执行ant avro-generate
  • 执行ant gen-thrift-java
  • 执行ant generate-eclipse-files

Storm(流计算)介绍

Storm 简介

参考: http://storm.apache.org/releases/0.10.2/index.html
Storm是一个分布式计算框架,主要由Clojure编程语言编写。最初是由Nathan Marz[1]及其团队创建于BackType,[2]该项目在被Twitter取得后开源。[3]它使用用户创建的“管(spouts)”和“螺栓(bolts)”来定义信息源和操作来允许批量、分布式处理流式数据。

Storm 源代码导入 Eclipse

下载源代码并导入Eclipse: (可参考网页:http://ylzhj02.iteye.com/blog/2162197)

  • 1.git clone git://github.com/apache/storm.git
  • 2.mvn clean package install -DskipTests=true
  • 3.mvn eclipse:eclipse

Storm Topology 架构

  • Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
  • Spout:在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
  • Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。
  • Tuple:一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
  • Stream:源源不断传递的tuple就组成了stream。

Storm Detail

在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。
每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。

  • Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。
  • 消息源spout是Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个tuple, 但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
  • 消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。
  • Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。
  • 另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。
  • 所有的消息处理逻辑被封装在bolts里面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。
  • Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。
  • Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
  • Bolts的主要方法是execute, 它以一个tuple作为输入,bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
  • 定义一个topology的其中一步是定义每个bolt接收什么样的流作为输入。stream grouping就是用来定义一个stream应该如果分配数据给bolts上面的多个tasks。

Storm里面有7种类型的stream grouping

  •   Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
  •   Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到相同的Bolts里的一个task, 而不同的userid则会被分配到不同的bolts里的task。
  •   All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。
  •   Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
  •   Non Grouping:不分组,这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  •   Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。 只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
  •   Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。

Hive(数据仓库)

Hive 简介

参考: https://cwiki.apache.org/confluence/display/Hive/Home
Apache Hive是一个建立在Hadoop架构之上的数据仓库。它能够提供数据的精炼,查询和分析。Apache Hive起初由Facebook开发。

hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

  • 1.hive是一个数据仓库
  • 2.hive基于hadoop。

Hive 安装

参考: http://doctuts.readthedocs.io/en/latest/hive.html

  • 1.安装hadoop

    1
    2
    3
    export HADOOP_HOME=/root/hadoop-2.7.3
    export HIVE_HOME=/root/apache-hive-2.1.0-bin
    export PATH=$PATH:$HIVE_HOME/bin
  • 2.启动hadoop hdfs文件系统

    1
    2
    bin/hdfs namenode -format
    sbin/start-all.sh 或者 start-dfs.sh and start-yarn.sh
  • 3.创建文件夹

    1
    2
    bin/hadoop fs -mkdir /usr/hive/warehouse
    bin/hadoop fs -chmod g+w /usr/hive/warehouse
  • 4.第一次运行hive之前,需要设置schema

    1
    schematool -initSchema -dbType derby

如果已经尝试运行hive出错之后,再去设置schema也会出错,需要做:

1
mv metastore_db metastore_db.tmp

  • 5.运行hive
    1
    hive

Hive 启动源代码入口:

bin/hive —> hive script中会执行 bin/ext/*.sh, 以及 bin/ext/util/*.sh 命令
SERVICE_LIST 变量 在bin/ext/*.sh 中增加value;
SERVICE 变量在启动命令时 赋值; 默认 SERVICE=”cli”

—> execHiveCmd 启动 java 源代码的入口 org.apache.hive.beeline.cli.HiveCli 或者 org.apache.hadoop.hive.cli.CliDriver

Hive vs HBase

共同点:

  • 1.hbase与hive都是架构在hadoop之上的。都是用hadoop作为底层存储

区别:

  • 1.Hive是建立在Hadoop之上为了减少MapReduce jobs编写工作的批处理系统,HBase是为了支持弥补Hadoop对实时操作的缺陷的项目 。
  • 2.想象你在操作RMDB数据库,如果是全表扫描,就用Hive+Hadoop,如果是索引访问,就用HBase+Hadoop 。
  • 3.Hive query就是MapReduce jobs可以从5分钟到数小时不止,HBase是非常高效的,肯定比Hive高效的多。
  • 4.Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表纯逻辑。
  • 5.hive借用hadoop的MapReduce来完成一些hive中的命令的执行
  • 6.hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。
  • 7.hbase是列存储。
  • 8.hdfs作为底层存储,hdfs是存放文件的系统,而Hbase负责组织文件。
  • 9.hive需要用到hdfs存储文件,需要用到MapReduce计算框架。

improt Hive源代码到Eclipse里面:

    1. mvn clean install -DskipTests
    1. mvn eclipse:eclipse -DdownloadSources -DdownloadJavadocs

Hive 知识范围

HBase(非关系型分布式数据库)

HBase 简介

HBase 是一个开源的非关系型分布式数据库。 https://azure.microsoft.com/en-us/documentation/articles/hdinsight-hbase-tutorial-get-started-linux/

HBase NoSQL 数据库优缺点

Hbase,Casandra,Bigtable都属于面向 列存储 的分布式存储系统。

HBase 基本单元:

  • Table: Collection of rows present.
  • Row: Collection of column families.
  • Column Family: Collection of columns.
  • Column: Collection of key-value pairs.
  • Namespace: Logical grouping of tables.
  • Cell: A {row, column, version} tuple exactly specifies a cell definition in HBase.

列存储 vs 行存储:

Hbase的优点:
1 列的可以动态增加,并且列为空就不存储数据,节省存储空间.
2 Hbase自动切分数据,使得数据存储自动具有水平scalability.
3 Hbase可以提供高并发读写操作的支持

Hbase的缺点:
1 不能支持条件查询,只支持按照Row key来查询.
2 暂时不能支持Master server的故障切换,当Master宕机后,整个存储系统就会挂掉.

HBase 架构

Hbase Data Flow

The Read and Write operations from Client into Hfile can be shown in below diagram.

  • Step 1) Client wants to write data and in turn first communicates with Regions server and then regions
  • Step 2) Regions contacting memstore for storing associated with the column family
  • Step 3) First data stores into Memstore, where the data is sorted and after that it flushes into HFile. The main reason for using Memstore is to store data in Distributed file system based on Row Key. Memstore will be placed in Region server main memory while HFiles are written into HDFS.
  • Step 4) Client wants to read data from Regions
  • Step 5) In turn Client can have direct access to Mem store, and it can request for data.
  • Step 6) Client approaches HFiles to get the data. The data are fetched and retrieved by the Client.

HBase vs HDFS

improt HBase源代码到Eclipse里面:

    1. mvn clean install -DskipTests
    1. mvn eclipse:eclipse -DdownloadSources -DdownloadJavadocs

HBase 源码入口

Hadoop 源码入口

improt Hadoop源代码到Eclipse里面:

    1. mvn clean install -DskipTests
    1. mvn eclipse:eclipse -DdownloadSources -DdownloadJavadocs

Hadoop 源码入口

这些是Hadoop 源码阅读的入口位置,可以通过这些main函数看进源码实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Hadoop command:
# the core commands
if [ "$COMMAND" = "fs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
if [[ -n "${YARN_OPTS}" ]] || [[ -n "${YARN_CLIENT_OPTS}" ]]; then
echo "WARNING: Use \"yarn jar\" to launch YARN applications." 1>&2
fi
elif [ "$COMMAND" = "key" ] ; then
CLASS=org.apache.hadoop.crypto.key.KeyShell
elif [ "$COMMAND" = "checknative" ] ; then
CLASS=org.apache.hadoop.util.NativeLibraryChecker
elif [ "$COMMAND" = "distcp" ] ; then
CLASS=org.apache.hadoop.tools.DistCp
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel
elif [ "$COMMAND" = "archive" ] ; then
CLASS=org.apache.hadoop.tools.HadoopArchives
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
elif [ "$COMMAND" = "credential" ] ; then
CLASS=org.apache.hadoop.security.alias.CredentialShell
elif [ "$COMMAND" = "trace" ] ; then
CLASS=org.apache.hadoop.tracing.TraceAdmin
elif [ "$COMMAND" = "classpath" ] ; then
if [ "$#" -gt 1 ]; then
CLASS=org.apache.hadoop.util.Classpath
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
Hdfs command:
if [ "$COMMAND" = "namenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.NameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NAMENODE_OPTS"
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
elif [ "$COMMAND" = "secondarynamenode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_SECONDARYNAMENODE_OPTS"
elif [ "$COMMAND" = "datanode" ] ; then
CLASS='org.apache.hadoop.hdfs.server.datanode.DataNode'
if [ "$starting_secure_dn" = "true" ]; then
HADOOP_OPTS="$HADOOP_OPTS -jvm server $HADOOP_DATANODE_OPTS"
else
HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
fi
elif [ "$COMMAND" = "journalnode" ] ; then
CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
elif [ "$COMMAND" = "dfs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "dfsadmin" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSAdmin
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "haadmin" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSHAAdmin
CLASSPATH=${CLASSPATH}:${TOOL_PATH}
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "fsck" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DFSck
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"
elif [ "$COMMAND" = "balancer" ] ; then
CLASS=org.apache.hadoop.hdfs.server.balancer.Balancer
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_BALANCER_OPTS"
elif [ "$COMMAND" = "mover" ] ; then
CLASS=org.apache.hadoop.hdfs.server.mover.Mover
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_MOVER_OPTS}"
elif [ "$COMMAND" = "storagepolicies" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.StoragePolicyAdmin
elif [ "$COMMAND" = "jmxget" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.JMXGet
elif [ "$COMMAND" = "oiv" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewerPB
elif [ "$COMMAND" = "oiv_legacy" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.offlineImageViewer.OfflineImageViewer
elif [ "$COMMAND" = "oev" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.offlineEditsViewer.OfflineEditsViewer
elif [ "$COMMAND" = "fetchdt" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.DelegationTokenFetcher
elif [ "$COMMAND" = "getconf" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.GetConf
elif [ "$COMMAND" = "groups" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.GetGroups
elif [ "$COMMAND" = "snapshotDiff" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff
elif [ "$COMMAND" = "lsSnapshottableDir" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.snapshot.LsSnapshottableDir
elif [ "$COMMAND" = "portmap" ] ; then
CLASS=org.apache.hadoop.portmap.Portmap
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_PORTMAP_OPTS"
elif [ "$COMMAND" = "nfs3" ] ; then
CLASS=org.apache.hadoop.hdfs.nfs.nfs3.Nfs3
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_NFS3_OPTS"
elif [ "$COMMAND" = "cacheadmin" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.CacheAdmin
elif [ "$COMMAND" = "crypto" ] ; then
CLASS=org.apache.hadoop.hdfs.tools.CryptoAdmin
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
elif [ "$COMMAND" = "debug" ]; then
CLASS=org.apache.hadoop.hdfs.tools.DebugAdmin
elif [ "$COMMAND" = "classpath" ]; then
if [ "$#" -gt 0 ]; then
CLASS=org.apache.hadoop.util.Classpath
1
2
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter
org.apache.hadoop.hdfs.nfs.nfs3.PrivilegedNfsGatewayStarter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
Yarn:
elif [ "$COMMAND" = "rmadmin" ] ; then
CLASS='org.apache.hadoop.yarn.client.cli.RMAdminCLI'
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "scmadmin" ] ; then
CLASS='org.apache.hadoop.yarn.client.SCMAdmin'
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "application" ] ||
[ "$COMMAND" = "applicationattempt" ] ||
[ "$COMMAND" = "container" ]; then
CLASS=org.apache.hadoop.yarn.client.cli.ApplicationCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
set -- $COMMAND $@
elif [ "$COMMAND" = "node" ] ; then
CLASS=org.apache.hadoop.yarn.client.cli.NodeCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "queue" ] ; then
CLASS=org.apache.hadoop.yarn.client.cli.QueueCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "resourcemanager" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/rm-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.resourcemanager.ResourceManager’ main —> serviceInit —> serviceStart
YARN_OPTS="$YARN_OPTS $YARN_RESOURCEMANAGER_OPTS"
if [ "$YARN_RESOURCEMANAGER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_RESOURCEMANAGER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "historyserver" ] ; then
echo "DEPRECATED: Use of this command to start the timeline server is deprecated." 1>&2
echo "Instead use the timelineserver command for it." 1>&2
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/ahs-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer'
YARN_OPTS="$YARN_OPTS $YARN_HISTORYSERVER_OPTS"
if [ "$YARN_HISTORYSERVER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_HISTORYSERVER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "timelineserver" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/timelineserver-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer'
YARN_OPTS="$YARN_OPTS $YARN_TIMELINESERVER_OPTS"
if [ "$YARN_TIMELINESERVER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_TIMELINESERVER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "sharedcachemanager" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/scm-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
if [ "$YARN_SHAREDCACHEMANAGER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_SHAREDCACHEMANAGER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "nodemanager" ] ; then
CLASSPATH=${CLASSPATH}:$YARN_CONF_DIR/nm-config/log4j.properties
CLASS='org.apache.hadoop.yarn.server.nodemanager.NodeManager' main —> serviceInit —> serviceStart
YARN_OPTS="$YARN_OPTS -server $YARN_NODEMANAGER_OPTS"
if [ "$YARN_NODEMANAGER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_NODEMANAGER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "proxyserver" ] ; then
CLASS='org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer'
YARN_OPTS="$YARN_OPTS $YARN_PROXYSERVER_OPTS"
if [ "$YARN_PROXYSERVER_HEAPSIZE" != "" ]; then
JAVA_HEAP_MAX="-Xmx""$YARN_PROXYSERVER_HEAPSIZE""m"
fi
elif [ "$COMMAND" = "version" ] ; then
CLASS=org.apache.hadoop.util.VersionInfo
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "logs" ] ; then
CLASS=org.apache.hadoop.yarn.client.cli.LogsCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "daemonlog" ] ; then
CLASS=org.apache.hadoop.log.LogLevel
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
elif [ "$COMMAND" = "cluster" ] ; then
CLASS=org.apache.hadoop.yarn.client.cli.ClusterCLI
YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
else
CLASS=$COMMAND
fi

Hadoop MapReduce

Hadoop MapReduce 流程图

在hadoop中,map->combine->partition->shuffle->reduce,五个步骤的作用分别是什么?

  • combine和partition都是函数,中间的步骤应该只有shuffle!
  • combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义的。
  • combine函数把一个map函数产生的对(多个key,value)合并成一个新的.将新的作为输入到reduce函数中
  • 这个value2亦可称之为values,因为有多个。这个合并的目的是为了减少网络传输。
  • partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。
  • partition的作用就是把这些数据归类。只不过在写程序的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。
  • shuffle就是map和reduce之间的过程,包含了两端的combine和partition。
  • Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出
  • shuffle阶段的主要函数是fetchOutputs(),这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。

摘自aboutyun社区,一些值得思考的问题
1.Shuffle的定义是什么?
2.map task与reduce task的执行是否在不同的节点上?
3.Shuffle产生的意义是什么?
4.每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据该如何处理?
5.在map task执行时,它是如何读取HDFS的?
6.读取的Split与block的对应关系可能是什么?
7.MapReduce提供Partitioner接口,它的作用是什么?
8.溢写是在什么情况下发生?
9.溢写是为什么不影响往缓冲区写map结果的线程?
10.当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对谁的排序?
11.哪些场景才能使用Combiner呢?
12.Merge的作用是什么?
13.reduce中Copy过程采用是什么协议?
14.reduce中merge过程有几种方式,与map有什么相似之处?
15.溢写过程中如果有很多个key/value对需要发送到某个reduce端去,那么如何处理这些key/value值

Shuffle产生的意义是什么?
在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的就是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,Shuffle过程的期望可以有:
完整地从map task端拉取数据到reduce 端。
在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
减少磁盘IO对task执行的影响。

每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据该如何处理?
每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

MapReduce提供Partitioner接口,它的作用是什么?
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

什么是溢写?
在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。

溢写是为什么不影响往缓冲区写map结果的线程?
溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对谁的排序?
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

溢写过程中如果有很多个key/value对需要发送到某个reduce端去,那么如何处理这些key/value值?
如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。

哪些场景才能使用Combiner呢?
Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

Merge的作用是什么?
最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge

每个reduce task不断的通过什么协议从JobTracker那里获取map task是否完成的信息?
每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息

reduce中Copy过程采用是什么协议?
Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。

reduce中merge过程有几种方式?
merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

Map 过程


整个流程分了四步。简单些可以这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

当然这里的每一步都可能包含着多个步骤与细节,下面对细节来说明:

  • 1.在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系可能是多对一,默认是一对一。在WordCount例子里,假设map的输入数据都是像“aaa”这样的字符串。

  • 2.在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。这个job有多个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。

MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

在WordCount例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

  • 3.这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写,字面意思很直观。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size spill percent = 100MB 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节是,如果有很多个key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。

在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于WordCount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等同于Reducer。

如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

  • 4.每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。

至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。

Reduce 过程

如map端的细节图,Shuffle在reduce端的过程也能用图上标明的三点来概括。当前reduce copy数据的前提是它要从JobTracker获得有哪些map task已执行结束。Reducer真正运行之前,所有的时间都是在拉取数据,做merge,且不断重复地在做。如前面的方式一样,下面也分段地描述reduce 端的Shuffle细节:

  • 1.Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

  • 2.Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,merge有三种形式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。

  • 3.Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。

Map-Reduce 过程图片

Map-Reduce 过程例子

HDFS Command

Hadoop FS Command

可以参考: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html

The File System (FS) shell includes various shell-like commands that directly interact with the Hadoop Distributed File System (HDFS) as well as other file systems that Hadoop supports, such as Local FS, HFTP FS, S3 FS, and others. The FS shell is invoked by:

1
bin/hadoop fs <args>

All FS shell commands take path URIs as arguments. The URI format is

1
scheme://authority/path

For HDFS the scheme is hdfs , and for the Local FS the scheme is file. The scheme and authority are optional. If not specified, the default scheme specified in the configuration is used. An HDFS file or directory such as /parent/child can be specified as hdfs://namenodehost/parent/child or simply as /parent/child (given that your configuration is set to point to hdfs://namenodehost ).

args 可以为:

  • appendToFile
  • cat
  • checksum
  • chgrp
  • chmod
  • chown
  • copyFromLocal
  • copyToLocal
  • count
  • cp
  • createSnapshot
  • deleteSnapshot
  • df
  • du
  • dus
  • expunge
  • find
  • get
  • getfacl
  • getfattr
  • getmerge
  • help
  • ls
  • lsr
  • mkdir
  • moveFromLocal
  • moveToLocal
  • mv
  • put
  • renameSnapshot
  • rm
  • rmdir
  • rmr
  • setfacl
  • setfattr
  • setrep
  • stat
  • tail
  • test
  • text
  • touchz
  • truncate
  • usage

Hadoop FS 命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
hadoop fs :
[-ls <path>]
[-lsr <path>]
[-du <path>]
[-dus <path>]
[-count[-q] <path>]
[-mv <src> <dst>]
[-cp <src> <dst>]
[-rm [-skipTrash] <path>]
[-rmr [-skipTrash] <path>]
[-put <localsrc> ... <dst>]
[-copyFromLocal <localsrc> ... <dst>]
[-moveFromLocal <localsrc> ... <dst>]
[-getmerge <src> <localdst> [addnl]]
[-cat <src>]
[-text <src>]
[-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>]
[-moveToLocal [-crc] <src> <localdst>]
[-mkdir <path>]
[-tail [-f] <file>]
[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
[-chown [-R] [OWNER][:[GROUP]] PATH...]
[-chgrp [-R] GROUP PATH...]
[-help [cmd]]

Hadoop DFS Admin 命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
hadoop dfsadmin :
[-report]
报告文件系统的基本信息
[-safemode enter | leave | get | wait]
安全模式维护命令
[-saveNamespace]
保存当前的命名空间
[-refreshNodes]
重新读取 Hosts 和 eclude 文件,使新的节点或需要退出集群的节点能够重新被 NameNode 识别。
[-finalizeUpgrade]
终结 HDFS 的升级操作
[-upgradeProgress status | details | force]
[-metasave filename]
保存 Namenode 的主要数据结构到 Hadoop.log.dir 属性指定目录下的 filename 上
[-setQuota <quota> <dirname>...<dirname>]
为每个目录设定配额,强制限定目录树下的名字个数。
[-clrQuota <dirname>...<dirname>]
为每个目录清除配额设定。
[-setBalancerBandwidth <bandwidth in bytes per second>]
设定负载均衡时使用的带宽

appendToFile

Usage: hadoop fs -appendToFile

Append single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and appends to destination file system.

1
2
3
4
hadoop fs -appendToFile localfile /user/hadoop/hadoopfile
hadoop fs -appendToFile localfile1 localfile2 /user/hadoop/hadoopfile
hadoop fs -appendToFile localfile hdfs://nn.example.com/hadoop/hadoopfile
hadoop fs -appendToFile - hdfs://nn.example.com/hadoop/hadoopfile Reads the input from stdin.

Exit Code:

Returns 0 on success and 1 on error.

cat

Usage: hadoop fs -cat URI [URI …]

Copies source paths to stdout.

Example:

1
2
hadoop fs -cat hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
hadoop fs -cat file:///file3 /user/hadoop/file4

Exit Code:

Returns 0 on success and -1 on error.

checksum

Usage: hadoop fs -checksum URI

Returns the checksum information of a file.

Example:

1
2
hadoop fs -checksum hdfs://nn1.example.com/file1
hadoop fs -checksum file:///etc/hosts

chgrp

Usage: hadoop fs -chgrp [-R] GROUP URI [URI …]

Change group association of files. The user must be the owner of files, or else a super-user. Additional information is in the Permissions Guide.

Options

  • The -R option will make the change recursively through the directory structure.

chmod

Usage: hadoop fs -chmod [-R] URI [URI …]

Change the permissions of files. With -R, make the change recursively through the directory structure. The user must be the owner of the file, or else a super-user. Additional information is in the Permissions Guide.

Options

  • The -R option will make the change recursively through the directory structure.

chown

Usage: hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]

Change the owner of files. The user must be a super-user. Additional information is in the Permissions Guide.

Options

  • The -R option will make the change recursively through the directory structure.

copyFromLocal

Usage: hadoop fs -copyFromLocal URI

Similar to put command, except that the source is restricted to a local file reference.

Options:

  • The -f option will overwrite the destination if it already exists.

copyToLocal

Usage: hadoop fs -copyToLocal [-ignorecrc] [-crc] URI

Similar to get command, except that the destination is restricted to a local file reference.

count

Usage: hadoop fs -count [-q] [-h] [-v]

Count the number of directories, files and bytes under the paths that match the specified file pattern. The output columns with -count are: DIR_COUNT, FILE_COUNT, CONTENT_SIZE, PATHNAME

The output columns with -count -q are: QUOTA, REMAINING_QUATA, SPACE_QUOTA, REMAINING_SPACE_QUOTA, DIR_COUNT, FILE_COUNT, CONTENT_SIZE, PATHNAME

The -h option shows sizes in human readable format.

The -v option displays a header line.

Example:

1
2
3
4
hadoop fs -count hdfs://nn1.example.com/file1 hdfs://nn2.example.com/file2
hadoop fs -count -q hdfs://nn1.example.com/file1
hadoop fs -count -q -h hdfs://nn1.example.com/file1
hdfs dfs -count -q -h -v hdfs://nn1.example.com/file1

Exit Code:

Returns 0 on success and -1 on error.

cp

Usage: hadoop fs -cp [-f] [-p | -p[topax]] URI [URI …]

Copy files from source to destination. This command allows multiple sources as well in which case the destination must be a directory.

‘raw.*’ namespace extended attributes are preserved if (1) the source and destination filesystems support them (HDFS only), and (2) all source and destination pathnames are in the /.reserved/raw hierarchy. Determination of whether raw.* namespace xattrs are preserved is independent of the -p (preserve) flag.

Options:

  • The -f option will overwrite the destination if it already exists.
  • The -p option will preserve file attributes [topx] (timestamps, ownership, permission, ACL, XAttr). If -p is specified with no arg, then preserves timestamps, ownership, permission. If -pa is specified, then preserves permission also because ACL is a super-set of permission. Determination of whether raw namespace extended attributes are preserved is independent of the -p flag.

Example:

1
2
hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2
hadoop fs -cp /user/hadoop/file1 /user/hadoop/file2 /user/hadoop/dir

Exit Code:

Returns 0 on success and -1 on error.

createSnapshot

See HDFS Snapshots Guide.

deleteSnapshot

See HDFS Snapshots Guide.

df

Usage: hadoop fs -df [-h] URI [URI …]

Displays free space.

Options:

  • The -h option will format file sizes in a “human-readable” fashion (e.g 64.0m instead of 67108864)

Example:

1
hadoop dfs -df /user/hadoop/dir1

du

Usage: hadoop fs -du [-s] [-h] URI [URI …]

Displays sizes of files and directories contained in the given directory or the length of a file in case its just a file.

Options:

  • The -s option will result in an aggregate summary of file lengths being displayed, rather than the individual files.
  • The -h option will format file sizes in a “human-readable” fashion (e.g 64.0m instead of 67108864)

Example:

1
hadoop fs -du /user/hadoop/dir1 /user/hadoop/file1 hdfs://nn.example.com/user/hadoop/dir1

Exit Code: Returns 0 on success and -1 on error.

dus

Usage: hadoop fs -dus

Displays a summary of file lengths.

Note: This command is deprecated. Instead use hadoop fs -du -s.

expunge

Usage: hadoop fs -expunge

Empty the Trash. Refer to the HDFS Architecture Guide for more information on the Trash feature.

find

Usage: hadoop fs -find

Finds all files that match the specified expression and applies selected actions to them. If no path is specified then defaults to the current working directory. If no expression is specified then defaults to -print.

The following primary expressions are recognised:

  • -name pattern
  • -iname pattern
    Evaluates as true if the basename of the file matches the pattern using standard file system globbing. If -iname is used then the match is case insensitive.

  • -print

  • -print0Always
    evaluates to true. Causes the current pathname to be written to standard output. If the -print0 expression is used then an ASCII NULL character is appended.

The following operators are recognised:

  • expression -a expression
  • expression -and expression
  • expression expression
    Logical AND operator for joining two expressions. Returns true if both child expressions return true. Implied by the juxtaposition of two expressions and so does not need to be explicitly specified. The second expression will not be applied if the first fails.

Example:

hadoop fs -find / -name test -print

Exit Code:

Returns 0 on success and -1 on error.

get

Usage: hadoop fs -get [-ignorecrc] [-crc]

Copy files to the local file system. Files that fail the CRC check may be copied with the -ignorecrc option. Files and CRCs may be copied using the -crc option.

Example:

1
2
hadoop fs -get /user/hadoop/file localfile
hadoop fs -get hdfs://nn.example.com/user/hadoop/file localfile

Exit Code:

Returns 0 on success and -1 on error.

getfacl

Usage: hadoop fs -getfacl [-R]

Displays the Access Control Lists (ACLs) of files and directories. If a directory has a default ACL, then getfacl also displays the default ACL.

Options:

  • -R: List the ACLs of all files and directories recursively.
  • path: File or directory to list.

Examples:

1
2
hadoop fs -getfacl /file
hadoop fs -getfacl -R /dir

Exit Code:

Returns 0 on success and non-zero on error.

getfattr

Usage: hadoop fs -getfattr [-R] -n name | -d [-e en]

Displays the extended attribute names and values (if any) for a file or directory.

Options:

  • -R: Recursively list the attributes for all files and directories.
  • -n name: Dump the named extended attribute value.
  • -d: Dump all extended attribute values associated with pathname.
  • -e encoding: Encode values after retrieving them. Valid encodings are “text”, “hex”, and “base64”. Values encoded as text strings are enclosed in double quotes (“), and values encoded as hexadecimal and base64 are prefixed with 0x and 0s, respectively.
  • path: The file or directory.

Examples:

1
2
hadoop fs -getfattr -d /file
hadoop fs -getfattr -R -n user.myAttr /dir

Exit Code:

Returns 0 on success and non-zero on error.

getmerge

Usage: hadoop fs -getmerge [-nl]

Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally -nl can be set to enable adding a newline character (LF) at the end of each file.

Examples:

1
2
hadoop fs -getmerge -nl /src /opt/output.txt
hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt

Exit Code:

Returns 0 on success and non-zero on error.

help

Usage: hadoop fs -help

Return usage output.

ls

Usage: hadoop fs -ls [-d] [-h] [-R]

Options:

  • -d: Directories are listed as plain files.
  • -h: Format file sizes in a human-readable fashion (eg 64.0m instead of 67108864).
  • -R: Recursively list subdirectories encountered.

For a file ls returns stat on the file with the following format:

1
permissions number_of_replicas userid groupid filesize modification_date modification_time filename

For a directory it returns list of its direct children as in Unix. A directory is listed as:

1
permissions userid groupid modification_date modification_time dirname

Files within a directory are order by filename by default.

Example:

1
hadoop fs -ls /user/hadoop/file1

Exit Code:

Returns 0 on success and -1 on error.

lsr

Usage: hadoop fs -lsr

Recursive version of ls.

Note: This command is deprecated. Instead use hadoop fs -ls -R

mkdir

Usage: hadoop fs -mkdir [-p]

Takes path uri’s as argument and creates directories.

Options:

  • The -p option behavior is much like Unix mkdir -p, creating parent directories along the path.

Example:

1
2
hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
hadoop fs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir

Exit Code:

Returns 0 on success and -1 on error.

moveFromLocal

Usage: hadoop fs -moveFromLocal

Similar to put command, except that the source localsrc is deleted after it’s copied.

moveToLocal

Usage: hadoop fs -moveToLocal [-crc]

Displays a “Not implemented yet” message.

mv

Usage: hadoop fs -mv URI [URI …]

Moves files from source to destination. This command allows multiple sources as well in which case the destination needs to be a directory. Moving files across file systems is not permitted.

Example:

1
2
hadoop fs -mv /user/hadoop/file1 /user/hadoop/file2
hadoop fs -mv hdfs://nn.example.com/file1 hdfs://nn.example.com/file2 hdfs://nn.example.com/file3 hdfs://nn.example.com/dir1

Exit Code:

Returns 0 on success and -1 on error.

put

Usage: hadoop fs -put

Copy single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and writes to destination file system.

1
2
3
4
hadoop fs -put localfile /user/hadoop/hadoopfile
hadoop fs -put localfile1 localfile2 /user/hadoop/hadoopdir
hadoop fs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile Reads the input from stdin.

Exit Code:

Returns 0 on success and -1 on error.

renameSnapshot

See HDFS Snapshots Guide.

rm

Usage: hadoop fs -rm [-f] [-r |-R] [-skipTrash] URI [URI …]

Delete files specified as args.

Options:

  • The -f option will not display a diagnostic message or modify the exit status to reflect an error if the file does not exist.
  • The -R option deletes the directory and any content under it recursively.
  • The -r option is equivalent to -R.
  • The -skipTrash option will bypass trash, if enabled, and delete the specified file(s) immediately. This can be useful when it is necessary to delete files from an over-quota directory.

Example:

1
hadoop fs -rm hdfs://nn.example.com/file /user/hadoop/emptydir

Exit Code:

Returns 0 on success and -1 on error.

rmdir

Usage: hadoop fs -rmdir [–ignore-fail-on-non-empty] URI [URI …]

Delete a directory.

Options:

  • –ignore-fail-on-non-empty: When using wildcards, do not fail if a directory still contains files.

Example:

1
hadoop fs -rmdir /user/hadoop/emptydir

rmr

Usage: hadoop fs -rmr [-skipTrash] URI [URI …]

Recursive version of delete.

Note: This command is deprecated. Instead use hadoop fs -rm -r

setfacl

Usage: hadoop fs -setfacl [-R] [-b |-k -m |-x ] |[–set ]

Sets Access Control Lists (ACLs) of files and directories.

Options:

  • -b: Remove all but the base ACL entries. The entries for user, group and others are retained for compatibility with permission bits.
  • -k: Remove the default ACL.
  • -R: Apply operations to all files and directories recursively.
  • -m: Modify ACL. New entries are added to the ACL, and existing entries are retained.
  • -x: Remove specified ACL entries. Other ACL entries are retained.
  • –set: Fully replace the ACL, discarding all existing entries. The acl_spec must include entries for user, group, and others for compatibility with permission bits.
  • acl_spec: Comma separated list of ACL entries.
  • path: File or directory to modify.

Examples:

1
2
3
4
5
6
7
hadoop fs -setfacl -m user:hadoop:rw- /file
hadoop fs -setfacl -x user:hadoop /file
hadoop fs -setfacl -b /file
hadoop fs -setfacl -k /dir
hadoop fs -setfacl --set user::rw-,user:hadoop:rw-,group::r--,other::r-- /file
hadoop fs -setfacl -R -m user:hadoop:r-x /dir
hadoop fs -setfacl -m default:user:hadoop:r-x /dir

Exit Code:

Returns 0 on success and non-zero on error.

setfattr

Usage: hadoop fs -setfattr -n name [-v value] | -x name

Sets an extended attribute name and value for a file or directory.

Options:

  • -b: Remove all but the base ACL entries. The entries for user, group and others are retained for compatibility with permission bits.
  • -n name: The extended attribute name.
  • -v value: The extended attribute value. There are three different encoding methods for the value. If the argument is enclosed in double quotes, then the value is the string inside the quotes. If the argument is prefixed with 0x or 0X, then it is taken as a hexadecimal number. If the argument begins with 0s or 0S, then it is taken as a base64 encoding.
  • -x name: Remove the extended attribute.
  • path: The file or directory.

Examples:

1
2
3
hadoop fs -setfattr -n user.myAttr -v myValue /file
hadoop fs -setfattr -n user.noValue /file
hadoop fs -setfattr -x user.myAttr /file

Exit Code:

Returns 0 on success and non-zero on error.

setrep

Usage: hadoop fs -setrep [-R] [-w]

Changes the replication factor of a file. If path is a directory then the command recursively changes the replication factor of all files under the directory tree rooted at path.

Options:

  • The -w flag requests that the command wait for the replication to complete. This can potentially take a very long time.
  • The -R flag is accepted for backwards compatibility. It has no effect.

Example:

1
hadoop fs -setrep -w 3 /user/hadoop/dir1

Exit Code:

Returns 0 on success and -1 on error.

stat

Usage: hadoop fs -stat [format]

Print statistics about the file/directory at in the specified format. Format accepts filesize in blocks (%b), type (%F), group name of owner (%g), name (%n), block size (%o), replication (%r), user name of owner(%u), and modification date (%y, %Y). %y shows UTC date as “yyyy-MM-dd HH:mm:ss” and %Y shows milliseconds since January 1, 1970 UTC. If the format is not specified, %y is used by default.

Example:

  • hadoop fs -stat “%F %u:%g %b %y %n” /file

Exit Code: Returns 0 on success and -1 on error.

tail

Usage: hadoop fs -tail [-f] URI

Displays last kilobyte of the file to stdout.

Options:

  • The -f option will output appended data as the file grows, as in Unix.

Example:

1
hadoop fs -tail pathname

Exit Code: Returns 0 on success and -1 on error.

test

Usage: hadoop fs -test -[defsz] URI

Options:

  • -d: f the path is a directory, return 0.
  • -e: if the path exists, return 0.
  • -f: if the path is a file, return 0.
  • -s: if the path is not empty, return 0.
  • -z: if the file is zero length, return 0.

Example:

1
hadoop fs -test -e filename

text

Usage: hadoop fs -text

Takes a source file and outputs the file in text format. The allowed formats are zip and TextRecordInputStream.

touchz

Usage: hadoop fs -touchz URI [URI …]

Create a file of zero length.

Example:

1
hadoop fs -touchz pathname

Exit Code: Returns 0 on success and -1 on error.

truncate

Usage: hadoop fs -truncate [-w]

Truncate all files that match the specified file pattern to the specified length.

Options:

  • The -w flag requests that the command waits for block recovery to complete, if necessary. Without -w flag the file may remain unclosed for some time while the recovery is in progress. During this time file cannot be reopened for append.

Example:

1
2
hadoop fs -truncate 55 /user/hadoop/file1 /user/hadoop/file2
hadoop fs -truncate -w 127 hdfs://nn1.example.com/user/hadoop/file1

usage

Usage: hadoop fs -usage command

Return the help for an individual command.

HDFS介绍

HDFS 分布式文件系统

  • Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.

Hadoop HDFS 1.x vs 2.x


Hadoop HDFS有2个版本,1.x和2.x,两者之间还有比较大的差距:

  1. 1.x只支持MapReduce运算. 2.x除了MapReduce之外,还支持Spark,MPI,Hama,Giraph等多种类型运算。
  2. 1.x中的MR组件既分发任务还需要进行资源调度。 2.x由另一个组件YARN来进行资源调度,MR只进行任务分发。
  3. 1.x无法实现HA,只有一个NameNode管理所有节点。 2.x支持HA,有SecondNameNode,但也只支持冷迁移。2.x还支持了Federation联邦,通过文件系统挂载点,将一个NameNode的负载分发下去。注意:HA和Federation可以结合使用。

网上资料中包含了10点差异,但是从技术/架构角度上来说,这3点变化是核心的。

Hadoop HDFS 2.x 架构图

NameNode

HDFS分布式文件系统中的管理者,负责管理文件系统的命名空间,集群配置信息,存储的复制。

Secondary NameNode

并非NameNode的热备份,辅助NameNode,定期合并FSimage和EditLog。在NameNode失效的情况下,可以依据Secondary NameNode本地存储的FSimage和EditLog,恢复自身作为NameNode,但可能会有部分文件丢失,原因在于Secondary NameNode上的FSimage和EditLog并不是实时更新的。

DataNode

DataNode是文件存储的节点,用于存放文件的Block,并且周期性的将Block信息发送给NameNode。值得提一句的是:HDFS中的Block大小设置很有讲究,通常为64M/128M,过小的Block会给NameNode带来巨大的管理压力,过大的Block可能会导致磁盘空间的浪费。

Client

与NameNode交互,获取文件存放位置;再与DataNode交互,读取或写入数据;并且可以管理整个HDFS文件系统。

文件写入过程

  1. Client向NameNode发起文件写入请求。
  2. NameNode根据文件大小和文件块配置情况,返回给Client它所管理的DataNode信息。
  3. Client将文件划分为多个Block,更具DataNode的地址信息,按顺序写入到每一个DataNode块中。

    文件读取过程

  4. Client向NameNode发起文件读取请求。
  5. NameNode返回文件存储的DataNode信息。
  6. Client读取文件信息。

HDFS 优缺点

优点

  1. 处理超大文件
  2. 流式访问数据,一次写入,多次访问
  3. 运行于廉价的商用机器上

缺点

  1. 不适合低延迟的数据访问
  2. 无法高效存储大量的小文件
  3. 不支持多用户写入及任意修改文件

Hadoop HDFS 2.x 安装, 关于安装, 请参考博客中的另一篇文章

Hadoop HDFS 2.x 包含了3种安装模式:

  1. Standalone. 独立模式
  2. Pseudo-Distributed Operation. 伪分布式
  3. Cluster. 集群

介绍

欢迎浏览 SStar1314! 我的个人博客. 以及 SStar1314,我的github主页. 我的主页开通于2016年11月,一直想写点什么,两年多的时间,在Evernote上记录很多自己研究的东西,一直想开通一个博客,把Evernote上的东西搬上来,顺道梳理一下以前的知识点.

个人关心的方向或技术

1
2
3
4
5
6
7
8
9
10
11
12
13
14
OpenStack云计算框架: Keystone/Nova/Neutron/Glance/Cinder/Swift/Ironic
Keystone: RabbitMQ,Qpid,AMQP(Producer+Consumer+Exchange+Queue)
Nova: nova-scheduler,nova-compute,虚拟化
Neutron: OVS,Linux Bridge,vlan,vxlan,gre.
Provider network ---- Self-defined network
虚拟化: KVM/Xen
大数据: Headoop/Spark
Headoop: MapReduce,HDFS,Yarn,Hbase,Hive,Storm. Hadoop1.x vs 2.x
Spark: RDD,Spark-shell,Spark SQL,Spark Streaming,MLlib,GraphX
NoSQL: HBase, Cassandra, MongoDB, Redis
容器化: Docker, Kubernetes, rkt, CoreOS
数据中心操作系统: DCOS, Mesos, Marathon, Chronos
分布式协同: ZooKeeper, etcd
其它: Kafka, Elastic Search, Logstash

More info: 我的github主页