java.lang.RuntimeException:java.lang.ClassCastException:java.lang.Longをjava.lang.Stringにキャストできません

リテシュシンハ

ストームトポロジの実行中にこのエラーが発生します。トポロジはエラーなしで5分間完全に実行され、その後失敗します。

Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS as 300 sec i.e 5mins.

これは私の入力ストリームです:

{"_id":{"$oid":"556809dbe4b0ef41436f7515"},"body":{"ProductCount":NumberInt(1),"category":null,"correctedWord":"bbtp","field":null,"filter":{},"fromAutocomplete":false,"loggedIn":false,"pageNo":"1","pageSize":"64","percentageMatch":NumberInt(100),"searchTerm":"bbtp","sortOrder":null,"suggestedWords":[]},"envelope":{"IP":"115.115.115.98","actionType":"search","sessionId":"10536088910863418864","timestamp":{"$date":"2015-05-29T06:40:00.000Z"}}}

これは完全なエラーです:

java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Long 
cannot be cast to java.lang.String at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at 
backtype.storm.daemon.executor$fn__4722$fn__4734$fn__4781.invoke(executor.clj:748) at
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) at 
clojure.lang.AFn.run(AFn.java:24) at 
java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
java.lang.String at 
backtype.storm.tuple.TupleImpl.getString(TupleImpl.java:112) at 
com.inferlytics.InferlyticsStormConsumer.bolt.QueryNormalizer.execute(QueryNor
malizer.java:40) at 
backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) at 
backtype.storm.daemon.executor$fn__4722$tuple_action_fn__4724.invoke(executor.clj:633) at 
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:4
04) at 
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58) at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ... 6 more 

私のトポロジー:

public class TopologyMain {
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
        .getLogger(TopologyMain.class);
private static final String SPOUT_ID = "Feed-Emitter";

/**
 * @param args
 */
/**
 * @param args
 * @throws AlreadyAliveException
 * @throws InvalidTopologyException
 */
/**
 * @param args
 * @throws AlreadyAliveException
 * @throws InvalidTopologyException
 */
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

    int numSpoutExecutors = 1;
    LOG.info("This is SpoutConfig");
    KafkaSpout kspout = QueryCounter();
    TopologyBuilder builder = new TopologyBuilder();
    LOG.info("This is Set Spout");
    builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);

    LOG.info("This is Query-Normalizer bolt");
    builder.setBolt("Query-normalizer", new QueryNormalizer())
    .shuffleGrouping(SPOUT_ID);

    LOG.info("This is Query-ProductCount bolt");
    builder.setBolt("Query-ProductCount", new QueryProductCount(),1)
    .shuffleGrouping("Query-normalizer", "stream1");

    LOG.info("This is Query-SearchTerm bolt");
    builder.setBolt("Query-SearchTerm", new QuerySearchTermCount(),1)
    .shuffleGrouping("Query-normalizer", "stream2");

    LOG.info("This is tick-tuple bolt");
    builder.setBolt("Tick-Tuple", new TickTuple(),1)
    .shuffleGrouping("Query-normalizer", "stream3");

    /*
     * Storm Constants
     * */

    String NIMBUS_HOST = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.NIMBUS_HOST );
    String NIMBUS_THRIFT_PORT = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.NIMBUS_THRIFT_PORT );
    String TOPOLOGY_TICK_TUPLE_FREQ_SECS = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.TOPOLOGY_TICK_TUPLE_FREQ_SECS );
    String STORM_JAR = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.STORM_JAR );
    String SET_NUM_WORKERS = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.SET_NUM_WORKERS );
    String SET_MAX_SPOUT_PENDING = FilePropertyManager.getProperty( ApplicationConstants.STORM_CONSTANTS_FILE,
            ApplicationConstants.SET_MAX_SPOUT_PENDING );
    final int setNumWorkers = Integer.parseInt(SET_NUM_WORKERS);
    final int setMaxSpoutPending = Integer.parseInt(SET_MAX_SPOUT_PENDING);
    final int nimbus_thirft_port = Integer.parseInt(NIMBUS_THRIFT_PORT);
    final int topology_tick_tuple_freq_secs = Integer.parseInt(TOPOLOGY_TICK_TUPLE_FREQ_SECS);


    /*
     * Storm Configurations
     */

    LOG.trace("Setting Configuration");
    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    conf.put(Config.NIMBUS_HOST, NIMBUS_HOST);
    conf.put(Config.NIMBUS_THRIFT_PORT, nimbus_thirft_port);
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, topology_tick_tuple_freq_secs);
    System.setProperty("storm.jar",STORM_JAR );
    conf.setNumWorkers(setNumWorkers);
    conf.setMaxSpoutPending(setMaxSpoutPending);


    if (args != null && args.length > 0) {
        LOG.trace("Storm Topology Submitted On CLuster");
        StormSubmitter. submitTopology(args[0], conf, builder.createTopology());
    }

    else
    {   
        LOG.trace("Storm Topology Submitted On Local");
        cluster.submitTopology("Query", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.killTopology("Query");
        LOG.trace("This is ShutDown cluster");
        cluster.shutdown();
    }

        LOG.trace("Method: main finished.");
}


private static KafkaSpout QueryCounter() {

    //Build a kafka spout

    /*
     * Kafka Constants
     */

    final String topic = FilePropertyManager.getProperty( ApplicationConstants.KAFKA_CONSTANTS_FILE,
            ApplicationConstants.TOPIC );

    String zkHostPort = FilePropertyManager.getProperty( ApplicationConstants.KAFKA_CONSTANTS_FILE,
            ApplicationConstants.ZOOKEEPER_CONNECTION_STRING );



    String zkRoot = "/Feed-Emitter";
    String zkSpoutId = "Feed-Emitter-spout";
    ZkHosts zkHosts = new ZkHosts(zkHostPort);

    LOG.trace("This is Inside kafka spout ");
    SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
    spoutCfg.scheme = new SchemeAsMultiScheme(new StringScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
    LOG.trace("Returning From kafka spout ");
    return kafkaSpout;

  }

}

私のQueryNormalizerボルト:

public class QueryNormalizer extends BaseBasicBolt {
/**
 * 
 */

private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
        .getLogger(QueryNormalizer.class);
public void cleanup() {}

/**
 * The bolt will receive the line from the
 * feed file and process it to Normalize this line
 * 
 * The normalize will be put the terms in lower case
 * and split the line to get all terms. 
 */
public void execute(Tuple input, BasicOutputCollector collector) {
    LOG.trace("Method in QueryNormalizer: execute called.");
    String feed = input.getString(0);

    String searchTerm = null;
    String pageNo = null;
    boolean sortOrder = true;
    boolean category = true;
    boolean field = true;
    boolean filter = true;
    String pc = null;
    int ProductCount = 0;
    String timestamp = null;
    String year = null;
    String month = null;
    String day = null;
    String hour = null;

    Calendar calendar = Calendar.getInstance();

    int dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);  
    int weekOfYear = calendar.get(Calendar.WEEK_OF_YEAR); 

    JSONObject obj = null;
    try {
        obj = new JSONObject(feed);
    } catch (JSONException e1) {

        LOG.error( "Json Exception in Query Normalizer", e1 );

    }

    try {
           searchTerm = obj.getJSONObject("body").getString("correctedWord");

           pageNo = obj.getJSONObject("body").getString("pageNo");
           sortOrder = obj.getJSONObject("body").isNull("sortOrder");
           category = obj.getJSONObject("body").isNull("category");
           field = obj.getJSONObject("body").isNull("field");
           filter = obj.getJSONObject("body").getJSONObject("filter").isNull("filters");
           pc = obj.getJSONObject("body").getString("ProductCount").replaceAll("[^\\d]", "");
           ProductCount = Integer.parseInt(pc);
           timestamp = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(10,29);
           year = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(10, 14);
           month = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(15, 17);
           day = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(18, 20);
           hour = (obj.getJSONObject("envelope").get("timestamp")).toString().substring(21, 23);
    } catch (JSONException e) {

        LOG.error( "Parsing Value Exception in Query Normalizer", e );

    }

    searchTerm = searchTerm.trim();

