记录一次kafka-manager1.3.3.18 Consumers is not display with kafka2.1.0 version

栏目: 后端 · 发布时间: 5年前

内容简介:1.3.3.18来管理kafka, 然后写了一个生产者和消费者程序,程序运行后,死活显示不出来程序运行后,消费者的group死活显示不出来。生产者代码如下:
使用kafka 2.1.0 ,然后用最新的kafka-manager  记录一次kafka-manager1.3.3.18 Consumers is not display with kafka2.1.0 version

1.3.3.18来管理kafka, 然后写了一个生产者和消费者程序,程序运行后,死活显示不出来

程序运行后,消费者的group死活显示不出来。

生产者代码如下:

package com.kafka.producer;

import org.apache.commons.lang3.exception.ExceptionUtils;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.Producer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

import java.util.concurrent.ExecutionException;

public class ProducerDemo {

public static void main(String[] args) {

int i = 0;

while (true) {

i++;

try {

send("test", String.format("test_%d", i), "123");

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

} catch (ExecutionException e) {

e.printStackTrace();

}

System.out.println(String.format("Kafka写入:%d", i));

}

}

private static Producer<String, Object> producer;

private static KafkaConsumer<String, Object> consumer;

private static final String server = "127.0.0.1:9092";

static {

Properties props = buildProducerConfig();

producer = new KafkaProducer<>(props);

private static Properties buildProducerConfig() {

Properties props = new Properties();

// bootstrap.servers是Kafka集群的IP地址,也就是Broker地址

props.put("bootstrap.servers", server);

props.put("acks", "all");

props.put("retries", 0);

props.put("batch.size", 16384);

props.put("linger.ms", 1);

props.put("buffer.memory", 33554432);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

return props;

public static RecordMetadata send(String topic, String key, Object Obj) throws InterruptedException, ExecutionException {

return producer.send(new ProducerRecord<String, Object>(topic, key, Obj)).get();

public static void sendAsync(String topic,String key,Object obj) {

producer.send(new ProducerRecord<String, Object>(topic, key, obj), new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception e) {

if(e !=null) {

System.out.println(ExceptionUtils.getStackTrace(e));

}

消费者程序如下:

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

import java.util.Properties;

public class ConsumerDemo {

public static void main(String[] args) {

KafkaUtils.consume();

}

private static KafkaConsumer<String, Object> consumer;

private static final String server = "127.0.0.1:9092";

static {

Properties props = buildConsumerConfig();

consumer = new KafkaConsumer<>(props);

}

private static Properties buildConsumerConfig() {

Properties props;

props = new Properties();

props.put("bootstrap.servers", server);

// 消费组

props.put("group.id", "testGroup");

props.put("enable.auto.commit", "true");

// 设置多久一次更新被消费消息的偏移量

props.put("auto.commit.interval.ms", "1000");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void consume() {

consumer.subscribe(Arrays.asList("test"));

while (true) {

// 每隔100ms拉取一次数据

ConsumerRecords<String, Object> records = consumer.poll(100);

for (ConsumerRecord<String, Object> record : records) {

System.out.printf("partition=%d,offset = %d, key = %s, value = %s\n", record.partition(),

record.offset(), record.key(), record.value());

}

}

}

}

然后在kafka manager的消费者组显示不出来,为了查找原因,去看kafka manager日志。发现日志报错如下:

[warn] k.m.a.c.KafkaManagedOffsetCache - Failed to process a message from offset topic on cluster test-Kafka!

kafka.common.KafkaException: Unknown offset schema version 3

at kafka.manager.utils.one10.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:428) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.utils.one10.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:532) ~[kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:332) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$4.apply(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.12.jar:na]

at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:308) [kafka-manager.kafka-manager-1.3.3.18-sans-externalized.jar:na]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]

初步诊断是kafka manager的问题,觉得具体深入分析下,发现kafka manager是用scala写的, 自己有不了解scala,顿时感觉无从下手,

但是想想,程序应该都差不多,就去分析分析原因吧,发现错误日志在GroupMetadataManager.scala:428,这行,那应该错误也在这边,

然后在google找了找,也没有很好的解决方式,只能在github的kafka manager提了个Issue,发现有人修改过源代码后成功显示了,安装这位老兄的提示

修改scala源代码,然后重新编译打包,问题终于得到了解决。

修改的scala源代码如下:

git diff origin/master

diff --git a/app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala

index 85771cd..f16b1a3 100644

--- a/app/kafka/manager/utils/one10/GroupMetadataManager.scala

+++ b/app/kafka/manager/utils/one10/GroupMetadataManager.scala

@@ -368,6 +368,25 @@ object GroupMetadataManager {

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

+  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

+

+  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),

+    new Field("metadata", STRING, "Associated metadata.", ""),

+    new Field("commit_timestamp", INT64))

+  private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")

+  private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")

+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

+

+  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(

+    new Field("offset", INT64),

+    new Field("leader_epoch", INT32),

+    new Field("metadata", STRING, "Associated metadata.", ""),

+    new Field("commit_timestamp", INT64))

+  private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")

+  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")

+  private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")

+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

+

private val PROTOCOL_TYPE_KEY = "protocol_type"

private val GENERATION_KEY = "generation"

private val PROTOCOL_KEY = "protocol"

@@ -388,6 +407,12 @@ object GroupMetadataManager {

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))

+  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(

+    new Field(PROTOCOL_TYPE_KEY, STRING),

+    new Field(GENERATION_KEY, INT32),

+    new Field(PROTOCOL_KEY, NULLABLE_STRING),

+    new Field(LEADER_KEY, NULLABLE_STRING),

+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))

// map of versions to key schemas as data types

private val MESSAGE_TYPE_SCHEMAS = Map(

@@ -398,13 +423,18 @@ object GroupMetadataManager {

// map of version of offset value schemas

private val OFFSET_VALUE_SCHEMAS = Map(

0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,

-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)

+    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,

+    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,

+    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)

+

private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

// map of version of group metadata value schemas

private val GROUP_VALUE_SCHEMAS = Map(

0 -> GROUP_METADATA_VALUE_SCHEMA_V0,

-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)

+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,

+    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)

+

private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

@@ -545,6 +575,20 @@ object GroupMetadataManager {

val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)

+      } else if (version == 2) {

+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]

+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]

+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

+

+        OffsetAndMetadata(offset, metadata, commitTimestamp)

+      } else if (version == 3) {

+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]

+        val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]

+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]

+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]

+

+        // val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)

+        OffsetAndMetadata(offset, metadata, commitTimestamp)

} else {

throw new IllegalStateException("Unknown offset message version")

}

完整的app/kafka/manager/utils/one10/GroupMetadataManager.scala b/app/kafka/manager/utils/one10/GroupMetadataManager.scala代码如下:

/*

* Licensed to the Apache Software Foundation (ASF) under one or more

* contributor license agreements. See the NOTICE file distributed with

* this work for additional information regarding copyright ownership.

* The ASF licenses this file to You under the Apache License, Version 2.0

* (the "License"); you may not use this file except in compliance with

* the License. You may obtain a copy of the License at

*

http://www.apache.org/licenses/LICENSE-2.0

*

* Unless required by applicable law or agreed to in writing, software

* distributed under the License is distributed on an "AS IS" BASIS,

* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

* See the License for the specific language governing permissions and

* limitations under the License.

*/

package kafka.manager.utils.one10

import java.io.PrintStream

import java.nio.ByteBuffer

import java.nio.charset.StandardCharsets

import java.util.UUID

import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata}

import kafka.utils.{Logging, nonthreadsafe}

import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.kafka.clients.consumer.internals.{ConsumerProtocol, PartitionAssignor}

import org.apache.kafka.common.TopicPartition

import org.apache.kafka.common.protocol.types.Type._

import org.apache.kafka.common.protocol.types._

import org.apache.kafka.common.utils.Utils

import scala.collection.JavaConverters._

import scala.collection.{Seq, immutable, mutable, _}

/**

* Case class used to represent group metadata for the ListGroups API

*/

case class GroupOverview(groupId: String,

protocolType: String)

/**

* We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset

* commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving

* information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit

* being materialized.

*/

case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {

def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get

}

/**

* Group contains the following metadata:

*

*  Membership metadata:

*  1. Members registered in this group

*  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)

*  3. Protocol metadata associated with group members

*

*  State metadata:

*  1. group state

*  2. generation id

*  3. leader id

*/

@nonthreadsafe

class GroupMetadata(val groupId: String

, var protocolType: Option[String]

, var generationId: Int

, var protocol: Option[String]

, var leaderId: Option[String]

extends Logging {

private val members =  new mutable.HashMap[String, MemberMetadata]

private val offsets =  new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]

private val pendingOffsetCommits =  new mutable.HashMap[TopicPartition, OffsetAndMetadata]

private val pendingTransactionalOffsetCommits =  new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()

private var receivedTransactionalOffsetCommits =  false

private var receivedConsumerOffsetCommits =  false

var newMemberAdded: Boolean =  false

def has(memberId: String) = members.contains(memberId)

def get(memberId: String) = members(memberId)

def isLeader(memberId: String): Boolean = leaderId.contains(memberId)

def leaderOrNull: String = leaderId.orNull

def protocolOrNull: String = protocol.orNull

def add(member: MemberMetadata) {

if (members.isEmpty)

this .protocolType = Some(member.protocolType)

assert (groupId == member.groupId)

assert ( this .protocolType.orNull == member.protocolType)

assert (supportsProtocols(member.protocols))

if (leaderId.isEmpty)

leaderId = Some(member.memberId)

members.put(member.memberId, member)

}

def remove(memberId: String) {

members.remove(memberId)

if (isLeader(memberId)) {

leaderId =  if (members.isEmpty) {

None

else {

Some(members.keys.head)

}

}

}

def allMembers = members.keySet

def allMemberMetadata = members.values.toList

//  TODO: decide if ids should be predictable or random
def generateMemberIdSuffix = UUID.randomUUID().toString

private def candidateProtocols = {

//  get the set of protocols that are commonly supported by all members
allMemberMetadata

.map(_.protocols)

.reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)

}

def supportsProtocols(memberProtocols: Set[String]) = {

members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty

}

def overview: GroupOverview = {

GroupOverview(groupId, protocolType.getOrElse(""))

}

def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],

pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {

this .offsets ++= offsets

this .pendingTransactionalOffsetCommits ++= pendingTxnOffsets

}

def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {

if (pendingOffsetCommits.contains(topicPartition)) {

if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)

throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +

"in the log.")

if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))

offsets.put(topicPartition, offsetWithCommitRecordMetadata)

}

pendingOffsetCommits.get(topicPartition) match {

case Some(stagedOffset)  if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>

pendingOffsetCommits.remove(topicPartition)

case _ =>

//

The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case

//  its entries would be removed from the cache by the `removeOffsets` method.
}

}

def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {

pendingOffsetCommits.get(topicPartition) match {

case Some(pendingOffset)  if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)

case _ =>

}

}

def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {

receivedConsumerOffsetCommits =  true

pendingOffsetCommits ++= offsets

}

def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $offsets is pending")

receivedTransactionalOffsetCommits =  true

val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,

mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])

offsets.foreach {  case (topicPartition, offsetAndMetadata) =>

producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))

}

}

def hasReceivedConsistentOffsetCommits : Boolean = {

!receivedConsumerOffsetCommits || !receivedTransactionalOffsetCommits

}

/*

Remove a pending transactional offset commit if the actual offset commit record was not written to the log.

* We will return an error and the client will retry the request, potentially to a different coordinator.

*/

def failPendingTxnOffsetCommit(producerId: Long, topicPartition: TopicPartition): Unit = {

pendingTransactionalOffsetCommits.get(producerId) match {

case Some(pendingOffsets) =>

val pendingOffsetCommit = pendingOffsets.remove(topicPartition)

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetCommit failed " +

s"to be appended to the log")

if (pendingOffsets.isEmpty)

pendingTransactionalOffsetCommits.remove(producerId)

case _ =>

//  We may hit this case if the partition in question has emigrated already.
}

}

def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,

commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {

pendingTransactionalOffsetCommits.get(producerId) match {

case Some(pendingOffset) =>

if (pendingOffset.contains(topicPartition)

&& pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)

pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)

case _ =>

//  We may hit this case if the partition in question has emigrated.
}

}

/*

Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written

* to the log.

*/

def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {

val pendingOffsetsOpt = pendingTransactionalOffsetCommits.remove(producerId)

if (isCommit) {

pendingOffsetsOpt.foreach { pendingOffsets =>

pendingOffsets.foreach {  case (topicPartition, commitRecordMetadataAndOffset) =>

if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)

throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +

s"and groupId $groupId even though the offset commit record itself hasn't been appended to the log.")

val currentOffsetOpt = offsets.get(topicPartition)

if (currentOffsetOpt.forall(_.olderThan(commitRecordMetadataAndOffset))) {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +

"committed and loaded into the cache.")

offsets.put(topicPartition, commitRecordMetadataAndOffset)

else {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offset $commitRecordMetadataAndOffset " +

s"committed, but not loaded since its offset is older than current offset $currentOffsetOpt.")

}

}

}

else {

trace(s"TxnOffsetCommit for producer $producerId and group $groupId with offsets $pendingOffsetsOpt aborted")

}

}

def activeProducers = pendingTransactionalOffsetCommits.keySet

def hasPendingOffsetCommitsFromProducer(producerId: Long) =

pendingTransactionalOffsetCommits.contains(producerId)

def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {

topicPartitions.flatMap { topicPartition =>

pendingOffsetCommits.remove(topicPartition)

pendingTransactionalOffsetCommits.foreach {  case (_, pendingOffsets) =>

pendingOffsets.remove(topicPartition)

}

val removedOffset = offsets.remove(topicPartition)

removedOffset.map(topicPartition -> _.offsetAndMetadata)

}.toMap

}

def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {

val expiredOffsets = offsets

.filter {

case (topicPartition, commitRecordMetadataAndOffset) =>

commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)

}

.map {

case (topicPartition, commitRecordOffsetAndMetadata) =>

(topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)

}

offsets --= expiredOffsets.keySet

expiredOffsets.toMap

}

def allOffsets = offsets.map {  case (topicPartition, commitRecordMetadataAndOffset) =>

(topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)

}.toMap

def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)

//  visible for testing
def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)

def numOffsets = offsets.size

def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty || pendingTransactionalOffsetCommits.nonEmpty

override def toString: String = {

"GroupMetadata(" +

s"groupId=$groupId, " +

s"generation=$generationId, " +

s"protocolType=$protocolType, " +

s"members=$members)"

}

}

/**

* Messages stored for the group topic has versions for both the key and value fields. Key

* version is used to indicate the type of the message (also to differentiate different types

* of messages from being compacted together if they have the same field values); and value

* version is used to evolve the messages within their data types:

*

* key version 0:       group consumption offset

*    -> value version 0:       [offset, metadata, timestamp]

*

* key version 1:       group consumption offset

*    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]

*

* key version 2:       group metadata

*     -> value version 0:       [protocol_type, generation, protocol, leader, members]

*/

object GroupMetadataManager {

private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort

private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort

private val CURRENT_GROUP_KEY_SCHEMA_VERSION2 = 3.toShort

private val OFFSET_COMMIT_KEY_SCHEMA =  new Schema( new Field("group", STRING),

new Field("topic", STRING),

new Field("partition", INT32))

private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")

private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")

private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")

private val OFFSET_COMMIT_VALUE_SCHEMA_V0 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")

private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")

private val OFFSET_COMMIT_VALUE_SCHEMA_V1 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64),

new Field("expire_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")

private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")

// new add for version
private val OFFSET_COMMIT_VALUE_SCHEMA_V2 =  new Schema( new Field("offset", INT64),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")

private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")

private val OFFSET_COMMIT_VALUE_SCHEMA_V3 =  new Schema(

new Field("offset", INT64),

new Field("leader_epoch", INT32),

new Field("metadata", STRING, "Associated metadata.", ""),

new Field("commit_timestamp", INT64))

private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")

private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")

private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")

private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")

// new add for version 3-end

private val GROUP_METADATA_KEY_SCHEMA =  new Schema( new Field("group", STRING))

private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")

private val MEMBER_ID_KEY = "member_id"

private val CLIENT_ID_KEY = "client_id"

private val CLIENT_HOST_KEY = "client_host"

private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"

private val SESSION_TIMEOUT_KEY = "session_timeout"

private val SUBSCRIPTION_KEY = "subscription"

private val ASSIGNMENT_KEY = "assignment"

private val MEMBER_METADATA_V0 =  new Schema(

new Field(MEMBER_ID_KEY, STRING),

new Field(CLIENT_ID_KEY, STRING),

new Field(CLIENT_HOST_KEY, STRING),

new Field(SESSION_TIMEOUT_KEY, INT32),

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

private val MEMBER_METADATA_V1 =  new Schema(

new Field(MEMBER_ID_KEY, STRING),

new Field(CLIENT_ID_KEY, STRING),

new Field(CLIENT_HOST_KEY, STRING),

new Field(REBALANCE_TIMEOUT_KEY, INT32),

new Field(SESSION_TIMEOUT_KEY, INT32),

new Field(SUBSCRIPTION_KEY, BYTES),

new Field(ASSIGNMENT_KEY, BYTES))

// new add for version
private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1

private val PROTOCOL_TYPE_KEY = "protocol_type"

private val GENERATION_KEY = "generation"

private val PROTOCOL_KEY = "protocol"

private val LEADER_KEY = "leader"

private val MEMBERS_KEY = "members"

private val GROUP_METADATA_VALUE_SCHEMA_V0 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V0)))

