更新時間:2021-10-29 來源:黑馬程序員 瀏覽量:
SparkSQL不僅能夠查詢MySQL數(shù)據(jù)庫中的數(shù)據(jù),還可以向表中插人新的數(shù)據(jù),實現(xiàn)方式的具體代碼如文件4-5所示。
文件4-5 SparkSqlToMysql.scala
import java.util.Properties import org.apachen.spark.rdd.RDD import org.apache.spark.sq1.{DataFrame, SparkSession} //創(chuàng)建樣例類Person case class Person (id: Int, name:String,age: Int) object SparkSqlToMysql { def main(args:ArrayL String]): Unit ={ //1.創(chuàng)建sparkSession對象 val spark: SparkSession=sparksession.builder() .appNamne("SparksqIToMysql") .master("local[2]") . getOrCreate() //2.創(chuàng)建數(shù)據(jù) val data=spark.sparkContext .patgoarrav("3,wangwu,22","4,zhaoliu,26")) //3.按MySQL列名切分數(shù)據(jù) val arRRD:RRD[Arey[String]] =data.map(_.split(",")") //4.RDD關聯(lián)Person樣例類 val personRDD:RDD[Person]= arrRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt). //導人隱式轉(zhuǎn)換 import spark.implicits_ //5.將RDD轉(zhuǎn)換成DataFrame val personDF:DataFrame=personRDD.toDF() //6.設置JDBC配置參數(shù) val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","123456") prop.setProperty("driver","com.mysql.jdbc.Driver") //7.寫入數(shù)據(jù) personDF.write.mode("append").jdbc( "jdbc:mysql://192.168.121.134:3306/spark","spark.person",prop) personDF.show() } }
在文件4-5中,第5行代碼首先創(chuàng)建case class Person樣例類;第9~ 12行代碼用來創(chuàng)建SparkSession對象;第14~15行代碼則通過spark.SparkContext.parallelize( )方法創(chuàng)建一個RDD,該RDD值表示兩個person數(shù)據(jù);第17~24行代碼表示將數(shù)據(jù)按照逗號切分并匹配case class Person中的字段用于轉(zhuǎn)換成DataFrame對象;第26~29行代碼表示設置JDBC配置參數(shù),訪問MySQL數(shù)據(jù)庫;第31行代碼personDF. write. mode()方法表示設置寫人數(shù)據(jù)方式,該參數(shù)append是一個枚舉類型,枚舉參數(shù)分別有append、overwriteerrorIfExistsignore4個值,分別表示為追加、覆蓋、表如果存在即報錯(該值為默認值)、忽略新保存的數(shù)據(jù)。
運行文件4-5中的代碼,返回sQLyog工具查看當前數(shù)據(jù)表,數(shù)據(jù)表內(nèi)容如圖4-7所示。
MySQL表數(shù)據(jù)怎樣導入到Hive文件中?