对提供者的access mode不太明白

@Bean
public Producer createPulsarProducer_myTopic1() throws PulsarClientException {
    Producer<String> stringProducer = pulsarClient.newProducer(Schema.STRING)
            .topic(myTopic1)
            .accessMode(ProducerAccessMode.Shared)
            .create();
    System.out.println("生产者名称:" + stringProducer.getProducerName());
    return stringProducer;
}

比如上面的代码。
使用 Shared 多个生产者可以在一个主题上发布。这是默认 设置

如果还有其它的代码:
Producer stringProducer = pulsarClient.newProducer(Schema.STRING)
.topic(myTopic1)
.accessMode(ProducerAccessMode.Exclusive)
.create();
System.out.println(“生产者名称:” + stringProducer.getProducerName());
return stringProducer;

都对myTopic1主题设置Access mode,那么这个Access mode是Shared呢,还是Exclusive呢??

这时会变成Exclusive。

  protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
            CompletableFuture<Void> producerQueuedFuture) {
        lock.writeLock().lock();
        try {
            switch (producer.getAccessMode()) {
            case Shared:
                if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
                    return FutureUtil.failedFuture(
                            new ProducerBusyException(
                                    "Topic has an existing exclusive producer: " + exclusiveProducerName));
                } else {
                    // Normal producer getting added, we don't need a new epoch
                    return CompletableFuture.completedFuture(topicEpoch);
                }

            case Exclusive:
                if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
                    return FutureUtil.failedFuture(
                            new ProducerFencedException(
                                    "Topic has an existing exclusive producer: " + exclusiveProducerName));
                } else if (!producers.isEmpty()) {
                    return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
                } else if (producer.getTopicEpoch().isPresent()
                        && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
                    // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
                    // to be fenced, because a new producer had been present in between.
                    return FutureUtil.failedFuture(new ProducerFencedException(
                            String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
                                    topicEpoch.get(), producer.getTopicEpoch().get())));
                } else {
                    // There are currently no existing producers
                    hasExclusiveProducer = true;
                    exclusiveProducerName = producer.getProducerName();

                    CompletableFuture<Long> future;
                    if (producer.getTopicEpoch().isPresent()) {
                        future = setTopicEpoch(producer.getTopicEpoch().get());
                    } else {
                        future = incrementTopicEpoch(topicEpoch);
                    }
                    future.exceptionally(ex -> {
                        hasExclusiveProducer = false;
                        exclusiveProducerName = null;
                        return null;
                    });

                    return future.thenApply(epoch -> {
                        topicEpoch = Optional.of(epoch);
                        return topicEpoch;
                    });
                }
                case ExclusiveWithFencing:
                    if (hasExclusiveProducer || !producers.isEmpty()) {
                        // clear all waiting producers
                        // otherwise closing any producer will trigger the promotion
                        // of the next pending producer
                        List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
                                new ArrayList<>(waitingExclusiveProducers);
                        waitingExclusiveProducers.clear();
                        waitingExclusiveProducersCopy.forEach((Pair<Producer,
                                                               CompletableFuture<Optional<Long>>> handle) -> {
                            log.info("[{}] Failing waiting producer {}", topic, handle.getKey());
                            handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out"));
                            handle.getKey().close(true);
                        });
                        producers.forEach((k, currentProducer) -> {
                            log.info("[{}] Fencing out producer {}", topic, currentProducer);
                            currentProducer.close(true);
                        });
                    }
                    if (producer.getTopicEpoch().isPresent()
                            && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
                        // If a producer reconnects, but all the topic epoch has already moved forward,
                        // this producer needs to be fenced, because a new producer had been present in between.
                        hasExclusiveProducer = false;
                        return FutureUtil.failedFuture(new ProducerFencedException(
                                String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
                                        topicEpoch.get(), producer.getTopicEpoch().get())));
                    } else {
                        // There are currently no existing producers
                        hasExclusiveProducer = true;
                        exclusiveProducerName = producer.getProducerName();

                        CompletableFuture<Long> future;
                        if (producer.getTopicEpoch().isPresent()) {
                            future = setTopicEpoch(producer.getTopicEpoch().get());
                        } else {
                            future = incrementTopicEpoch(topicEpoch);
                        }
                        future.exceptionally(ex -> {
                            hasExclusiveProducer = false;
                            exclusiveProducerName = null;
                            return null;
                        });

                        return future.thenApply(epoch -> {
                            topicEpoch = Optional.of(epoch);
                            return topicEpoch;
                        });
                    }
            case WaitForExclusive: {
                if (hasExclusiveProducer || !producers.isEmpty()) {
                    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
                    log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
                    waitingExclusiveProducers.add(Pair.of(producer, future));
                    producerQueuedFuture.complete(null);
                    return future;
                } else if (producer.getTopicEpoch().isPresent()
                        && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
                    // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
                    // to be fenced, because a new producer had been present in between.
                    return FutureUtil.failedFuture(new ProducerFencedException(
                            String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
                                    topicEpoch.get(), producer.getTopicEpoch().get())));
                } else {
                    // There are currently no existing producers
                    hasExclusiveProducer = true;
                    exclusiveProducerName = producer.getProducerName();

                    CompletableFuture<Long> future;
                    if (producer.getTopicEpoch().isPresent()) {
                        future = setTopicEpoch(producer.getTopicEpoch().get());
                    } else {
                        future = incrementTopicEpoch(topicEpoch);
                    }
                    future.exceptionally(ex -> {
                        hasExclusiveProducer = false;
                        exclusiveProducerName = null;
                        return null;
                    });

                    return future.thenApply(epoch -> {
                        topicEpoch = Optional.of(epoch);
                        return topicEpoch;
                    });
                }
            }