馬老師工作流引擎-activiti
?KTable<Windowed<String>, List<String>> kTable = kStream.groupBy((k, v) -> "defaultKey")
? ? ? ?.windowedBy(TimeWindows.of(windowSize).grace(Duration.ZERO))
? ? ? ?.aggregate(() -> new ArrayList<>(), (k, v, agg) -> {
? ? ? ? ?System.out.println("========== aggregate record ==========");
? ? ? ? ?log.info("k: {}, v: {}, agg: {}", k, v, agg);
? ? ? ? ?if (!signal.equals(v)) {
? ? ? ? ? ?agg.add(v);
? ? ? ? ?}
? ? ? ? ?aggRecordMap.put(lastMsgTimeKey, System.currentTimeMillis());
標簽: