我是靠谱客的博主 怕孤单含羞草,最近开发中收集的这篇文章主要介绍flink理论干货笔记(6),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

501. ScatterGatherConfiguration用来配置分散-聚集迭代模型。方法有setName、setParalleliam、registerAggregator、setOptNumVertices、getNumberOfVertices、setOptDegrees、addBroadcastSetForxxx、setDirection等

502. Gatter-Sum-Apply模型也是同步迭代进行的模型,有三个阶段,收集、总和、应用。GSA也能计算单源最短路径。调用runGatherSumApplyIteration,参数是GatherFunction、SumFunction、ApplyFunction

503. GSA还能实现PageRank、联通分量等算法; GSA的配置同样有setName、setParallelism、registerAggregator、addBroadcastSetForxxx、setOptNumVertices、setDirection等

504. 分散-聚集模型对内存有较低要求,且程序容易维护,对发送和接受的消息没有并发访问。GSA模型和分散-聚集模型很像,用GSA实现就能用后者实现。

505. gelly支持的图算法有LabelPropagation(社区发现)、联通组件(用collect-sum-apply)、单源最短路径、三角枚举器等; 聚类包括平均聚类系数、全局聚类系数、局部聚类系数。

506. HITS是超链接引发的主题搜索,它类似于PageRank,但顶点得分全部发送到每个邻居,而在pagerank中顶点得分首先除以邻居的数量。

507. Adamic-Adar测量顶点对之间的相似度,作为共享邻居的度的反对数之和。分数是非负且无界。Jaccard指数测量顶点邻域之间的相似度,且被计算为共享邻居的数量除以不同邻居的数量。分数从0到1

508.  VertexInDegree、VertexOutDegree、VertexDegrees、EdgeSourceDegrees、EdgeTargetDegrees、EdgeDegreesPair、VertexDegree、EdgeSourceDegree、EdgeTargetDegree、EdgeDegreePair、MaximumDegree、Simplify、TranslateGraphIds、TranslateVertexValues、TranslateEdgeValues等

509. GridGraph和generate()搭配是网格图生成器。CirculantGraph和generate()搭配是有向循环图生成器。CompleteGraph和generate()是完全图。CycleGraph是无向循环图。EchoGraph是回声图。EmptyGraph是空图。HypercubeGraph是超立方图。PathGraph是路径图。RMatGraph是有向幂律顿图。SingletonEdgeGraph是包含孤立的双路径的无向图。StarGraph是星图。 

510. 二分图即BipartiteGraph,其中一个DataSet表示顶部节点,另一个表示底部节点,还有一个表示二者之间的边。边用BipartiteEdge表示,没有值用NullValue表示。创建方式是fromDataSet。图形转换用Projection,分为顶部和底部projection,也可分为简单projection和完整projection,具体是projectionBottomSimple和projectionBottomFull。

511. flinkml目前支持svm、knn、minmax scaler、als、sos、交叉验证、距离指标、多元线性回归等算法。需要依赖如flink-ml_2.11包。

512. flinkml常用api有:Splitter.trainTestSplit(用于拆分训练集和测试集)、training、testing 、MultipleLinearRegression、setStepsize、setIterations、setConvergenceThreshold、fit、predict等

513. StandardScaler、PolynomialFeatures、setDegree、chainTransformer、chainPredictor; LabeledVector表示(label,features),而DenseVector表示密集向量

514. libSVM格式是通用的ml格式,可用readLibSVM读取,用writeLibSVM保存,它们在MLUtils文件中;用svm分类:
val svm=SVM()
.setBlocks(…)
.setIterations(100)
.setRegularization(…)
.setStepsize(…)
.setSeed(…) 

515. svm.fit(…)和svm.evaluate(…) ;以及kFoldSplit、multiRandomSplit、trainTestHoldoutSplit、
trainTestSplit

516. DistanceMetric可用于自定义距离指标。目前支持Euclidean、squared Euclidean、cosine、Chebyshev、Manhattan、Minkowski、Tanimoto等

