我是Kafka和Avro的菜鸟。所以我一直在尝试让生产者/消费者运行。到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串:生产者的配置:Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.ser
我正在尝试简单读取存储在HDFS中的Avro文件。当它在本地文件系统上时,我发现了如何读取它....FileReaderreader=DataFileReader.openReader(newFile(filename),newGenericDatumReader());for(GenericRecorddatum:fileReader){Stringvalue=datum.get(1).toString();System.out.println("value="value);}reader.close();但是,我的文件在HDFS中。我无法为openReader提供路径或FSData
无法使用Flume推特代理读取和解析流推特数据创建的文件,既不使用Java也不使用Avro工具。我的需求是将avro格式转换成JSON格式。当使用任何一种方法时,我得到异常:org.apache.avro.AvroRuntimeException:java.io.IOException:Blocksizeinvalidortoolargeforthisimplementation:-40我在伪节点集群中使用Hadoopvanilla配置,hadoop版本是2.7.1Flume版本为1.6.0twitter代理的flume配置文件和解析avro文件的java代码附在下面:TwitterA
我正在使用Avro,我有一个GenericRecord.我想提取clientId,deviceName,holder从中。在Avro架构中,clientId是整数,deviceName是字符串和holder是一个map。clientId在avro架构中:{"name":"clientId","type":["null","int"],"doc":"hello"}deviceName在avro架构中:{"name":"deviceName","type":["null","string"],"doc":"test"}holder在avro架构中:{"name":"holder","typ
我的KafkaProducer能够使用KafkaAvroSerializer将对象序列化到我的主题。但是,KafkaConsumer.poll()返回反序列化的GenericRecord而不是我的序列化类。MyKafkaProducerKafkaProducerproducer;try(InputStreamprops=Resources.getResource("producer.props").openStream()){Propertiesproperties=newProperties();properties.load(props);properties.put(Produc
使用AvroJavaAPI,我可以创建一个简单的记录模式,例如:SchemaschemaWithTimestamp=SchemaBuilder.record("MyRecord").namespace("org.demo").fields().name("timestamp").type().longType().noDefault().endRecord();如何使用逻辑类型标记架构字段,特别是:https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/LogicalTypes.TimestampMillis.html
我有一些像这样的json数据:{"id":1998983092,"name":"TestName1","type":"searchstring","creationDate":"2017-06-06T13:49:15.091+0000","lastModificationDate":"2017-06-28T14:53:19.698+0000","lastModifiedUsername":"testuser@test.com","lockedQuery":false,"lockedByUsername":null}我能够毫无问题地将lockedQuery空值添加到GenericReco
我正在测试一个新的模式注册表,它加载和检索不同类型的avro模式。在测试过程中,我需要创建一堆不同类型的avro模式。由于它涉及很多排列,我决定以编程方式创建模式。我正在使用apacheavroSchemaBuilder这样做。我使用以下方法创建了avro:SchemaoldSchema=SchemaBuilder.record("abc").aliases("records").fields().name("field_null").type("null").noDefault().endRecord();这成功了。创建的avro看起来像:{"type":"record","name
我将Apacheavro架构与Kafka0.0.8V结合使用。我在生产者/消费者端使用相同的模式。架构中没有任何更改。但是当我尝试使用消息时,我在消费者那里遇到了一些异常(exception)。为什么会出现此错误?制作人publicvoidsendFile(Stringtopic,GenericRecordpayload,Schemaschema)throwsCoreException,IOException{BinaryEncoderencoder=null;ByteArrayOutputStreamout=null;try{DatumWriterwriter=newSpecific
我正在使用Apacheavro进行数据序列化。因为,数据有一个固定的模式,我不希望模式成为序列化数据的一部分。在以下示例中,模式是avro文件“users.avro”的一部分。Useruser1=newUser();user1.setName("Alyssa");user1.setFavoriteNumber(256);Useruser2=newUser("Ben",7,"red");Useruser3=User.newBuilder().setName("Charlie").setFavoriteColor("blue").setFavoriteNumber(null).build(