Всем привет. Есть датафрейм в дельте. Добавляю колонку. Сохраняю по тому же пути, где находится таблица. Идет ошибка.
Ошибка такая:
File "/root/Borneo/populate_delta_lake.py", line 100, in put_file_when_table_exists
.save(full_path)
File "/spark/python/lib/
pyspark.zip/pyspark/sql/readwriter.py", line 830, in save
self._jwrite.save(path)
File "/spark/python/lib/
py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in
call answer, self.gateway_client, self.target_id,
self.name)
File "/spark/python/lib/
pyspark.zip/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/spark/python/lib/
py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling
o390.save.
: java.lang.NullPointerException
(далее длиннный стектрейс)
Вот добавление колонки:
structList = [(
x.name,x.dataType) for x in dfSourceSchema if (x.name.upper() in colDiffAdd)]
dfMergedSchema = self.dfSink.where("1=0") # Create empty dataframe to merge Sink schema
for i in structList:
dfMergedSchema = dfMergedSchema.withColumn(i[0],lit(None).cast(i[1]))
(dfMergedSchema.write
.format("delta")
.option("mergeSchema","true")
.mode("append")
.save(full_path)
)
Может кто-то боролся с этим?