当前位置 博文首页 > qq262593421的博客:SparkSQL使用UDF函数代替MySQL空间函数读取M

    qq262593421的博客:SparkSQL使用UDF函数代替MySQL空间函数读取M

    作者:[db:作者] 时间:2021-08-17 21:43

    一、问题描述

    SparkSQL虽然可以访问MySQL数据,但是对于MySQL的空间字段,SparkSQL并没有提供内置函数去解析

    二、问题分析

    SparkSQL没有内置函数解析空间类型,需要手动编写UDF函数实现

    SparkSQL网络传输的数据格式是Byte数组,返回的数据格式中没有Geometry类型,需要将Geometry类型转成String类型返回

    三、代码实现

    1、自定义UDF函数

      @throws[Exception]
      def sparkUDFSTAsText(geometryAsBytes: Array[Byte]): Geometry = {
        var dbGeometry: Geometry = null
        if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes")
        //first four bytes of the geometry are the SRID,
        //followed by the actual WKB.  Determine the SRID
        //here
        val sridBytes = new Array[Byte](4)
        System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4)
        val bigEndian: Boolean = geometryAsBytes(4) == 0x00
        var srid = 0
        if (bigEndian) for (i <- 0 until sridBytes.length) {
          srid = (srid << 8) + (sridBytes(i) & 0xff)
        }
        else for (i <- 0 until sridBytes.length) {
          srid += (sridBytes(i) & 0xff) << (8 * i)
        }
        //use the JTS WKBReader for WKB parsing
        val wkbReader = new WKBReader
        //copy the byte array, removing the first four
        //SRID bytes
        val wkb = new Array[Byte](geometryAsBytes.length - 4)
        System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length)
        dbGeometry = wkbReader.read(wkb)
        dbGeometry.setSRID(srid)
        dbGeometry
      }

    Java版?

        public Geometry sparkUDFSTAsText(byte[] geometryAsBytes) throws Exception {
            Geometry dbGeometry = null;
            if (geometryAsBytes.length < 5) throw new Exception("Invalid geometry inputStream - less than five bytes");
            byte[] sridBytes = new byte[4];
            System.arraycopy(geometryAsBytes, 0, sridBytes, 0, 4);
            boolean bigEndian = geometryAsBytes[4] == 0x00;
            int srid = 0;
            if(bigEndian) {
                for(int i=0; i<sridBytes.length; i++) {
                    srid = (srid << 8) + (sridBytes[i] & 0xff);
                }
            } else {
                for(int i=0; i<sridBytes.length; i++) {
                    srid += (sridBytes[i] & 0xff) << (8 * i);
                }
            }
            WKBReader wkbReader = new WKBReader();
            byte[] wkb = new byte[geometryAsBytes.length - 4];
            System.arraycopy(geometryAsBytes, 4, wkb, 0, wkb.length);
            dbGeometry = wkbReader.read(wkb);
            dbGeometry.setSRID(srid);
            return dbGeometry;
        }

    2、SparkSQL调用UDF函数

        def toGeometryText(binary: Array[Byte]) = sparkUDFSTAsText(binary).toText
        spark.udf.register("ST_ASTEXT",toGeometryText(_))
        val rddROW: RDD[Row] = spark.sql("SELECT id, ST_ASTEXT(point), ST_ASTEXT(polygon) FROM t_point_polygon").limit(10).rdd
    

    四、知识拓展

    1、MySQL中的空间扩展

    https://www.mysqlzh.com/doc/172.html

    http://dcx.sap.com/1201/zh/dbspatial/pg-api-spatial-st-geometry-type.html

    2、MySQL中的空间类型

    cs