    //Condition to eliminate pagination
     if(!searchTerm.isEmpty()){
         if ((pageNo.equals("1")) && (sortOrder == true) && (category == true) && (field == true) && (filter == true)){
             searchTerm = searchTerm.toLowerCase();

            System.out.println("In QueryProductCount execute: "+searchTerm+","+year+","+month+","+day+","+hour+","+dayOfYear+","+weekOfYear+","+ProductCount);
            System.out.println("Entire Json : "+feed);
            System.out.println("In QuerySearchCount execute : "+searchTerm+","+year+","+month+","+day+","+hour);

            LOG.trace("In QueryNormalizer execute : "+searchTerm+","+year+","+month+","+day+","+hour+","+dayOfYear+","+weekOfYear+","+ProductCount);
            LOG.trace("In QueryNormalizer execute : "+searchTerm+","+year+","+month+","+day+","+hour);

            collector.emit("stream1", new Values(searchTerm , year , month , day , hour , dayOfYear , weekOfYear , ProductCount ));
            collector.emit("stream2", new Values(searchTerm , year , month , day , hour ));
            collector.emit("stream3", new Values());

            }LOG.trace("Method in QueryNormalizer: execute finished.");
     }


    }



/**
 * The bolt will only emit the specified streams in collector
 */
public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declareStream("stream1", new Fields("searchTerm" ,"year" ,"month" ,"day" ,"hour" ,"dayOfYear" ,"weekOfYear" ,"ProductCount"));
    declarer.declareStream("stream2", new Fields("searchTerm" ,"year" ,"month" ,"day" ,"hour"));
    declarer.declareStream("stream3", new Fields());

}
}

QueryNormalizerのクラスエラーは、この行に示されています

String feed = input.getString(0);

public void execute(Tuple input, BasicOutputCollector collector) {
LOG.trace("Method in QueryNormalizer: execute called.");
String feed = input.getString(0);

String searchTerm = null;

原因:java.lang.ClassCastException:java.lang.Longは、com.inferlytics.InferlyticsStormConsumer.bolt.QueryNormalizerのbacktype.storm.tuple.TupleImpl.getString(TupleImpl.java:112)のjava.lang.Stringにキャストできません。 .execute(QueryNor malizer.java:40)

編集:

構成Config.TOPOLOGY_TICK_TUPLE_FREQ_SECSから削除した、コードは正しく機能しますが、Tick Tuple()を実装する必要があります。それを達成する方法は?

TickTupleクラスに問題があると思います。これはそれを実装する正しい方法ですか?

TickTuple

public class TickTuple extends BaseBasicBolt {

private static final long serialVersionUID = 1L;
private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory
        .getLogger(TickTuple.class);


private static final String KEYSPACE = FilePropertyManager.getProperty( ApplicationConstants.CASSANDRA_CONSTANTS_FILE,
        ApplicationConstants.KEYSPACE );
private static final String MONGO_DB = FilePropertyManager.getProperty( ApplicationConstants.MONGO_CONSTANTS_FILE,
        ApplicationConstants.MONGO_DBE );
private static final String TABLE_CASSANDRA_TOP_QUERY = FilePropertyManager.getProperty( ApplicationConstants.CASSANDRA_CONSTANTS_FILE,
        ApplicationConstants.TABLE_CASSANDRA_TOP_QUERY );
private static final String MONGO_COLLECTION_E = FilePropertyManager.getProperty( ApplicationConstants.MONGO_CONSTANTS_FILE,
        ApplicationConstants.MONGO_COLLECTION_E );

public void cleanup() {

}


protected static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}


@Override
public void execute(Tuple input, BasicOutputCollector collector) {


         try {
            if (isTickTuple(input)) {
                CassExport.cassExp(KEYSPACE, TABLE_CASSANDRA_TOP_QUERY, MONGO_DB, MONGO_COLLECTION_E);
                TruncateCassandraTable.truncateData(TABLE_CASSANDRA_TOP_QUERY);

                Log.trace("In Truncate");
                return;
             }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }



}
}

誰かがコードに必要な変更を提案できますか?

リテシュシンハ

問題は、TickTupleボルトの実装にありましたconf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, topology_tick_tuple_freq_secs)。メイントポロジ構成に追加しました。代わりに、TickTupleが実装されているボルトに追加する必要があります。

TickTupleコードを編集し、このスニペットを追加すると、すべてが正常に機能します。

 @Override
   public Map<String, Object> getComponentConfiguration() {
      // configure how often a tick tuple will be sent to our bolt
     Config conf = new Config();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, topology_tick_tuple_freq_secs);
        return conf;
   }  

これは、メイントポロジではなく、対応するボルトに追加する必要があります

この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。

侵害の場合は、連絡してください[email protected]

編集
0

コメントを追加

0

関連記事

Related 関連記事

ホットタグ

アーカイブ