Kafka-API

 

ctrl+H
new 它的实现类
ctrl+r替换
格式化ctrl+alt+l

ctrl+f
ctrl+alt+v

 

替换
&lt "
&lt <
&gt >

Kafka生产者Java API

 创建生产者

不带回调函数的

public class CustomProducer { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); //kafka地址 properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092"); //acks=-1 properties.put("acks", "all"); properties.put("retries", 0); //基于大小的批处理 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //基于时间的批处理 properties.put("linger.ms", 1); //客户端缓存大小 properties.put("buffer.memory", 33554432); //k v序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 9; i++){ producer.send(new ProducerRecord<String, String>("first","" + i, "Hello" + i )); } //Thread.sleep(1000); producer.close(); //忘记close关了,它就是基于批处理的条件( 基于大小的批处理; 基于时间的批处理,看是否达到,没有达到就不会send;)  } }

 new producer<String, String>( "主题", 分区int, " key“, "value" )

 

带回调函数

带回调函数的producer, 每发一条消息调用一次回调函数
不管有没有发送成功

public class CustomProducerCompletion { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092"); properties.put("acks", "all"); properties.put("retries", 2); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定义分区 ProducerConfig.PARTITIONER_CLASS_CONFIG //properties.put("partitioner.class", "com.atguigu.kafka.producer.CustomPartitioner"); //拦截器  properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("com.atguigu.kafka.interceptor.TimeStampInterceptor","com.atguigu.kafka.interceptor.CountInterceptor")); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 9; i++){ kafkaProducer.send(new ProducerRecord<String, String>("first", "1", "Hi" + i), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null){ System.out.println("Topic:" + recordMetadata.topic() + "\t" + "Partition:" + recordMetadata.partition() + "\t" + "offset:" + recordMetadata.offset() ); } } }); } kafkaProducer.close(); } }

自定义分区

 指定分区重写key的规则

public class CustomPartitioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 0; //控制分区  } public void close() { } /** * 可以添加属性 * @param config */ public void configure(Map<String, ?> config) { } }

Kafka消费者Java API

高级API

不需手动管理offset

poll 超时时间
subscribe订阅主题
可同时消费多个主题
数组-Arrays.asList->集合

//高级API public class CustomConsumer { public static void main(String[] args) { Properties properties = new Properties(); //定义kafka集群地址 properties.put("bootstrap.servers", "hadoop101:9092, hadoop102:9092, hadoop103:9092"); //消费者组id properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kris"); //是否自动提交偏移量:(kafka集群管理) properties.put("enable.auto.commit", "true"); //间隔多长时间提交一次offset properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); //key,value的反序列化 properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); kafkaConsumer.subscribe(Arrays.asList("first")); while (true){ ConsumerRecords<String, String> records = kafkaConsumer.poll(100); //定义Consumer, poll拉数据 for (ConsumerRecord<String, String> record : records) { System.out.println("Topic:" + record.topic() + "\t" + "Partition:" + record.partition() + "\t" + "Offset:" +record.offset() + "\t" + "key:" + record.key() + "\t" + "value:" + record.value()); } } } }

低级API

leader
  offset
  保存offset
消息

public class LowLevelConsumer { public static void main(String[] args) { //1.集群 ArrayList<String> list = new ArrayList<>(); list.add("hadoop101"); list.add("hadoop102"); list.add("hadoop103"); //2.主题 String topic = "first"; //3.分区 int partition = 2; //4.offset long offset = 10; //5.获取leader String leader = getLeader(list, topic, partition); //6.连接leader获取数据  getData(leader, topic, partition, offset); } private static void getData(String leader, String topic, int partition, long offset) { //1.创建SimpleConsumer SimpleConsumer consumer = new SimpleConsumer(leader, 9092, 2000, 1024 * 1024 * 2, "getData"); //2.发送请求 //3.构建请求对象FetchRequestBuilder FetchRequestBuilder builder = new FetchRequestBuilder(); FetchRequestBuilder requestBuilder = builder.addFetch(topic, partition, offset, 1024 * 1024); FetchRequest fetchRequest = requestBuilder.build(); //4.获取响应 FetchResponse fetchResponse = consumer.fetch(fetchRequest); //5.解析响应 ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topic, partition); //6.遍历 for (MessageAndOffset messageAndOffset : messageAndOffsets) { long message_offset = messageAndOffset.offset(); Message message = messageAndOffset.message(); //7.解析message ByteBuffer byteBuffer = message.payload(); //payload是有效负载 byte[] bytes = new byte[byteBuffer.limit()]; byteBuffer.get(bytes); //8.获取数据 System.out.println("offset:" + message_offset + "\t" + "value:" + new String(bytes)); } } private static String getLeader(ArrayList<String> list, String topic, int partition) { //1.循环发送请求,获取leader for (String host : list) { //2.创建SimpleConsumer对象 SimpleConsumer consumer = new SimpleConsumer( host, 9092, 2000, 1024*1024, "getLeader" ); //3.发送获取leader请求 //4.构造请求TopicMetadataRequest TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic)); //5.获取响应TopicMetadataResponse TopicMetadataResponse response = consumer.send(request); //6.解析响应 List<TopicMetadata> topicsMetadata = response.topicsMetadata(); //7.遍历topicsMetadata for (TopicMetadata topicMetadata : topicsMetadata) { List<PartitionMetadata> partitionsMetadata = topicMetadata.partitionsMetadata(); //8.遍历partitionsMetadata for (PartitionMetadata partitionMetadata : partitionsMetadata) { //9.判断 if (partitionMetadata.partitionId() == partition){ BrokerEndPoint endPoint = partitionMetadata.leader(); return endPoint.host(); } } } } return null; } }

Kafka producer拦截器

flume-事件
flume的拦截器链:
kafka-消息

每发送一条数据调用一次onSend方法
接收数据调用回调函数之前调用onAcknoeledgement

https://blog.csdn.net/stark_summer/article/details/50144591

Kafka与Flume比较

在企业中必须要清楚流式数据采集框架flume和kafka的定位是什么:

flume:cloudera公司研发:

       适合多个生产者;

适合下游数据消费者不多的情况;

适合数据安全性要求不高的操作;

适合与Hadoop生态圈对接的操作。

kafka:linkedin公司研发:

适合数据下游消费众多的情况;

适合数据安全性要求较高的操作,支持replication。

因此我们常用的一种模型是:

线上数据 --> flume --> kafka --> flume(根据情景增删该流程) --> HDFS

vim flume-kafka.conf # define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/datas/flume.log a1.sources.r1.shell = /bin/bash -c # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

 tail -F动态实时  -c 0从0行开始监控

[kris@hadoop101 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-kafka.conf [kris@hadoop101 datas]$ cat > flume.log Hello [kris@hadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic first Hello

 

相关文章