网页制作与网站建设试卷,建设银行个人网上银行登录入口,云南最近出了什么流行病,莱州网站建设报价map
map是大家非常熟悉的大数据操作算子#xff0c;主要用于将数据流中的数据进行转换#xff0c;形成新的数据流。简单来说#xff0c;就是一个“一一映射”#xff0c;消费一个元素就产出一个元素。 我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要…map
map是大家非常熟悉的大数据操作算子主要用于将数据流中的数据进行转换形成新的数据流。简单来说就是一个“一一映射”消费一个元素就产出一个元素。 我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现返回值类型还是DataStream不过泛型流中的元素类型可能改变。
public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),new WaterSensor(sensor_2, 2, 2));// 方式一传入匿名类实现MapFunctionstream.map(new MapFunctionWaterSensor, String() {Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二传入MapFunction的实现类// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunctionWaterSensor, String {Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}
面代码中MapFunction实现类的泛型类型与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候需要指定两个泛型分别是输入事件和输出事件的类型还需要重写一个map()方法定义从一个输入事件转换为另一个输出事件的具体逻辑。
Filter
filter转换操作顾名思义是对数据流执行一个过滤通过一个布尔条件表达式设置过滤条件对于每一个流内元素进行判断若为true则元素正常输出若为false则元素被过滤掉。 进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口而FilterFunction内要实现filter()方法就相当于一个返回布尔类型的条件表达式。
public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));// 方式一传入匿名类实现FilterFunctionstream.filter(new FilterFunctionWaterSensor() {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}}).print();// 方式二传入FilterFunction实现类// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunctionWaterSensor {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}}
}
FlatMap
flatMap操作又称为扁平映射主要是将数据流中的整体一般是集合类型拆分成一个一个的个体使用。消费一个元素可以产生0到多个元素。flatMap可以认为是“扁平化”flatten和“映射”map两步操作的结合也就是先按照某种规则对数据进行打散拆分同map一样flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参返回值类型取决于所传参数的具体逻辑可以与原数据流相同也可以不同。 案例需求如果输入的数据是sensor_1只打印vc如果输入的数据是sensor_2既打印ts又打印vc。
public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}/*** TODO flatmap 一进多出包含0出* 对于s1的数据一进一出* 对于s2的数据一进2出* 对于s3的数据一进0出类似于过滤的效果** map怎么控制一进一出* 》 使用 return** flatmap怎么控制的一进多出* 》 通过 Collector来输出 调用几次就输出几条***/public static class MyFlatMap implements FlatMapFunctionWaterSensor, String {Overridepublic void flatMap(WaterSensor value, CollectorString out) throws Exception {if (value.id.equals(sensor_1)) {out.collect(String.valueOf(value.vc));} else if (value.id.equals(sensor_2)) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}