德阳吧

您现在的位置是:首页 > 生活经验 > 正文

生活经验

程序并行配置不正确(程序并行配置不正确怎么解决)

zhiyongz2024-05-17生活经验

程序并行配置不正确(程序并行配置不正确怎么解决)1、代码实现逻辑

package one;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.j *** a.functions.KeySelector;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @program: Flink_learn * @description: 并行度的设置 * 针对每个算子设置的并行度的优先级高于全局并行度 * 本程序需要两个任务插槽 * @author: Mr.逗 * @create: 2021-09-14 15:40 **/public class Example3 { public static void main(String[] args) { // 获取流处理的运行时环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行任务的数量为1 // 需要1个任务插槽 env.setParalleli *** (1); //读取数据源 // 并行度设置为1 DataStreamSource<String> stream = env.fromElements("hello world", "hello world").setParalleli *** (1); // map操作 // 这里使用的flatMap方法 // map: 针对流中的每一个元素,输出一个元素 // flatMap:针对流中的每一个元素,输出0个,1个或者多个元素 // 并行度设置为2 SingleOutputStreamOperator<WordWithCount> m *** edStream = stream // 输入泛型:String; 输出泛型:WordWithCount .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String v, Collector<WordWithCount> out) throws Exception { String[] words = v.split(" "); for (String w : words) { // 使用collect方法向下游发送数据 out.collect(new WordWithCount(w, 1L)); } } }).setParalleli *** (2); //分组shuffle // 第一个泛型:流中元素的泛型 // 第二个泛型:key的泛型 KeyedStream<WordWithCount, String> keyedStream = m *** edStream.keyBy(new KeySelector<WordWithCount, String>() { @Override public String getKey(WordWithCount v) throws Exception { return v.word; } }); // reduce操作 // reduce会维护一个累加器 // 第一条数据到来,作为累加器输出 // 第二条数据到来,和累加器进行聚合操作,然后输出累加器 // 累加器和流中元素的类型是一样的 SingleOutputStreamOperator<WordWithCount> reduce = keyedStream.reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception { return new WordWithCount(v1.word, v1.count + v2.count); } }); //输出 reduce.print(); String name = Example3.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } // POJO类 // 1. 必须是公有类 // 2. 所有字段必须是public // 3. 必须有空构造器 // 模拟了case class public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + '\'' + ", count=" + count + '}'; } }}2、结果展示

WordWithCount{word='hello', count=1}WordWithCount{word='world', count=1}WordWithCount{word='hello', count=2}WordWithCount{word='world', count=2}