YMLiang

这篇文章涵盖了spark与常用关系型数据库交互的所有内容(oracle,sqlserver与mysql类似这里就不详细说明了),这也是我项目中用到最常用的几种,应该可以帮助大家快速开发项目

Spark访问Hive数据库

spark访问hive数据库有两种方式

  1. 第一种是原始的jdbc访问hive数据库

    import java.sql.DriverManager
    
    object HiveJDBC {
        def main(args: Array[String]): Unit = {
            //1. 利用反射机制获取数据库驱动类
            val driver = "org.apache.hive.jdbc.HiveDriver"
            Class.forName(driver)
            //2. 声明hive数据库的url,username,password
            val (url,username,password) = ("jdbc:hive2://master:10000","spark","")
            //2.1 获取数据库连接对象 
            val connection = DriverManager.getConnection(url,username,password)
            //2.2 编写sql语句
            val sql = "select count(*) as count from sogou.sogou500w_ext"
            //3. 执行sql语句
            val statement = connection.prepareStatement(sql)
            //4. 处理结果集
            val result = statement.executeQuery()
            //4. 循环打印结果
            while(result.next()){
            println(s"${result.getString("mycount")}")
            }
            //5. 关闭连接
            result.close()
            statement.close()
            connection.close()
        }
    }
    
  2. 第二种是通过spark自带的访问数据库的方法,在配置sparkSession时引入enableHiveSupport()支持hive的访问

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    object ReadHive {
    //隐藏打印的配置信息
    Logger.getLogger("org").setLevel(Level.ERROR)
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
        .appName("Spark Hive Demo")
        .master("local")
        .enableHiveSupport()//支持hive,这个是关键,没有不行!
        .getOrCreate()
        //spark.sparkContext.addJar("/home/ymliang/ReadHive.jar")
        //利用sparksql访问hive数据库,本质是编写sql语句
        spark.sql("use sogou")
        //show->spark的action算子,打印结果,默认20行数据
        spark.sql("select * from sogou.sogou500w_ext").show()
        spark.stop()
    }
    }
    

Spark访问MySql数据库

