本文出自明月工作室:https://www.freebytes.net/it/java/kafka-consumer-producer-base-code.html
1、开启zookeeper和kafka服务,并创建topic:topic-1。
2、建立一个maven项目。
3、引入maven依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
4、编写消息回调类:
package net.freebytes.kafka.ks;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
/**
* kafka消息发送成功的回调类
*
* @author 千里明月
* @date 2021/8/4 23:07
*/
public class ProducerCallback implements Callback {
private long startTime;
private int key;
private String message;
public ProducerCallback(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
/**
* 回调方法
*
* @author 千里明月
* @date 2021/8/9
**/
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
long duration = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println("key=" + key + ",value=" + message + ",partition=" + metadata.partition() + ",offset=" + metadata.offset() + ",duration=" + duration);
} else {
e.printStackTrace();
}
}
}
5、编写消息生产者类:
package net.freebytes.kafka.ks;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 生产者
*
* @author 千里明月
* @date 2021/8/4 22:38
*/
public class Producer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("client.id", "client-1");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<Integer, Object> producer = new KafkaProducer<>(properties);
//topic名
String topic = "topic-1";
int key = 1;
while (true) {
Thread.sleep(1000);
//发送的消息
String msg = "message_" + key++;
Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, key, msg), new ProducerCallback(System.currentTimeMillis(), key, msg));
//等待消息发送
result.get();
}
}
}
6、编写消息消费者类:
package net.freebytes.kafka.ks;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* 消费者
*
* @author 千里明月
* @date 2021/8/4 22:38
*/
public class Consumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//topic名
String topic = "topic-1";
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
//每秒拉取一次消息
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
//输出消息
System.out.println("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
}
}
} finally {
//关闭消费者
consumer.close();
}
}
}
可在gitee上查看项目源码: