内容简介:路径:src/main/resources/user.json
- 编译Schema
- 依赖于Avro实现 自定义的序列化器和反序列化器
引入依赖
<dependency> <groupId>com.twitter</groupId> <artifactId>bijection-avro_2.12</artifactId> <version>0.9.6</version> </dependency>
Schema
路径:src/main/resources/user.json
{ "type": "record", "name": "User", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "age", "type": "int"} ] }
发送消息
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath(); Schema schema = new Schema.Parser().parse(new File(schemaFilePath)); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", ByteArraySerializer.class.getName()); Producer<String, byte[]> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { GenericData.Record record = new GenericData.Record(schema); record.put("id", i); record.put("name", "zhongmingmao" + i); record.put("age", i); byte[] bytes = recordInjection.apply(record); ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("zhongmingmao", bytes); producer.send(producerRecord); TimeUnit.SECONDS.sleep(1); } producer.close();
消费消息
String schemaFilePath = BijectionProducer.class.getClassLoader().getResource("user.json").getPath(); Schema schema = new Schema.Parser().parse(new File(schemaFilePath)); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "zhongmingmao"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", ByteArrayDeserializer.class.getName()); KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("zhongmingmao")); try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(100); for (ConsumerRecord<String, byte[]> record : records) { GenericRecord genericRecord = recordInjection.invert(record.value()).get(); log.info("id={}, name={}, age={}, partition={}, offset={}", genericRecord.get("id"), genericRecord.get("name"), genericRecord.get("age"), record.partition(), record.offset()); } TimeUnit.SECONDS.sleep(1); } } finally { consumer.close(); }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 【每日笔记】【Go学习笔记】2019-01-04 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-02 Codis笔记
- 【每日笔记】【Go学习笔记】2019-01-07 Codis笔记
- Golang学习笔记-调度器学习
- Vue学习笔记(二)------axios学习
- 算法/NLP/深度学习/机器学习面试笔记
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Parsing Techniques
Dick Grune、Ceriel J.H. Jacobs / Springer / 2010-2-12 / USD 109.00
This second edition of Grune and Jacobs' brilliant work presents new developments and discoveries that have been made in the field. Parsing, also referred to as syntax analysis, has been and continues......一起来看看 《Parsing Techniques》 这本书的介绍吧!