《ApacheNifi概念介绍、源码解析、开发指南(中文).pdf》由会员分享,可在线阅读,更多相关《ApacheNifi概念介绍、源码解析、开发指南(中文).pdf(56页珍藏版)》请在taowenge.com淘文阁网|工程机械CAD图纸|机械工程制图|CAD装配图下载|SolidWorks_CaTia_CAD_UG_PROE_设计图分享下载上搜索。
1、NiFi文档1.初识NiFi1.1.概述NiFi最早是美国国家安全局内部使用的工具,用来投递海量的传感器数据,后来由apache基金会开源。NiFi 基本设计理念:Flow Based Programming,1.2.核心概念 FlowFileFlowFile表示在系统中移动的每个对象,FlowFile由两部分组成:o content内容,即数据本身o attributes属性,每条数据带上的属性信息.以键值对的形式.FlowFile Processoro FlowFile处理器,由它完成对数据的实际处理工作,包括但不限于对数据内容和属性的加载,路由,转换,输出等.o 处理器最灵活之处在于处理
2、器可以读写FlowFile的属性信息,并且用自带的领域特定语言(DSL)对属性进行编程.Connection。由Connections把各个处理器链接起来,从而形成数据处理流程的有向无环图(DAG图).也称数据流,N iFi中 的 Flow.。Connection同时充当处理器间的队列,并且队列的属性高度可配置.o 这些队列可以配置优先级,可以在设置阈值,可以实现反压。Flow Controllero 流控制器对用户不可见的,它充当维护处理器如何连接和管理所有处理器所使用的线程及其分配的重要角色。Flow Controller充当促进处理器之间FlowFiles交换的代理。Process Gr
3、oupo 为了方便管理,把一组特定的处理器及其连接组成的Flow放到一个处理组中去,可以通过输入端口接收数据并通过输出端口发送数据。o 以这种方式,处理组可以通过组合其他组来创建全新组,形成更加复杂的DAG图(Flow流)。1.3.关键特性 Flow流高度可管理 保证交付NiFi的一个核心理念是即使在非常高的规模下,保证交付也是必须的。这是通过有效使用专门的持久化的预写日志(WAL)和内容存储库来实现的。它们的设计可以实现非常高的事务处理速率,有效的负载分散,写入时复制以及发挥传统磁盘读/写的优势。背 压和数据缓冲机制NiFi支持缓冲所有队列数据,以及在这些队列达到指定限制时提供背压的能力,或
4、者在数据达到指定时间时使数据过期失效。可 配置优先级的队列NiFi允许设置一个或多个优先级策略,用于如何从队列中检索数据。默认是先进先出,但有时候应该先拉取最新的数据,最大的数据或其他一些自定义方案。Flow流可配置特定的QoS(延迟v吞吐量,容量损失等)在Flow流中有一些点是很关键的,且不能容忍丢失.或者有时候必须在几秒钟内处理和交付它。NiFi可以对这些问题进行细粒度的特定配置。易于使用 可视化的控制和命令得益于强大的web操作界面.无论多么复杂的数据流都能在w eb界面上直观的呈现,整个数据处理流程,包括设计,控制,反馈和监控都可在web界面完成,一步到位.任何更改都能在界面上立马生效
5、,完全不要部署的过程,对于整个数据流,更可以对中间某个处理器进行单独变更,实时生效.数 据流模板对于设计好的数据流处理流,可以保存为模板来进行复用.模板可以导出成xml文件,导入到其他N iFi中进行多处使用.数据溯源flowfile流 过Flow流时,NiFi会自动记录,索引并提供可用的起源数据,包括导入,导出,转换等。这些信息对于故障排除,优化等很有用处.对 历史数据进行细粒度的恢复NiFi的内容存储库旨在充当历史记录的滚动缓冲区。数据仅在内容存储库过期时或存储空间不足时才会被删除。这与数据起源能力相结合,提供了非常精细的操作功能.包括对数据历史中的某一个点的点击查看内容,下载内容,处理回
6、放等功能.所有数据都可以回溯到它生命周期中很早的某一点.安全机制 系统内部安全Flow流中的流动的数据都可以进行加密传输 用户使用安全支持用户认证和不同级别的用户授权(可读,管理数据流,系统管理)多租户授权可扩展的架构设计 可扩展组件NiFi 的核心设计就是扩展.它的 processors,Controller Services,Reporting Tasks,Prioritizers,and Customer User Interfaces 都是 可扩展的.隔离的类加载器自定义的类加载器保证了扩展的组件简单的依赖关系.点到点的通信协议NiFi实例之间的通信协议是N iFi点到点(S2S)协议
7、。S2s可以轻松,高效,安全地将数据从一个NiFi实例传输到另一个实例。NiFi客户端的库也可以轻松在其他应用程序使用,以通过S2 s来与N iFi实例进行通信。S2s中支持基于套接字的协议和HTTP(S)协议作为底层传输协议,使得可以将代理服务器嵌入到S2s通信中。灵活的扩容模型 更多的N iFi实例可以搭建NiFi集群,也可以不组成集群,多台机器使用点到点协议来协作.更大的并发数量直接修改处理器的并发数1.4.架构匚 OS/Host白 JVM Q Web Server芯 Flow ControllerProcessor 1 Extension Nm FlowFile 塞 Content 塞
8、 ProvenanceRepository Repository RepositoryLocal Storage Web Serverweb服务器的提供基于http的命令和控制API。Flow Controller流量控制器是操作的大脑。它为扩展程序提供运行所需的线程,并管理扩展程序何时接收执行资源的时间表。Processor处理组件 Extensions扩展组件 FlowFile Repository通 过FlowFile Respository可跟踪Flow中处于活动状态的FlowFile的状态。存储库的实现是插件式的,默认是位于指定磁盘分区上的持久性预写日志。Content Reposi
9、toryContent Repository作为FlowFile的存储库,实现是插件式的,默认是一种相当简单的机制,该机制将数据块存储在文件系统中。可以指定多个文件系统存储位置,以便使用不同的物理分区以减少任何单个卷上的争用。Provenance RepositoryProvenance Repository是存储所有来源事件数据的地方。存储库实现是插件式的,默认实现是使用一个或多个物理磁盘卷。在每个位置内,事件数据都被索引并可以搜索。集群Q OS/Host戈不 8、Q OS/HostQ OS/HosIS、OS/HostoZooKeeper Serveroo o Cluster Coordin
10、ator Primary Node ZooKeeper Clienta从NiFi 1.0版本开始,采用了零主群集的范例。NiFi群集中的每个节点都对数据执行相同的任务,但是每个节点都对不同的数据集进行操作。通过ZooKeeper选择一个节点作为集群协调器,并且故障转移由ZooKeeper自动处理。所有群集节点均向群集协调器报告心跳和状态信息。群集协调器负责断开和连接节点。此外,每个群集都有一个主节点,该节点也由ZooKeeper选择。作 为DataFlow管理者,您可以通过任何节点的用户界面(UI)与NiFi群集进行交互。您所做的任何更改都将复制到群集中的所有节点,从而允许多个入口点。2.源代
11、码浅析2.1.1.总体结构 Snifi-1.10.0 n ifi-/SR C/nifi-1.10.0 .github .idea nif i-ap iA n ifi-assem b ly nif i-b o o ts tra p *2 nif i-com m onsA n ifi-d o c ker nif i-do csA B n ifi-ex tern a l nif i-fra m e w o rk-a p i nif i-m aven archetyp es nif i-m ocka*2 n ifi-n ar-b u n d lesA B nif i-to o lkitnifi-com
12、monsnifi-apinifi-framework-apinifi-bootstrapnifi-mocknifi-nar-bundlesnifi-assemblynifi-docsnifi-maven-archetypesnifi-externalnifi-toolkitnifi-docker nifi-api就 是 n ifi的应用程序接口,里面就是定义了整个工程用到的接口,注解,抽象类和枚举等基本的接口和信息.nifi-assembl负 责 n ifi的成品装配,工程打包最后形成的可供部署的压缩包就在这个工程里的target目录内.nifi-bootstarp负 责 nifi这 个 jv
13、m 应用程序的启动相关事宜 nifi-commonsn ifi诸多特性,比如datd-provenance,expression-language.s2s传输的实现就在这里,同时也是 n ifi的工具类集合 nifi-dockern ifi的 docker应用相关 nifi-docsn ifi的文档实现相关 nifi-externaln ifi内部元信息和外部交换,主要用于集群模式下 nifi-framework-api这就是n ifi核心框架层的api,也就是架构图中的Flow Controller那一层,注意这里只是各种接口信息定义,不是实现.nifi-maven-archetypes这里
14、只是为了生成两个maven archetype,一个是n ifi自定义处理器的脚手架,一个是n ifi自定义服务的脚手架,这些脚手架在maven的中央仓库都有提供.nifi-mock用 于 n ifi的 mock测试 nifi-nar-bundlesnifi java工具箱就是这里了.整个n ifi里面大部分的maven工程都是这个工程的子工程,在这个工程里面,一 个 bundle就是一个工具,也对应着上面架构图里的Extension nifi-toolkit这里面是n ifi的命令行工具的实现.nifi也提供了比较丰富的命令行指令.2.1.2.N i f i 程序入口在 nifi-bpoot
15、strap 模块内有一个 org.apache.nifi.bootstrap.RunNifi 的类,该类的 main。方法即为Nifi的启动入口方法。Bjnifi-bootstrapsrc fei main Bi java Bi org.apache.nifi.bootstrap Bi exception Bi notification ESutil G BootstrapCodec Q NIFILIstener System.ouf.p rin tln(,System.out.println(Restarx:Stop Apache N1FJ,:f it is running,and then
16、 stiSystea.out.println(5tatus:Determine if there is a running Instance of ApactSystm.our.printIn(Dump:Write a Thread Duap to the file specified by opticSystea.out.println(Diagnostics:Write diagnostic information to the file spprivate static String!)(final String!)orig)(return Ar rays.copyOfRange(or:
17、1 0 RunNiFipublic static void main(String args)throws lOExceptlon,InterruptedException(1.i faros.length v 1 11 length a 3)4 1SfirtusO RunNiFi(File)Q createSensitiveKeyFile(File):PatG diagnostics(Flle,boolean):voidO dump(File):voidO env():voidQ getBootstrapFlle(Logger,String0 getChildProcesses(String
18、):List 1)duapFlle-new FHe(arg$(l);else(MpfXte null:接着看start方法,里面做了很多前期的准备性工作,主要是加载bootstrap.conf里配置的属性,以及在里面构建另外一个java cm d命令:final ProcessBuilder builder=new ProcessBuilder():mand(cmd)Process process=builder.startf);所以这个start方法是启动了另外一个java进程,这个进程才是真正的NiFi runtime。通过代码跟踪或查看日志,可见,这个cmd命令类似如下:/opt/jdk
19、l.8.0_131/bin/java-classpath/opt/nifi-1.7.1/./conf:/opt/nifi-1.7.1/./lib/logback-core-1.2.3.jar:/opt/nifi-1.7.1/./lib/jetty-schemas-3.1.jar:/opt/nifi-1.7.1/./lib/logback-classic-1.2.3.jar:/opt/nifi-1.7.1/./lib/jul-to-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/jcl-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./
20、lib/nifi-properties-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-runtime-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-framework-api-1.7.1.jar:/opt/nifi-1.7.1/./lib/nifi-nar-utils-1.7.1.jar:/opt/nifi-1.7.1/./lib/javax.servlet-api-3.1.0.jar:/opt/nifi-1.7.1/./lib/log4j-over-slf4j-1.7.25.jar:/opt/nifi-1.7.1/./lib/slf4j
21、-api-1.7.25.jar:/opt/nifi-1.7.1/./lib/nifi-api-1.7.1.jar-Dpiler.disablejsrl99=true-Xmx3g-Xms3g-Djavax.security.auth.useSubjectCredsOnly=true-Djava.security.egd=file:/dev/urandom-D.http.allowRestrictedHeaders=true-D.preferIPv4Stack=true-Djava.awt.headless=true-XX:+UseGlGC-Djava.protocol,handler.pkgs=
22、.www.protocol-Duser.timezone=Asia/Shanghai-Dnifi.properties.file.path=/opt/nifi-l.7.l/./conf/nifi.properties-Dnifi.bootstrap.listen.port=56653-Dapp=NiFi以 盯砺日 og.apache.nifi.NiFi,可以清晰的看到,命令中实际执行的是java类 org.apache.nifi.NiFi的 main方法。2.1.3.N i f i 启动初始化这个org.apache.nifi.NiFi类在以下模块中:nifi-nar-bundlesnifi-
23、framework-bundle+-nifi-framework+-nifi-runtimeNifi-framework模块就是nifi框架的核心代码Org.apache.nifi.NiFi.main。方法如下:Main entry point of the application.*param args things which are ignored7public static void main(String args)LOGGERinfo(Launching NiFi.);y NiFiProperties properties=convertArgumentsToValidatedNi
24、FiProperties(args);new NiFi(properties);catch(final Throwable t)LOGGERerror(Failure to launch NiFi due to +1.1);)Main。方法调用了 NiFi的构造方法:public NiFi(final NiFiProperties properties)throws ClassNotFoundException,lOException,NoSuchMethodException,InstantiationException,HlegalAccessException,IHegalArgumen
25、tException.InvocationTargetException this(properties.ClassLoader.getSystemClassLoader();public NiFi(final NiFiProperties properties.ClassLoader rootClassLoader)throws ClassNotFoundException.lOException,NoSuchMethodException.InstantiationException,HlegalAccessException,IHegalArgumentException,Invocat
26、ionTargetException 第二个构造方法是实际上的构造方法,里面进行了大量初始化操作,以下是非常关键的部分:/expand the narsfinal ExtensionMapping extension Mapping=NarUnpacker.unpack/Vars(properties,systemBundle);/load the extensions classloadersNarClassLoaders narClassLoaders=NarClassLoadersHolder.gef/nstence():narClassLoaders.init(rootClassLoa
27、der,properties.getFrameworkWorkingDirectoryO,properties.getExtensionsWorkingDirectoryO);这部分初始化了 NiFi的Java犷展工具箱,这些工具从Nifi的用户来说就是在NiFi安装目录的lib目录下的各个*.nar包,这个些nar包实际就是NiFi增加了特定额外信息的jar包集合的压缩,本质上还是jar包。以下是我们解压开的一个nar包,结构如下:to META-INF maven org.apache.nifi nifi-pol-narB pom.properties过 pom.xml bundled-d
28、ependenciesA bcpkix-jdk15on-1.60.jarR bcprov-jdk15on-1.60.jar0 commons-codec-1.10.jarR commons-collections4-4.2.jar commons-compress-1.18.jarA commons-csv-1.5.jarR conrwnons-io-2.6.jarA commons-lang3-3.8.1.jar启 commons-text-1.4.jar篇 curvesapi-1.04.jarR jack$on-annotations-2.9.0.jar口 jackson-core-2.9
29、.7.jarR jackson-databind-2.9.7.jar启 jBCrypt-0.4.1.jarR nffi-poi-processors-1.9.2.jarA nifi-processor-utils-1.9.2.jar国 nifi-security-utils-1.9.2.jarA nifi-standard-record-utils-1.9.2.jarA nifi-utils-1.9.2.jar0 poi-4.0.0.jarA poi-ooxml-4.0.0.jarR poi-ooxml-schemas-4.0.0.jar0 xerceslmpl-2.11.0.jar启 xml
30、-apis-1.4.01 jarR xmlbeans-3.0.1.jar DEPENDENCIES LICENSE NOTICE MANIFEST.MF那么回到NiFi的构造方法内,首先是解压这些nar包,并在代码内用ExtensionMapping对象描述,代码如下:/expand the narsfinal ExtensionMapping extensionMapping=NarUnpacker.unpackA/ars(properties.systemBundle);然后加载并初始化这些类加载器:/load the extensions classloadersNarClassLoad
31、ers narClassLoaders=NarClassLoadersHolder.ef/nstance(),narClassLoaders.init(rootClassLoader,properties.getFrameworkWorkingDirectory(),properties.getExtensionsWorkingDirectoryO);/load the framework classloaderfinal ClassLoader frameworkClassLoader=narClassLoaders.getFrameworkBundle().getClassLoader()
32、;if(frameworkClassLoader=null)throw new HlegalStateException(Unable to find the framework NAR ClassLoader.):)final Set narBundles=narClassLoaders.getBundles():在 N iFi的官方介绍中,有两处它的特性介绍是扩展和类加载隔离,这里我们可以对它这两个特性的实现探究竟了.它为每一个nar包构造了一个独立的自定义的类加载器:NarClassLoaderpublic class NarClassLoader extends URLCIassLoa
33、der private static final Logger LOGGER=LoggerFactory.getLogger(NarClassLoader.class);private static final FileFilter JAR_FILTER-new FileFilter()Overridepublic boolean accept(File pathname)final String nameToTest=pathname.getName().toLowerCase();return nameToTest.endsWith(.jar)&pathname.isFile();;目前基
34、本清晰,N iFi的扩展性是由自定义的压缩文件nar包和自定义的类加载器来提供的.接着往下看:/load the server from the framework classloaderThread.currenf777read().setContextClassLoader(frameworkClassLoader);Class jettyServer=Class.for/VameC org.apache.ni?.web.server.JettyServer.true,frameworkClassLoader);Constructor jettyConstructor=jettyServe
35、r.getConstructor(NiFiProperties.class.Set.class);final long startTime=System.nanoT/mef);nifiServer=(NiFiServer)jettyConstructor.newlnstance(properties.narBundles):nifiServer.setExtensionMapping(extensionMapping);nifiServer.setBundles(systemBundle.narBundles);回想架构图V M 的最上层是web server,这 个 web server就是
36、在这里被加载了,这 是 个 jettyserver,继续往下看:if(shutdown)LOGGRinfo(NiFi has been shutdown via NiFi Bootstrap.Will not start Controller);else nifiServer.start();if(bootstrapListener!=null)bootstrapListener.sendStartedStatus(true);)final long duration=System.nanoT/meO startTime:LOGGE/7.info(Controller initializati
37、on took+duration+nanoseconds+(+(int)TimeUnit.SECO/VDS.convert(duration.TrneJn.NANOS ECON DS)+seconds).):)start这 个 nifiServer,这 个 NiFi对象的构造方法这里就全部走完了.2.1.4.N i F i-W e bN i F i S e r ve r.s t a r t。的方法内,代码跳转到n i f i-f r a m e w o r k下的一个了模块n i f i-w e b 内 了。*2 nif i-user-actions *2 nifi-web *2 nif i-
38、custom-ui-utilities nifi-jetty nif i-ui-extension *nifi-web-api *2 nifi-web-content-access nif i-web-content-viewer *nifiweb-docs *2nifiweb-error *2 nif i-web-optimistic-locking nif i-web-security *nifi-web-ui target他 nifi-web.imlm pom.xmlparentorg.apache.nifinifi-fram ew orkl.10.0nifi-w ebpomnifi-w
39、eb-optimistic-l.ockingnifi-custom-ui-utilitiesnifi-web-securitynifi-web-apinifi-web-errornifi-web-docsnifi-web-content-viewernifi-web-uinifi-jettyodulenifi-web-content-access由 target_rUCraEawcrU IEInifi-ui-extension/modules2.1.5.n i f i-j e t t y与Web相关的代码都在这个模块了,包括Server和界面相关的代码,上面提到的NiFiServer的实现类
40、JettyServer 就在子模块 nifi-jetty 内了。!(nifi-web 1 *2 nifl*custom*ui*utilities937eOverride h s rc938 5public void|r t r t(|maint r y Create a standard extension manager and discover extensions fei dssemblyfin a l ExtensionDiscoveringMandger extensioritanager=new StandardExtensionOiscextensiorManager.dIsco
41、verExtension*(systertundle,bundles):Bi org.apache.nifi.web.serverextens lorHaMQer.logClas$Loaderttapplng();G HostHeaderHandler O JettyServer Java/Set the extension manager into the holder which makes i t available to the 5 JettyServerExtens iorHanagerttol.der.jnit(extensiorManager);ServerConnectorCr
42、eator G ServerConfigurationExceptionH Generate docs for extensions feS resourcesDoeGenerator.generte(props,extensionManager,extensionKapping);to test接 着 看JettyServer这个类,上 面 的N iFi构造方法里面最后是先实例化了这个JettyServer,然后调用了 start方法.先看它的构造方法,只看注释,找到了核心方法:/load wars from the bundlefinal Handler warHandlers=loadl
43、nitialWars(bundles),可以看到,其实就 是 把w ar包加载进来了,这 些w ar包 就 是nifi-web下面的子工程,有几个子工程 的pom文件中配置的就是war/packaging接着看这个Start方法:第一句就是 ExtensionManager.discoverExtensions(systemBundle,bundles);就是这里把所有的扩展类加载进JVM 了,看到看到ExtensionManager的注释,这个注释就说明了一切S c a n s th r o ug h th e c l a ssp a th to l o a d a l l F l o wF
44、 H e P r o c e sso r s,F l o wF i l e C o m p a r a to r s,a n d R e p o r ti n g T a sk s usi n g th ese r vi c e p r o vi d e r A P I a n d r un n i n g th r o ug h a l l c l a ssl o a d e r s(r o o t,N A R s).这 个ExtensionManager在加载类的时候,用到了 java的一种比较高级的机制,javaSPI(service provider interface),这种机制在很
45、多框架中比如spring中大量使用final ServiceLoader serviceLoader=ServiceLoader./oad(entry.getKey().bundle.getClassLoader():这个机制解释了为什么写自定义的处理器的时候要在/resources/META-INF/services目录下面写上配置文件.在自定义处理开发的时候,一定要注意写这个配置文件,否则类是加载不进来的接 着 start这 个 jetty server,接着往下看,只看注释,可以看到,大致就是做了 server context以及 filter的注入工作了:/e n sur e th e
46、 a p p r o p r i a te wa r s d e p l o ye d suc c e ssf ul l y b e f o r e i n j e c ti n g th e N i F i c o n te xt a n d se c ur i ty f i l te r s/th i sm ust b e d o n e a f te r sta r ti n g th e se r ve r (a n d e n sur i n g th e r e we r e n o sta r t up f a i l ur e s)2.1.6.n i f i-w e b-a p
47、 i基本到这里,N iFi的实例化和初始化流程基本就有个大致了解了,我们可以接着再进一步,看到nifi-web-api这个工程,这个工程其实就是n ifi的 restful接口工程,n ifi的所有restful接口都是这里实现的,包括处理器的新增,处理器的连接以及处理器的start等在里面随便打开个以resource结尾的类:fenifiweb-api fel main Bi java tliorg.apache.nifi CBaop ta audit Cl autborization registry tS web tBapi ta concurrent Bl config Eldto B
48、i filter ta request G AccsPolicyRourc O AccessResource G ApplicationResource G ConnoctionRAsourc*G ControllerResource G Control terService Resource G Count”如sourc。0 DataTransf er Re source G FlowFileOueueRasource 0 FlowRtsourc*G FunneiResource G InputPortResource&LabeIRtsource G OutputPortResource 9
49、 PraneterContextResource*ProceswfResource O ProvenanceEventResource fte tricw i T/w tpcclfied processor.ttf Tht iGVUCProducesfHedteType.AWJanawJStW)Path(/eAplOperatlon(value Gets a proc-:response=ProcessorEnt Ity.cU5,uthorlzations=Authorization(value CApiResponsesIvalue AplAesponseCcode=M,AptRe$pon*
50、e(code 41,&piRe$pon$e code 2 3.ApiResponse(code 44M,O*piRespome(code 他nessoge=1Message=not luthrrf j,nessage-1 ienr:-,or”:-message=rc.!ur(*x,tvne)$gc=-w i;n out N if:not in app-opr i.public Response getProcessorffApPor(value The processor Id.required=true)PathParaa(Id)fin al String id)throws IIf (Is