概述
本文转自:Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20707
(出处: about云开发)
Spark安装目录
[Bash shell]
纯文本查看
复制代码
1
|
/Users/erichan/Garden/spark-1
.4.0-bin-hadoop2.6
|
基本测试
[Bash shell]
纯文本查看
复制代码
1
|
.
/bin/run-example
org.apache.spark.examples.SparkPi
|
[Bash shell]
纯文本查看
复制代码
1
|
MASTER=
local
[20] .
/bin/run-example
org.apache.spark.examples.SparkPi
|
scala
[Scala]
纯文本查看
复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import
org.apache.spark.SparkContextimport org.apache.spark.SparkContext.
_
/**
* 用Scala编写的一个简单的Spark应用
*/
object
ScalaApp {
在主函数里,我们要初始化所需的 SparkContext 对象,并且用它通过 textFile 函数来访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射
:
def
main(args
:
Array[String]) {
val
sc
=
new
SparkContext(
"local[2]"
,
"First Spark App"
)
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
val
data
=
sc.textFile(
"data/UserPurchaseHistory.csv"
)
.map(line
=
> line.split(
","
))
.map(purchaseRecord
=
> (purchaseRecord(
0
), purchaseRecord(
1
),
purchaseRecord(
2
)))
现在,我们有了一个RDD,其每条记录都由 (user, product, price) 三个字段构成。我们可以对商店计算如下指标
:
购买总次数 客户总个数 总收入
1.4
最畅销的产品
// 求购买次数
val
numPurchases
=
data.count()
// 求有多少个不同客户购买过商品
val
uniqueUsers
=
data.map{
case
(user, product, price)
=
> user }.distinct().count()
// 求和得出总收入
val
totalRevenue
=
data.map{
case
(user, product, price)
=
> price.toDouble }.sum()
// 求最畅销的产品是什么
val
productsByPopularity
=
data
.map{
case
(user, product, price)
=
> (product,
1
) }
.reduceByKey(
_
+
_
)
.collect()
.sortBy(-
_
.
_
2
)
val
mostPopular
=
productsByPopularity(
0
)
最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。第一步,我们将 (user, product, price) 格式的记录映射为 (product,
1
) 格式。然后,我们执行一个 reduceByKey 操作,它会对各个产品的
1
值进行求和。转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用 collect 函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过 sortByKey 这类操作来对其进行并行排序。) 最后,可在终端上打印出计算结果
:
println(
"Total purchases: "
+ numPurchases)
println(
"Unique users: "
+ uniqueUsers)
println(
"Total revenue: "
+ totalRevenue)
println(
"Most popular product: %s with %d purchases"
.
format(mostPopular.
_
1
, mostPopular.
_
2
))
}
}
|
可以在项目的主目录下执行 sbt run 命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:
...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
14/01/30 10:54:40 INFO spark.SparkContext: Job finished: collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
build.sbt
[Bash shell]
纯文本查看
复制代码
1
2
3
4
5
6
7
|
name :=
"scala-spark-app"
version :=
"1.0"
scalaVersion :=
"2.11.6"
libraryDependencies +=
"org.apache.spark"
%%
"spark-core"
%
"1.4.0"
|
[Bash shell]
纯文本查看
复制代码
1
|
erichan:scala-spark-app/ $ sbt run
|
java 8
[Bash shell]
纯文本查看
复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.api.java.
function
.PairFunction;
import
scala.Tuple2;
import
java.util.List;
public class JavaApp {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(
"local[2]"
,
"First Spark App"
);
JavaRDD<String[]> data = sc.textFile(
"data/UserPurchaseHistory.csv"
).map(s -> s.
split
(
","
));
long numPurchases = data.count();
long uniqueUsers = data.map(strings -> strings[0]).distinct().count();
double totalRevenue = data.mapToDouble(strings -> Double.parseDouble(strings[2])).
sum
();
List<Tuple2<String, Integer>> pairs = data.mapToPair(
new PairFunction<String[], String, Integer>() {
@Override
public Tuple2<String, Integer> call(String[] strings) throws Exception {
return
new Tuple2(strings[1], 1);
}
}
).reduceByKey((i1, i2) -> i1 + i2).collect();
pairs.
sort
((o1, o2) -> -(o1._2() - o2._2()));
String mostPopular = pairs.get(0)._1();
int purchases = pairs.get(0)._2();
System.out.println(
"Total purchases: "
+ numPurchases);
System.out.println(
"Unique users: "
+ uniqueUsers);
System.out.println(
"Total revenue: "
+ totalRevenue);
System.out.println(String.
format
(
"Most popular product: %s with %d purchases"
, mostPopular, purchases));
sc.stop();
}
}
|
Maven pom.xml
[XML]
纯文本查看
复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
project
xmlns
=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
=
"http://maven.apache.org/POM/4.0.0 [url]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]"
>
<
modelVersion
>4.0.0</
modelVersion
>
<
groupId
>java-spark-app</
groupId
>
<
artifactId
>java-spark-app</
artifactId
>
<
version
>1.0</
version
>
<
dependencies
>
<
dependency
>
<
groupId
>org.apache.spark</
groupId
>
<
artifactId
>spark-core_2.11</
artifactId
>
<
version
>1.4.0</
version
>
</
dependency
>
</
dependencies
>
<
build
>
<
plugins
>
<
plugin
>
<
groupId
>org.apache.maven.plugins</
groupId
>
<
artifactId
>maven-compiler-plugin</
artifactId
>
<
version
>3.1</
version
>
<
configuration
>
<
source
>1.8</
source
>
<
target
>1.8</
target
>
</
configuration
>
</
plugin
>
</
plugins
>
</
build
>
</
project
>
|
python
[Python]
纯文本查看
复制代码
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
from
pyspark
import
SparkContext
sc
=
SparkContext(
"local[2]"
,
"First Spark App"
)
data
=
sc.textFile(
"data/UserPurchaseHistory.csv"
).
map
(
lambda
line: line.split(
","
)).
map
(
lambda
record: (record[
0
], record[
1
], record[
2
]))
numPurchases
=
data.count()
uniqueUsers
=
data.
map
(
lambda
record: record[
0
]).distinct().count()
totalRevenue
=
data.
map
(
lambda
record:
float
(record[
2
])).
sum
()
products
=
data.
map
(
lambda
record: (record[
1
],
1.0
)).reduceByKey(
lambda
a, b: a
+
b).collect()
mostPopular
=
sorted
(products, key
=
lambda
x: x[
1
], reverse
=
True
)[
0
]
print
"Total purchases: %d"
%
numPurchases
print
"Unique users: %d"
%
uniqueUsers
print
"Total revenue: %2.2f"
%
totalRevenue
print
"Most popular product: %s with %d purchases"
%
(mostPopular[
0
], mostPopular[
1
])
sc.stop()
|
[Python]
纯文本查看
复制代码
1
2
|
cd
/
Users
/
erichan
/
Garden
/
spark
-
1.4
.
0
-
bin
-
hadoop2.
6
/
bin
.
/
spark
-
submit pythonapp.py
|
最后
以上就是慈祥烧鹅为你收集整理的Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录的全部内容,希望文章能够帮你解决Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复