Kafka on pulsar 无法发送消息

我在分别在虚拟机和k8s环境部署了一套pulsar,pulsar版本为2.10.1,kop版本2.10.0.6。
虚拟机和k8s环境都碰到了同样的问题。
使用kafka-clients-2.2.0包写了一段测试代码通过kafka协议测试生产和消费。
新创建的topic,首次可以正常发送消息。
停止程序再启动,无法正常发送消息,出现异常。
异常如下:

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for KafkaTopic1-0:120012 ms has passed since batch creation
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
	at com.kafka.test.Producer.run(Producer.java:45)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for KafkaTopic1-0:120012 ms has passed since batch creation
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for KafkaTopic1-0:120002 ms has passed since batch creation
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
	at com.kafka.test.Producer.run(Producer.java:45)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for KafkaTopic1-0:120002 ms has passed since batch creation

测试代码如下:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
	private final KafkaProducer<Integer, String> producer;
	private final String topic;
	private final Boolean isAsync;

	public Producer(String topic, Boolean isAsync) {
		Properties props = new Properties();

		String boostrapServers = "x.x.x.x:9092";
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
		props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		producer = new KafkaProducer<>(props);
		this.topic = topic;
		this.isAsync = isAsync;
	}

	public void run() {
		int messageNo = 0;
		while (messageNo < 10) {
			String messageStr = "Message_" + messageNo;
			long startTime = System.currentTimeMillis();
			if (isAsync) { // Send asynchronously
				producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
						new DemoCallBack(startTime, messageNo, messageStr));
			} else { // Send synchronously
				try {
					producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
					System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
				} catch (InterruptedException | ExecutionException e) {
					e.printStackTrace();
				}
			}
			++messageNo;
		}
	}

	public static void main(String[] args) {
		String topic = "KafkaTopic1";
		Producer producerThread = new Producer(topic, false);
		producerThread.start();
	}
}

class DemoCallBack implements Callback {

	private final long startTime;
	private final int key;
	private final String message;

	public DemoCallBack(long startTime, int key, String message) {
		this.startTime = startTime;
		this.key = key;
		this.message = message;
	}

	public void onCompletion(RecordMetadata metadata, Exception exception) {
		long elapsedTime = System.currentTimeMillis() - startTime;
		if (metadata != null) {
			System.out.println("message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), "
					+ "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
		} else {
			exception.printStackTrace();
		}
	}
}

你能在 standalone 模式下复现吗?我用 standalone 无法复现。
另外,Kafka 客户端日志对排查 KoP 问题帮忙十分有限,一般而言最好提供:

  1. KoP 配置
  2. Broker 日志(能稳定复现的话最好是 debug 级别日志)

你可以检查下 topic 是否被自动删除了,参考 kop/kop.md at 1e814e7f9b1c336bb3e752d9281aed2870205525 · streamnative/kop · GitHub

感谢您的回复。

我发现出问题的时候从puslar manager上看Topic还在但是partition消失了。
然后我修改了几个broker参数
allowAutoTopicCreation=false
brokerDeleteInactiveTopicsEnabled=false
后续测试如果还出现上面的现象,我再补充现场的日志

那就对了,topic 还在 partition 消失是因为 Pulsar 的 partitioned topic 被自动删除了,但是 metadata 还留存着,这个 bug 已经修复了,但是默认配置还是保留着原来的行为。除了配置 brokerDeleteInactiveTopicsEnabled=false 外,还有一种解决方式是配置 brokerDeleteInactivePartitionedTopicMetadataEnabled=true,参考 Topic metadata not removed for partitioned topics · Issue #10975 · apache/pulsar · GitHub

感谢你的回复,祝工作顺利!!! :+1: