1. Flink版本1.7.2
使用maven构建工程,因此pom.xml添加如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.8</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.10.1</version> </dependency>
response.proto文件
syntax = "proto3";package com.google.protos;//搜索响应message SearchResponse { uint64 search_time = 1; uint32 code = 2; Result results = 3;}//搜索结果message Result { string id = 1; repeated Item items = 2;}//搜索结果项message Item{ string id = 1; string name = 2; string title = 3; string url = 4; uint64 publish_time = 5; float score = 6; //推荐或者相似加权分值}
消息示例,包含嵌套对象results以及数组对象items:
{ "search_time":1553650604, "code":200, "results":{ "id":"449", "items":[ { "id":"47", "name":"name47", "title":"标题47", "url":"https://www.google.com.hk/item-47", "publish_time":1552884870, "score":96.03 }, { "id":"2", "name":"name2", "title":"标题2", "url":"https://www.google.com.hk/item-2", "publish_time":1552978902, "score":16.06 }, { "id":"60", "name":"name60", "title":"标题60", "url":"https://www.google.com.hk/item-60", "publish_time":1553444982, "score":62.58 }, { "id":"67", "name":"name67", "title":"标题67", "url":"https://www.google.com.hk/item-67", "publish_time":1553522957, "score":12.17 }, { "id":"15", "name":"name15", "title":"标题15", "url":"https://www.google.com.hk/item-15", "publish_time":1553525421, "score":32.36 }, { "id":"53", "name":"name53", "title":"标题53", "url":"https://www.google.com.hk/item-53", "publish_time":1553109227, "score":52.13 }, { "id":"70", "name":"name70", "title":"标题70", "url":"https://www.google.com.hk/item-70", "publish_time":1552781921, "score":1.72 }, { "id":"53", "name":"name53", "title":"标题53", "url":"https://www.google.com.hk/item-53", "publish_time":1553229003, "score":5.31 }, { "id":"30", "name":"name30", "title":"标题30", "url":"https://www.google.com.hk/item-30", "publish_time":1553282629, "score":26.51 }, { "id":"36", "name":"name36", "title":"标题36", "url":"https://www.google.com.hk/item-36", "publish_time":1552665833, "score":48.76 } ] }}
import com.google.protos.GoogleProtobuf.*;import com.googlecode.protobuf.format.JsonFormat;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.time.Instant;import java.util.Properties;import java.util.Random;import java.util.concurrent.TimeUnit;/** * @author lynn * @ClassName com.lynn.kafka.SearchResponsePublisher * @Description TODO * @Date 19-3-26 上午8:17 * @Version 1.0 **/public class SearchResponsePublisher { private static final Logger LOG = LoggerFactory.getLogger(SearchResponsePublisher.class); public String randomMessage(int results){ Random random = new Random(); DecimalFormat fmt = new DecimalFormat("##0.00"); SearchResponse.Builder response = SearchResponse.newBuilder(); response.setSearchTime(Instant.now().getEpochSecond()) .setCode(random.nextBoolean()?200:404); Result.Builder result = Result.newBuilder() .setId(""+random.nextInt(1000)); for (int i = 0; i < results; i++) { int number = random.nextInt(100); Item.Builder builder = Item.newBuilder() .setId(number+"") .setName("name"+number) .setTitle("标题"+number) .setUrl("https://www.google.com.hk/item-"+number) .setPublishTime(Instant.now().getEpochSecond() - random.nextInt(1000000)) .setScore(Float.parseFloat(fmt.format(random.nextInt(99) + random.nextFloat()))); result.addItems(builder.build()); } response.setResults(result.build()); return new JsonFormat().printToString(response.build()); } /** * * @param args */ public static void main(String[] args) throws InterruptedException{ if(args.length < 3){ System.err.println("Please input broker.servers and topic and records number!"); System.exit(-1); } String brokers = args[0]; String topic = args[1]; int recordsNumber = Integer.parseInt(args[2]); LOG.info("I will publish {} records...", recordsNumber); SearchResponsePublisher publisher = new SearchResponsePublisher();// System.out.println(publisher.randomMessage(10));// if(recordsNumber == 1000) return; Properties props = new Properties(); props.put("bootstrap.servers", brokers); //all:-1 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); int count = 0; while (count++ < recordsNumber){ producer.send(new ProducerRecord<String, String>(topic, String.valueOf(Instant.now().toEpochMilli()), publisher.randomMessage(10))); TimeUnit.MILLISECONDS.sleep(100); }// producer.flush(); producer.close(); }}
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.Types;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.sinks.PrintTableSink;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
// set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka().version("0.11") .topic(sourceTopic) .startFromEarliest()// .startFromLatest() .property("bootstrap.servers", brokers) .property("group.id", "res") .property("session.timeout.ms", "30000") .sinkPartitionerFixed(); tableEnv.connect(kafka) .withFormat(new Json() .failOnMissingField(false) .deriveSchema()) .withSchema(new Schema() .field("search_time", Types.LONG()) .field("code", Types.INT()) .field("results", Types.ROW( new String[]{"id", "items"}, new TypeInformation[]{ Types.STRING(), ObjectArrayTypeInfo.getInfoFor(Row[].class, //Array.newInstance(Row.class, 10).getClass(), Types.ROW( new String[]{"id", "name", "title", "url", "publish_time", "score"}, new TypeInformation[]{Types.STRING(),Types.STRING(),Types.STRING(),Types.STRING(),Types.LONG(),Types.FLOAT()} ))}) )).inAppendMode().registerTableSource("tb_json");//item[1] item[10] 数组下标从1开始String sql4 = "select search_time, code, results.id as result_id, items[1].name as item_1_name, items[2].id as item_2_id\n" + "from tb_json"; Table table4 = tableEnv.sqlQuery(sql4); tableEnv.registerTable("tb_item_2", table4); LOG.info("------------------print {} schema------------------", "tb_item_2"); table4.printSchema(); tableEnv.registerTableSink("console4", new String[]{"f0", "f1", "f2", "f3", "f4"}, new TypeInformation[]{ Types.LONG(),Types.INT(), Types.STRING(), Types.STRING(), Types.STRING() }, new PrintTableSink()); table4.insertInto("console4"); // execute program env.execute("Flink Table Json Engine");
select search_time, code, results.id as result_id, //嵌套json子字段 items[1].name as item_1_name, //数组对像子字段,数组下标从1开始 items[2].id as item_2_idfrom tb_json
??嵌套字段可以通过.连接符直接获取,而数组元素可以通过[下标]获取,下标从1开始,与Java中数组下标从0开始不同.
??按照Json对象的嵌套以及数组格式进行定义,即无需将每个字段展平进行定义,将嵌套字段定义为Row类型,数组类型定义为ObjectArrayTypeInfo或BasicArrayTypeInfo, ObjectArrayTypeInfo的第一个参数为数组类型,如示例中Row[].class 或Array.newInstance(Row.class, 10).getClass()方式获取class.
convert方法中的类型判断使用==,可能时由于flink版本的原因引起的==运算符没有重载.因此将此运算符替换为.equals()方法.
JsonRowDeserializationSchema.java
private Object convert(JsonNode node, TypeInformation<?> info) { if (Types.VOID.equals(info) || node.isNull()) { return null; } else if (Types.BOOLEAN.equals(info)) { return node.asBoolean(); } else if (Types.STRING.equals(info)) { return node.asText(); } else if (Types.BIG_DEC.equals(info)) { return node.decimalValue(); } else if (Types.BIG_INT.equals(info)) { return node.bigIntegerValue(); } else if(Types.LONG.equals(info)){ return node.longValue(); } else if(Types.INT.equals(info)){ return node.intValue(); } else if(Types.FLOAT.equals(info)){ return node.floatValue(); } else if(Types.DOUBLE.equals(info)){ return node.doubleValue(); } else if (Types.SQL_DATE.equals(info)) { return Date.valueOf(node.asText()); } else if (Types.SQL_TIME.equals(info)) { // according to RFC 3339 every full-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String time = node.asText(); if (time.indexOf(‘Z‘) < 0 || time.indexOf(‘.‘) >= 0) { throw new IllegalStateException( "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " + "Format: HH:mm:ss‘Z‘"); } return Time.valueOf(time.substring(0, time.length() - 1)); } else if (Types.SQL_TIMESTAMP.equals(info)) { // according to RFC 3339 every date-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String timestamp = node.asText(); if (timestamp.indexOf(‘Z‘) < 0) { throw new IllegalStateException( "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + "Format: yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘"); } return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace(‘T‘, ‘ ‘)); } else if (info instanceof RowTypeInfo) { return convertRow(node, (RowTypeInfo) info); } else if (info instanceof ObjectArrayTypeInfo) { return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof BasicArrayTypeInfo) { return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return convertByteArray(node); } else { // for types that were specified without JSON schema // e.g. POJOs try { return objectMapper.treeToValue(node, info.getTypeClass()); } catch (JsonProcessingException e) { throw new IllegalStateException("Unsupported type information ‘" + info + "‘ for node: " + node); } } }
JsonRowSerializationSchema.java
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) { if (Types.VOID.equals(info) || object == null) { return container.nullNode(); } else if (Types.BOOLEAN.equals(info)) { return container.booleanNode((Boolean) object); } else if (Types.STRING.equals(info)) { return container.textNode((String) object); } else if (Types.BIG_DEC.equals(info)) { // convert decimal if necessary if (object instanceof BigDecimal) { return container.numberNode((BigDecimal) object); } return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); } else if (Types.BIG_INT.equals(info)) { // convert integer if necessary if (object instanceof BigInteger) { return container.numberNode((BigInteger) object); } return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); } else if(Types.LONG.equals(info)){ if(object instanceof Long){ return container.numberNode((Long) object); } return container.numberNode(Long.valueOf(((Number) object).longValue())); } else if(Types.INT.equals(info)){ if(object instanceof Integer){ return container.numberNode((Integer) object); } return container.numberNode(Integer.valueOf(((Number) object).intValue())); } else if(Types.FLOAT.equals(info)){ if(object instanceof Float){ return container.numberNode((Float) object); } return container.numberNode(Float.valueOf(((Number) object).floatValue())); } else if(Types.DOUBLE.equals(info)){ if(object instanceof Double){ return container.numberNode((Double) object); } return container.numberNode(Double.valueOf(((Number) object).doubleValue())); } else if (Types.SQL_DATE.equals(info)) { return container.textNode(object.toString()); } else if (Types.SQL_TIME.equals(info)) { final Time time = (Time) object; // strip milliseconds if possible if (time.getTime() % 1000 > 0) { return container.textNode(timeFormatWithMillis.format(time)); } return container.textNode(timeFormat.format(time)); } else if (Types.SQL_TIMESTAMP.equals(info)) { return container.textNode(timestampFormat.format((Timestamp) object)); } else if (info instanceof RowTypeInfo) { if (reuse != null && reuse instanceof ObjectNode) { return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); } else { return convertRow(null, (RowTypeInfo) info, (Row) object); } } else if (info instanceof ObjectArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof BasicArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return container.binaryNode((byte[]) object); } else { // for types that were specified without JSON schema // e.g. POJOs try { return mapper.valueToTree(object); } catch (IllegalArgumentException e) { throw new IllegalStateException("Unsupported type information ‘" + info + "‘ for object: " + object, e); } } }
添加文件:
resources/META-INF/services/org.apache.flink.table.factories.TableFactory
org.apache.flink.formats.json.JsonRowFormatFactoryorg.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
由于打包后kafka-connector jar中与json jar中的同名文件会覆盖,需要将两个文件的内容保留.
参考阿里巴巴blink分支
scala:
BatchCompatibleStreamTableSink.scala
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport org.apache.flink.table.api._import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}/** Defines an external [[TableSink]] to emit a batch [[Table]] for * compatible with stream connect plugin. */trait BatchCompatibleStreamTableSink[T] extends TableSink[T] { /** Emits the DataStream. */ def emitBoundedStream(boundedStream: DataStream[T]): DataStreamSink[_]}
PrintTableSink.scala
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport java.lang.{Boolean => JBool}import java.util.TimeZoneimport java.util.{Date => JDate}import java.sql.Dateimport java.sql.Timeimport java.sql.Timestampimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}import org.apache.flink.api.java.typeutils.RowTypeInfoimport org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.operators.StreamingRuntimeContextimport org.apache.flink.types.Rowimport org.apache.flink.configuration.Configurationimport org.apache.flink.table.runtime.functions.DateTimeFunctionsimport org.apache.flink.util.StringUtils/** * A simple [[TableSink]] to output data to console. * */class PrintTableSink() extends TableSinkBase[JTuple2[JBool, Row]] with BatchCompatibleStreamTableSink[JTuple2[JBool, Row]] with UpsertStreamTableSink[Row] { override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() dataStream.addSink(sink).name(sink.toString) } override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = new PrintTableSink() override def setKeyFields(keys: Array[String]): Unit = {} override def setIsAppendOnly(isAppendOnly: JBool): Unit = {}// override def getRecordType: DataType = DataTypes.createRowType(getFieldTypes, getFieldNames) override def getRecordType: TypeInformation[Row] = { new RowTypeInfo(getFieldTypes, getFieldNames) } /** Emits the DataStream. */ override def emitBoundedStream(boundedStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() boundedStream.addSink(sink).name(sink.toString) }}/** * Implementation of the SinkFunction writing every tuple to the standard output. * */class PrintSinkFunction() extends RichSinkFunction[JTuple2[JBool, Row]] { private var prefix: String = _ override def open(parameters: Configuration): Unit = { super.open(parameters) val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] prefix = "task-" + (context.getIndexOfThisSubtask + 1) + "> " } override def invoke(in: JTuple2[JBool, Row]): Unit = { val sb = new StringBuilder val row = in.f1 for (i <- 0 until row.getArity) { if (i > 0) sb.append(",") val f = row.getField(i) if (f.isInstanceOf[Date]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd")) } else if (f.isInstanceOf[Time]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss")) } else if (f.isInstanceOf[Timestamp]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd HH:mm:ss.SSS")) } else { sb.append(StringUtils.arrayAwareToString(f)) } } if (in.f0) { System.out.println(prefix + "(+)" + sb.toString()) } else { System.out.println(prefix + "(-)" + sb.toString()) } } override def close(): Unit = { this.prefix = "" } override def toString: String = "Print to System.out"}