概述
a.readChar() 读取输入字符
println(“请输入任意字符:”)
val ch = StdIn.readChar()
//scala中如何读取文件中的内容
val lines = Source.fromFile("E:/data/hello.txt").getLines()
Source源文件,地址,getLines()转化流数据成迭代数据
隐式转换中
rich 这个类是什么 引用在文件读取richFile 和 比较器接口中comparable
导入scala.io.Source后 即可引用Source中的方法读取文件信息
import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt”)
//返回一个迭代器
val lines = source.getLines()
for(i <- lines)
println(i)
//内容也可以放到数组中
// val arr = source.getLines().toArray
// for(elem <- arr)
// println(elem)
//文件内容直接转换成一个字符串
// val contents = source.mkString
// println(contents)
}
读取字符
按字符读取文件
import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt”)
for(c <- source)
println©
}
读取单词
将文件中的内容 转换成一个单词数组
import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt)
val contents = source.mkString.split(” ") //转化为string然后切割成单词
for(word <- contents)
println(word)
}
读取网络文件
Source 可以直接读取来自URL、等非文件源的内容
import scala.io.Source
object FileDemo extends App{
val source = Source.fromURL(“http://www.baidu.com”, “UTF-8”)
val lines = source.getLines()
for(i <- lines)
println(i)
}
netty
//1.创建SparkConf对象进行配置,然后在创建SparkContext进行操作
val conf = new SparkConf().setAppName("BaseStationDemo").setMaster("local[2]")
val sc = new SparkContext(conf)
kafka 连接
连接生产者
获取配置: properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream(“producer.properties”));
连接生产者: Producer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
//发送数据
ProducerRecord<Integer, String> record = new ProducerRecord(“spark”, i,“11111”);
producer.send(record); 发送数据
Thread.sleep(10000); 间隔时间
producer.close(); 关闭连接
AKKA连接(已过时)
WORK
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
object WorkTest {
def main(args: Array[String]): Unit = {
val host = “127.0.0.1”
val port = “8889”
val config =
s"""
|akka.actor.provider = “akka.remote.RemoteActorRefProvider”
|akka.remote.netty.tcp.hostname = “
h
o
s
t
"
∣
a
k
k
a
.
r
e
m
o
t
e
.
n
e
t
t
y
.
t
c
p
.
p
o
r
t
=
"
host" |akka.remote.netty.tcp.port = "
host"∣akka.remote.netty.tcp.port="port”
|""".stripMargin
val configs: Config = ConfigFactory.parseString(config)
val workerSystem: ActorSystem = ActorSystem(“WorkerSystem”, configs)
val worker: ActorRef = workerSystem.actorOf(Props[Worker2], “Worker”)
worker ! “start”
}
}
class Worker2 extends Actor{
override def preStart(): Unit = {
println(“初始化方法”)
val master = context.actorSelection(“akka.tcp://MasterSystem@127.0.0.1:8888/user/Master”)
master ! “connect”
}
override def receive: Receive = {
case “start” => println(“接收到自己发来的消息”)
case “reply” => {
println(“接收到Master发来的消息”)
}
}
}
MASTER
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
object MasterTest {
def main(args: Array[String]): Unit = {
val host = “127.0.0.1”
val port = “8888”
val config =
s"""
|akka.actor.provider = “akka.remote.RemoteActorRefProvider”
|akka.remote.netty.tcp.hostname = “
h
o
s
t
"
∣
a
k
k
a
.
r
e
m
o
t
e
.
n
e
t
t
y
.
t
c
p
.
p
o
r
t
=
"
host" |akka.remote.netty.tcp.port = "
host"∣akka.remote.netty.tcp.port="port”
|""".stripMargin
val configs: Config = ConfigFactory.parseString(config)
val masterSystem: ActorSystem = ActorSystem(“MasterSystem”, configs)
val master = masterSystem.actorOf(Props[Master], “Master”)
master ! “connect”
}
}
class Master extends Actor{
override def preStart(): Unit = {
println(“初始化方法”)
}
override def receive: Receive = {
case “start” => println(“接收到自己发来的消息”)
case “connect” =>{
println(“向worker发送消息”)
sender ! “reply”
}
}
}
最后
以上就是无限芒果为你收集整理的SPARk连接的全部内容,希望文章能够帮你解决SPARk连接所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复