# bigdata-module-8-flinklearn **Repository Path**: penglanglang/bigdata-module-8-flinklearn ## Basic Information - **Project Name**: bigdata-module-8-flinklearn - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-11-23 - **Last Updated**: 2021-11-23 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # bigdata-module-8-flinklearn #### 介绍 任务1: idea代码编写入门案例,要求至少java或scala版本一种实现方式 任务2: window机制案例代码编写 任务3: watermark案例代码编写,并运行成功 state案例代码编写 任务4: 并行度设置代码修改,并运行测试 加深理解FlinkKafka源码 任务5: flinktable案例代码编写 作业提交流程图自己亲自重绘 #### 任务一 入门案例 idea代码编写入门案例,要求至少java或scala版本一种实现方式 ``` public class WordCountStream { public static void main(String[] args) throws Exception { //获取Flink流执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //Socket模拟实时发送单词,使用Flink实时接收数据 DataStreamSource dataStream = executionEnvironment.socketTextStream("centos-master", 7777); //匿名实现FlatMapFunction SingleOutputStreamOperator> tupleSingleOutputStreamOperator = dataStream.flatMap(new FlatMapFunction>() { public void flatMap(String s, Collector> collector) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { collector.collect(Tuple2.of(word, 1l)); } } }); DataStreamSink> result = tupleSingleOutputStreamOperator.keyBy(new KeySelector, Object>() { public Object getKey(Tuple2 stringLongTuple2) throws Exception { return stringLongTuple2.f0; } // 根据第二份字段求和 }).sum(1) //输出 .print(); // 触发执行程序 executionEnvironment.execute("wordcount stream process"); } } ``` #### 任务二 window demo window机制案例代码编写 TimeWindowDemo.java 基于时间的滚动窗口 和滑动窗口 ``` /** * 翻滚窗口:窗口不重叠 * * 1、基于时间驱动 * 2、基于事件驱动 */ public class TimeWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); DataStreamSource dataStreamSource = env.socketTextStream("centos-master", 7777); dataStreamSource.map(new MapFunction>() { public Tuple2 map(String value) throws Exception { long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + " random: " + random + " timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2(value, random); } }).keyBy(new KeySelector, String>() { public String getKey(Tuple2 value) throws Exception { return value.f0; } }).timeWindow(Time.seconds(5)) .apply(new WindowFunction, Object, String, TimeWindow>() { public void apply(String key, TimeWindow window, Iterable> input, Collector out) throws Exception { int sum = 0; for(Tuple2 tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect(key+": "+sum+" |window_start:"+format.format(start) +" <> window_end:"+format.format(end)); } }).print(); env.execute("wordcount stream process"); } } ``` CountWindowDemo.java 基于事件的滚动窗口和滑动窗口 ``` /** * 滑动窗口:窗口可重叠 * * 1、基于时间驱动 * 2、基于事件驱动 */ public class CountWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); DataStreamSource dataStreamSource = env.socketTextStream("centos-master", 7777); dataStreamSource.map(new MapFunction>() { public Tuple2 map(String value) throws Exception { long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + " random: " + random + " timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2(value, random); } }).setParallelism(1) .keyBy(new KeySelector, String>() { public String getKey(Tuple2 value) throws Exception { return value.f0; } }).countWindow(5) .apply(new WindowFunction, Object, String, GlobalWindow>() { public void apply(String key, GlobalWindow window, Iterable> input, Collector out) throws Exception { int sum = 0; for(Tuple2 tuple2 : input){ sum +=tuple2.f1; } long maxTimestamp = window.maxTimestamp(); out.collect(key+": "+sum+" |window_maxTimestamp:"+format.format(maxTimestamp) ); } }).print(); env.execute("wordcount stream process"); } } ``` ``` SessionWindowDemo.java 基于会话的窗口 public class SessionWindowDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); DataStreamSource dataStreamSource = env.socketTextStream("centos-master", 7777); dataStreamSource.map(new MapFunction>() { public Tuple2 map(String value) throws Exception { long timeMillis = System.currentTimeMillis(); int random = new Random().nextInt(10); System.out.println("value: " + value + " random: " + random + " timestamp: " + timeMillis + "|" + format.format(timeMillis)); return new Tuple2(value, random); } }).keyBy(new KeySelector, String>() { public String getKey(Tuple2 value) throws Exception { return value.f0; } }).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .apply(new WindowFunction, Object, String, TimeWindow>() { public void apply(String key, TimeWindow window, Iterable> input, Collector out) throws Exception { int sum = 0; for(Tuple2 tuple2 : input){ sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); out.collect(key+": "+sum+" |window_start:"+format.format(start) +" <> window_end:"+format.format(end)); } }).print(); env.execute("wordcount stream process"); } } ``` #### 任务三 watermark demo watermark案例代码编写,并运行成功 ``` public class WaterMarkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); DataStreamSource data = env.socketTextStream("centos-master", 7777); WatermarkStrategy> watermarkStrategy = new WatermarkStrategy>() { public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator>() { private long maxTimeStamp = 0L; public void onEvent(Tuple2 event, long eventTimestamp, WatermarkOutput output) { maxTimeStamp = Math.max(maxTimeStamp, event.f1); System.out.println("maxTimeStamp:" + maxTimeStamp + "...format:" + format.format(maxTimeStamp)); } public void onPeriodicEmit(WatermarkOutput output) { System.out.println(".....onPeriodicEmit...."); long maxOutOfOrderness = 3000l; output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness)); } }; } }.withTimestampAssigner(new SerializableTimestampAssigner>() { public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; } }); SingleOutputStreamOperator> tuple2SingleOutputStreamOperator = data.map(new MapFunction>() { public Tuple2 map(String value) throws Exception { String[] split = value.split(","); return new Tuple2(split[0], Long.valueOf(split[1])); } }).assignTimestampsAndWatermarks(watermarkStrategy); KeyedStream, String> keyed = tuple2SingleOutputStreamOperator.keyBy(e -> e.f0); SingleOutputStreamOperator result = keyed.window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new WindowFunction, String, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable> input, Collector out) throws Exception { System.out.println("..." + format.format(window.getStart())); Iterator> iterator = input.iterator(); ArrayList list = new ArrayList<>(); while (iterator.hasNext()) { Tuple2 next = iterator.next(); list.add(next.f1); } Collections.sort(list); String result = "key:" + key + "..." + "list.size:" + list.size() + "...list.first:" + format.format(list.get(0)) + "...list.last:" + format.format(list.get(list.size() - 1)) + "...window.start:" + format.format(window.getStart()) + "..window.end:" + format.format(window.getEnd()); out.collect(result); } }); result.print(); env.execute(); } } ``` #### 任务四 parallelism demo 并行度设置代码修改,并运行测试 加深理解FlinkKafka源码 ``` /** *算子级别 > env级别 > Client级别 > 系统默认级别 * **/ public class ParallelismDemo { public static void main(String[] args) throws Exception { //获取Flink流执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(3); //Socket模拟实时发送单词,使用Flink实时接收数据 DataStreamSource dataStream = executionEnvironment.socketTextStream("centos-master", 7777); //匿名实现FlatMapFunction SingleOutputStreamOperator> tupleSingleOutputStreamOperator = dataStream.flatMap(new FlatMapFunction>() { public void flatMap(String s, Collector> collector) throws Exception { String[] splits = s.split("\\s"); for (String word : splits) { collector.collect(Tuple2.of(word, 1l)); } } }) .setParallelism(4); DataStreamSink> result = tupleSingleOutputStreamOperator.keyBy(new KeySelector, Object>() { public Object getKey(Tuple2 stringLongTuple2) throws Exception { return stringLongTuple2.f0; } //基于时间每隔5s划分为一个窗口 })//.timeWindow(Time.seconds(5)) .sum(1) .print(); executionEnvironment.execute("wordcount stream process"); } } ``` #### 任务五 flinktable api demo flinktable案例代码编写 作业提交流程图自己亲自重绘 ``` public class TableAPIDemo { public static void main(String[] args) throws Exception { //Flink执行环境env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //用env,做出Table环境tEnv StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); SingleOutputStreamOperator> data = env.socketTextStream("centos-master", 7777) .flatMap(new FlatMapFunction>() { @Override public void flatMap(String value, Collector> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(Tuple2.of(word, 1)); } } }).keyBy(new KeySelector, String>() { @Override public String getKey(Tuple2 value) throws Exception { return value.f0; } }).timeWindow(Time.seconds(5)).apply(new WindowFunction, Tuple2, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable> input, Collector> out) throws Exception { int sum = 0; for(Tuple2 tuple2 : input){ //System.out.println(tuple2); sum +=tuple2.f1; } long start = window.getStart(); long end = window.getEnd(); System.out.println("key: "+key+ " start:"+format.format(start)+ " end:"+format.format(end)); out.collect(new Tuple2<>(key,sum)); } }); //Table table = tEnv.fromDataStream(data, $("world"), $("count")); tEnv.createTemporaryView("worlds",data,$("world"),$("counts")); Table world = tEnv.sqlQuery("select world, sum(counts) from worlds group by worldh"); DataStream> result = tEnv.toRetractStream(world, Row.class); result.print(); env.execute(); } } ```