private val GROUP_METADATA_VALUE_SCHEMA_V1 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V1)))

private val GROUP_METADATA_VALUE_SCHEMA_V2 =  new Schema(

new Field(PROTOCOL_TYPE_KEY, STRING),

new Field(GENERATION_KEY, INT32),

new Field(PROTOCOL_KEY, NULLABLE_STRING),

new Field(LEADER_KEY, NULLABLE_STRING),

new Field(MEMBERS_KEY,  new ArrayOf(MEMBER_METADATA_V2)))

//  map of versions to key schemas as data types
private val MESSAGE_TYPE_SCHEMAS = Map(

0 -> OFFSET_COMMIT_KEY_SCHEMA,

1 -> OFFSET_COMMIT_KEY_SCHEMA,

2 -> GROUP_METADATA_KEY_SCHEMA

)

//  map of version of offset value schemas
private val OFFSET_VALUE_SCHEMAS = Map(

1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,

2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,

3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3

)

private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort

//  map of version of group metadata value schemas
private val GROUP_VALUE_SCHEMAS = Map(

1 -> GROUP_METADATA_VALUE_SCHEMA_V1,

2 -> GROUP_METADATA_VALUE_SCHEMA_V2)

private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort

private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)

private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)

private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)

private def schemaForKey(version: Int) = {

val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown offset schema version " + version)

}

}

private def schemaForOffset(version: Int) = {

val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)

println("version is:"+version+", schemaOpt is: "+schemaOpt)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown offset schema version " + version)

}

}

private def schemaForGroup(version: Int) = {

val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)

schemaOpt match {

case Some(schema) => schema

case _ =>  throw new KafkaException("Unknown group metadata version " + version)

}

}

/**

* Generates the key for offset commit message for given (group, topic, partition)

*

@return

key for offset commit message

*/

def offsetCommitKey(group: String, topicPartition: TopicPartition,

versionId: Short = 0): Array[Byte] = {

val key =  new Struct(CURRENT_OFFSET_KEY_SCHEMA)

key.set(OFFSET_KEY_GROUP_FIELD, group)

key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)

key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + key.sizeOf)

byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)

key.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Generates the key for group metadata message for given group

*

@return

key bytes for group metadata message

*/

def groupMetadataKey(group: String): Array[Byte] = {

val key =  new Struct(CURRENT_GROUP_KEY_SCHEMA)

key.set(GROUP_KEY_GROUP_FIELD, group)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + key.sizeOf)

byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)

key.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Generates the payload for offset commit message from given offset and metadata

*

@param

offsetAndMetadata consumer's current offset and metadata

@return

payload for offset commit message

*/

def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {

//  generate commit value with schema version 1
val value =  new Struct(CURRENT_OFFSET_VALUE_SCHEMA)

value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)

value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)

value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)

value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)

val byteBuffer = ByteBuffer.allocate(2  /*  version  */ + value.sizeOf)

byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)

value.writeTo(byteBuffer)

byteBuffer.array()

}

/**

* Decodes the offset messages' key

*

@param

buffer input byte-buffer

@return

an GroupTopicPartition object

*/

def readMessageKey(buffer: ByteBuffer): BaseKey = {

val version = buffer.getShort

val keySchema = schemaForKey(version)

val key = keySchema.read(buffer)

if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {

//  version 0 and 1 refer to offset
val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]

val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]

val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]

OffsetKey(version, GroupTopicPartition(group,  new TopicPartition(topic, partition)))

else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {

//  version 2 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

GroupMetadataKey(version, group)

else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION2) { //

new add

//  version 3 refers to offset
val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]

GroupMetadataKey(version, group)

else {

throw new IllegalStateException("Unknown version " + version + " for group metadata message")

}

}

/**

* Decodes the offset messages' payload and retrieves offset and metadata from it

*

@param

buffer input byte-buffer

@return

an offset-metadata object from the message

*/