517. ml.pipeline是flinkml的管道,受sklearn启发,有Estimator、Transformer、Predictor接口。FlinkMLTools. registerFlinkMLTypes用于注册ml类型

518. PolynomialFeatures是多项式特征,支持fit和transform,可以是Vector或LabeledVector类型,方法有setDegree、chainPredictor

519. 随机异常值选择(SOS)是一种无监督的离群值选择算法,为每个数据点输出异常值概率,当其他数据点与之亲和力不足,就被认为是异常值。SOS可用于日志分析、欺诈监测、质量控制等。参数有困惑度、errorTolerance、maxIterations,模型是StochasticOutlierSelection,方法有setPerplexity

520. StandardScaler让数据具有标准的均值和方差,也是一种Transformer,方法有fit和transform,类型也有Vector和LabeledVector,参数有mean、std,方法是setMean和setStd

521. ALS是一种Predictor,有fit和predict方法,参数有numFactors、lambda、Iterations、blocks、seed、temporaryPath

522. 分布式环境中,CoCoA算法用来计算本地的SDCA迭代,然后才合并到全局状态。可用来训练分布式svm,它有fit和predict方法。参数有blocks、iterations、localIterations、Regularization、stepSize、thresholdValue、seed、outputDecisionFunction

523. 思考:flinkml基于table api,也基于datastream和dataset? 

524. Stage也是flink ml的基本组件,它就是flink计算图的一个节点

525. Estimator是一个stage,有fit方法,用于训练。AlgoOperator也是一个stage,有transform方法,是一段计算逻辑。Transformer是一种AlgoOperator。Model是一个Transformer,提供了额外的api

526. 多个Stage可以用Pipeline封装成Estimator,也就是说Pipeline可以act as Estimator,每个stage可以是Estimator、Model、Transformer或AlgoOperator

527. Graph也可以act as Estimator,图其实就是stage构成的DAG。GraphBuilder可用来创建Model,用buildModel方法。Pipeline是一条路,Graph是多条路 

528. 思考:flink ml的Graph的DAG和算子,和flink dataset的DAG和算子,没关系? 

529. flink ml迭代包括有有界迭代和无界迭代,前者是离线,后者是在线

530. 最佳实践,可以用ParamterTool获取配置,方法是fromPropertiesFile、fromArgs、fromSystemProperties

531. env.getConfig.setGlobalJobParameters(paramters)来注册全局参数,然后在任意用户函数访问如getRuntimeContext(). getExecutionConfig(). getGlobalJobParamters();建议用pojo而不是TupleX,如果字段太多 

532. flink默认用slf4j,而它可以使用不同的日志,建议用Logback而不是Log4j;需在pom.xml加入logback-core和logback-classic,而去除log4j(用exclusion);同时添加log4j-over-slf4j,因为flink依赖hadoop,而后者用log4j,因此要从log4j重定向到slf4j,后又记录到Logback

533. ExecutionConfig是执行配置,方法有enableClosureCleaner(闭包清理器)、getNumberOfExecutionRetries(重新执行失败任务的次数)、getExecutionRetryDelay(重新执行任务等待的延迟)、getExecutionMode(执行模式如pipelined)、enableForceKryo(使用kryo)、enableForceAvro(使用avro)、enableObjectReuse(重用对象)、enableSysoutLogging、registerKryoType、disableAutoTypeRegistration、setTaskCancellationInterval等等

534. 实现flink.api.common.Program接口,并自定义getPlan方法,即通过计划打包程序。可通过ExecutionEnvironment. createProgramPlan()从环境创建程序的计划。

535. 调用打包程序的整个过程:
1) 搜索jar清单并查找主类或属性,其中program-class优先于main-class,如不含属性,命令行和web界面都支持手动传递入口点参数。
2) 若入口点类实现了Program接口,则调用getPlan方法来获取计划。
3) 若没有实现上述接口,则调用main方法。 

