YMLiang

利用SparkSQL求股市K线波峰波谷

前几天朋友问了一道题我觉得还不错拿出来分享一下,如何求股市K线的波峰波谷,这里我简单分析一下我的思路,首先K线是一系列无序的数字组成的,根据每个数字与其相邻的数字之间的大小关系我们就能很轻易的求出波峰波谷,下面开始分析

  1. 首先数据我们用scala生成
    • 代码在spark-shell中写,如果在工作中大家遇到类似问题读文件取数据即可,这里简化用spark-shell代替
        val random = new scala.util.Random();
        val arr = (0 to 50).map(x=>{random.nextInt(100)}).zipWithIndex
- zipWithIndex是在生成得随机数后在跟一个从0开始的索引组成一个个类似tuple形式的Vector容器

        arr
        //看一下打印出的类型
        scala> res10: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((85,0), (54,1), (69,2)......
- 将其变为数组

        val arr2 = arr.toArray

- 转化为RDD

        val rdd = sc.makeRDD(arr2)

- rdd转为DF(即DataFrame)

        val df = rdd.toDF("data","id")
        df.show
        +----+---+
        |data| id|
        +----+---+
        |  85|  0|
        |  54|  1|
        |  69|  2|
        |   7|  3|
        |  39|  4|
        |  40|  5|
        |  43|  6|
        |  50|  7|
        |  59|  8|
        |  28|  9|
        |  98| 10|
        |  70| 11|
        |  87| 12|
        |  52| 13|
        |   6| 14|
        |   5| 15|
        |  96| 16|
        |  14| 17|
        |  26| 18|
        |  81| 19|
        +----+---+
    这里show是默认显示20行数据 | show(100)可指定行数

- 将df建立为临时表,之后就可以用我们熟悉的sql语句操作了,提醒一下SparkSQL支持的sql是HiveSQL而它又与Oracle语法类似,但与MySQL语法有些差别

        df.createOrReplaceTempView("temp")

- 为了方便观看我们将他们的列换下位置,用SQL语句的形式再建一个临时表,这种方式在操作df时很常用

        val df2 = sql("select id,data from temp")
        df2.createOrReplaceTempView("temp2")
        sql("select * from temp2").show

        +---+----+
        | id|data|
        +---+----+
        |  0|  85|
        |  1|  54|
        |  2|  69|
        |  3|   7|
        |  4|  39|
        |  5|  40|
  1. 此时我们准备工作完成,开始正式写sql,大致思路为:增加两列,第一列让数列整体上移,第二列让数列整体下移这样就达到了让我们的data列的每一个数字都可以与其相邻的数字作比较的目的,之后用case when end来标识波峰波谷
- 导入隐式转换和sql函数的包

        import spark.implicits._
        import org.apache.spark.sql.functions

        sql("""
        select id,data,
        case when data>data1 and data>data2 then '波峰'
            when data<data1 and data<data2 then '波谷'
        end as data3 from(
        select id,data,lead(data) over(order by id) as data1,lag(data) over(order by id) as data2 from temp2
        )
        """).show

- [case when](http://www.cnblogs.com/aipan/p/7770611.html) 用法和if else思路类似
- 在这里我们用了分析函数lead()和lag(),详细用法请[点击这里](https://blog.csdn.net/pelifymeng2/article/details/70313943),我记这些随缘,忘了就上网查一下,没必要记的太死,关键是一定要有思路

        +---+----+-----+ 
        | id|data|data3|
        +---+----+-----+
        |  0|  85| null|
        |  1|  54|   波谷|
        |  2|  69|   波峰|
        |  3|   7|   波谷|
        |  4|  39| null|
        |  5|  40| null|
        |  6|  43| null|
        |  7|  50| null|
        |  8|  59|   波峰|
        |  9|  28|   波谷|
        | 10|  98|   波峰|
        | 11|  70|   波谷|
        | 12|  87|   波峰|
        | 13|  52| null|
        | 14|   6| null|
        | 15|   5|   波谷|
        | 16|  96|   波峰|
        | 17|  14|   波谷|
        | 18|  26| null|
        | 19|  81|   波峰|
        +---+----+-----+

好啦大功告成,有问题可以联系我我们一起交流,其实一直困扰我的问题是不知道为什么用sparkSql时子查询不给查询语句起别名也不报错,而用oracle和mysql时就会报错……
616581760 微信QQ同号


 评论


博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议

本站使用 Material X 作为主题 , 总访问量为 次 。
Copyright 2018-2019 YMLiang'BLOG   |   京ICP备 - 19039949  |  载入天数...载入时分秒...