我刚开始使用Avro(和python)。我想检查架构的演变。我准备了2个模式,首先保存第一个模式的数据,然后追加新数据并使用模式2保存。我编写时没有出现任何错误,但是我无法对数据进行反序列化。我猜我的语法是错误的。我应如何继续将具有新架构的项目添加到现有文件?
schema = avro.schema.Parse(open('user.avsc', "r").read())
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Anna", "favorite_number": 1})
writer.append({"name": "Jan", "favorite_number": 13, "favorite_color": "blue"})
writer.close()
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print (user)
reader.close()
{'name': 'Anna', 'favorite_number': 1, 'favorite_color': None}
{'name': 'Jan', 'favorite_number': 13, 'favorite_color': 'blue'}
schema2 = avro.schema.Parse(open('user2.avsc', "r").read())
writer = DataFileWriter(open("users.avro", "ab"), DatumWriter(), schema2)
writer.append({"name": "Eva", "favorite_number": 5, "favorite_food":"raclette"})
writer.append({"name": "Adam", "favorite_number": 122, "favorite_color": "black", "favorite_film": "Gone with the wind"})
writer.close()
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
print (user)
reader.close()
Invalid UTF-8 input bytes: b'\x01\x04\x14avro.codec\x08null\x16avro.schema\xf0\x05{"type": "record", "n'
{'name': 'Anna', 'favorite_number': 1, 'favorite_color': None}
{'name': 'Jan', 'favorite_number': 13, 'favorite_color': 'blue'}
---------------------------------------------------------------------------
UnicodeDecodeError Traceback (most recent call last)
<ipython-input-128-cbc8ab11fe9f> in <module>
1 reader = DataFileReader(open("users.avro", "rb"), DatumReader())
----> 2 for user in reader:
3 print (user)
4 reader.close()
~\Anaconda3\lib\site-packages\avro\datafile.py in __next__(self)
524 self._read_block_header()
525
--> 526 datum = self.datum_reader.read(self.datum_decoder)
527 self._block_count -= 1
528 return datum
~\Anaconda3\lib\site-packages\avro\io.py in read(self, decoder)
487 if self.reader_schema is None:
488 self.reader_schema = self.writer_schema
--> 489 return self.read_data(self.writer_schema, self.reader_schema, decoder)
490
491 def read_data(self, writer_schema, reader_schema, decoder):
~\Anaconda3\lib\site-packages\avro\io.py in read_data(self, writer_schema, reader_schema, decoder)
532 return self.read_union(writer_schema, reader_schema, decoder)
533 elif writer_schema.type in ['record', 'error', 'request']:
--> 534 return self.read_record(writer_schema, reader_schema, decoder)
535 else:
536 fail_msg = "Cannot read unknown schema type: %s" % writer_schema.type
~\Anaconda3\lib\site-packages\avro\io.py in read_record(self, writer_schema, reader_schema, decoder)
732 readers_field = readers_fields_dict.get(field.name)
733 if readers_field is not None:
--> 734 field_val = self.read_data(field.type, readers_field.type, decoder)
735 read_record[field.name] = field_val
736 else:
~\Anaconda3\lib\site-packages\avro\io.py in read_data(self, writer_schema, reader_schema, decoder)
510 return decoder.read_boolean()
511 elif writer_schema.type == 'string':
--> 512 return decoder.read_utf8()
513 elif writer_schema.type == 'int':
514 return decoder.read_int()
~\Anaconda3\lib\site-packages\avro\io.py in read_utf8(self)
260 except UnicodeDecodeError as exn:
261 logger.error('Invalid UTF-8 input bytes: %r', input_bytes)
--> 262 raise exn
263
264 def check_crc32(self, bytes):
~\Anaconda3\lib\site-packages\avro\io.py in read_utf8(self)
257 input_bytes = self.read_bytes()
258 try:
--> 259 return input_bytes.decode('utf-8')
260 except UnicodeDecodeError as exn:
261 logger.error('Invalid UTF-8 input bytes: %r', input_bytes)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf0 in position 30: invalid continuation byte
根据规范,Avro对象文件只能包含一个架构。
演变过程被定义为具有与写入器架构不同的读取器架构,但仍能够读取旧数据。
例如,您可以读取没有喜欢的电影的文件,但是阅读器模式定义了默认的喜欢的电影“无”