def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {

if (buffer ==  null ) {  //  tombstone
null

else {

val version = buffer.getShort

val valueSchema = schemaForOffset(version)

val value = valueSchema.read(buffer)

if (version == 0) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]

val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, timestamp)

else if (version == 1) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)

else if (version == 2) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp)

else if (version == 3) {

val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]

val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]

val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]

val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]

OffsetAndMetadata(offset, metadata, commitTimestamp)

else {

throw new IllegalStateException("Unknown offset message version")

}

}

}

/**

* Decodes the group metadata messages' payload and retrieves its member metadatafrom it

*

@param

buffer input byte-buffer

@return

a group metadata object from the message

*/

def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {

if (buffer ==  null ) {  //  tombstone
null

else {

val version = buffer.getShort

val valueSchema = schemaForGroup(version)

val value = valueSchema.read(buffer)

if (version == 0 || version == 1) {

val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]

val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]

val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]

val leaderId = value.get(LEADER_KEY).asInstanceOf[String]

val memberMetadataArray = value.getArray(MEMBERS_KEY)

val members = memberMetadataArray.map { memberMetadataObj =>

val memberMetadata = memberMetadataObj.asInstanceOf[Struct]

val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]

val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]

val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]

val subscription: PartitionAssignor.Subscription = ConsumerProtocol.deserializeSubscription(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])

val assignment: PartitionAssignor.Assignment = ConsumerProtocol.deserializeAssignment(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])

val member =  new MemberMetadata(memberId

, groupId

, clientId

, clientHost

, protocolType

, List((protocol, subscription.topics().asScala.toSet))

, assignment.partitions().asScala.map(tp => (tp.topic(), tp.partition())).toSet)

member

}

val finalProtocolType =  if (protocolType ==  null || protocolType.isEmpty) None  else Some(protocolType)

val group =  new GroupMetadata(groupId = groupId

, generationId = generationId

, protocolType = finalProtocolType

, protocol = Option(protocol)

, leaderId = Option(leaderId)

)

members.foreach(group.add)

group

else {

throw new IllegalStateException("Unknown group metadata message version")

}

}

}

//

Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.

//  (specify --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
class OffsetsMessageFormatter  extends MessageFormatter {

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {

//

Only print if the message is an offset record.

//  We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case offsetKey: OffsetKey =>

val groupTopicPartition = offsetKey.key

val value = consumerRecord.value

val formattedValue =

if (value ==  null ) "NULL"

else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString

output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))

output.write("::".getBytes(StandardCharsets.UTF_8))

output.write(formattedValue.getBytes(StandardCharsets.UTF_8))

output.write("\n".getBytes(StandardCharsets.UTF_8))

case _ =>  //  no-op
}

}

}

//  Formatter for use with tools to read group metadata history
class GroupMetadataMessageFormatter  extends MessageFormatter {

def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {

Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {

//

Only print if the message is a group metadata record.

//  We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
case groupMetadataKey: GroupMetadataKey =>

val groupId = groupMetadataKey.key

val value = consumerRecord.value

val formattedValue =

if (value ==  null ) "NULL"

else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString

output.write(groupId.getBytes(StandardCharsets.UTF_8))

output.write("::".getBytes(StandardCharsets.UTF_8))

output.write(formattedValue.getBytes(StandardCharsets.UTF_8))

output.write("\n".getBytes(StandardCharsets.UTF_8))

case _ =>  //  no-op
}

}

}

}

case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {

def  this (group: String, topic: String, partition: Int) =

this (group,  new TopicPartition(topic, partition))

override def toString: String =

"[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)

}

trait BaseKey{

def version: Short

def key: Any

}

case class OffsetKey(version: Short, key: GroupTopicPartition)  extends BaseKey {

override def toString: String = key.toString

}

case class GroupMetadataKey(version: Short, key: String)  extends BaseKey {

override def toString: String = key

}


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Charlotte's Web

Charlotte's Web

E. B. White / Puffin Classics / 2010-6-3 / GBP 6.99

This is the story of a little girl named Fern who loved a little pig named Wilbur and of Wilbur's dear friend, Charlotte A. Cavatica, a beautiful large grey spider. With the unlikely help of Templeton......一起来看看 《Charlotte's Web》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

在线进制转换器
在线进制转换器

各进制数互转换器

HEX HSV 转换工具
HEX HSV 转换工具

HEX HSV 互换工具