我是靠谱客的博主 大气鲜花,最近开发中收集的这篇文章主要介绍flink的那些事,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

flink开发流程
一 自定义source
1 定义实体bean
2 获取kafka里的数据
3 生成流,并对gson转换
问题:对类的定义 以及类转换
获取kafka的步骤,在测试阶段如何去自定义消费断点时间
代码:
//1.1

case class SpeedBean(rts: Long, parserData: Long, carModelId: String, receiveDate: Long, tripId: String, sn: String, sp: Int, ts: Long,
                       et: Int, beanType1: Int = 1) extends OutingTripStatusInfo(sn, beanType1, rts)

//1.2 获取kafka

  val zkCluster = "bigdata06:2181,bigdata09:2181,bigdata10:2181"
    val kafkaCluster = "bigdata07:9092,bigdata08:9092,bigdata09:9092"
    val topic = "vin_selfdrivingtravel_status"
    val timestamp = 1519804800000L

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers",kafkaCluster)
    props.setProperty("zookeeper.connect",zkCluster)
props.put("key.deserializer","org.apache.kakfa.common.serialization.StringDeserializer")
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("group.id","selfDrivingTest")

//1.3解析kafka 里的 内容

  val speedStreamBean = speadStreaming.map(new MapFunction[String, SpeedBean] {
      override def map(t: String): SpeedBean = {
        gson.fromJson(t, classOf[SpeedBean]).copy(beanType1 = 1)
      }
    })

二 source 的流建立连接
1 source 是否需要转化广播流
2 对连接流进行 connection

2.2 
streamOutingTripBean.connect(speedStreamBean).map(new CoMapFunction[OutingTripBean, SpeedBean, OutingTripStatusInfo]() {
  override def map1(in1: OutingTripBean): OutingTripStatusInfo = in1

  override def map2(in2: SpeedBean): OutingTripStatusInfo = in2
}).connect(gpsBeanStreamBean).map(new CoMapFunction[OutingTripStatusInfo, GpsBean, OutingTripStatusInfo]() {
  override def map1(in1: OutingTripStatusInfo): OutingTripStatusInfo = in1

  override def map2(in2: GpsBean): OutingTripStatusInfo = in2
})

问题: 1 keyby如何去选择字段去分组
2 多流connection 如何去做,用什么函数,是否可以做flink sql

三 开始计算 process
1 对keyby连接流里的字段
2 对keyby后的数据,进行process数据处理
3 对状态的选取和维护
3.1 声明状态类
3.2 选取合适的状态
3.3 获取父类状态数据
3.4 对空状态的里的数据进行特殊处理
4 计算数据逻辑
1 初始化processElement
2 不同的状态去process
3 更新状态
4 从状态里获取汇总数据
5 返回数据collection

1  .keyBy(_._vin1).process(keyedProcessFunction = new KeyedProcessFunction[String, OutingTripStatusInfo, outResult]() {

3.1 声明状态类

case class BaseStateInfo(vin: String, SDTTripId: Long, state: Int, StartTime: Long, EndTime: Long)
case class GpsStateInfo(totalDistinceGps: Double, lo: Double, ld: Double, rts: Long, distanceCurrenceDay: Double, maxDistanceDay: Long, maxDistince: Double)
case class DrivingTimeInfo(totalDrivingTime: Long, maxDrivingTime: Long, maxDrivingTimeStart: Long, maxDrivingTimeEnd: Long,
                                 maxDrivingTimeOnce: Long, maxDrivingTimeOneDay: Long, trip_id: String, currencyDrivingTime: Long)
case class SpeedStateInfo(maxSpeed: Long, maxSpeedTime: Long, sumSpeed: Long, speedCount: Long)

3.2 选取合适的状态

var vin_state: ValueState[BaseStateInfo] = _
      var speed_state: ValueState[SpeedStateInfo] = _
      var gps_state: ValueState[GpsStateInfo] = _
      var drivingTime_state: ValueState[DrivingTimeInfo] = _

3.3 获取父类状态数据

 override def open(parameters: Configuration): Unit = {
    vin_state = getRuntimeContext.getState(new ValueStateDescriptor[BaseStateInfo]("0", TypeInformation.of(new TypeHint[BaseStateInfo]() {})))
    speed_state = getRuntimeContext.getState(new ValueStateDescriptor("4", TypeInformation.of(new TypeHint[SpeedStateInfo]() {})))
    gps_state = getRuntimeContext.getState(new ValueStateDescriptor[GpsStateInfo]("2", TypeInformation.of(new TypeHint[GpsStateInfo]() {})))
    drivingTime_state = getRuntimeContext.getState(new ValueStateDescriptor[DrivingTimeInfo]("3", TypeInformation.of(new TypeHint[DrivingTimeInfo]() {})))
  }

3.4 状态数据初始化

private def initStatus(statusInfo: OutingTripStatusInfo): Unit = {
        if (vin_state.value() == null) vin_state.update(BaseStateInfo(statusInfo._vin1, 0, 0, 0, 0))
        if (speed_state.value() == null) speed_state.update(SpeedStateInfo(0, 0, 0, 0))
        if (gps_state.value() == null) gps_state.update(GpsStateInfo(0d, 0d, 0d, 0, 0d, 0, 0d))
        if (drivingTime_state.value() == null) drivingTime_state.update(DrivingTimeInfo(0, 0, 0, 0, 0, 0, "", 0))
      }
4 计算数据逻辑
	1 初始化processElement
	2 不同的状态去process
	3  更新状态
	4 从状态里获取汇总数据
	5 返回数据collection

4.1 初始化 processElement
4.2 不同的状态去process

  override def processElement(statusInfo: OutingTripStatusInfo, context: KeyedProcessFunction[String, OutingTripStatusInfo, outResult]#Context, collector: Collector[outResult]): Unit = {
    initStatus(statusInfo)
    statusInfo._beanType match {
      case 0 => processOutingTripBean(statusInfo.asInstanceOf[OutingTripBean])
      case 1 => processSpeedBean(statusInfo.asInstanceOf[SpeedBean])
      case 2 => processGpsBean(statusInfo.asInstanceOf[GpsBean])
    }

4.3更新状态

  gps_state.update(GpsStateInfo(tmpDistince, bean.lo, bean.ld, bean.rts, tmpDistince, tempMaxTime, tempMaxDistance))

4.4从状态里获取汇总数据

// 总驾驶时长
        val totalDrivingTime = drivingTime_state.value().totalDrivingTime

4.5 返回数据collection

 if (trip_id > 0 && vin != null && !"".equals(vin)) {
          collector.collect(outResult(trip_id, vin, state, totalDrivingTime, maxDrivingTime,
            maxDrivingTimeStart, maxDrivingTimeEnd, maxDrivingTimeOnce, maxDrivingTimeOneDay, avgSpead, maxSpeed,
            maxSpeedRts, totalDistance, maxDistance, maxDistanceTime, ct))
        }
      }

四 水印
1 30秒snapshot

window(TumblingEventTimeWindows.of(Time.seconds(30)))

五 sink数据

最后

以上就是大气鲜花为你收集整理的flink的那些事的全部内容,希望文章能够帮你解决flink的那些事所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部