馬老師Elasticsearch核心知識篇
? ?KafkaProducer<String, String> producer = new KafkaProducer<>(buildConfigProps());
? ?branches[1].map((k, v) -> new KeyValue<>(k, new ProducerRecord<>(SOURCE_TOPIC, k, v.toJSONString())))
? ? ? ?.foreach((k, v) -> producer.send(v));
? ?KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
? ?kafkaStreams.setUncaughtExceptionHandler(
? ? new RestartUncaughtExceptionHandler(streamsBuilder, props));
? ?kafkaStreams.start();
標(biāo)簽: