我是靠谱客的博主 无限芒果,最近开发中收集的这篇文章主要介绍SPARk连接,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

a.readChar() 读取输入字符

println(“请输入任意字符:”)
val ch = StdIn.readChar()

//scala中如何读取文件中的内容

val lines = Source.fromFile("E:/data/hello.txt").getLines()

Source源文件,地址,getLines()转化流数据成迭代数据

隐式转换中

rich 这个类是什么 引用在文件读取richFile 和 比较器接口中comparable

导入scala.io.Source后 即可引用Source中的方法读取文件信息

import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt”)
//返回一个迭代器
val lines = source.getLines()
for(i <- lines)
println(i)
//内容也可以放到数组中
// val arr = source.getLines().toArray
// for(elem <- arr)
// println(elem)
//文件内容直接转换成一个字符串
// val contents = source.mkString
// println(contents)
}

读取字符

按字符读取文件
import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt”)
for(c <- source)
println©
}

读取单词

将文件中的内容 转换成一个单词数组

import scala.io.Source
object FileDemo extends App{
val source = Source.fromFile(“E:/data/hello.txt)
val contents = source.mkString.split(” ") //转化为string然后切割成单词
for(word <- contents)
println(word)
}

读取网络文件

Source 可以直接读取来自URL、等非文件源的内容

import scala.io.Source
object FileDemo extends App{
val source = Source.fromURL(“http://www.baidu.com”, “UTF-8”)
val lines = source.getLines()
for(i <- lines)
println(i)
}

netty

//1.创建SparkConf对象进行配置,然后在创建SparkContext进行操作

 val conf = new SparkConf().setAppName("BaseStationDemo").setMaster("local[2]")
 val sc = new SparkContext(conf)

kafka 连接

连接生产者
获取配置: properties.load(MyKafkaProducer.class.getClassLoader().getResourceAsStream(“producer.properties”));
连接生产者: Producer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
//发送数据
ProducerRecord<Integer, String> record = new ProducerRecord(“spark”, i,“11111”);
producer.send(record); 发送数据
Thread.sleep(10000); 间隔时间
producer.close(); 关闭连接

AKKA连接(已过时)

WORK
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

object WorkTest {
def main(args: Array[String]): Unit = {
val host = “127.0.0.1”
val port = “8889”
val config =
s"""
|akka.actor.provider = “akka.remote.RemoteActorRefProvider”
|akka.remote.netty.tcp.hostname = “ h o s t " ∣ a k k a . r e m o t e . n e t t y . t c p . p o r t = " host" |akka.remote.netty.tcp.port = " host"akka.remote.netty.tcp.port="port”
|""".stripMargin
val configs: Config = ConfigFactory.parseString(config)
val workerSystem: ActorSystem = ActorSystem(“WorkerSystem”, configs)
val worker: ActorRef = workerSystem.actorOf(Props[Worker2], “Worker”)
worker ! “start”

}

}
class Worker2 extends Actor{
override def preStart(): Unit = {
println(“初始化方法”)
val master = context.actorSelection(“akka.tcp://MasterSystem@127.0.0.1:8888/user/Master”)
master ! “connect”
}
override def receive: Receive = {
case “start” => println(“接收到自己发来的消息”)
case “reply” => {
println(“接收到Master发来的消息”)
}

}
}

MASTER

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

object MasterTest {
def main(args: Array[String]): Unit = {
val host = “127.0.0.1”
val port = “8888”
val config =
s"""
|akka.actor.provider = “akka.remote.RemoteActorRefProvider”
|akka.remote.netty.tcp.hostname = “ h o s t " ∣ a k k a . r e m o t e . n e t t y . t c p . p o r t = " host" |akka.remote.netty.tcp.port = " host"akka.remote.netty.tcp.port="port”
|""".stripMargin
val configs: Config = ConfigFactory.parseString(config)
val masterSystem: ActorSystem = ActorSystem(“MasterSystem”, configs)
val master = masterSystem.actorOf(Props[Master], “Master”)
master ! “connect”

}
}
class Master extends Actor{
override def preStart(): Unit = {
println(“初始化方法”)
}
override def receive: Receive = {
case “start” => println(“接收到自己发来的消息”)
case “connect” =>{
println(“向worker发送消息”)
sender ! “reply”
}
}
}

最后

以上就是无限芒果为你收集整理的SPARk连接的全部内容,希望文章能够帮你解决SPARk连接所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部