536. 并行执行时,每个并行实例处理任务输入数据的子集。要使用保存点的话,还要设置最大并行度。从保存点恢复,可以更改特定算子或整个程序的并行度。

537. setParallelism()可设在单个算子,或在整个env环境级别。甚至客户级别,比如flink run -p 10 ….jar,其中的-p,或者在java/scala代码中,client. run(program, 10,true)的10也是并行度。系统级别是在flink-conf. yaml中设置parallelism.default

538. 除了client个系统级别外,都能设置最大并行度,用setMaxParallelism(),范围一般在127~32768

539. 执行计划的可视化工具是tools/planVisualizer.html,需要设置web.submit.enable为true,端口为8081,可在执行flink作业之前显示执行计划。程序打印用env. getExecutionPlan()

540. 重启策略设置在flink-conf. yaml,参数为restart-strategy,具体有固定延迟、失败率、不重启。还支持自定义策略,调用setRestartStrategy方法。固定延迟用fixedDelayRestart方法,可设置重启次数为3,以及delay为10

541. fixed-delay.attempts是尝试次数,fixed-delay.delay是延迟时间;故障率策略用failure-rate参数,二级参数有max-failures-per-interval、failure-rate-interval、delay,也可以用编程方式,用failureRateRestart,可设置最大失败次数为3,以及delay为10;不重启策略是none,即作业直接失败,代码为noReatart

542. env.java.home是java路径。jobmanager.rpc.address要指向主节点。jobmanager.heap.mb 和taskmanager.heap.mb是每个节点的最大主内存。conf/slaves要配置从节点,运行TM。每台机器的可用cpu数量,是taskmanager.numberOfTaskSlots,而集群cpu总数是parallelism.default,临时文件是taskmanager.tmp.dirs

543. 启动flink集群用start-cluster.sh,停止集群用stop-cluster.sh。前者会在本地启动JM,并利用ssh在所有工作节点启动TM;bin下的jobmanager.sh和taskmanager.sh用于将JM和TM添加到正在运行的集群 

544. yarn上启动,用yarn-session.sh -n 4 -jm 1024m -tm 4096m;或者flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m   …/word count.jar

545. yarn的flink配置会覆盖flink-conf的多个配置,可用-Dfs.overwrite-files=true …就能无需更改配置文件。yarn部署flink后,就会显示JM连接的详细信息。yarn.containers.vcores允许覆盖vcores的数量

546. 分离的yarn会话是让它提交后关闭,用-d或--detached,此时只能用yarn application -kill appId来停止会话。yarn-session.sh -id  xxx用来附加到正在运行的会话

547. flink会在运行单个作业时将用户jar包含到系统类路径中,可以用yarn.per-job-cluster.include-user-jar控制此行为。可设置为order(默认)、first、last,来控制类路径中user-jar的位置,也可以直接disabled

548. flink-conf.yaml可设置yarn.reallocate-failed、yarn.maximum-failure-containers、yarn.application-attempts来表示恢复行为,或用-D参数;失败的yarn会话,可能是hadoop设置、版本不兼容等原因

549. yarn.log-aggregation-enable设为true才能使用yarn聚合日志,然后用yarn logs -applicationId xxid来检索日志;yarn. resourcemanager. web app. address是yarn的资源管理web界面,默认端口是8088

550. yarn集群可用防火墙控制集群的网络流量,用户可以用防火墙向flink提交作业。两个参数可以指定端口,yarn.application - master. port和blob. server. port

551. 在yarn上运行flink,此时JobManager和AM在同一个容器中运行!!!而NM和TaskManager在一个容器?

552. 要在k8s上部署flink作业,要先部署k8s集群,要本地运行k8s,建议用minikube,执行一遍minikube ssh 'sudo ip link set docker0 promisc on' ;k8s中基本flink会话部署有三个组件:部署作业运行JM、部署TM池、公开JM的rest和ui服务

553. 用以下命令启动集群:
kubectl create -f jobmanager-service. yaml
kubectl create -f jobmanager-deployment. yaml
kubectl create -f task manager-deployment.yaml 

554. 先在终端运行kubectl proxy。再在浏览器访问flink ui。终止flink会话,用kubectl  delete -f jobmanager-deployment.yaml、kubectl delete -f task manager-deployment.yaml、kubectl delete -f jobmanager-service.yaml

555. flink docker镜像托管在dockerhub,如flink:larest、flink:1.5、flink:1.5-hadoop27等。flink-container模块的build.sh可用于创建此类镜像。

556. docker compose是一种本地运行docker容器的便携方式,命令有docker-compose up(或加-d,表示后台)、docker-compose scale taskmanager=N 以及docker-compose kill ,可以访问8081的web ui 

557. 要用命令提交作业,必须将jar复制到JM容器,如docker cp path/to/jar "$xxx" : /job.jar,然后执行docker exec -t -i "$xxx"flink run /job.jar

558. JM可以配置HA,即有一个master JM,和多个slave JM,主从之间没有明显区别。必须依赖zk!!才能进行分布式协调!!!

559. conf/masters是主节点文件,包含所有JM的主机地址和端口,如jobManagerAddress1: webUIPort1~X,可以用high - availability. jobmanager. port更改端口设置

560. flink-conf.yaml还需要添加以下配置:high-availability:zookeeper、high-availability.  zookeeper. quorum:address1:2181、high-availability. zookeeper. path. root:/flink、high-availability. cluster-id/default_ns、high-availability. storageDir:hdfs///flink/recovery;conf/zoo.cfg也要配置,如server. 0=localhost:2888:3888 

561. 启动zk quorum命令为start-zookeeper-quorum.sh;stop-zookeeper-quorum.sh停止zk守护进程;yarn中可以配置最大尝试次数,yarn.resourcemanager.am.max-attempts,在yarn-site.xml;flink-conf. yaml可以配置yarn.application-attempts,注意它不能超过yarn中配置的最大尝试次数

562. 如果zk以kerberos安全模式运行,需要设置以下参数:zookeeper.sasl.service-name、zookeeper.sasl.login-context-name

563. CheckpointConfig config=env.getCheckpointConfig()
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)表示取消作业时保存检查点,而DELETE_ON_CANCELLATION表示取消作业时删除检查点。

564. state.checkpoints.dir:hdfs:///checkpoints/用来保存元数据和数据文件。可以在setStateBackend时设置。flink run -s: checkpointMetaDataPath[:runArgs] 

565. 可以用. uid("")来手动为算子分配id,可以确定每个算子的状态。算子id对于保存点非常有用。可以用webui从保存点恢复。触发保存点用flink save point :jobId[:targetDir];使用yarn触发保存点用flink savepoint :jobId [:targetDir] -yid:yarnAppId;使用保存点取消作业用flink cancel -s [:targetDir] :jobId;从保存点恢复用flink run -s :savepointPath [:runArgs]

566. 以上命令加-n(或者--allowNonRestoredState)来跳过无法映射到新程序的状态,即允许未恢复状态; flink savepoint -d:savepointPath是处理保存点;state.savepoint.dir是保存点目录,可以是hdfs:///flink/savepoints;无状态算子不是保存点的一部分。可以给所有算子分配id,如果确定它无状态,可以跳过该uid

567. 状态后台有MemoryStateBackend、Fs…、RocksDB…这几种,默认用MSB,建议MSB用于本地开发和调试以及没有状态的作业。FSB将快照写入文件系统,建议FSB用于大状态、长窗口以及所有高可用设置。RSB将整个RocksDB保存到文件系统,建议和FSB类似。RSB是目前唯一提供增量检查点的后端!! 

568. 可以为每个作业设置状态后台,用env.setStateBackend(new XxStateBackend(…)),默认状态后台在flink-conf.yaml设置,即state.backend,可以是jobmanager(即内存)、filesystem(FSB)、rocksdb(RSB),而state.checkpoints.dir定义检查点和元数据目录

