1.模拟实现kafka的生产者消费者(原生API)
解决相关依赖:<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version></dependency>
生产者:
packagecom.zy.kafka;importjava.util.Properties;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //配置ack级别:0 1 -1(all) prps.put("acks", "all"); //重试次数 prps.put("retries", 3); prps.put("batch.size", 16384); prps.put("linger.ms",1); prps.put("buffer.memory", 33554432); //指定(message的K-V)的序列化 prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.创建生产者对象(指定的key和value的泛型) Producer<String, String>producer=new KafkaProducer<>(prps); //生产者发送消息 for(inti=0;i<100;i++) { /** * ProducerRecord<String, String>(topic, value) * topic:主题名称 * key: * value: */ //消息的封装对象 ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i); producer.send(pr); }producer.close(); }}
消费者:
packagecom.zy.kafka;importjava.util.Arrays;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.Producer;importorg.apache.kafka.clients.producer.ProducerRecord;publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.加载配置文件 //1.1封装配置文件对象 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //指定消费的组的ID prps.put("group.id", "test"); //是否启动自动提交(是否自动提交反馈信息,向zookeeper提交) prps.put("enable.auto.commit", "true"); //自动提交的时间间隔 prps.put("auto.commit.interval.ms", "1000"); //指定(message的K-V)的序列化 prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建kafka的消费者 KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps); //添加消费主题 consumer.subscribe(Arrays.asList("kafka_test")); //开始消费 while(true) { //设置从哪里开始消费,返回的是一个消费记录 ConsumerRecords<String, String>poll = consumer.poll(10); for(ConsumerRecord<String, String>p:poll) { System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value()); } } }}
import java.io.IOException;import java.io.InputStream;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import kafka.admin.TopicCommand;public class KafkaAPI { public static void main(String[] args) throws IOException { /* kafka-topics.sh --create --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 --replication-factor 3 --partitions 10 --topic kafka_test11 */ //创建一个topic String ops[]=new String []{ "--create", "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181", "--replication-factor","3", "--topic","zy_topic","--partitions","5" }; String list[]=new String[] { "--list", "--zookeeper", "hadoop01:2181,hadoop02:2181,hadoop03:2181" }; //以命令的方式提交 TopicCommand.main(list); }}