kafka 源码分析 4 : broker 处理生产请求

java admin 84900 0 Comment

Kafka broker上对于produce生产者生产消息的处理

Kafka Server处理生成者请求

入口在KafkaApis.scala, 通过request.header.apikey判断消息类型

def handle(request: RequestChannel.Request) {
try {

trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))

ApiKeys.forId(request.header.apiKey) match {
case ApiKeys.PRODUCE => handleProduceRequest(request)

生产消息则调用replicaManager.appendRecords

// call the replica manager to append messages to the replicas

replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
isFromClient = true,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback)

// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;

// hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log

produceRequest.clearPartitionRecords()

ReplicaManager.scala
appendRecords 先写消息到partition的leader上,如果requireAcks==-1说明需要所有isr都写入成功才返回response,而isr同样作为leader的消费者来拉取的

/**

* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;

* the callback function will be triggered either when timeout or the required acks are satisfied;

* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.

*/
 def appendRecords(timeout: Long,

 requiredAcks: Short,

 internalTopicsAllowed: Boolean,

 isFromClient: Boolean,

 entriesPerPartition: Map[TopicPartition, MemoryRecords],

 responseCallback: Map[TopicPartition, PartitionResponse] => Unit,

 delayedProduceLock: Option[Object] = None) {

 if (isValidRequiredAcks(requiredAcks)) {
 val sTime = time.milliseconds
 val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
isFromClient = isFromClient, entriesPerPartition, requiredAcks)
 debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
 val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->

 ProducePartitionStatus(

result.info.lastOffset + 1, // required offset

new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
 }
 // 1. required acks = -1
 // 2. there is data to append
 // 3. at least one partition append was successful (fewer errors than partitions)
 if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
 } else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
 }

 } else {
 // If required.acks is outside accepted range, something is wrong with the client
 // Just return an error and don't handle the request at all
 val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
 LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP)
 }
 responseCallback(responseStatus)

 }
 }

追加消息到本地log中

/**

* Append the messages to the local replica logs

*/
 private def appendToLocalLog(internalTopicsAllowed: Boolean,



isFromClient: Boolean,



entriesPerPartition: Map[TopicPartition, MemoryRecords],



requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {

 trace("Append [%s] to local log ".format(entriesPerPartition))

 entriesPerPartition.map { case (topicPartition, records) =>
 brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
 brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
 // reject appending to internal topics if it is not allowed
 if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
 LogAppendInfo.UnknownLogAppendInfo,
 Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
 } else {
try {
 val partitionOpt = getPartition(topicPartition)
 val info = partitionOpt match {

 case Some(partition) =>


 if (partition eq ReplicaManager.OfflinePartition)

 throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")


 partition.appendRecordsToLeader(records, isFromClient, requiredAcks)

 case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"


 .format(topicPartition, localBrokerId))
 }
 val numAppendedMessages =

 if (info.firstOffset == -1L || info.lastOffset == -1L)


 0

 else


 info.lastOffset - info.firstOffset + 1
 // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
 brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
 brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
 brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
 brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
 trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"

 .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
 (topicPartition, LogAppendResult(info))
} catch {
 // NOTE: Failed produce requests metric is not incremented for known exceptions
 // it is supposed to indicate un-expected failures of a broker in handling a produce request
 case e@ (_: UnknownTopicOrPartitionException |

_: NotLeaderForPartitionException |

_: RecordTooLargeException |

_: RecordBatchTooLargeException |

_: CorruptRecordException |

_: KafkaStorageException |

_: InvalidTimestampException) =>

 (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
 case t: Throwable =>

 brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()

 brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()

 error("Error processing append operation on partition %s".format(topicPartition), t)

 (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))
}
 }

 }
 }

追加records到leader上

def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {

leaderReplicaIfLocal match {
case Some(leaderReplica) =>

val log = leaderReplica.log.get

val minIsr = log.config.minInSyncReplicas

val inSyncSize = inSyncReplicas.size

// Avoid writing to leader if there are not enough insync replicas to make it safe

if (inSyncSize < minIsr && requiredAcks == -1) {


throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"

.format(topicPartition, inSyncSize, minIsr))

}

val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)

// probably unblock some follower fetch requests since log end offset has been updated

replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))

// we may need to increment high watermark since ISR could be down to 1

(info, maybeIncrementLeaderHW(leaderReplica))
case None =>

throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"


.format(topicPartition, localBrokerId))

}
}
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)

tryCompleteDelayedRequests()
info

}

Log.scala

