There are several use cases that require exclusive access for a single writer, of which few examples are:

  • Ensuring a linear non-interleaved history of messages
  • Providing basic mechanism for leader election

ProducerAccessMode.Exclusive 目前适合单分区的Topic,内部实现机制,在broker内部,通过 private volatile boolean hasExclusiveProducer 来判断,设计之初是为function 的选主使用的。多分区可能会出现, 如果对一个多分区的Topic同时创建多个producer,举一个例子如果 producer1 对 topic- partition-0创建成功了,同时producer2 对 topic- partition-1也创建成功了, 但这样会导致最终producer1和producer2 都会报错,需要注意。

 case Exclusive:
                if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
                    return FutureUtil.failedFuture(
                            new ProducerFencedException(
                                    "Topic has an existing exclusive producer: " + exclusiveProducerName));
                } else if (!producers.isEmpty()) {
                    return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
                } else if (producer.getTopicEpoch().isPresent()
                        && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
                    // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
                    // to be fenced, because a new producer had been present in between.
                    return FutureUtil.failedFuture(new ProducerFencedException(
                            String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
                                    topicEpoch.get(), producer.getTopicEpoch().get())));
                } else {
                    // There are currently no existing producers
                    hasExclusiveProducer = true;
                    exclusiveProducerName = producer.getProducerName();

                    CompletableFuture<Long> future;
                    if (producer.getTopicEpoch().isPresent()) {
                        future = setTopicEpoch(producer.getTopicEpoch().get());
                    } else {
                        future = incrementTopicEpoch(topicEpoch);
                    future.exceptionally(ex -> {
                        hasExclusiveProducer = false;
                        exclusiveProducerName = null;
                        return null;

                    return future.thenApply(epoch -> {
                        topicEpoch = Optional.of(epoch);
                        return topicEpoch;

@liudezhi2098 请问这个leader选主是pulsar内部使用的吗?