我是靠谱客的博主 喜悦酸奶,最近开发中收集的这篇文章主要介绍Alink使用入门,基于flink的机器学习,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、什么是 Alink?

​ Alink 是阿里巴巴计算平台事业部PAI团队从 2017 年开始基于实时计算引擎 Flink 研发的新一代机器学习算法平台,提供丰富的算法组件库和便捷的操作框架,开发者可以一键搭建覆盖数据处理、特征工程、模型训练、模型预测的算法模型开发全流程。

​ 借助Flink在批流一体化方面的优势,Alink能够为批流任务提供一致性的操作。在实践过程中,Flink原有的机器学习库FlinkML的局限性显露出来(仅支持10余种算法,支持的数据结构也不够通用),但我们看重Flink底层引擎的优秀性能,于是基于Flink重新设计研发了机器学习算法库,于2018年在阿里集团内部上线,随后不断改进完善,在阿里内部错综复杂的业务场景中锻炼成长。

二、FlinkML 和 Alink 的关系

​ FlinkML 是 Flink 社区现存的一套机器学习算法库,这一套算法库已经存在很久而且更新比较缓慢。Alink 是基于新一代的 Flink,完全重新写了一套,跟 FlinkML 没有代码上的关系。Alink 由阿里巴巴计算平台事业部PAI团队研发,开发出来以后在阿里巴巴内部也用了,然后现在正式开源出来。

三、Alin机器学习案例

案例1:

准备环境

from pyalink.alink import *
resetEnv()
useLocalEnv(1, config=None)

Use one of the following commands to start using PyAlink:

  • useLocalEnv(parallelism, flinkHome=None, config=None)
  • useRemoteEnv(host, port, parallelism, flinkHome=None, localIp=“localhost”, config=None)
    Call resetEnv() to reset environment and switch to another.

JVM listening on 127.0.0.1:50568
MLEnv(benv=JavaObject id=o2, btenv=JavaObject id=o5, senv=JavaObject id=o3, stenv=JavaObject id=o6)

数据准备

## read data
URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv"
SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string"
LABEL_COL = "rating5"
TEXT_COL = "review_context"
VECTOR_COL = "vec"
PRED_COL = "pred"
PRED_DETAIL_COL = "predDetail"
source = CsvSourceBatchOp() 
.setFilePath(URL)
.setSchemaStr(SCHEMA_STR)
.setFieldDelimiter("_alink_")
.setQuoteChar(None)
## Split data for train and test
trainData = SplitBatchOp().setFraction(0.9).linkFrom(source)
testData = trainData.getSideOutput(0)

特征工程

pipeline = (
Pipeline()
.add(
Segment()
.setSelectedCol(TEXT_COL)
)
.add(
StopWordsRemover()
.setSelectedCol(TEXT_COL)
).add(
DocHashCountVectorizer()
.setFeatureType("WORD_COUNT")
.setSelectedCol(TEXT_COL)
.setOutputCol(VECTOR_COL)
)
)

模型训练

## naiveBayes model
naiveBayes = (
NaiveBayesTextClassifier()
.setVectorCol(VECTOR_COL)
.setLabelCol(LABEL_COL)
.setPredictionCol(PRED_COL)
.setPredictionDetailCol(PRED_DETAIL_COL)
)
%timeit
model = pipeline.add(naiveBayes).fit(trainData)

473 ms ± 160 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

数据预测评估

## evaluation
predict = model.transform(testData)
metrics = (
EvalMultiClassBatchOp()
.setLabelCol(LABEL_COL)
.setPredictionDetailCol(PRED_DETAIL_COL)
.linkFrom(predict)
.collectMetrics()
)

打印评估结果

print("ConfusionMatrix:", metrics.getConfusionMatrix())
print("LabelArray:", metrics.getLabelArray())
print("LogLoss:", metrics.getLogLoss())
print("Accuracy:", metrics.getAccuracy())
print("Kappa:", metrics.getKappa())
print("MacroF1:", metrics.getMacroF1())
print("Label 1 Accuracy:", metrics.getAccuracy("1"))
print("Label 1 Kappa:", metrics.getKappa("1"))
print("Label 1 Precision:", metrics.getPrecision("1"))

