这篇文章涵盖了spark与常用关系型数据库交互的所有内容(oracle,sqlserver与mysql类似这里就不详细说明了),这也是我项目中用到最常用的几种,应该可以帮助大家快速开发项目
Spark访问Hive数据库
spark访问hive数据库有两种方式
第一种是原始的jdbc访问hive数据库
import java.sql.DriverManager object HiveJDBC { def main(args: Array[String]): Unit = { //1. 利用反射机制获取数据库驱动类 val driver = "org.apache.hive.jdbc.HiveDriver" Class.forName(driver) //2. 声明hive数据库的url,username,password val (url,username,password) = ("jdbc:hive2://master:10000","spark","") //2.1 获取数据库连接对象 val connection = DriverManager.getConnection(url,username,password) //2.2 编写sql语句 val sql = "select count(*) as count from sogou.sogou500w_ext" //3. 执行sql语句 val statement = connection.prepareStatement(sql) //4. 处理结果集 val result = statement.executeQuery() //4. 循环打印结果 while(result.next()){ println(s"${result.getString("mycount")}") } //5. 关闭连接 result.close() statement.close() connection.close() } }
第二种是通过spark自带的访问数据库的方法,在配置sparkSession时引入
enableHiveSupport()
支持hive的访问import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object ReadHive { //隐藏打印的配置信息 Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Spark Hive Demo") .master("local") .enableHiveSupport()//支持hive,这个是关键,没有不行! .getOrCreate() //spark.sparkContext.addJar("/home/ymliang/ReadHive.jar") //利用sparksql访问hive数据库,本质是编写sql语句 spark.sql("use sogou") //show->spark的action算子,打印结果,默认20行数据 spark.sql("select * from sogou.sogou500w_ext").show() spark.stop() } }
Spark访问MySql数据库
spark访问mysql数据库的三种方式
用jdbc方式访问mysql数据库
/** * 读取数据库中的数据 * 备注: * 1、将jdbc驱动拷贝到$SPARK_HOME/conf目录下,是最简单的做法; * 2、明白每一个参数的意思,一个参数不对整个结果出不来; * 3、从数据库从读大量的数据进行分析,不推荐;读取少量的数据是可以接受的,也是常见的做法。 */ object MySqlJDBC { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local") .appName(s"${this.getClass.getCanonicalName}") .getOrCreate() /** * 配置jdbc参数 * url : mysql的url * driver : mysql的数据库驱动类 * dbtable : 表名 * user : 用户名 * password : 密码 * load() : 加载 */ val jdbcDF = spark.read.format("jdbc") .option("url","jdbc:mysql://master:3306/spark") .option("driver","com.mysql.jdbc.Driver") .option("dbtable","student") .option("user","spark") .option("password","spark") .load() //将读出的数据库信息创建临时表,常用方法是createOrReplaceTempView(创建,如果存在则重新创建视图) jdbcDF.createTempView("tmp") //格式化打印表内容 spark.sql("select * from tmp").show() //打印列的Schema属性 jdbcDF.printSchema } }
创建一个数据库工具类(使用事务和批处理,提高性能)
- 首先为什么要使用批处理?
- 批处理时:数据累积到一定数量,再一次性提交到数据库,减少了与数据库的交互次数,所以效率会大大提高
事务:事务指逻辑上的一组操作,组成这组操作的各个单元,要不全部成功,要不全部不成功,默认是关闭事务的。
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} /** * 既用事务,也用批处理;(建议在处理大批量的数据时,同时使用批处理和事务) */ object MySqlUtils { /** * 获取一个数据库连接对象 * * @return Connection */ def getConnect(): Connection = { var conn: Connection = null var psts: PreparedStatement = null try { Class.forName("com.mysql.jdbc.Driver") conn = DriverManager.getConnection("jdbc:mysql://master:3306/spark", "spark", "spark") conn.setAutoCommit(false)//将自动提交关闭 } catch { case e:Exception => e.printStackTrace() } conn } /** * 关闭连接 con,psts,res Object */ //Connection conn, ResultSet rs, Statement st def closeRes(con: Connection, rs: ResultSet, psts: PreparedStatement): Unit = { if (con != null) { try { con.close() } catch { case e: Exception => e.printStackTrace() } } if (rs != null) { try { rs.close() } catch { case e: Exception => e.printStackTrace() } } if (psts != null) { try { psts.close() } catch { case e: Exception => e.printStackTrace() } } } }
具体使用这个工具类时,调用方法获取连接关闭链接即可,这里举一个之前做的小demo
import java.sql.{Connection, PreparedStatement} import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} object LogTest1 { //隐藏打印信息 Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(this.getClass.getCanonicalName) .setMaster("local") val sc = new SparkContext(conf) //读文件 val file = sc.textFile("hdfs://master:9000/sparkTest01.log") //处理文件 //数据文件大致内容如下: //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/ HTTP/1.1" 200 259 //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/head.jsp HTTP/1.1" 200 713 //192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] "GET /MyDemoWeb/body.jsp HTTP/1.1" 200 240 //192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242 //192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/hadoop.jsp HTTP/1.1" 200 242 //192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 val data = file.map(x => { //按"切分 val line = x.split("\"") //切分开取第二个数据,再按/切分 val line2 = line(1).split("/") //取切分后的第三个个数据再按空格切分后取第一个数据即head.jsp //变为tuple形式(xx,1)返回 (line2(2).split("\\s+")(0), 1) }).collect.filter(_._1 != "") //filter即取出数据为不为空的信息,起到过滤的作用 //做wordCount val data2 = data.groupBy(_._1).map(x => { //map后数据类型是 //x.1->String //x.2是Array(String,Int) //所以对x.2再遍历求和即得到出现的次数 (x._1, x._2.map(x => x._2).sum) }).toArray //返回的是数组形式 //JDBC访问mysql数据库,调用了MysqlUtils类获取连接和关闭链接 //并使用了批处理executeBatch(),addBatch() //相比于executeUpdate()效率更高 var conn: Connection = null var psts: PreparedStatement = null //jdbc try { //调用getConnect()获取链接 conn = MySqlUtils.getConnect() //执行sql语句 val psts = conn.prepareStatement("insert into test01 values(?,?)") for (i <- 0 to data2.length - 1) { psts.setString(1, s"${data2(i)._1}") psts.setInt(2, data2(i)._2) // 这样,更新10000条数据,就得访问数据库10000次,造成io的负载,数据量一大会明显察觉速度变慢 psts.executeUpdate() //psts.setString(1, s"${data2(i)._1}") //psts.setInt(2, data2(i)._2) //psts.addBatch()//添加到同一个批处理中 } //批处理 executeBatch() 批量写入,降低IO,提高性能 //psts.executeBatch()//执行批处理 conn.commit(); //执行完后,手动提交事务 conn.setAutoCommit(true); //在把自动提交打开 } catch { case e: Exception => e.printStackTrace() } finally { //调用closeRes()关闭链接 MySqlUtils.closeRes(conn, null, psts) } } }
用spark方式链接mysql,前提是必须创建dataFrame(df),这里以hive写入mysql为例,将hive同mysql整合起来,也是平常非常常用的一种技术
import java.util.Properties
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}/**
spark将最后一条sql语句的查询结果保存到mysql数据库中
*/
object HiveToMySql {
Logger.getLogger(“org”).setLevel(Level.ERROR)def main(args: Array[String]): Unit = {
val spark = SparkSession.builder() .appName("Spark Hive Demo") .master("spark://master:7077") .enableHiveSupport() //支持hive,这个是关键,没有不行! .getOrCreate() //读取hive数据库内容 spark.sql("use sogou") //获取dataFrame val df = spark.sql("select count(*) as mycount from sogou.sogou500w_ext") //创建properties对象,注入mysql配置 val prop = new Properties() prop.put("user", "spark") prop.put("password", "spark") prop.put("driver", "com.mysql.jdbc.Driver") //写入数据时字段名要对应上,顺序可以不与实际的表对应也就是mysql与hive的表列字段要一致 df.write .mode(SaveMode.Overwrite) .jdbc("jdbc:mysql://master:3306/spark", "spark.bb", prop) //写入mysql时可以配置mode(插入),overwrite(覆盖),append(追加),ignore(忽略),error(默认表存在报错)常用是mode和overwrite //resultDF.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.200.150:3306/spark","student",prop)
}
}
以上涵盖了spark与常用关系型数据库交互的所有内容(oracle,sqlserver与mysql类似这里就不详细说明了),希望对大家有所帮助,这也是我项目中用到最常用的几种,应该可以帮助大家快速开发项目