569. 算子启动检查点时间(checkpoint_start_delay)和在对齐期间缓冲的数据量,都是重要的数据。当存在瞬态背压、数据偏移或网络问题,这些值可能会很高,但如果一直很高,意味着flink将许多资源放入了检查点。getCheckpointConfig().setMinPauseBetweenCheckpoint()是调整检查点之间的最短持续时间

570. 异步快照比同步更好,要尽可能使状态检查点异步。RDB和基于堆的状态后台都支持完全异步快照。大状态应该保持为被keys化状态,而不是算子状态。

571. 建议对大型状态使用RDB的增量检查点。计时器可以选择存在堆上还是RDB中,前者性能更好,后者扩展性更高,后者可能定时器数量超过可用主内存而溢出到磁盘。使用后者需要配置state.backend.rocksdb.timer-serverice.factory,可选择heap或rocksdb

572. 用setOptions将选项传给RDB,如RocksDBStateBackend. setOption(new MyOption()),预定义选项使用setPredefinedOptions() 

573. 容量规划是确定该使用多少资源让flink作业可靠运行。需要考虑反压、带宽、并行度。

574. 检查点和保存点可以压缩,默认用snappy算法。可以自定义算法。execution. setUseSnapshotCompression()激活压缩。任务本地恢复,思想是让状态快照除了写入分布式存储以外,还写入本地副本(仍需分布式)。可以有效降低恢复时间。激活本地恢复使用state.backend.local-recovery以及选项CheckpointOptions. LOCAL_RECOVERY 

575. 以下状态后台支持本地恢复,FSB和RSB,其中被keys化状态是状态最大的部分。未来会涵盖算子的状态和计时器。

576. flink-conf.yaml完整配置包括hdfs相关、核心配置、JM相关、TM相关、akka相关、rest相关、blob相关、心跳相关、ssl相关、netty相关、web前端相关、fs相关、编译优化相关、runtime相关,以及资源管理相关(RM)、yarn相关、mesos相关、HA相关、zk相关、kerberos相关、env相关、state相关、可查询状态相关(query)、metrics相关、 历史服务器相关

577. 网络缓冲区会自动避免过度缓冲,因此越多越好。从flink1.5开始在堆外分配,具体是taskmanager.memory.off-heap; taskmanager. network. memory. fraction以及.min、.max可用于定义网络缓冲区的内存

578. 可直接设置网络缓冲区的数量, 公式为#slots-per-TM^2 * #TMs*4,如20个8插槽机器的集群,大约要用5000个网络缓冲区来获得最佳吞吐量;具体用以下参数配置缓冲区数量和大小:taskmanager.network.numOfBuffers和taskmanager. memory. segment-size;临时io目录由taskmanager. tmp. dirs指定 

579. taskmanager. numberOfTaskSlots是TM处理槽的个数,可以为整个app或各个算子设置插槽数。一般与cpu核心数成比例。最终还得看并行度,即setParallelism() 

580. 最大并行度一般不能在作业运行时修改,除非完全重启。因此要提前设置它,注意高于128的最高值会导致被keys化后台稍微更大的状态快照。0<p<=max p<=2^15

581. 为算子提供稳定的id,用setUid(uid),避免了Job Graph更改导致新的uuid; RDB的性能比基于内存的MDB差,但它是唯一支持大状态和异步快照的后台,建议考虑使用它,除非你确信永远不超过主内存;建议配置JM的HA 

582. flink命令有:flink run(-p、-q、-d、-c、-m)、flink info、flink list(-s、-r、-a、-m)、flink cancel、flink stop、flink modify

583. 停止比取消更优雅,它需要实现StoppableFunction接口,然后调用stop(),而取消调用cancel(),后者如果没有停止,会定期中断线程

584. 保存点命令flink savepoint jobId xxx,而用yarn触发需要加上-yid  <yarnAppId> ;  flink cancel -s <savepoint Dir> <jobId>即只有保存点成功才会取消该作业; flink run -s <savepoint Dir>从保存点恢复;flink savepoint -d <savepoint dir>配置保存点或加上-j <jarFile>