ConfusionMatrix: [[4987, 327, 229, 204, 292], [28, 1223, 164, 147, 108], [1, 1, 269, 10, 11], [0, 0, 0, 10, 0], [0, 2, 1, 2, 83]]
LabelArray: [‘5’, ‘4’, ‘3’, ‘2’, ‘1’]
LogLoss: 2.330945631084851
Accuracy: 0.8114582047166317
Kappa: 0.6190950197563011
MacroF1: 0.5123859853163818
Label 1 Accuracy: 0.9486356340288925
Label 1 Kappa: 0.27179135595030096
Label 1 Precision: 0.9431818181818182

案例2:

环境准备

# set env
from pyalink.alink import *
import sys, os
resetEnv()
useLocalEnv(2)

Use one of the following command to start using pyalink:
使用以下一条命令来开始使用 pyalink:

  • useLocalEnv(parallelism, flinkHome=None, config=None)
  • useRemoteEnv(host, port, parallelism, flinkHome=None, localIp=“localhost”, config=None)
    Call resetEnv() to reset environment and switch to another.
    使用 resetEnv() 来重置运行环境,并切换到另一个。

JVM listening on 127.0.0.1:51134
JavaObject id=o6

数据准备

# schema of train data
schemaStr = "id string, click string, dt string, C1 string, banner_pos int, site_id string, 
site_domain string, site_category string, app_id string, app_domain string, 
app_category string, device_id string, device_ip string, device_model string, 
device_type string, device_conn_type string, C14 int, C15 int, C16 int, C17 int, 
C18 int, C19 int, C20 int, C21 int"
# prepare batch train data
batchTrainDataFn = "http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv"
trainBatchData = CsvSourceBatchOp().setFilePath(batchTrainDataFn) 
.setSchemaStr(schemaStr) 
.setIgnoreFirstLine(True);
# feature fit
labelColName = "click"
vecColName = "vec"
numHashFeatures = 30000
selectedColNames =["C1","banner_pos","site_category","app_domain",
"app_category","device_type","device_conn_type",
"C14","C15","C16","C17","C18","C19","C20","C21",
"site_id","site_domain","device_id","device_model"]
categoryColNames = ["C1","banner_pos","site_category","app_domain",
"app_category","device_type","device_conn_type",
"site_id","site_domain","device_id","device_model"]
numericalColNames = ["C14","C15","C16","C17","C18","C19","C20","C21"]
# prepare stream train data
wholeDataFile = "http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv"
data = CsvSourceStreamOp() 
.setFilePath(wholeDataFile) 
.setSchemaStr(schemaStr) 
.setIgnoreFirstLine(True);
# split stream to train and eval data
spliter = SplitStreamOp().setFraction(0.5).linkFrom(data)
train_stream_data = spliter
test_stream_data = spliter.getSideOutput(0)

在线学习五步骤

  • *步骤一、特征工程*
  • *步骤二、批式模型训练*
  • *步骤三、在线模型训练(FTRL)*
  • *步骤四、在线预测*
  • *步骤五、在线评估*

特征工程

# setup feature enginerring pipeline
feature_pipeline = Pipeline() 
.add(StandardScaler() 
.setSelectedCols(numericalColNames)) 
.add(FeatureHasher() 
.setSelectedCols(selectedColNames) 
.setCategoricalCols(categoryColNames) 
.setOutputCol(vecColName) 
.setNumFeatures(numHashFeatures))
# fit and save feature pipeline model
FEATURE_PIPELINE_MODEL_FILE = os.path.join(os.getcwd(), "feature_pipe_model.csv")
feature_pipeline.fit(trainBatchData).save(FEATURE_PIPELINE_MODEL_FILE);
BatchOperator.execute();
# load pipeline model
feature_pipelineModel = PipelineModel.load(FEATURE_PIPELINE_MODEL_FILE);

批式模型训练

# train initial batch model
lr = LogisticRegressionTrainBatchOp()
%timeit
initModel = lr.setVectorCol(vecColName) 
.setLabelCol(labelColName) 
.setWithIntercept(True) 
.setMaxIter(10) 
.linkFrom(feature_pipelineModel.transform(trainBatchData))
59.6 ms ± 14.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

在线模型训练(FTRL)

# ftrl train
model = FtrlTrainStreamOp(initModel) 
.setVectorCol(vecColName) 
.setLabelCol(labelColName) 
.setWithIntercept(True) 
.setAlpha(0.1) 
.setBeta(0.1) 
.setL1(0.01) 
.setL2(0.01) 
.setTimeInterval(10) 
.setVectorSize(numHashFeatures) 
.linkFrom(feature_pipelineModel.transform(train_stream_data))

19.1 s ± 1.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

在线预测

# ftrl predict
predResult = FtrlPredictStreamOp(initModel) 
.setVectorCol(vecColName) 
.setPredictionCol("pred") 
.setReservedCols([labelColName]) 
.setPredictionDetailCol("details") 
.linkFrom(model, feature_pipelineModel.transform(test_stream_data))
predResult.print(key="predResult", refreshInterval = 30, maxLimit=20)
'DataStream predResult: (Updated on 2019-12-05 15:03:33)'
clickpreddetails
000{“0”:“0.9046159047711626”,“1”:"0.0953840952288…
110{“0”:“0.7301554114492774”,“1”:"0.2698445885507…
200{“0”:“0.9354702479573089”,“1”:"0.0645297520426…
310{“0”:“0.7472443769874088”,“1”:"0.2527556230125…
400{“0”:“0.7313933609276811”,“1”:"0.2686066390723…
500{“0”:“0.7579078017993002”,“1”:"0.2420921982006…
600{“0”:“0.9658883764493819”,“1”:"0.0341116235506…
700{“0”:“0.8916428187684737”,“1”:"0.1083571812315…
800{“0”:“0.964470362868512”,“1”:"0.03552963713148…
900{“0”:“0.7879843998010425”,“1”:"0.2120156001989…
1000{“0”:“0.7701207324521978”,“1”:"0.2298792675478…
1100{“0”:“0.8816330561252186”,“1”:"0.1183669438747…
1200{“0”:“0.8671197714269967”,“1”:"0.1328802285730…
1300{“0”:“0.9355228418514457”,“1”:"0.0644771581485…
1400{“0”:“0.9098863130943347”,“1”:"0.0901136869056…
1500{“0”:“0.7917622336863489”,“1”:"0.2082377663136…
1600{“0”:“0.8377318499121809”,“1”:"0.1622681500878…
1700{“0”:“0.9647915025127575”,“1”:"0.0352084974872…
1800{“0”:“0.7313985049080408”,“1”:"0.2686014950919…
1910{“0”:“0.8541619467983884”,“1”:"0.1458380532016…

在线评估

# ftrl eval
EvalBinaryClassStreamOp() 
.setLabelCol(labelColName) 
.setPredictionCol("pred") 
.setPredictionDetailCol("details") 
.setTimeInterval(10) 
.linkFrom(predResult) 
.link(JsonValueStreamOp() 
.setSelectedCol("Data") 
.setReservedCols(["Statistics"]) 
.setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"]) 
.setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])) 
.print(key="evaluation", refreshInterval = 30, maxLimit=20)
StreamOperator.execute();
'DataStream evaluation: (Updated on 2019-12-05 15:03:31)'
StatisticsAccuracyAUCConfusionMatrix
0all0.82860966707869080.7182165258211499[[5535,5007],[112297,561587]]
1window0.84649534705028610.7283501551891348[[485,456],[8534,49090]]
2all0.8300194753368480.7191075542108774[[6020,5463],[120831,610677]]
3window0.84557998844441430.7227709897015594[[512,416],[8671,49247]]
4all0.83116144553070010.719465721678977[[6532,5879],[129502,659924]]
5window0.84449541284403670.7259189182276968[[545,455],[8698,49162]]
6all0.83207330802826080.7199603254520217[[7077,6334],[138200,709086]]

案例3:

准备环境

from pyalink.alink import *
resetEnv()
useLocalEnv(1, config=None)
Use one of the following command to start using pyalink:
使用以下一条命令来开始使用 pyalink:
- useLocalEnv(parallelism, flinkHome=None, config=None)
- useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", config=None)
Call resetEnv() to reset environment and switch to another.
使用 resetEnv() 来重置运行环境,并切换到另一个。
JVM listening on 127.0.0.1:57785
JavaObject id=o6

数据准备

## prepare data
import numpy as np
import pandas as pd
data = np.array([
[0, 0.0, 0.0, 0.0],
[1, 0.1, 0.1, 0.1],
[2, 0.2, 0.2, 0.2],
[3, 9, 9, 9],
[4, 9.1, 9.1, 9.1],
[5, 9.2, 9.2, 9.2]
])
df = pd.DataFrame({"id": data[:, 0], "f0": data[:, 1], "f1": data[:, 2], "f2": data[:, 3]})
inOp = BatchOperator.fromDataframe(df, schemaStr='id double, f0 double, f1 double, f2 double')
FEATURE_COLS = ["f0", "f1", "f2"]
VECTOR_COL = "vec"
PRED_COL = "pred"

数据预处理

vectorAssembler = (
VectorAssembler()
.setSelectedCols(FEATURE_COLS)
.setOutputCol(VECTOR_COL)
)

聚类训练

kMeans = (
KMeans()
.setVectorCol(VECTOR_COL)
.setK(2)
.setPredictionCol(PRED_COL)
)

数据预测

pipeline = Pipeline().add(vectorAssembler).add(kMeans)
%timeit
pipeline.fit(inOp).transform(inOp).firstN(9).collectToDataframe()
idf0f1f2vecpred
00.00.00.00.00.0 0.0 0.01
11.00.10.10.10.1 0.1 0.11
22.00.20.20.20.2 0.2 0.21
33.09.09.09.09.0 9.0 9.00
44.09.19.19.19.1 9.1 9.10
55.09.29.29.29.2 9.2 9.20

301 ms ± 25.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

from pyalink.alink import *
resetEnv()
useLocalEnv(1, config=None)
Use one of the following command to start using pyalink:
使用以下一条命令来开始使用 pyalink:
- useLocalEnv(parallelism, flinkHome=None, config=None)
- useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", config=None)
Call resetEnv() to reset environment and switch to another.
使用 resetEnv() 来重置运行环境,并切换到另一个。
JVM listening on 127.0.0.1:57514
JavaObject id=o6

案例4:

手写数字识别

  • 使用Softmax 训练模型
  • 使用模型预测
  • 评估预测结果

准备数据

URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_dense.csv"
SCHEMA_STR = "label bigint,bitmap string"
mnist_data = CsvSourceBatchOp() 
.setFilePath(URL) 
.setSchemaStr(SCHEMA_STR)
.setFieldDelimiter(";")
spliter = SplitBatchOp().setFraction(0.8)
train = spliter.linkFrom(mnist_data)
test = spliter.getSideOutput(0)

训练 + 预测 + 评估

softmax = Softmax().setVectorCol("bitmap").setLabelCol("label") 
.setPredictionCol("pred").setPredictionDetailCol("detail") 
.setEpsilon(0.0001).setMaxIter(200)
%timeit model = softmax.fit(train)
res = model.transform(test)
evaluation = EvalMultiClassBatchOp().setLabelCol("label").setPredictionCol("pred")
metrics = evaluation.linkFrom(res).collectMetrics()

20.7 ms ± 3.22 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

打印结果

print("ConfusionMatrix:", metrics.getConfusionMatrix())
print("LabelArray:", metrics.getLabelArray())
print("LogLoss:", metrics.getLogLoss())
print("TotalSamples:", metrics.getTotalSamples())
print("ActualLabelProportion:", metrics.getActualLabelProportion())
print("ActualLabelFrequency:", metrics.getActualLabelFrequency())
print("Accuracy:", metrics.getAccuracy())
print("Kappa:", metrics.getKappa())

ConfusionMatrix: [[170, 3, 5, 0, 1, 7, 2, 2, 1, 0], [2, 154, 2, 1, 14, 3, 6, 9, 0, 2], [9, 3, 174, 0, 3, 3, 3, 3, 0, 0], [0, 0, 1, 162, 5, 4, 2, 6, 0, 7], [5, 9, 2, 5, 160, 1, 8, 1, 0, 0], [11, 4, 2, 0, 4, 187, 1, 2, 1, 1], [2, 5, 2, 2, 6, 1, 170, 4, 1, 0], [0, 2, 8, 4, 2, 4, 8, 180, 6, 1], [1, 3, 3, 1, 3, 1, 3, 3, 209, 0], [2, 2, 2, 0, 3, 1, 1, 2, 0, 179]]
LabelArray: [‘9’, ‘8’, ‘7’, ‘6’, ‘5’, ‘4’, ‘3’, ‘2’, ‘1’, ‘0’]
LogLoss: None
TotalSamples: 2000
ActualLabelProportion: [0.101, 0.0925, 0.1005, 0.0875, 0.1005, 0.106, 0.102, 0.106, 0.109, 0.095]
ActualLabelFrequency: [202, 185, 201, 175, 201, 212, 204, 212, 218, 190]
Accuracy: 0.8725
Kappa: 0.858283141946106

案例5:

准备环境

# set env
from pyalink.alink import *
resetEnv()
useLocalEnv(1, config=None)
Use one of the following commands to start using PyAlink:
- useLocalEnv(parallelism, flinkHome=None, config=None)
- useRemoteEnv(host, port, parallelism, flinkHome=None, localIp="localhost", config=None)
Call resetEnv() to reset environment and switch to another.
JVM listening on 127.0.0.1:50568
MLEnv(benv=JavaObject id=o2, btenv=JavaObject id=o5, senv=JavaObject id=o3, stenv=JavaObject id=o6)

数据准备

## read data
URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/review_rating_train.csv"
SCHEMA_STR = "review_id bigint, rating5 bigint, rating3 bigint, review_context string"
LABEL_COL = "rating5"
TEXT_COL = "review_context"
VECTOR_COL = "vec"
PRED_COL = "pred"
PRED_DETAIL_COL = "predDetail"
source = CsvSourceBatchOp() 
.setFilePath(URL)
.setSchemaStr(SCHEMA_STR)
.setFieldDelimiter("_alink_")
.setQuoteChar(None)
## Split data for train and test
trainData = SplitBatchOp().setFraction(0.9).linkFrom(source)
testData = trainData.getSideOutput(0)

特征工程

pipeline = (
Pipeline()
.add(
Segment()
.setSelectedCol(TEXT_COL)
)
.add(
StopWordsRemover()
.setSelectedCol(TEXT_COL)
).add(
DocHashCountVectorizer()
.setFeatureType("WORD_COUNT")
.setSelectedCol(TEXT_COL)
.setOutputCol(VECTOR_COL)
)
)

模型训练

## naiveBayes model
naiveBayes = (
NaiveBayesTextClassifier()
.setVectorCol(VECTOR_COL)
.setLabelCol(LABEL_COL)
.setPredictionCol(PRED_COL)
.setPredictionDetailCol(PRED_DETAIL_COL)
)
%timeit model = pipeline.add(naiveBayes).fit(trainData)

3.39 s ± 152 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

数据预测评估

## evaluation
predict = model.transform(testData)
metrics = (
EvalMultiClassBatchOp()
.setLabelCol(LABEL_COL)
.setPredictionDetailCol(PRED_DETAIL_COL)
.linkFrom(predict)
.collectMetrics()
)

打印评估结果

print("ConfusionMatrix:", metrics.getConfusionMatrix())
print("LabelArray:", metrics.getLabelArray())
print("LogLoss:", metrics.getLogLoss())
print("Accuracy:", metrics.getAccuracy())
print("Kappa:", metrics.getKappa())
print("MacroF1:", metrics.getMacroF1())
print("Label 1 Accuracy:", metrics.getAccuracy("1"))
print("Label 1 Kappa:", metrics.getKappa("1"))
print("Label 1 Precision:", metrics.getPrecision("1"))

ConfusionMatrix: [[4987, 327, 229, 204, 292], [28, 1223, 164, 147, 108], [1, 1, 269, 10, 11], [0, 0, 0, 10, 0], [0, 2, 1, 2, 83]]
LabelArray: [‘5’, ‘4’, ‘3’, ‘2’, ‘1’]
LogLoss: 2.330945631084851
Accuracy: 0.8114582047166317
Kappa: 0.6190950197563011
MacroF1: 0.5123859853163818
Label 1 Accuracy: 0.9486356340288925
Label 1 Kappa: 0.27179135595030096
Label 1 Precision: 0.9431818181818182

四、总结

​ 本文档介绍了Alink的由来以及与flink的关系,以及alink使用的五个基础案例。案例中的代码在jupyter lab中进行运行可以直接得到训练结果以及预测结果。Alink采用的python语言,但是其机器学习过程又与sparkml类似,可以简单理解为,Alink是对标sparkML的机器学习框架,并且支持在线数据流式处理能力。后续会持续进行该方向的研究。

最后

以上就是喜悦酸奶为你收集整理的Alink使用入门,基于flink的机器学习的全部内容,希望文章能够帮你解决Alink使用入门,基于flink的机器学习所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部