概述
flink开发流程
一 自定义source
1 定义实体bean
2 获取kafka里的数据
3 生成流,并对gson转换
问题:对类的定义 以及类转换
获取kafka的步骤,在测试阶段如何去自定义消费断点时间
代码:
//1.1
case class SpeedBean(rts: Long, parserData: Long, carModelId: String, receiveDate: Long, tripId: String, sn: String, sp: Int, ts: Long,
et: Int, beanType1: Int = 1) extends OutingTripStatusInfo(sn, beanType1, rts)
//1.2 获取kafka
val zkCluster = "bigdata06:2181,bigdata09:2181,bigdata10:2181"
val kafkaCluster = "bigdata07:9092,bigdata08:9092,bigdata09:9092"
val topic = "vin_selfdrivingtravel_status"
val timestamp = 1519804800000L
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers",kafkaCluster)
props.setProperty("zookeeper.connect",zkCluster)
props.put("key.deserializer","org.apache.kakfa.common.serialization.StringDeserializer")
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("group.id","selfDrivingTest")
//1.3解析kafka 里的 内容
val speedStreamBean = speadStreaming.map(new MapFunction[String, SpeedBean] {
override def map(t: String): SpeedBean = {
gson.fromJson(t, classOf[SpeedBean]).copy(beanType1 = 1)
}
})
二 source 的流建立连接
1 source 是否需要转化广播流
2 对连接流进行 connection
2.2
streamOutingTripBean.connect(speedStreamBean).map(new CoMapFunction[OutingTripBean, SpeedBean, OutingTripStatusInfo]() {
override def map1(in1: OutingTripBean): OutingTripStatusInfo = in1
override def map2(in2: SpeedBean): OutingTripStatusInfo = in2
}).connect(gpsBeanStreamBean).map(new CoMapFunction[OutingTripStatusInfo, GpsBean, OutingTripStatusInfo]() {
override def map1(in1: OutingTripStatusInfo): OutingTripStatusInfo = in1
override def map2(in2: GpsBean): OutingTripStatusInfo = in2
})
问题: 1 keyby如何去选择字段去分组
2 多流connection 如何去做,用什么函数,是否可以做flink sql
三 开始计算 process
1 对keyby连接流里的字段
2 对keyby后的数据,进行process数据处理
3 对状态的选取和维护
3.1 声明状态类
3.2 选取合适的状态
3.3 获取父类状态数据
3.4 对空状态的里的数据进行特殊处理
4 计算数据逻辑
1 初始化processElement
2 不同的状态去process
3 更新状态
4 从状态里获取汇总数据
5 返回数据collection
1 .keyBy(_._vin1).process(keyedProcessFunction = new KeyedProcessFunction[String, OutingTripStatusInfo, outResult]() {
3.1 声明状态类
case class BaseStateInfo(vin: String, SDTTripId: Long, state: Int, StartTime: Long, EndTime: Long)
case class GpsStateInfo(totalDistinceGps: Double, lo: Double, ld: Double, rts: Long, distanceCurrenceDay: Double, maxDistanceDay: Long, maxDistince: Double)
case class DrivingTimeInfo(totalDrivingTime: Long, maxDrivingTime: Long, maxDrivingTimeStart: Long, maxDrivingTimeEnd: Long,
maxDrivingTimeOnce: Long, maxDrivingTimeOneDay: Long, trip_id: String, currencyDrivingTime: Long)
case class SpeedStateInfo(maxSpeed: Long, maxSpeedTime: Long, sumSpeed: Long, speedCount: Long)
3.2 选取合适的状态
var vin_state: ValueState[BaseStateInfo] = _
var speed_state: ValueState[SpeedStateInfo] = _
var gps_state: ValueState[GpsStateInfo] = _
var drivingTime_state: ValueState[DrivingTimeInfo] = _
3.3 获取父类状态数据
override def open(parameters: Configuration): Unit = {
vin_state = getRuntimeContext.getState(new ValueStateDescriptor[BaseStateInfo]("0", TypeInformation.of(new TypeHint[BaseStateInfo]() {})))
speed_state = getRuntimeContext.getState(new ValueStateDescriptor("4", TypeInformation.of(new TypeHint[SpeedStateInfo]() {})))
gps_state = getRuntimeContext.getState(new ValueStateDescriptor[GpsStateInfo]("2", TypeInformation.of(new TypeHint[GpsStateInfo]() {})))
drivingTime_state = getRuntimeContext.getState(new ValueStateDescriptor[DrivingTimeInfo]("3", TypeInformation.of(new TypeHint[DrivingTimeInfo]() {})))
}
3.4 状态数据初始化
private def initStatus(statusInfo: OutingTripStatusInfo): Unit = {
if (vin_state.value() == null) vin_state.update(BaseStateInfo(statusInfo._vin1, 0, 0, 0, 0))
if (speed_state.value() == null) speed_state.update(SpeedStateInfo(0, 0, 0, 0))
if (gps_state.value() == null) gps_state.update(GpsStateInfo(0d, 0d, 0d, 0, 0d, 0, 0d))
if (drivingTime_state.value() == null) drivingTime_state.update(DrivingTimeInfo(0, 0, 0, 0, 0, 0, "", 0))
}
4 计算数据逻辑
1 初始化processElement
2 不同的状态去process
3 更新状态
4 从状态里获取汇总数据
5 返回数据collection
4.1 初始化 processElement
4.2 不同的状态去process
override def processElement(statusInfo: OutingTripStatusInfo, context: KeyedProcessFunction[String, OutingTripStatusInfo, outResult]#Context, collector: Collector[outResult]): Unit = {
initStatus(statusInfo)
statusInfo._beanType match {
case 0 => processOutingTripBean(statusInfo.asInstanceOf[OutingTripBean])
case 1 => processSpeedBean(statusInfo.asInstanceOf[SpeedBean])
case 2 => processGpsBean(statusInfo.asInstanceOf[GpsBean])
}
4.3更新状态
gps_state.update(GpsStateInfo(tmpDistince, bean.lo, bean.ld, bean.rts, tmpDistince, tempMaxTime, tempMaxDistance))
4.4从状态里获取汇总数据
// 总驾驶时长
val totalDrivingTime = drivingTime_state.value().totalDrivingTime
4.5 返回数据collection
if (trip_id > 0 && vin != null && !"".equals(vin)) {
collector.collect(outResult(trip_id, vin, state, totalDrivingTime, maxDrivingTime,
maxDrivingTimeStart, maxDrivingTimeEnd, maxDrivingTimeOnce, maxDrivingTimeOneDay, avgSpead, maxSpeed,
maxSpeedRts, totalDistance, maxDistance, maxDistanceTime, ct))
}
}
四 水印
1 30秒snapshot
window(TumblingEventTimeWindows.of(Time.seconds(30)))
五 sink数据
最后
以上就是大气鲜花为你收集整理的flink的那些事的全部内容,希望文章能够帮你解决flink的那些事所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复