|
package avro; |
|
|
|
import org.apache.avro.Schema; |
|
import org.apache.avro.generic.GenericData; |
|
import org.apache.avro.generic.GenericRecord; |
|
import org.apache.avro.reflect.ReflectData; |
|
|
|
import java.util.ArrayList; |
|
import java.util.List; |
|
|
|
|
|
public class BeanToRecordConverter<E> { |
|
private final PropertyExtractors extractors = new PropertyExtractors(); |
|
|
|
private final Class<?> type; |
|
private final Schema schema; |
|
|
|
public BeanToRecordConverter(Class<E> type) { |
|
this.type = type; |
|
this.schema = ReflectData.get().getSchema(type); |
|
} |
|
|
|
public BeanToRecordConverter(Class<E> type, Schema schema) { |
|
this.type = type; |
|
this.schema = schema; |
|
} |
|
|
|
public GenericRecord convert(E bean) { |
|
try { |
|
return convertBeanToRecord(bean, schema); |
|
} catch (Exception e) { |
|
throw new RuntimeException(e); |
|
} |
|
} |
|
|
|
private GenericRecord convertBeanToRecord(Object bean, Schema schema) throws Exception { |
|
Class<?> beanClass = bean.getClass(); |
|
PropertyExtractors.PropertyExtractor extractor = extractors.getOrCreate(beanClass); |
|
|
|
GenericRecord result = new GenericData.Record(schema); |
|
|
|
List<Schema.Field> fields = schema.getFields(); |
|
|
|
for (Schema.Field field : fields) { |
|
Schema fieldSchema = field.schema(); |
|
|
|
Schema.Type type = fieldSchema.getType(); |
|
String name = field.name(); |
|
Object value = extractor.extract(bean, name); |
|
|
|
if (isSimpleType(type)) { |
|
result.put(name, value); |
|
continue; |
|
} |
|
|
|
if (type.equals(Schema.Type.RECORD)) { |
|
GenericRecord fieldRes = convertBeanToRecord(value, fieldSchema); |
|
result.put(name, fieldRes); |
|
continue; |
|
} |
|
|
|
if (type.equals(Schema.Type.ARRAY)) { |
|
// let's assume it's always list |
|
List<Object> elements = (List<Object>) value; |
|
Schema elementSchema = fieldSchema.getElementType(); |
|
|
|
if (isSimpleType(elementSchema.getType())) { |
|
result.put(name, elements); |
|
continue; |
|
} |
|
|
|
List<GenericRecord> results = new ArrayList<>(elements.size()); |
|
|
|
for (Object element : elements) { |
|
GenericRecord elementRes = convertBeanToRecord(element, elementSchema); |
|
results.add(elementRes); |
|
} |
|
|
|
result.put(name, results); |
|
continue; |
|
} |
|
} |
|
|
|
return result; |
|
} |
|
|
|
public static boolean isSimpleType(Schema.Type type) { |
|
if (type.equals(Schema.Type.STRING)) { |
|
return true; |
|
} |
|
if (type.equals(Schema.Type.INT)) { |
|
return true; |
|
} |
|
if (type.equals(Schema.Type.LONG)) { |
|
return true; |
|
} |
|
|
|
return false; |
|
} |
|
|
|
} |
org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/home/lrao/scanwork/a423448b-fc42-46b6-a0f6-88f10fcdb653/a423448b-fc42-46b6-a0f6-88f10fcdb653.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:243)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:129)
java.lang.ClassCastException: com.sherlock.dao.ScanResultsRow cannot be cast to org.apache.avro.generic.IndexedRecord
at org.apache.avro.generic.GenericData.setField(GenericData.java:569)
at org.apache.parquet.avro.AvroRecordConverter.set(AvroRecordConverter.java:295)
at org.apache.parquet.avro.AvroRecordConverter$1.add(AvroRecordConverter.java:109)
at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary(AvroConverters.java:62)
at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:323)
at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:371)
at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:218)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:129)
I get above errors. any idea what Im missing?