585. start-scala-shell.sh local是flink的交互式scala shell,会自动绑定benv和senv表示批和流环境。可以添加外部依赖项,用-a或-addclasspath <path/to/jar> 

586. start-scala-shell.sh  remote是远程,需要指定host和port;start-scala-shell.sh  yarn -n 2是将flink部署到yarn,其中TM个数为2

587. flink kerberos目的是
1) 用kafka为集群作业启动安全数据访问
2) 对zk进行身份认证(如配置为SASL)
3) 验证Hadoop组件(如hdfs hbase)

588. 内部结构基于flink. runtime. security. modules. SecurityModule;Hadoop使用UserGroupInformation(UGI)类来建立进程范围的登录用户上下文;JAAS配置的kerberos可用于zk、kafka等。

589. 独立模式部署时,要将keytab,即security. kerberos. login. keytab存在于所有集群节点的指示路径上;yarn/mesos部署时,确保keys表文件存在于上述路径。会自动从客户端复制keytab到flink容器; 使用kinit时(仅yarn),避免了生成keys表的复杂性。flink  cli获取hadoop授权令牌(适用于hdfs和hbase),缺点是集群是短暂的,因为委托令牌一般一周内过期。 

590. flink集群所有内部连接都用ssl来身份验证和加密,都需要提供证书作为共享密钥。外部连接都用rest/http,也可配置为需要ssl连接。委托给代理的理由是,代理提供更多身份验证选项。 

591. 可单独为内部和外部启动ssl,用security.ssl.internal. enabled和security.ssl.rest.enabled,对于内部连接,还有以下配置,taskmanager. data. ssl. enabled、blob. service. ssl. enabled、akka. ssl. enabled 

592. ssl配置需要配置keystore和truststores,相关参数在security.ssl.internal.下面;外部连接的配置在security.ssl.rest.下面;添加security.ssl.algorithms来更新一组强大的密码套件;用keytool程序生成keys证书和信任库 

593. 具体命令为keytool  -genkeypair -alias flink. internal -keystore internal. keystore -dname  "…" -storepass xxx -keypass xxx -keyalg RSA -keysize 4096 以及keytool -exportcert -keystore …和keytool -importcert -keystore…等等

594. 部署到yarn时,命令为flink run -m yarn-cluster -yt deploy-keys/ flinkapp.jar,要将自定义CA证书添加到yarn代理节点的java默认信任库中

595. flink文件系统用flink.core.fs.FileSystem表示,支持以下:local、s3、maprfs、Swift fs、hdfs、ftp等;默认配置是用fs.default-scheme指定。连接限制用fs.<scheme>. limit. xxx指定 

596. 升级应用程序和flink,需要用保存点重启。修改应用程序要保持状态兼容,具体要匹配算子状态(依赖算子id),以及限制修改用户函数和算子。注意,算子状态可以是用户定义的,也可以是内部的。

597. 升级flink步骤,1. 旧版flink获取要迁移的作业的保存点 2. 从保存点恢复新版flink下的作业

598. 就地升级,在获取保存点后,要1. 停止/取消所有正在运行的作业 2. 关闭旧版flink的集群 3. 将flink升级到集群上的较新版本 4. 在新版本下重启集群

599. 浅拷贝升级,要1. 从保存点恢复前,除了旧版flink安装外,还要设置新版安装 2. 使用新的flink安装从保存点恢复3. 停止并关闭旧版集群

600. 前提条件,要了解是否存在将保存点迁移到新版本相关的任何api的更改。是否为作业的算子设置了明确的uid。注意,不支持半异步模式的RDB状态迁移,只支持全异步模式。 
 
 

 


  

最后

以上就是怕孤单含羞草为你收集整理的flink理论干货笔记(6)的全部内容,希望文章能够帮你解决flink理论干货笔记(6)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部