本文转自:Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20707
(出处: about云开发)
Spark安装目录
[Bash shell]
纯文本查看
复制代码
|
1
|
/Users/erichan/Garden/spark-1
.4.0-bin-hadoop2.6
|
基本测试
[Bash shell]
纯文本查看
复制代码
|
1
|
.
/bin/run-example
org.apache.spark.examples.SparkPi
|
[Bash shell]
纯文本查看
复制代码
|
1
|
MASTER=
local
[20] .
/bin/run-example
org.apache.spark.examples.SparkPi
|
scala
[Scala]
纯文本查看
复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import
org.apache.spark.SparkContextimport org.apache.spark.SparkContext.
_
/**
* 用Scala编写的一个简单的Spark应用
*/
object
ScalaApp {
在主函数里,我们要初始化所需的 SparkContext 对象,并且用它通过 textFile 函数来访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射
:
def
main(args
:
Array[String]) {
val
sc
=
new
SparkContext(
"local[2]"
,
"First Spark App"
)
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
val
data
=
sc.textFile(
"data/UserPurchaseHistory.csv"
)
.map(line
=
> line.split(
","
))
.map(purchaseRecord
=
> (purchaseRecord(
0
), purchaseRecord(
1
),
purchaseRecord(
2
)))
现在,我们有了一个RDD,其每条记录都由 (user, product, price) 三个字段构成。我们可以对商店计算如下指标
:
购买总次数 客户总个数 总收入
1.4
最畅销的产品
// 求购买次数
val
numPurchases
=
data.count()
// 求有多少个不同客户购买过商品
val
uniqueUsers
=
data.map{
case
(user, product, price)
=
> user }.distinct().count()
// 求和得出总收入
val
totalRevenue
=
data.map{
case
(user, product, price)
=
> price.toDouble }.sum()
// 求最畅销的产品是什么
val
productsByPopularity
=
data
.map{
case
(user, product, price)
=
> (product,
1
) }
.reduceByKey(
_
+
_
)
.collect()
.sortBy(-
_
.
_
2
)
val
mostPopular
=
productsByPopularity(
0
)
最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。第一步,我们将 (user, product, price) 格式的记录映射为 (product,
1
) 格式。然后,我们执行一个 reduceByKey 操作,它会对各个产品的
1
值进行求和。转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用 collect 函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过 sortByKey 这类操作来对其进行并行排序。) 最后,可在终端上打印出计算结果
:
println(
"Total purchases: "
+ numPurchases)
println(
"Unique users: "
+ uniqueUsers)
println(
"Total revenue: "
+ totalRevenue)
println(
"Most popular product: %s with %d purchases"
.
format(mostPopular.
_
1
, mostPopular.
_
2
))
}
}
|
可以在项目的主目录下执行 sbt run 命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:
...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
14/01/30 10:54:40 INFO spark.SparkContext: Job finished: collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
build.sbt
[Bash shell]
纯文本查看
复制代码
|
1
2
3
4
5
6
7
|
name :=
"scala-spark-app"
version :=
"1.0"
scalaVersion :=
"2.11.6"
libraryDependencies +=
"org.apache.spark"
%%
"spark-core"
%
"1.4.0"
|
[Bash shell]
纯文本查看
复制代码
|
1
|
erichan:scala-spark-app/ $ sbt run
|
java 8
[Bash shell]
纯文本查看
复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
import
org.apache.spark.api.java.
function
.PairFunction;
import
scala.Tuple2;
import
java.util.List;
public class JavaApp {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(
"local[2]"
,
"First Spark App"
);
JavaRDD<String[]> data = sc.textFile(
"data/UserPurchaseHistory.csv"
).map(s -> s.
split
(
","
));
long numPurchases = data.count();
long uniqueUsers = data.map(strings -> strings[0]).distinct().count();
double totalRevenue = data.mapToDouble(strings -> Double.parseDouble(strings[2])).
sum
();
List<Tuple2<String, Integer>> pairs = data.mapToPair(
new PairFunction<String[], String, Integer>() {
@Override
public Tuple2<String, Integer> call(String[] strings) throws Exception {
return
new Tuple2(strings[1], 1);
}
}
).reduceByKey((i1, i2) -> i1 + i2).collect();
pairs.
sort
((o1, o2) -> -(o1._2() - o2._2()));
String mostPopular = pairs.get(0)._1();
int purchases = pairs.get(0)._2();
System.out.println(
"Total purchases: "
+ numPurchases);
System.out.println(
"Unique users: "
+ uniqueUsers);
System.out.println(
"Total revenue: "
+ totalRevenue);
System.out.println(String.
format
(
"Most popular product: %s with %d purchases"
, mostPopular, purchases));
sc.stop();
}
}
|
Maven pom.xml
[XML]
纯文本查看
复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
project
xmlns
=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
=
"http://maven.apache.org/POM/4.0.0 [url]http://maven.apache.org/xsd/maven-4.0.0.xsd[/url]"
>
<
modelVersion
>4.0.0</
modelVersion
>
<
groupId
>java-spark-app</
groupId
>
<
artifactId
>java-spark-app</
artifactId
>
<
version
>1.0</
version
>
<
dependencies
>
<
dependency
>
<
groupId
>org.apache.spark</
groupId
>
<
artifactId
>spark-core_2.11</
artifactId
>
<
version
>1.4.0</
version
>
</
dependency
>
</
dependencies
>
<
build
>
<
plugins
>
<
plugin
>
<
groupId
>org.apache.maven.plugins</
groupId
>
<
artifactId
>maven-compiler-plugin</
artifactId
>
<
version
>3.1</
version
>
<
configuration
>
<
source
>1.8</
source
>
<
target
>1.8</
target
>
</
configuration
>
</
plugin
>
</
plugins
>
</
build
>
</
project
>
|
python
[Python]
纯文本查看
复制代码
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
|
from
pyspark
import
SparkContext
sc
=
SparkContext(
"local[2]"
,
"First Spark App"
)
data
=
sc.textFile(
"data/UserPurchaseHistory.csv"
).
map
(
lambda
line: line.split(
","
)).
map
(
lambda
record: (record[
0
], record[
1
], record[
2
]))
numPurchases
=
data.count()
uniqueUsers
=
data.
map
(
lambda
record: record[
0
]).distinct().count()
totalRevenue
=
data.
map
(
lambda
record:
float
(record[
2
])).
sum
()
products
=
data.
map
(
lambda
record: (record[
1
],
1.0
)).reduceByKey(
lambda
a, b: a
+
b).collect()
mostPopular
=
sorted
(products, key
=
lambda
x: x[
1
], reverse
=
True
)[
0
]
print
"Total purchases: %d"
%
numPurchases
print
"Unique users: %d"
%
uniqueUsers
print
"Total revenue: %2.2f"
%
totalRevenue
print
"Most popular product: %s with %d purchases"
%
(mostPopular[
0
], mostPopular[
1
])
sc.stop()
|
[Python]
纯文本查看
复制代码
|
1
2
|
cd
/
Users
/
erichan
/
Garden
/
spark
-
1.4
.
0
-
bin
-
hadoop2.
6
/
bin
.
/
spark
-
submit pythonapp.py
|
最后
以上就是慈祥烧鹅最近收集整理的关于Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录的全部内容,更多相关Spark机器学习入门·编程(scala/java/python)实现分析商店购买记录内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复