以下に示すkafkaストリームのコーディング部分で何かを試していました
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("iostatin2");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
.mapValues(value -> value.replace("]","" ) )
.mapValues(value -> value.replaceAll("\":\"", "\":"))
.mapValues(value -> value.replaceAll("\":", "\":\""))
.mapValues(value -> value.replaceAll("\",\"", ",\""))
.mapValues(value -> value.replaceAll(",\"", "\",\""))
.mapValues(value -> value.replaceAll(":\"\\{", ":\\{"))
.mapValues(value -> value.replaceAll("\\}\",", "\\},"))
.mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{"));
textlines.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
したがって、foreachaction()関数で
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
この行の値は、変数 'value'がスコープですでに定義されているようなエラーを引き起こしています。だから私はその行を何に置き換える必要があります... plzzは私を助けます..
値は上記のapply()メソッドのパラメーターとしてすでに使用されているため、行の値を変更します textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
vのような他の名前に
textlines.flatMapValues(v -> Arrays.asList(v.split("\\},\\{")));
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加