我是靠谱客的博主 积极小蝴蝶,最近开发中收集的这篇文章主要介绍Spark_通过Spark实现ip地址查询,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

标题

          • 1.需求分析
          • 2.数据准备
          • 3.代码思路
          • 4.代码


1.需求分析

在互联网中,我们经常会见到城市热点图这样的报表数据,例如在百度统计中,会统计今年的热门旅游城市、热门报考学校等,会将这样的信息显示在热点图中。我们根据每个用户的IP地址,与我们的IP地址段进行比较,确认每个IP落在哪一个IP端内,获取经纬度,然后绘制热力图
在这里插入图片描述
我们需要通过日志信息(运行商或者网站自己生成)和城市ip段信息来判断用户的ip段,统计热点经纬度。

2.数据准备

ip日志信息:
在ip日志信息中,只需要关心ip这一个维度就可以了
在这里插入图片描述

城市ip段信息:
第一第二字段分别为:ip地址的起始结束
第三第四字段分别对应:ip始末解析为数字后的结果
最后两个字段分别对应经度纬度
此次我们需要关心的是3,4,和最后两个字段
在这里插入图片描述

创建数据库:
用于对结果进行存储

CREATE DATABASE `spark`;
USE `spark`;

DROP TABLE IF EXISTS `iplocation`;

CREATE TABLE `iplocation` (
  `longitude` varchar(32) DEFAULT NULL,
  `latitude` varchar(32) DEFAULT NULL,
  `total_count` varchar(32) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.代码思路

1、加载城市ip段信息,获取ip起始数字和结束数字,经度,维度
2、 加载日志数据,获取ip信息,然后转换为数字,和ip段比较
3、 比较的时候采用二分法查找,找到对应的经度和维度
4、 然后对经度和维度做单词计数

4.代码
package cn.itcast.rdd

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

//todo:利用spark实现ip地址查询
object Iplocation {

  //将ip地址转换为Long类型   192.168.200.100
  def ip2Long(ip: String): Long = {
    val ips: Array[String] = ip.split("\.")
    var ipNum:Long=0L
    //遍历数组
    for(i <- ips){
      ipNum=i.toLong | ipNum << 8L
    }
    ipNum
  }

  //利用二分查询,找到long类型数字在数组中的下标     ip开始数字  ip结束数字   经度    维度
  def binarySearch(ipNum: Long, broadcastValue: Array[(String, String, String, String)]): Int ={
    var start=0
    var end=broadcastValue.length-1

    while(start <=end){
      val middle=(start+end)/2

      if(ipNum >=broadcastValue(middle)._1.toLong && ipNum <=broadcastValue(middle)._2.toLong){
        return middle
      }


      if(ipNum < broadcastValue(middle)._1.toLong){
        end=middle-1
      }

      if(ipNum > broadcastValue(middle)._2.toLong){
        start=middle+1
      }
    }

    -1
  }

  //将数据写入到mysql表中
  val data2Mysql=(iter:Iterator[((String, String), Int)]) =>{
    //创建数据库连接
    var conn:Connection=null
    //创建PreparedStatement
    var ps:PreparedStatement=null

    //sql语句  ?表示占位符
    val sql="insert into iplocation(longitude,latitude,total_count)  values(?,?,?)"
    try {
      //构建连接
      conn = DriverManager.getConnection("jdbc:mysql:///spark?serverTimezone=GMT%2B8&useSSL=false","root","123456")
      ps = conn.prepareStatement(sql)

      //遍历迭代器
      iter.foreach(line => {
        //给?赋值
        ps.setString(1, line._1._1)
        ps.setString(2, line._1._2)
        ps.setLong(3, line._2)
        //执行sql语句
        ps.execute()
      })
    }catch {
      case e:Exception =>println(e)
    }finally {
      //关闭数据库连接
      if(ps!=null){
        ps.close()
      }
      if(conn!=null){
        conn.close()
      }
    }

  }


  def main(args: Array[String]): Unit = {
    //1、创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("Iplocation").setMaster("local[2]")
    //2、创建SparkContext
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //3、读取城市ip信息日志数据,获取(ip开始数字、结束数字、经度、维度)
    val city_ips: RDD[(String, String, String, String)] = sc.textFile("H:\大数据实时资料\3、Spark\2、spark第二天教案文档\资料\服务器访问日志根据ip地址查找区域\用户ip上网记录以及ip字典\ip.txt").map(x=>x.split("\|")).map(x=>(x(2),x(3),x(x.length-2),x(x.length-1)))
    //把城市ip信息数据通过广播变量下发到每一个worker节点
    val broadcastIps: Broadcast[Array[(String, String, String, String)]] = sc.broadcast(city_ips.collect())

    //4、读取运营商日志数据,获取所有的ip地址
    val ips: RDD[String] = sc.textFile("H:\大数据实时资料\3、Spark\2、spark第二天教案文档\资料\服务器访问日志根据ip地址查找区域\用户ip上网记录以及ip字典\20090121000132.394251.http.format").map(x=>x.split("\|")(1))

    //5、遍历运营商日志数据,获取每一个ip地址,然后去广播变量中去匹配,获取当前ip地址对应经纬度
    val result: RDD[((String, String), Int)] = ips.mapPartitions(iter => {
      // 获取广播变量中的值
      val broadcastValue: Array[(String, String, String, String)] = broadcastIps.value
      //遍历迭代器,获取每一个ip地址
      iter.map(ip => {
        //把ip地址转换成long类型数字
        val ipNum: Long = ip2Long(ip)
        //拿到long类型的数字去广播变量值中去匹配,获取对应下标  用到二分查询
        val index: Int = binarySearch(ipNum, broadcastValue)
        //返回结果数据   ((经度,维度),1)
        ((broadcastValue(index)._3, broadcastValue(index)._4), 1)
      })
    })
    //6、统计相同经度和维度出现总次数
    val finalResult: RDD[((String, String), Int)] = result.reduceByKey(_+_)

    //7、打印输出
    finalResult.foreach(println)

    // 把结果数据写入到mysql表中
    finalResult.foreachPartition(data2Mysql)


    //8、关闭sc
    sc.stop()
  }
}

运行结果:
在这里插入图片描述

数据库:
在这里插入图片描述

最后

以上就是积极小蝴蝶为你收集整理的Spark_通过Spark实现ip地址查询的全部内容,希望文章能够帮你解决Spark_通过Spark实现ip地址查询所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部