private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {

 maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
 val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
 // return if we have no valid messages or if this is a duplicate of the last appended entry
 if (appendInfo.shallowCount == 0)
return appendInfo
 // trim any invalid bytes or partial messages before appending it to the on-disk log
 var validRecords = trimInvalidBytes(records, appendInfo)
 // they are valid, insert them in the log
 lock synchronized {
if (assignOffsets) {
 // assign offsets to the message set
 val offset = new LongRef(nextOffsetMetadata.messageOffset)
 appendInfo.firstOffset = offset.value
 val now = time.milliseconds
 val validateAndOffsetAssignResult = try {

 LogValidator.validateMessagesAndAssignOffsets(validRecords,


 offset,


 now,


 appendInfo.sourceCodec,


 appendInfo.targetCodec,


 config.compact,


 config.messageFormatVersion.messageFormatVersion,


 config.messageTimestampType,


 config.messageTimestampDifferenceMaxMs,


 leaderEpoch,


 isFromClient)
 } catch {

 case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
 }
 validRecords = validateAndOffsetAssignResult.validatedRecords
 appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
 appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
 appendInfo.lastOffset = offset.value - 1
 if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)

 appendInfo.logAppendTime = now
 // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
 // format conversion)
 if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {

 for (batch <- validRecords.batches.asScala) {


 if (batch.sizeInBytes > config.maxMessageSize) {

 // we record the original message set size instead of the trimmed size

 // to be consistent with pre-compression bytesRejectedRate recording

 brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)

 brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)

 throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."

.format(batch.sizeInBytes, config.maxMessageSize))


 }

 }
 }
} else {
 // we are taking the offsets we are given
 if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)

 throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
}
// update the epoch cache with the epoch stamped onto the message by the leader
validRecords.batches.asScala.foreach { batch =>
 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)

 leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
}
// check messages set size may be exceed config.segmentSize
if (validRecords.sizeInBytes > config.segmentSize) {
 throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."

 .format(validRecords.sizeInBytes, config.segmentSize))
}
// now that we have valid records, offsets assigned, and timestamps updated, we need to
// validate the idempotent/transactional state of the producers and collect some metadata
val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
maybeDuplicate.foreach { duplicate =>
 appendInfo.firstOffset = duplicate.firstOffset
 appendInfo.lastOffset = duplicate.lastOffset
 appendInfo.logAppendTime = duplicate.timestamp
 return appendInfo
}
// 如果segment满了则换一个新的segment
// maybe roll the log if this segment is full
val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
 maxTimestampInMessages = appendInfo.maxTimestamp,
 maxOffsetInMessages = appendInfo.lastOffset)
val logOffsetMetadata = LogOffsetMetadata(
 messageOffset = appendInfo.firstOffset,
 segmentBaseOffset = segment.baseOffset,
 relativePositionInSegment = segment.size)
// 由segment写入
segment.append(firstOffset = appendInfo.firstOffset,
 largestOffset = appendInfo.lastOffset,
 largestTimestamp = appendInfo.maxTimestamp,
 shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
 records = validRecords)
// update the producer state
for ((producerId, producerAppendInfo) <- updatedProducers) {
 producerAppendInfo.maybeCacheTxnFirstOffsetMetadata(logOffsetMetadata)
 producerStateManager.update(producerAppendInfo)
}
// update the transaction index with the true last stable offset. The last offset visible
// to consumers using READ_COMMITTED will be limited by this value and the high watermark.
for (completedTxn <- completedTxns) {
 val lastStableOffset = producerStateManager.completeTxn(completedTxn)
 segment.updateTxnIndex(completedTxn, lastStableOffset)
}
// always update the last producer id map offset so that the snapshot reflects the current offset
// even if there isn't any idempotent data being written
producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)
// update the first unstable offset (which is used to compute LSO)
updateFirstUnstableOffset()
trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
 .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
// 如果超过了刷新间隔,则调用一次fsync
if (unflushedMessages >= config.flushInterval)
 flush()
appendInfo
 }

 }
 }

LogSegment.scala
追加record,如果追加的字节数超过一定大小则记录index、timeIndex

@nonthreadsafe
def append(firstOffset: Long,
 largestOffset: Long,
 largestTimestamp: Long,
 shallowOffsetOfMaxTimestamp: Long,
 records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {

trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"

.format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))

val physicalPosition = log.sizeInBytes()

if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)

// append the messages

require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")

val appendedBytes = log.append(records)

trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")

// Update the in memory max timestamp and corresponding offset.

if (largestTimestamp > maxTimestampSoFar) {
maxTimestampSoFar = largestTimestamp
offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp

}

// append an entry to the index (if needed)

if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(firstOffset, physicalPosition)
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
bytesSinceLastIndexEntry = 0

}

bytesSinceLastIndexEntry += records.sizeInBytes
}

FileRecords.scala

public int append(MemoryRecords records) throws IOException {
int written = records.writeFullyTo(channel);
size.getAndAdd(written);
return written;
}

通过FileChannel write到磁盘
MemoryRecords.scala

/**
 * Write all records to the given channel (including partial records).
 * @param channel The channel to write to
 * @return The number of bytes written
 * @throws IOException For any IO errors writing to the channel
 */
public int writeFullyTo(GatheringByteChannel channel) throws IOException {
buffer.mark();
int written = 0;
while (written < sizeInBytes())
written += channel.write(buffer);
buffer.reset();
return written;
}

 

Reproduced please indicate the source: 飞嗨_分享互联网 » kafka 源码分析 4 : broker 处理生产请求

Like (0) or Share (0)
Guest Post my comment   Change account
Cancel comment

emoj
(0)person posted

Effective,Professional,Conform to SEO

Contact us