我正在尝试从Spark将数据写入Postgres;我有一个由一些字符串,一些浮点值和一些宽度不同的整数组成的DataFrame,并且我发现如果我不传递模式,则会收到如下错误:
User class threw exception: java.lang.IllegalArgumentException:
Unsupported type in postgresql: ByteType
at org.apache.spark.sql.jdbc.PostgresDialect$.getJDBCType(PostgresDialect.scala:83)
如果我做传递模式,则发现无法识别任何双精度数据类型
val postgresSchema = "c1 VARCHAR(10000), c2 BIGINT, c3 BIGINT, c4 FLOAT8, c5 FLOAT8, c6 TINYINT, c7 VARCHAR(10000), c8 VARCHAR(10000)"
dataFrame
.coalesce(POSTGRES_WRITE_PARTITIONS)
.write
.option("createTableColumnTypes", postgresSchema)
.mode(SaveMode.Overwrite)
.jdbc(jdbcURL, table, connectionProperties)
我已经根据可能起作用的Postgresql Numeric Data Type Docs尝试了所有可以想到的事情,并且似乎没有任何数据类型起作用。
User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType float8 is not supported.(line 1, pos 86)
User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType double is not supported.(line 1, pos 86)
User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType real is not supported.(line 1, pos 86)
User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
DataType float(53) is not supported.(line 1, pos 86)
我也尝试使用DOUBLE PRECISION
作为数据类型,它给出了:
User class threw exception: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'PRECISION' expecting <EOF>(line 1, pos 93)
有什么想法吗?
不要在createTableColumnTypes中放入双精度列,如果您在schema,下面的示例代码中提到过,SparkSQL将自动创建双精度列
for (String fieldName : schema.keySet()) {
String fieldType = schema.get(fieldName).toLowerCase();
fieldName = fieldName.toLowerCase();
DataType sparkDataType = DataTypeConverterUtil.getSparkDataTypeFromJavaType(fieldType);
StructField structField = new StructField(fieldName, sparkDataType, true, Metadata.empty());
structFields[count++] = structField;
}
StructType sparkDataSchema = new StructType(structFields);
Dataset<Row> df = sqlContext.read().format("com.databricks.spark.csv").schema(sparkDataSchema)
.option("delimiter", dataDelimeter).option("header", true).option("quote", "\"")
.option("escape", "\"").load(paths.split(","));
df.write().option("truncate", true).mode(SaveMode.Overwrite).option("createTableColumnTypes", customSchema)
.jdbc(url, tableName, connectionProperties);
您必须添加sparkDataSchema中提到的双列]