如何从Spark将双精度浮点数写到jdbc上的postgres

问题描述 投票:0回答:1

我正在尝试从Spark将数据写入Postgres;我有一个由一些字符串,一些浮点值和一些宽度不同的整数组成的DataFrame,并且我发现如果我不传递模式,则会收到如下错误:

User class threw exception: java.lang.IllegalArgumentException:
Unsupported type in postgresql: ByteType 
at org.apache.spark.sql.jdbc.PostgresDialect$.getJDBCType(PostgresDialect.scala:83)

如果我传递模式,则发现无法识别任何双精度数据类型

val postgresSchema = "c1 VARCHAR(10000), c2 BIGINT, c3 BIGINT, c4 FLOAT8, c5 FLOAT8, c6 TINYINT, c7 VARCHAR(10000), c8 VARCHAR(10000)"

dataFrame
.coalesce(POSTGRES_WRITE_PARTITIONS)
.write
.option("createTableColumnTypes", postgresSchema)
.mode(SaveMode.Overwrite)
.jdbc(jdbcURL, table, connectionProperties)

我已经根据可能起作用的Postgresql Numeric Data Type Docs尝试了所有可以想到的事情,并且似乎没有任何数据类型起作用。

User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType float8 is not supported.(line 1, pos 86)

User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType double is not supported.(line 1, pos 86)

User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType real is not supported.(line 1, pos 86)

User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType float(53) is not supported.(line 1, pos 86)

我也尝试使用DOUBLE PRECISION作为数据类型,它给出了:

User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'PRECISION' expecting <EOF>(line 1, pos 93)

有什么想法吗?

postgresql apache-spark jdbc
1个回答
1
投票

不要在createTableColumnTypes中放入双精度列,如果您在schema,下面的示例代码中提到过,SparkSQL将自动创建双精度列

    for (String fieldName : schema.keySet()) {
        String fieldType = schema.get(fieldName).toLowerCase();
        fieldName = fieldName.toLowerCase();
        DataType sparkDataType = DataTypeConverterUtil.getSparkDataTypeFromJavaType(fieldType);
        StructField structField = new StructField(fieldName, sparkDataType, true, Metadata.empty());
        structFields[count++] = structField;
    }
    StructType sparkDataSchema = new StructType(structFields);
    Dataset<Row> df = sqlContext.read().format("com.databricks.spark.csv").schema(sparkDataSchema)
            .option("delimiter", dataDelimeter).option("header", true).option("quote", "\"")
            .option("escape", "\"").load(paths.split(","));

    df.write().option("truncate", true).mode(SaveMode.Overwrite).option("createTableColumnTypes", customSchema)
            .jdbc(url, tableName, connectionProperties);

您必须添加sparkDataSchema中提到的双列]

© www.soinside.com 2019 - 2024. All rights reserved.