spark访问mysql数据库的三种方式

  1. 用jdbc方式访问mysql数据库

    /**
    * 读取数据库中的数据
    * 备注:
    * 1、将jdbc驱动拷贝到$SPARK_HOME/conf目录下,是最简单的做法;
    * 2、明白每一个参数的意思,一个参数不对整个结果出不来;
    * 3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。
    */
    object MySqlJDBC {
        def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder()
            .master("local")
            .appName(s"${this.getClass.getCanonicalName}")
            .getOrCreate()
            /**
            * 配置jdbc参数
            * url : mysql的url
            * driver : mysql的数据库驱动类
            * dbtable : 表名
            * user : 用户名
            * password : 密码
            * load() : 加载
            */
            val jdbcDF = spark.read.format("jdbc")
            .option("url","jdbc:mysql://master:3306/spark")
            .option("driver","com.mysql.jdbc.Driver")
            .option("dbtable","student")
            .option("user","spark")
            .option("password","spark")
            .load()
            //将读出的数据库信息创建临时表,常用方法是createOrReplaceTempView(创建,如果存在则重新创建视图)
            jdbcDF.createTempView("tmp")
            //格式化打印表内容
            spark.sql("select * from tmp").show()
            //打印列的Schema属性
            jdbcDF.printSchema
        }
    }
    
  2. 创建一个数据库工具类(使用事务和批处理,提高性能)

    • 首先为什么要使用批处理?
    • 批处理时:数据累积到一定数量,再一次性提交到数据库,减少了与数据库的交互次数,所以效率会大大提高
    • 事务:事务指逻辑上的一组操作,组成这组操作的各个单元,要不全部成功,要不全部不成功,默认是关闭事务的。

      import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
      
      /**
      *  既用事务,也用批处理;(建议在处理大批量的数据时,同时使用批处理和事务)
      */
      object MySqlUtils {
      /**
      * 获取一个数据库连接对象
      *
      * @return Connection
      */
      def getConnect(): Connection = {
          var conn: Connection = null
          var psts: PreparedStatement = null
          try {
          Class.forName("com.mysql.jdbc.Driver")
          conn = DriverManager.getConnection("jdbc:mysql://master:3306/spark", "spark", "spark")
          conn.setAutoCommit(false)//将自动提交关闭
          } catch {
          case e:Exception => e.printStackTrace()
          }
          conn
      }
      
      /**
          * 关闭连接 con,psts,res Object
          */
      //Connection conn, ResultSet rs, Statement st
      def closeRes(con: Connection, rs: ResultSet, psts: PreparedStatement): Unit = {
          if (con != null) {
              try {
                  con.close()
              } catch {
                  case e: Exception => e.printStackTrace()
              }
          }
          if (rs != null) {
              try {
                  rs.close()
              } catch {
                  case e: Exception => e.printStackTrace()
              }
          }
          if (psts != null) {
              try {
                  psts.close()
              } catch {
                  case e: Exception => e.printStackTrace()
              }
          }
      }
      }
      
    • 具体使用这个工具类时,调用方法获取连接关闭链接即可,这里举一个之前做的小demo

      import java.sql.{Connection, PreparedStatement}
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.{SparkConf, SparkContext}
      
      object LogTest1 {
      //隐藏打印信息
      Logger.getLogger("org").setLevel(Level.ERROR)
      def main(args: Array[String]): Unit = {
          val conf = new SparkConf()
          .setAppName(this.getClass.getCanonicalName)
          .setMaster("local")
          val sc = new SparkContext(conf)
          //读文件
          val file = sc.textFile("hdfs://master:9000/sparkTest01.log")
          //处理文件
          //数据文件大致内容如下:
          //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259
          //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713
          //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/body.jsp HTTP/1.1" 200 240
          //192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
          //192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242
          //192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240
          val data = file.map(x => {
              //按"切分
              val line = x.split("\"")
              //切分开取第二个数据,再按/切分
              val line2 = line(1).split("/")
              //取切分后的第三个个数据再按空格切分后取第一个数据即head.jsp
              //变为tuple形式(xx,1)返回
              (line2(2).split("\\s+")(0), 1)
          }).collect.filter(_._1 != "")
          //filter即取出数据为不为空的信息,起到过滤的作用
          //做wordCount
          val data2 = data.groupBy(_._1).map(x => {
          //map后数据类型是
          //x.1->String
          //x.2是Array(String,Int)
          //所以对x.2再遍历求和即得到出现的次数
          (x._1, x._2.map(x => x._2).sum)
          }).toArray
          //返回的是数组形式
      
          //JDBC访问mysql数据库,调用了MysqlUtils类获取连接和关闭链接
          //并使用了批处理executeBatch(),addBatch()
          //相比于executeUpdate()效率更高
      
          var conn: Connection = null
          var psts: PreparedStatement = null
          //jdbc
          try {
              //调用getConnect()获取链接
              conn = MySqlUtils.getConnect()
              //执行sql语句
              val psts = conn.prepareStatement("insert into test01 values(?,?)")
              for (i <- 0 to data2.length - 1) {
                  psts.setString(1, s"${data2(i)._1}")
                  psts.setInt(2, data2(i)._2)
                  // 这样,更新10000条数据,就得访问数据库10000次,造成io的负载,数据量一大会明显察觉速度变慢
                  psts.executeUpdate()
                  //psts.setString(1, s"${data2(i)._1}")
                  //psts.setInt(2, data2(i)._2)
                  //psts.addBatch()//添加到同一个批处理中
              }
              //批处理 executeBatch() 批量写入,降低IO,提高性能
              //psts.executeBatch()//执行批处理
              conn.commit(); //执行完后,手动提交事务
              conn.setAutoCommit(true); //在把自动提交打开
          } catch {
              case e: Exception => e.printStackTrace()
          } finally {
              //调用closeRes()关闭链接
              MySqlUtils.closeRes(conn, null, psts)
          }
      }
      }
      
  1. 用spark方式链接mysql,前提是必须创建dataFrame(df),这里以hive写入mysql为例,将hive同mysql整合起来,也是平常非常常用的一种技术

    import java.util.Properties
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.{SaveMode, SparkSession}

    /**

    • spark将最后一条sql语句的查询结果保存到mysql数据库中
      */
      object HiveToMySql {
      Logger.getLogger(“org”).setLevel(Level.ERROR)

      def main(args: Array[String]): Unit = {

      val spark = SparkSession.builder()
      .appName("Spark Hive Demo")
      .master("spark://master:7077")
      .enableHiveSupport() //支持hive,这个是关键,没有不行!
      .getOrCreate()
      //读取hive数据库内容
      spark.sql("use sogou")
      //获取dataFrame
      val df = spark.sql("select count(*) as mycount from sogou.sogou500w_ext")
      //创建properties对象,注入mysql配置
      val prop = new Properties()
      prop.put("user", "spark")
      prop.put("password", "spark")
      prop.put("driver", "com.mysql.jdbc.Driver")
      //写入数据时字段名要对应上,顺序可以不与实际的表对应也就是mysql与hive的表列字段要一致
      df.write
      .mode(SaveMode.Overwrite)
      .jdbc("jdbc:mysql://master:3306/spark", "spark.bb", prop)
      //写入mysql时可以配置mode(插入),overwrite(覆盖),append(追加),ignore(忽略),error(默认表存在报错)常用是mode和overwrite
      //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
      

      }
      }

以上涵盖了spark与常用关系型数据库交互的所有内容(oracle,sqlserver与mysql类似这里就不详细说明了),希望对大家有所帮助,这也是我项目中用到最常用的几种,应该可以帮助大家快速开发项目

 评论


博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Material X 作为主题 , 总访问量为 次 。
Copyright 2018-2019 YMLiang'BLOG   |   京ICP备 - 19039949  |  载入天数...载入时分秒...