Spring 框架的一个关键组件是面向方面的编程(AOP)框架。面向方面的编程需要把程序逻辑分解成不同的部分称为所谓的关注点。跨一个应用程序的多个点的功能被称为横切关注点,这些横切关注点在概念上独立于应用程序的业务逻辑。有各种各样的常见的很好的方面的例子,如日志记录、审计、声明式事务、安全性和缓存等。
Spring 方面可以使用下面提到的五种通知工作:
Database Driver:
|
|
1、execute方法:可以用于执行任何SQL语句,一般用于执行DDL语句;
2、update方法及batchUpdate方法:update方法用于执行新增、修改、删除等语句;batchUpdate方法用于执行批处理相关语句;
3、query方法及queryForXXX方法:用于执行查询相关语句;
4、call方法:用于执行存储过程、函数相关语句。
Spring 容器是 Spring 框架的核心。容器将创建对象,把它们连接在一起,配置它们,并管理他们的整个生命周期从创建到销毁。Spring 容器使用依赖注入(DI)来管理组成一个应用程序的组件。这些对象被称为 Spring Beans.
通过阅读配置元数据提供的指令,容器知道对哪些对象进行实例化,配置和组装。配置元数据可以通过 XML,Java 注释或 Java 代码来表示。下图是 Spring 如何工作的高级视图。 Spring IoC 容器利用 Java 的 POJO 类和配置元数据来生成完全配置和可执行的系统或应用程序。
它是最简单的容器,给 DI 提供了基本的支持,它用 org.springframework.beans.factory.BeanFactory 接口来定义。BeanFactory 或者相关的接口,如 BeanFactoryAware,InitializingBean,DisposableBean,在 Spring 中仍然存在具有大量的与 Spring 整合的第三方框架的反向兼容性的目的。
该容器添加了更多的企业特定的功能,例如从一个属性文件中解析文本信息的能力,发布应用程序事件给感兴趣的事件监听器的能力。该容器是由org.springframework.context.ApplicationContext 接口定义。
被称作 bean 的对象是构成应用程序的支柱也是由 Spring IoC 容器管理的。bean 是一个被实例化,组装,并通过 Spring IoC 容器所管理的对象。这些 bean 是由用容器提供的配置元数据创建的.
作用域 描述
singleton 该作用域将 bean 的定义的限制在每一个 Spring IoC 容器中的一个单一实例(默认)。
prototype 该作用域将单一 bean 的定义限制在任意数量的对象实例。
request 该作用域将 bean 的定义限制为 HTTP 请求。只在 web-aware Spring ApplicationContext 的上下文中有效。
session 该作用域将 bean 的定义限制为 HTTP 会话。 只在web-aware Spring ApplicationContext的上下文中有效。
global-session 该作用域将 bean 的定义限制为全局 HTTP 会话。只在 web-aware Spring ApplicationContext 的上下文中有效。
如果作用域设置为 singleton,那么 Spring IoC 容器刚好创建一个由该 bean 定义的对象的实例。该单一实例将存储在这种单例 bean 的高速缓存中,以及针对该 bean 的所有后续的请求和引用都返回缓存对象。
如果作用域设置为 prototype,那么每次特定的 bean 发出请求时 Spring IoC 容器就创建对象的新的 Bean 实例。一般说来,满状态的 bean 使用 prototype 作用域和没有状态的 bean 使用 singleton 作用域。
理解 Spring bean 的生命周期很容易。当一个 bean 被实例化时,它可能需要执行一些初始化使它转换成可用状态。同样,当 bean 不再需要,并且从容器中移除时,可能需要做一些清除工作。
org.springframework.beans.factory.InitializingBean 接口指定一个单一的方法:
void afterPropertiesSet() throws Exception;
org.springframework.beans.factory.DisposableBean 接口指定一个单一的方法:
void destroy() throws Exception;
如果你有太多具有相同名称的初始化或者销毁方法的 Bean,那么你不需要在每一个 bean 上声明初始化方法和销毁方法。框架使用 元素中的 default-init-method 和 default-destroy-method 属性提供了灵活地配置这种情况.
BeanPostProcessor 接口定义回调方法,你可以实现该方法来提供自己的实例化逻辑,依赖解析逻辑等。你也可以在 Spring 容器通过插入一个或多个 BeanPostProcessor 的实现来完成实例化,配置和初始化一个bean之后实现一些自定义逻辑回调方法。
ApplicationContext 会自动检测由 BeanPostProcessor 接口的实现定义的 bean,注册这些 bean 为后置处理器,然后通过在容器中创建 bean,在适当的时候调用它。
你需要注册一个在 AbstractApplicationContext 类中声明的关闭 hook 的 registerShutdownHook() 方法。它将确保正常关闭,并且调用相关的 destroy 方法。
为避免 Spring 注入的人为工作很麻烦, 可以设置 bean Autowiring, 搜寻 appContext 进行 自动注入。
可选值 功能说明
no 默认不使用autowiring。 必须显示的使用”“标签明确地指定bean。
byName 根据属性名自动装配。此选项将检查容器并根据名字查找与属性完全一致的bean,并将其与属性自动装配。
byType 如果容器中存在一个与指定属性类型相同的bean,那么将与该属性自动装配。如果存在多个该类型的bean,那么将会抛出异常,并指出不能使用byType方式进行自动装配。若没有找到相匹配的bean,则什么事都不发生,属性也不会被设置。如果你不希望这样,那么可以通过设置 dependency-check=”objects”让Spring抛出异常。
constructor 与byType的方式类似,不同之处在于它应用于构造器参数。如果在容器中没有找到与构造器参数类型一致的bean,那么将会抛出异常。
autodetect 通过bean类的自省机制(introspection)来决定是使用constructor还是byType方式进行自动装配。如果发现默认的构造器,那么将使用byType方式。
@Autowired注解注入
@Autowired往往用在类中注解注入,在配置xml需要配置才能使用@Autowired标识
@Autowired
@Qualifier(“teacher”)
private Teacher teacher;
public void setTeacher(Teacher teacher)
{this.teacher = teacher; }
beans标签设置default-autowire参数
在spring的配置文件中可以参照如下设置default-autowire参数
default-autowire="byName"
1 @Required
@Required 注解应用于 bean 属性的 setter 方法。
2 @Autowired
@Autowired 注解可以应用到 bean 属性的 setter 方法,非 setter 方法,构造函数和属性。
3 @Qualifier
通过指定确切的将被连线的 bean,@Autowired 和 @Qualifier 注解可以用来删除混乱。
4 JSR-250 Annotations
Spring 支持 JSR-250 的基础的注解,其中包括了 @Resource,@PostConstruct 和 @PreDestroy 注解。
Java 编写的 Web Application 为什么 能够监听一个port?
Web Application 都没有main函数, 那么该web从哪个入口函数启动的呢?
答案都是 依据 tomact 或 Websphere 这些server实现的, 以Tomcat为例,最重要的就是Tomcat的生命周期管理特性.
主要顺序: Bootstrap.java#main() —> init() —> load() —> start()
|
|
|
|
获取Mesos源码或者tar包:
Mesos 安装(Ubuntu14.04):
$ sudo apt-get update
$ sudo apt-get install -y tar wget git
$ sudo apt-get install -y openjdk-7-jdk
$ sudo apt-get install -y autoconf libtool
$ sudo apt-get -y install build-essential python-dev libcurl4-nss-dev libsasl2-dev libsasl2-modules maven libapr1-dev libsvn-dev
Building Mesos(如果下载了tar.gz的Mesos包,此步可以被跳过):
$ cd mesos
$ ./bootstrap
$ mkdir build
$ cd build
$ ../configure
$ make
$ make check
$ make install
$ cd build
$ ./bin/mesos-master.sh —ip=$(ip) –work_dir=/var/lib/mesos
$ ./bin/mesos-agent.sh —master=$(master_ip):5050 –work_dir=/var/lib/mesos
$ ./src/test-framework –master=$(master_ip):5050
$ ./src/examples/java/test-framework $(master_ip):5050
$ ./src/examples/python/test-framework $(master_ip):5050
kube-controller-manager:负责执行各种控制器,目前有两类:
1.Kubernetes解压之后,进入kubernetes/server目录下,再次解压kubernetes-server-linux-amd64.tar.gz文件,再进入kubernetes/server/kubernetes/server/bin目录下,才能发现kubernetes对应的很多命令。
2.命令list:
Master:
./etcd > /var/log/etcd.log 2>&1 &
./kube-apiserver –insecure-bind-address=9.21.62.27 –insecure-port=8080 –service-cluster-ip-range=172.17.0.0/16 –etcd_servers=http://127.0.0.1:4001 –logtostderr=true –v=0 –log_dir=/home/kubernetes/logs/kube
./kube-controller-manager –v=0 –logtostderr=false –log_dir=/var/log/kube –master=9.21.62.27:8080
./kube-scheduler –master=’9.21.62.27:8080’ –v=0 –log_dir=/var/log/kube
Slave:
./kube-proxy –logtostderr=false –v=0 –master=http://9.21.62.27:8080
./kubelet –logtostderr=false –v=0 –allow-privileged=false –log_dir=/var/log/kube –address=0.0.0.0 –port=10250 –hostname_override=172.30.13.0 –api_servers=http://9.21.62.27:8080
Test: ./kubectl -s http://9.21.62.27:8080 get services
创建kubernetes Dashboard:./kubectl -s http://9.21.62.27:8080 create -f ../../../../cluster/addons/dashboard/dashboard-service.yaml –namespace=kube-system
访问地址为: http://9.21.62.27:8080/api/v1/proxy/namespaces/kube-system/services/kubernetes-dashboard/#/dashboard/
基于influxdb,grafana,heapster创建日志分析性能监控: ./kubectl -s http://9.21.62.27:8080 create -f ../../../../../kubernetes/cluster/addons/cluster-monitoring/influxdb/
显示cluster-info信息:
./kubectl -s http://9.21.62.27:8080 –namespace=”kube-system” cluster-info
Kubernetes master is running at http://9.21.62.27:8080
Heapster is running at http://9.21.62.27:8080/api/v1/proxy/namespaces/kube-system/services/heapster
Grafana is running at http://9.21.62.27:8080/api/v1/proxy/namespaces/kube-system/services/monitoring-grafana
InfluxDB is running at http://9.21.62.27:8080/api/v1/proxy/namespaces/kube-system/services/monitoring-influxdb
|
|
|
|
安装kubectl:
通过kubectl来配置本地kubernetes cluster:
Test本地Cluster:
|
|
2.需要安装VirtualBox环境:
ubuntu14.04: wget http://download.virtualbox.org/virtualbox/5.0.22/virtualbox-5.0_5.0.22-108108~Ubuntu~trusty_amd64.deb
Redhat 7.1: wget http://download.virtualbox.org/virtualbox/5.0.22/VirtualBox-5.0-5.0.22_108108_el7-1.x86_64.rpm
3.本地利用virtualbox搭建Kubernetes Cluster:
4.Test 搭建的环境:
Requirements:
Docker 1.8.3+ : https://docs.docker.com/engine/installation/#installation
etcd : https://github.com/coreos/etcd/releases
go : https://golang.org/doc/install
etcd安装与Test:
Open another terminal:
容器 = Cgroup + Namespace + rootfs + 容器引擎
Docker 内核特性:
1.devices:设备权限控制。
2.cpuset:分配制定的CPU和内存节点。
3.cpu:控制CPU占用率。
4.cpuacct:统计CPU使用情况。
5.memory:限制内存的使用上限。
6.freezer:冻结(暂停)Cgroup中的进程。
7.net_cls:配合tc(traffic controller)限制网络带宽。
8.net_prio:设置进程的网络流量优先级。
9.huge_tlb:限制HugeTLB的使用。
10.perf_event:运行Perf工具基于Cgroup分组做性能检测。
1.IPC:隔离System V IPC和POSIX消息队列。
2.Network:隔离网络资源。
3.Mount:隔离文件系统挂载点。
4.PID:隔离进程ID。
5.UTS:隔离主机名和域名。
6.User:隔离用户ID和组ID。
Docker镜像:
从image repositories中查看所有images信息: cat /var/lib/docker/image/aufs/repositories.json | python -m json.tool
启动Docker daemon,测试image layer之间的关系:docker daemon -D -s overlay -g /var/lib/docker/
|
|
|
|
|
|
refer: https://linuxcontainers.org/lxcfs/introduction/
refer : http://www.voidcn.com/blog/kc58236582/article/p-4505115.html
LXC is a userspace interface for the Linux kernel containment features.Through a powerful API and simple tools, it lets Linux users easily create and manage system or application containers.
online try: https://linuxcontainers.org/lxd/try-it/
|
|
|
|
|
|
神人 Fabrice Bellard最开始创立: https://bellard.org/
refer : https://wiki.archlinux.org/index.php/QEMU_(%E7%AE%80%E4%BD%93%E4%B8%AD%E6%96%87)
创建硬盘镜像
执行 qemu-img 带 resize 选项调整硬盘驱动镜像的大小.它适用于 raw 和 qcow2. 例如, 增加镜像 10 GB 大小, 运行:
qemu-system-* 程序 (例如 qemu-system-i386 或 qemu-system-x86_64, 取决于客户机架构)用来运行虚拟化的客户机. 用法是:
refer : https://wiki.archlinux.org/index.php/libvirt_(%E7%AE%80%E4%BD%93%E4%B8%AD%E6%96%87)
refer : https://www.ibm.com/developerworks/cn/linux/l-libvirt/index.html
mvn archetype:generate -DgroupId=com.mycompany.app -DartifactId=my-app -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
groupId 指: project名称,及生成的war包名称。
artifactId 指: 不携带version的war包名称。
Build project:
Maven Phases:
There are two other Maven lifecycles of note beyond the default list above
Detail usage: http://maven.apache.org/guides/getting-started/index.html
These are not really file systems; they allow access to file systems from an operating system standpoint.
main.c —> java.c(JLI_Launch函数) 文件:
|
|
libjava.so
libjvm.so
See openjdk8/hotspot/src/share/vm/classfile/vmSymbols.hpp
|
|
publish -vs- subscribe
notifier_block 结构体
注册函数: notifier_chain_register
网络相关的 chain: inetaddr_chain , inet6addr_chain , and netdev_chain.
函数 notifier_call_chain
该函数返回值 类型 位于 include/linux/notifier.h 文件中。
inetaddr_chain netdev_chain
inetaddr_chain: sends notifications about insertion/removal/change of an IPv4 address on a local interface.
netdev_chain: sends notifications about the registration status of network devices.
|
|
Common names for wrappers include [un]register_xxxnotifier, xxx[un]registernotifier, and xxx[un]register.
netdev_rx_stat, its elements are of type netif_rx_stats
|
|
包括2个阶段:
|
|
Both components built into the kernel and components loaded as modules can be passed input parameters so that users can fine-tune the functionality implemented by the components, override defaults compiled into them, or change them from one system boot to the next, kernel provides two kinds of macros to define options:
Module options.
module_param
The module parameters are listed in the module’s directory “/sys/modules”. The subdirectory /sys/modules/module/parameters holds a file for each parameter exported by module.
|
|
subsys_initcall(net_dev_init);
subsys_initcall macros ensure net_dev_init runs before any NIC device drivers register themselves.
main parts of net_dev_init:
/proc/net is created by net_dev_init, via dev_proc_init and dev_mcast_init:
/sbin/modprobe
Invoke when the kernel needs to load a module.
/sbin/hotplug
Invoke when the kernel detects that a new device has been plugged or unplugged from system.
The kernel provides a function named call_usermodehelper to execute such user-space helper.
Two kernel routines, request_module and kobject_hotplug , invoke call_usermodehelper to invoke /sbin/modprobe and /sbin/hotplug
kmod is the kernel module loader that allows kernel components to request the loading of module. call request_module
Hotplug was introduced into the kernel to implement popular consuer feature known as Plug and Play (PnP). When compile the kernel modules, the object files are placed by default in directory: /lib/modules/kernel_version/. kobject_hotplug function is invoked by the kernel to respond to insertion and removal of a device.
modprobe and hotplug create file/directory in /proc/sys/kernel
The registration of a network device takes place in the following situations:
Two main conditions trigger the unregisteration of a device:
Network devices are defined with net_device structures.
include 3 input parameters:
net_device structure is pretty bug, its fields are initialized in chunks by different routines.
alloc_ xxxdev function pass right xxx_setup routine to alloc_netdev .
void ether_setup(struct net_device *dev)
{
dev->change_mtu = eth_change_mtu;
dev->hard_header = eth_header;
dev->rebuild_header = eth_rebuild_header;
dev->set_mac_address = eth_mac_addr;
dev->hard_header_cache = eth_header_cache;
dev->header_cache_update = eth_header_cache_update;
dev->hard_header_parse = eth_header_parse;
dev->type = ARPHRD_ETHER;
dev->hard_header_len = ETH_HLEN;
dev->mtu = 1500;
dev->addr_len = ETH_ALEN;
dev->tx_queue_len = 1000;
dev->flags = IFF_BROADCAST|IFF_MULTICAST;
memset(dev->broadcast,0xFF, ETH_ALEN);
}
net_device include:
Each network device is assigned a queuing discipline, used by Traffic Control to implement its QoS mechanisms.
LINK_STATE_START
LINK_STATE_PRESENT
LINK_STATE_NOCARRIER
LINK_STATE_LINKWATCH_EVENT
LINK_STATE_XOFF
LINK_STATE_SHED
LINK_STATE_RX_SCHED
The state of a device with regard to its registration with the network stack is saved in reg_state field of the net_device structure.
NETREG_UNINITIALIZED
NETREG_REGISTERING
NETREG_REGISTERED
NETREG_UNREGISTERING
NETREG_UNREGISTERED
NETREG_RELEASED
Both kernel components and user-space applications are interested in knowing when a network device is registered, unregistered, goes down, or comes up.
netdev_chain: kernel components can register with this notification chain.
All the NETDEV_XXX events that are reported via neTDev_chain are listed in
Notification event list:
NETDEV_UP
NETDEV_GOING_DOWN
NETDEV_DOWN
NETDEV_REGISTER
NETDEV_UNREGISTER
NETDEV_REBOOT
NETDEV_CHANGEADDR
NETDEV_CHANGENAME
NETDEV_CHANGE
Quite a few kernel components register to netdev_chain List:
The virtual devices need to be registered and enabled just like real ones, to be used.
register_netdevice/unregister_netdevice
net_device data structure include a set of function pointers, that kernel uses to interact with the device driver and special kernel features
establish device/kernel communication. such as:
nearly all devices interact with kernel in two ways:
every interrupt runs a function called an interrupt handler. IRQ are defined in kernel/irq/manage.c and are overridden by arch/XXX/kernel/irq.c.
|
|
|
|
/proc 目录 被 proc_mkdir 函数创建。
/proc/net 目录 被 proc_net_fops_create/proc_net_remove (调用 create_proc_entry/remove_proc_entry) 创建及删除。
/proc/sys 实际上是内核变量,可以read/write。 /proc/sys 被定义在 ctl_table 结构体中, 通过 register_sysctl_table 和 unregister_sysctl_table 来注册和取消注册。
某些目录在系统启动的时候就被创建,某些在运行时刻才被添加。
ioctl 是主要管理net_device的调用函数.
《Linux Networking Internals》 是基于Linux解释network的书籍, 文章内容还是不错的.
另推荐一本书《Understanding the Linux Kernel》, 阅读本系列之前更加推荐之前的文章, 关于该文章的记录也会放在博客上.
Sock Buffer: sk_buff
包括以下内容:
alloc_skb -> dev_alloc_skb
kfree_skb -> dev_kfree_skb
skb_reserve
net_device structure include:
tips: ifconfig/route 通过 ioctl 系统调用来实现; ip/IPROUTE2 使用Netlink socket 来实现.
以x86_64 arch机器为例:
—————————————————— 内核load进内存阶段 (arch/*/boot/)—————————————————-
入口文件是: arch/x86/boot/compressed/head_64.S 注意: kernel 代码不是以 main 函数为入口的。
head_64.S 是 汇编 代码文件, 该文件会 call verify_cpu + make_boot_params(初始化boot_params) + efi_main(处理并返回boot_params),最终里面 call extract_kernel 会调用 入口函数 extract_kernel, 该 函数 位于 arch/x86/boot/compressed/misc.c 会:
1. 拿到 boot_params , 由汇编代码传入该参数 该 参数的结构体 位于 arch/x86/include/uapi/asm/bootparam.h .
2. 通过 sanitize_boot_params 函数 初始化 boot_params 参数部分内容, 代码位于 arch/x86/include/asm/bootparam_utils.h .
3. 通过 boot_params->screen_info 内容, 调用 arch/x86/boot/compressed/console.c 文件中的 console_init 函数, 初始化 tty 的console .
4. (arch/x86/boot/compressed/kaslr.c) choose_random_location 函数 会随机挑选一段内存地址, 用于解压内核 压缩文件。
5. 调用 (lib/decompress_bunzip2.c) __decompress 函数解压缩内核压缩文件, 根据不同的压缩文件类型,调用不同的解压缩函数, 压缩文件区分应该是发生在 编译内核时。
6. 内核文件解压之后会成为 elf 文件, 位于内存中, 通过调用 parse_elf 函数 load 进 内核内容, parse_elf 位于 arch/x86/boot/compressed/misc.c 中。
7. 判断是否 需要重新 分配内存地址, 调用 handle_relocations 函数。
arch/x86/boot/compressed/head_64.S 在解压缩内核之后会执行解压缩之后的内核代码!!!
而 与arch 无关, 较为 common(legacy) 的 文件 arch/x86/boot/header.S 会 最终调用 (arch/x86/boot/main.c) main 函数 , 该函数会:
1. copy_boot_params 拷贝启动参数
2. console_init 初始化console
3. init_heap
4. validate_cpu
5. set_bios_mode
6. detect_memory
7. keyboard_init
8. query_ist
9. query_apm_bios (if config_apm)
10. query_edd (if config_edd)
11. set_video
12. go_to_protected_mode
—————————————————— 内核启动阶段 (init/main.c) —————————————————-
入口文件是 : init/main.c 启动函数是: start_kernel 该函数会:
rest_init函数所做内容示意图:
内核代码中 调用 alloc_large_system_hash 函数的位置有不少,会构建一些 大的系统 hash table, 名称分别有:
—————————————中断 宏: SAVE_ALL & RESTORE_ALL 用户态和内核态 上下文切换 ————————————
中断向量表: linux/arch/x86/entry/systemcalls/syscall_32.tbl
中断结构体: irq_desc
linux/arch/x86/entry/entry_32.S : 系统调用的代码段, 入口应该是:
http://lxr.linux.no/linux+v3.19/arch/x86/kernel/entry_32.S —> ENTRY(system_call)
Linux 进程 fork 关键代码:
http://lxr.linux.no/linux+v3.19/kernel/fork.c#L1185
|
|
用户空间的程序默认是通过栈来传递参数的,但对于系统调用来说,内核态和用户态使用的是不同的栈,这使得系统调用的参数只能通过寄存器的方式进行传递。
ELF = Executable and Linkable Format
一般Linux程序编写都会使用到glibc库, 而最终glibc也只是帮助用户进行系统调用而已, 至于glibc如何进行系统调用的?
通过阅读源码发现, 所有的函数都会最终调用glibc的源码中的 sysdep.h , 里面有 INLINE_SYSCALL 定义
Linux中实际上 thread 也是被当成 process 来创建, process和thread 是一样的, 区别在于系统调用时的传参.
refer: http://duartes.org/gustavo/blog/post/kernel-boot-process/
boot: linux ks=nfs:192.168.0.254:/var/ftp/pub/ks.cfg
anaconda 是python写的系统安装工具。
Linux 中有 4个主分区 + 逻辑分区(需要将其中一个主分区变为扩展分区)。
Installation过程中可以切换窗口,Ctrl + Alt + F1,2,3,4,5,6,每个窗口分别有不同的作用。
anaconda-ks.cfg kickstart安装配置文件
python —> anaconda —> install Linux
/tftpboot/n1.cfg配置文件:
对于守护进程而言,一般情况下守护进程都是detach from shell,所以接受不到sighup信号,这个信号经常被用作 reload configuration左右。
Linux 系统中的主要log都在/var/log目录下:
|
|
redhat 中使用了 rsyslog 替代 syslog
w: 显示当前所有的登录用户
多登陆端口通信: (pts/1向pts/0发送消息)
vim /etc/inittab 可以减少默认启动的tty个数!!!!
compress/uncompress 最古老的Unix压缩工具
gzip/gunzip 最广泛的压缩工具,Linux系统中标准压缩工具,对于文本文件能够达到很高压缩率
bzip2/bunzip2 新版Linux压缩工具,比gzip拥有更高的压缩率
tar 打包(备份)作用,参数:
|
|
Linux 下文件类型共有7种:
“-“ 文件
d 文件夹
l 链接
b block
c char字符设备
s socket文件
p protocol网络文件
文件分为三个部分进行存储:(目录文件中存储文件名)
1.存储文件名(指向inode号) —> 2.inode(指向块存储单元) —> 3.block (4k为单位)
inode 存储文件的属性, 可以用 stat 命令查看文件inode的内容。
文本文件的操作命令:
wc: 统计文件的行,词,字数
which ls : 查找命令, 查找 $PATH 路径中的文件
locate ls : 查找所有匹配ls字母的文件, 注意!!! locate 命令从数据库中查找文件,而不是直接查找系统文件,数据库位置为/var/lib/slocate/slocate.db. 当数据库没有更新的时候,可能会查找不到。
|
|
find
|
|
系统服务启动顺序:
|-/etc/inittab 总的配置文件
|—–/etc/rc.d/rc.sysinit 系统初始化文件, 加载许多内容
————fsck.ext3 mount -o rw.remount /dev/sda2 /
————mount -a /etc/fstab
|—–/etc/rc.d/rcX.d/* X为系统运行级别
|———-/etc/rc.d/rcX.d/ServiceXXX1 start 用户定义自启动的service
|———-/etc/rc.d/rcX.d/ServiceXXX2 start
|—–/etc/rc.d/rc.local 最后访问的文件
|
|-minigetty 启动 /dev/tty1-6 启动tty
|———-login —> bash —> /etc/profile ~/.bash_profile
|-gdm 监控进程是否死掉,重启进程
shutdown -h now/init 0 / halt -p -f / poweroff: 关机
users: 显示当前系统登录的用户
who: 当前登录在本机的用户及来源
w: 当前登录本机的用户及运行的程序
write: 给当前联机的用户发消息
wall: 给所有登录在本机的用户广播消息
last: 查看用户的登录日志
lastlog: 查看每个用户最后登录的情况
finger: 查看用户信息
ping -s 1024 www.baidu.com -s 可以指定测试包的大小,用于测试不同包大小的带宽。
ab -n 1000 -c 1000 www.baidu.com ab为 linux压力测试的命令,模拟1000个端口,进行总共1000次请求 (Apache HTTP server benchmarking tool)
traceroute www.baidu.com 查询整个转发路径上的访问结点的掉包率
mtr www.baidu.com 查看通路的掉包率
arping 查询哪个机器网卡ip地址是多少
top
vmstat (Report virtual memory statistics)
netstat (Print network connections, routing tables, interface statistics, masquerade connections, and multicast memberships)
netstat 查看tcp连接时, 如果
ESTABLISHED特别多 CC(Challenge Collapsar) 攻击, 建立链接攻击
ESTABLISHED很少,LISTEN很多 DDOS 攻击 sync-flood(泛滥攻击)
抓包工具:
Linux 内核内核参数:
位于 /proc/sys 目录下, 修改内核参数:
输出内核参数:
从文件中读入内核参数:
解压rpm包
|
|
查询rpm包里面内容
|
|
查询rpm包里面的install/uninstall script
|
|
参考: https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools
到Spark源代码目录下,运行:
Spark 是由 Scala语言编写,而Scala语言的语法糖太多,对于Scala新手的我来说确实困难,花费时间会较多,先暂时放一放。
Summary of the steps for setting up a Storm cluster:
配置:
源代码入口:
运行测试案例:
nimbus 与 supervisor之间通过zookeeper 进行通信
The Offiline Image Viewer is a tool to dump the contents of hdfs fsimage files.
command :
|
|
Users can access the viewer and get information of the fsimage:
|
|
hbase rest api 服务启动:
Follow these instructions for each HBase host fulfilling the REST server role.
To start the REST server as a foreground process, use the following command:
|
|
To start the REST server as a background process, use the following command:
|
|
To use a different port than the default of 8080, use the -p option.
|
|
Endpoint HTTP Verb Description Example
/version/cluster GET Version of HBase running on this cluster
Endpoint HTTP Verb Description Example
/table/schema GET Describe the schema of the specified table.
Endpoint HTTP Verb Description Example
/table/row/column:qualifier/timestamp GET Get the value of a single row. Values are Base-64 encoded.
Endpoint HTTP Verb Description Example
/table/scanner/ PUT Get a Scanner object. Required by all other Scan operations. Adjust the batch parameter to the number of rows the scan should return in a batch. See the next example for adding filters to your Scanner. The scanner endpoint URL is returned as the Location in the HTTP response. The other examples in this table assume that the Scanner endpoint ishttp://example.com:20550/users/scanner/145869072824375522207.
{
“type”: “PrefixFilter”,
“value”: “u123”
}
Endpoint HTTP Verb Description Example
/table/row_key/ PUT Write a row to a table. The row, column qualifier, and value must each be Base-64 encoded. To encode a string, you can use the base64command-line utility. To decode the string, usebase64 -d. The payload is in the –data argument, so the/users/fakerowvalue is a placeholder. Insert multiple rows by adding them to the
Endpoint HTTP Verb Description Example
/namespaces GET List all namespaces.
Hadoop Cluster High Availability (HA) enable QJM (Quorum Journal Manager): https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html
Hadoop Cluster High Availability (HA) enable using shared storage: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
Hadoop Cluster Service Level Authorization (ACL)
Hadoop Cluster enable HTTP/Web ACL
Integrate with Kerberos .
Rolling Upgrade url link: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html
HDFS Rolling Upgrade url link: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/HdfsRollingUpgrade.html
Storage Policy url link: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/ArchivalStorage.html
Memory Storage Support url link: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/MemoryStorage.html
Hadoop KMS(Key Management Server) url link: https://hadoop.apache.org/docs/r2.7.2/hadoop-kms/index.html
https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html
https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/NodeManagerRest.html
https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/MapredAppMasterRest.html
https://hadoop.apache.org/docs/r2.7.2/hadoop-yarn/hadoop-yarn-site/HistoryServerRest.html
The View File System (ViewFs) provides a way to manage multiple Hadoop file system namespaces or namespace volumes.
Suppose there are multiple cluster, each cluster has one or more namenodes. Each namenode has its own namespace, and a namenode belongs to one and only one cluster.
ViewFs implements the Hadoop file system interface just like HDFS and the local file system. 代码位于Hadoop HDFS src 工程的 ViewFS继承类。
|
|
Port: 50070 也是 hdfs web portal的端口号.
Authentication when security is off:
|
|
Authentication using Kerberos SPNEGO when security is on:
|
|
Authentication using Hadoop delegation token when security is on:
|
|
List a Directory :
Status of a File/Directory :
Delete a File/Directory :
Rename a File/Directory :
Open and Read a File :
Create and Write to a File :
Append to a File :
Concatenate Files :
Get Content Summary of a Directory :
Get File Checksum :
Get Home Directory :
Set Permission :
Set Owner :
Set Replication Factor :
Set Access or Modification Time :
Get Delegation Token :
Renew Delegation Token :
Cancel Delegation Token :
在前面的文章介绍过,Hadoop的Federation是将整个文件系统划分为子集,每一个Federation中的NameNode负责管理其中一个子集,整个文件系统由这些子集通过挂载mount的方式构建。 Federation与HA结合使用。
官方doc: https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-hdfs/Federation.html
Namespace :
1. Consists of directories, files and blocks.
2. It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
Block Storage Services , this has two parts :
1. Block Management (performed in the Namenode)
Provides Datanode cluster membership by handling registrations, and periodic heart beats.
Processes block reports and maintains location of blocks.
Supports block related operations such as create, delete, modify and get block location.
Manages replica placement, block replication for under replicated blocks, and deletes blocks that are over replicated.
2. Storage - is provided by Datanodes by storing blocks on the local file system and allowing read/write access.
Federation uses multiple independent Namenodes/namespaces to scale the name service horizontally. The Namenodes are federated; the Namenodes are independent and do not require coordination with each other. The Datanodes are used as common storage for blocks by all the Namespaces. Each Datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports.
Federation configuration is backward compatible and allows existing single Namenode configuration to work without any change.
Step 1: Add the dfs.nameservices parameter to your configuration and configure it with a list of comma separated NameServiceIDs. This will be used by the Datanodes to determine the Namenodes in the cluster.
Step 2: For each Namenode and Secondary Namenode/BackupNode/Checkpointer add the following configuration parameters suffixed with the corresponding NameServiceID into the common configuration file:
Namenode
|
|
Step 1: Format a Namenode:
Step 2: Format additional Namenodes
Older releases only support a single Namenode, after Upgrade the cluster to newer release in order to enable federation.
Perform the following steps:
|
|
Snapshots功能非常重要,可以保证HDFS在出现异常情况时可以进行恢复。 Snapshots可以使用在整个HDFS系统上,也可以只对其中的部分文件目录。
HDFS Snapshots are read-only point-in-time copies of file system. Snapshots can be taken on a subtree of the file system or the entire file system.
The HDFS path should be Snapshottable.
Hadoop HDFS 2.x 包含了3种安装模式:
默认情况下,Hadoop被配置成以非分布式模式运行的一个独立Java进程。
Hadoop可以在单节点上以所谓的伪分布式模式运行,此时每一个Hadoop守护进程都作为一个独立的Java进程运行。
不配置Yarn
etc/hadoop/core-site.xml:
etc/hadoop/hdfs-site.xml:
Setup passphraseless ssh login:
The following instructions are to run a MapReduce job locally.
Format the filesystem: 初始化!!!
|
|
Start NameNode daemon and DataNode daemon:
|
|
The hadoop daemon log output is written to the $(HADOOP_LOG_DIR) directory (defaults to $(HADOOP_HOME)/logs).
Make the HDFS directories required to execute MapReduce jobs:
|
|
Copy the input files into the distributed filesystem:
|
|
Run some of the examples provided:
|
|
Examine the output files: Copy the output files from the distributed filesystem to the local filesystem and examine them:
|
|
or
View the output files on the distributed filesystem:
|
|
配置Yarn on a Single Node
etc/hadoop/mapred-site.xml
etc/hadoop/yarn-site.xml:
Start ResourceManager daemon and NodeManager daemon:
|
|
Browse the web interface for the ResourceManager; by default it is available at:
ResourceManager - http://localhost:8088/
|
|
参考:http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html
下载 hadoop2.7.3 版本的压缩包,解压缩到master节点上, 解压路径为 ${Hadoop_Install} .
配置 hadoop cluster 中各个节点之间的passwordless 无密码访问。
到 ${Hadoop_Install}/etc/hadoop/ 目录下 编辑配置文件: core-site.xml hdfs-site.xml mapred-site.xml yarn-site.xml .
core-site.xml : configure important parameters
hdfs-site.xml : configure for NameNode + DataNode
mapred-site.xml : Configure for MapReduce Applications + MapReduce JobHistory Server
yarn-site.xml : Configure for ResourceManager + NodeManager + History Server
Format a new distributed filesystem:
会生成一个name文件夹,里面存储fsimage和editlog文件,记录整个cluster中的文件系统。
Start HDFS NameNode :
Start HDFS DataNode :
Start all Hadoop slaves * :
Start Yarn ResourceManager :
Start Yarn NodeManager :
Start Yarn WebAppProxy server if necessary:
Start all Yarn slaves *:
Start MapReduce JobHistory server :
|
|
configure hdfs-site.xml :
Then write the decommission data node(slave2) to hdfs_exclude.txt file.
Last, force configure reload:
|
|
#get children of the root node
#get “/cluster1/leader” as xml (default is json)
#get the data as text
#set a node (data.txt contains the ascii text you want to set on the node)
#create a node
|
|
#create a new session
#session heartbeat
#delete a session
RestAPI源码入门:
RestAPI 入口main函数所在文件: org.apache.zookeeper.server.jersey.RestMain
ZooKeeper Source Code 解析:
1.zkServer 脚本启动命令:
2.zkCli 脚本启动命令:
由上可知, ZooKeeper的server启动入口函数为 QuorumPeerMain ,而client的启动入口函数为 ZooKeeperMain。
分布式协同技术诞生于分布式系统中,致力于解决各大分布式系统或分布式计算平台点到点的同步问题。 代表性的有 etcd, ZooKeeper, Consul, Doozerd。 其中:
推荐一个Raft算法动态描述的网站: https://raft.github.io/
至于 Raft 和 Paxos 算法的区别,网上文章有一些,可以阅读一下,但是本人至今没仔细钻研过两个算法的区别,以后如果有时间再补上。
1.logs.InitLogs()
k8s.io/kubernetes/pkg/util/logs 日志管理代码
每30秒刷新一次日志。 使用了github.com/golang/glog
type Factory struct {
clients ClientCache
flags pflag.FlagSet
// Returns interfaces for dealing with arbitrary runtime.Objects.
Object func() (meta.RESTMapper, runtime.ObjectTyper)
// Returns interfaces for dealing with arbitrary
// runtime.Unstructured. This performs API calls to discover types.
UnstructuredObject func() (meta.RESTMapper, runtime.ObjectTyper, error)
// Returns interfaces for decoding objects - if toInternal is set, decoded objects will be converted
// into their internal form (if possible). Eventually the internal form will be removed as an option,
// and only versioned objects will be returned.
Decoder func(toInternal bool) runtime.Decoder
// Returns an encoder capable of encoding a provided object into JSON in the default desired version.
JSONEncoder func() runtime.Encoder
// ClientSet gives you back an internal, generated clientset
ClientSet func() (*internalclientset.Clientset, error)
// Returns a RESTClient for accessing Kubernetes resources or an error.
RESTClient func() (*restclient.RESTClient, error)
// Returns a client.Config for accessing the Kubernetes server.
ClientConfig func() (*restclient.Config, error)
// Returns a RESTClient for working with the specified RESTMapping or an error. This is intended
// for working with arbitrary resources and is not guaranteed to point to a Kubernetes APIServer.
ClientForMapping func(mapping *meta.RESTMapping) (resource.RESTClient, error)
// Returns a RESTClient for working with Unstructured objects.
UnstructuredClientForMapping func(mapping *meta.RESTMapping) (resource.RESTClient, error)
// Returns a Describer for displaying the specified RESTMapping type or an error.
Describer func(mapping *meta.RESTMapping) (kubectl.Describer, error)
// Returns a Printer for formatting objects of the given type or an error.
Printer func(mapping *meta.RESTMapping, options kubectl.PrintOptions) (kubectl.ResourcePrinter, error)
// Returns a Scaler for changing the size of the specified RESTMapping type or an error
Scaler func(mapping *meta.RESTMapping) (kubectl.Scaler, error)
// Returns a Reaper for gracefully shutting down resources.
Reaper func(mapping *meta.RESTMapping) (kubectl.Reaper, error)
// Returns a HistoryViewer for viewing change history
HistoryViewer func(mapping *meta.RESTMapping) (kubectl.HistoryViewer, error)
// Returns a Rollbacker for changing the rollback version of the specified RESTMapping type or an error
Rollbacker func(mapping *meta.RESTMapping) (kubectl.Rollbacker, error)
// Returns a StatusViewer for printing rollout status.
StatusViewer func(mapping *meta.RESTMapping) (kubectl.StatusViewer, error)
// MapBasedSelectorForObject returns the map-based selector associated with the provided object. If a
// new set-based selector is provided, an error is returned if the selector cannot be converted to a
// map-based selector
MapBasedSelectorForObject func(object runtime.Object) (string, error)
// PortsForObject returns the ports associated with the provided object
PortsForObject func(object runtime.Object) ([]string, error)
// ProtocolsForObject returns the <port, protocol> mapping associated with the provided object
ProtocolsForObject func(object runtime.Object) (map[string]string, error)
// LabelsForObject returns the labels associated with the provided object
LabelsForObject func(object runtime.Object) (map[string]string, error)
// LogsForObject returns a request for the logs associated with the provided object
LogsForObject func(object, options runtime.Object) (*restclient.Request, error)
// PauseObject marks the provided object as paused ie. it will not be reconciled by its controller.
PauseObject func(object runtime.Object) (bool, error)
// ResumeObject resumes a paused object ie. it will be reconciled by its controller.
ResumeObject func(object runtime.Object) (bool, error)
// Returns a schema that can validate objects stored on disk.
Validator func(validate bool, cacheDir string) (validation.Schema, error)
// SwaggerSchema returns the schema declaration for the provided group version kind.
SwaggerSchema func(unversioned.GroupVersionKind) (*swagger.ApiDeclaration, error)
// Returns the default namespace to use in cases where no
// other namespace is specified and whether the namespace was
// overridden.
DefaultNamespace func() (string, bool, error)
// Generators returns the generators for the provided command
Generators func(cmdName string) map[string]kubectl.Generator
// Check whether the kind of resources could be exposed
CanBeExposed func(kind unversioned.GroupKind) error
// Check whether the kind of resources could be autoscaled
CanBeAutoscaled func(kind unversioned.GroupKind) error
// AttachablePodForObject returns the pod to which to attach given an object.
AttachablePodForObject func(object runtime.Object) (*api.Pod, error)
// UpdatePodSpecForObject will call the provided function on the pod spec this object supports,
// return false if no pod spec is supported, or return an error.
UpdatePodSpecForObject func(obj runtime.Object, fn func(*api.PodSpec) error) (bool, error)
// EditorEnvs returns a group of environment variables that the edit command
// can range over in order to determine if the user has specified an editor
// of their choice.
EditorEnvs func() []string
// PrintObjectSpecificMessage prints object-specific messages on the provided writer
PrintObjectSpecificMessage func(obj runtime.Object, out io.Writer)
}
const (
RunV1GeneratorName = “run/v1”
RunPodV1GeneratorName = “run-pod/v1”
ServiceV1GeneratorName = “service/v1”
ServiceV2GeneratorName = “service/v2”
ServiceNodePortGeneratorV1Name = “service-nodeport/v1”
ServiceClusterIPGeneratorV1Name = “service-clusterip/v1”
ServiceLoadBalancerGeneratorV1Name = “service-loadbalancer/v1”
ServiceAccountV1GeneratorName = “serviceaccount/v1”
HorizontalPodAutoscalerV1Beta1GeneratorName = “horizontalpodautoscaler/v1beta1”
HorizontalPodAutoscalerV1GeneratorName = “horizontalpodautoscaler/v1”
DeploymentV1Beta1GeneratorName = “deployment/v1beta1”
DeploymentBasicV1Beta1GeneratorName = “deployment-basic/v1beta1”
JobV1Beta1GeneratorName = “job/v1beta1”
JobV1GeneratorName = “job/v1”
ScheduledJobV2Alpha1GeneratorName = “scheduledjob/v2alpha1”
NamespaceV1GeneratorName = “namespace/v1”
ResourceQuotaV1GeneratorName = “resourcequotas/v1”
SecretV1GeneratorName = “secret/v1”
SecretForDockerRegistryV1GeneratorName = “secret-for-docker-registry/v1”
SecretForTLSV1GeneratorName = “secret-for-tls/v1”
ConfigMapV1GeneratorName = “configmap/v1”
)
Eclipse 安装 scala IDE 插件:
http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site
源代码 import进Eclipse
Step 1: Download the code
Download the 0.10.0.0 release and un-tar it.
tar -xzf kafka_2.11-0.10.0.0.tgz
cd kafka_2.11-0.10.0.0
Step 2: Start the server
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don’t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.
bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
…
Now start the Kafka server:
bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
…
Step 3: Create a topic
Let’s create a topic named “test” with a single partition and only one replica:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
We can now see that topic if we run the list topic command:
bin/kafka-topics.sh –list –zookeeper localhost:2181
test
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
Step 4: Send some messages
Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.
Run the producer and then type a few messages into the console to send to the server.
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
This is a message
This is another message
Step 5: Start a consumer
Kafka also has a command line consumer that will dump out messages to standard output.
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning
This is a message
This is another message
If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.
Step 6: Setting up a multi-broker cluster
So far we have been running against a single broker, but that’s no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let’s expand our cluster to three nodes (still all on our local machine).
First we make a config file for each of the brokers:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
Now edit these new files and set the following properties:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
bin/kafka-server-start.sh config/server-1.properties &
…
bin/kafka-server-start.sh config/server-2.properties &
…
Now create a new topic with a replication factor of three:
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 3 –partitions 1 –topic my-replicated-topic
Okay but now that we have a cluster how can we know which broker is doing what? To see that run the “describe topics” command:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
Note that in my example node 1 is the leader for the only partition of the topic.
We can run the same command on the original topic we created to see where it is:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.
Let’s publish a few messages to our new topic:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic my-replicated-topic
…
my test message 1
my test message 2
^C
Now let’s consume these messages:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
…
my test message 1
my test message 2
^C
Now let’s test out fault-tolerance. Broker 1 was acting as the leader so let’s kill it:
ps | grep server-1.properties
7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java…
kill -9 7564
Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
But the messages are still be available for consumption even though the leader that took the writes originally is down:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –from-beginning –topic my-replicated-topic
…
my test message 1
my test message 2
^C
Step 7: Use Kafka Connect to import/export data
Writing data from the console and writing it back to the console is a convenient place to start, but you’ll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data. Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we’ll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file. First, we’ll start by creating some seed data to test with:
echo -e “foo\nbar” > test.txt
Next, we’ll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file. During startup you’ll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from
test.txt
and producing them to the topic
connect-test
, and the sink connector should start reading messages from the topic
connect-test
and write them to the file
test.sink.txt
. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:
cat test.sink.txt
foo
bar
Note that the data is being stored in the Kafka topic
connect-test
, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic connect-test –from-beginning
{“schema”:{“type”:”string”,”optional”:false},”payload”:”foo”}
{“schema”:{“type”:”string”,”optional”:false},”payload”:”bar”}
…
The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
echo “Another line” >> test.txt
You should see the line appear in the console consumer output and in the sink file.
Step 8: Use Kafka Streams to process data
Kafka Streams is a client library of Kafka for real-time stream processing and analyzing data stored in Kafka brokers. This quickstart example will demonstrate how to run a streaming application coded in this library. Here is the gist of the WordCountDemo example code (converted to use Java 8 lambda expressions for easy reading).
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split(“\W+”)))
// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))
// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")
It implements the WordCount algorithm, which computes a word occurrence histogram from the input text. However, unlike other WordCount examples you might have seen before that operate on bounded data, the WordCount demo application behaves slightly differently because it is designed to operate on aninfinite, unbounded stream of data. Similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. However, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data.
We will now prepare input data to a Kafka topic, which will subsequently processed by a Kafka Streams application.
echo -e “all streams lead to kafka\nhello kafka streams\njoin kafka summit” > file-input.txt
Next, we send this input data to the input topic named streams-file-input using the console producer (in practice, stream data will likely be flowing continuously into Kafka where the application will be up and running):
bin/kafka-topics.sh –create \
–zookeeper localhost:2181 \
–replication-factor 1 \
–partitions 1 \
–topic streams-file-inputcat file-input.txt | bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-file-input
We can now run the WordCount demo application to process the input data:
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
There won’t be any STDOUT output except log entries as the results are continuously written back into another topic named streams-wordcount-output in Kafka. The demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.
We can now inspect the output of the WordCount demo application by reading from its output topic:
bin/kafka-console-consumer.sh –zookeeper localhost:2181 \
–topic streams-wordcount-output \
–from-beginning \
–formatter kafka.tools.DefaultMessageFormatter \
–property print.key=true \
–property print.value=true \
–property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
–property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
with the following output data being printed to the console:
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
join 1
kafka 3
summit 1
Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:
GET /connectors
- return a list of active connectorsPOST /connectors
- create a new connector; the request body should be a JSON object containing a string name field and a object config field with the connector configuration parametersGET /connectors/{name}
- get information about a specific connectorGET /connectors/{name}/config
- get the configuration parameters for a specific connectorPUT /connectors/{name}/config
- update the configuration parameters for a specific connectorGET /connectors/{name}/status
- get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasksGET /connectors/{name}/tasks
- get a list of tasks currently running for a connectorGET /connectors/{name}/tasks/{taskid}/status
- get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failedPUT /connectors/{name}/pause
- pause the connector and its tasks, which stops message processing until the connector is resumedPUT /connectors/{name}/resume
- resume a paused connector (or do nothing if the connector is not paused)POST /connectors/{name}/restart
- restart a connector (typically because it has failed)POST /connectors/{name}/tasks/{taskId}/restart
- restart an individual task (typically because it has failed)DELETE /connectors/{name}
- delete a connector, halting all tasks and deleting its configurationKafka Connect also provides a REST API for getting information about connector plugins:
GET /connector-plugins-
return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jarsPUT /connector-plugins/{connector-type}/config/validate
- validate the provided configuration values against the configuration definition. This API performs per config validation, returns suggested values and error messages during validation.Kafka has four core APIs:
kafka.producer.SyncProducer and kafka.producer.async.AsyncProducer.
can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched datakafka.producer.Producer
provides the ability to batch multiple produce requests (producer.type=async), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either queue.time or batch.size is reached. A background thread (kafka.producer.async.ProducerSendThread
) dequeues the batch of data and lets the kafka.producer.EventHandler
serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the event.handler config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the kafka.producer.async.CallbackHandler
interface and setting callback.handlerconfig
parameter to that class.
handles the serialization of data through a user-specified Encoder:
|
|
The default is the no-op kafka.serializer.DefaultEncoder
The routing decision is influenced by the kafka.producer.Partitioner.
|
|
Broker Node Registry:
Broker Topic Registry:
Consumer Id Registry:
Consumer Offsets:
Partition Owner registry:
https://coreos.com/etcd/docs/latest/
https://coreos.com/etcd/docs/latest/api.html
https://coreos.com/etcd/docs/latest/libraries-and-tools.html
Installation Requirements:
Instructions:
客户端:client.go
etcdmain/main.go Main():
1.源码下载:git clone https://github.com/apache/zookeeper.git
2.通过Eclipse新建一个Java project, 将project prosition 设置为source code download的位置,java project 创建成果之后,右键build.xml run as Ant build,会自动下载build project的jar包。
多个文件目录下都有build.xml文件,都可以尝试run as Ant build,下载build project所需要的jar包。
3.运行build.xml之后,jar包或许仍不完整,需要自己下载jar包,目录截图如下:
4.至此,zookeeper java工程的编译任务完成。
|
|
RestAPI源码入门:
RestAPI 入口main函数所在文件: org.apache.zookeeper.server.jersey.RestMain
ZooKeeper Source Code 解析:
1.zkServer 脚本启动命令:
2.zkCli 脚本启动命令:
由上可知, ZooKeeper的server启动入口函数为 QuorumPeerMain ,而client的启动入口函数为 ZooKeeperMain。
main函数:
=======================================================================
ZooKeeperMain解析源码:
main函数:
==============================================================================================================================
SendThred run函数:
属性: state: socket连接状态,isFirstConnect: socket连接是否为第一次初始化连接,zookeeerSaslClient: SASL用户认证机制
==============================================================================================================================
EventThred run函数:
属性: waitingEvents
ClinetCnxn是核心函数:
ZooKeeper 服务:
ZooKeeper 名字空间:
《ZooKeeper分布式过程协同技术详解》
wiki: https://en.wikipedia.org/wiki/Paxos_(computer_science)
ZooKeeper API:
1.create /path data
2.delete /path
3.exists /path
4.setData /path data
5.getData /path
6.getChildren /path
ZooKeeper服务器的两种工作模式: 独立模式(standalone)和仲裁模式(quorum)
独立模式(standalone): 单独的ZooKeeper服务器。
仲裁模式(quorum): ZooKeeper集合(ZooKeeper ensemble)。 仲裁模式中,为减少ZooKeeper Server数据同步的延迟,法定人数 的概念被使用,法定人数的大小设置非常重要。
仲裁模式试验:
configure 文件:
z1.cfg
z2.cfg
z3.cfg
启动3个节点命令:
连接集群:
创建 ephemeral 节点:
创建 sequential 节点:
WatchedEvent 数据结构包含以下信息:
Spark 这一大数据分析框架,包含了:
参考: http://spark.apache.org/docs/latest/streaming-programming-guide.html
StreamingContext:
socket:
往9999端口里面输入:
参考: http://spark.apache.org/docs/latest/graphx-programming-guide.html
|
|
构建graph的代码:
Graph Operators:
Shortest Path:
参考: http://spark.apache.org/docs/latest/sql-programming-guide.html
|
|
Running the Thrift JDBC/ODBC server:
Running the Spark SQL CLI: