我有一个具有两个线程的应用程序,一个线程写入队列,第二个线程从队列读取异步。我需要创建第三个生成20个以上的第三个。新创建的线程将一直运行到明确停止为止。那20个线程应该获取“实时”数据以便进行分析。20个中的每个都有唯一的ID /名称。我需要将相关数据(READ线程收集的)发送到正确的线程(20个线程中)。例如,如果数据包含ID(在其中)为2的字符串->我需要将其发送到ID = 2的线程。我的问题是:我应该如何为20个线程中的每个线程持有一个“指针”并向其发送相关数据?(我可以在可运行列表(将保存线程)中搜索id,但随后我需要调用方法“ NewData(string)”以将数据发送到正在运行的线程)。我该怎么办?蒂亚·帕斯(TIA Paz)
使用队列与线程通信可能会更好。然后,您可以将所有队列放在地图中以方便访问。我会推荐一个BlockingQueue
。
public class Test {
// Special stop message to tell the worker to stop.
public static final Message Stop = new Message("Stop!");
static class Message {
final String msg;
// A message to a worker.
public Message(String msg) {
this.msg = msg;
}
public String toString() {
return msg;
}
}
class Worker implements Runnable {
private volatile boolean stop = false;
private final BlockingQueue<Message> workQueue;
public Worker(BlockingQueue<Message> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (!stop) {
try {
Message msg = workQueue.poll(10, TimeUnit.SECONDS);
// Handle the message ...
System.out.println("Worker " + Thread.currentThread().getName() + " got message " + msg);
// Is it my special stop message.
if (msg == Stop) {
stop = true;
}
} catch (InterruptedException ex) {
// Just stop on interrupt.
stop = true;
}
}
}
}
Map<Integer, BlockingQueue<Message>> queues = new HashMap<>();
public void test() throws InterruptedException {
// Keep track of my threads.
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 20; i++) {
// Make the queue for it.
BlockingQueue<Message> queue = new ArrayBlockingQueue(10);
// Build its thread, handing it the queue to use.
Thread thread = new Thread(new Worker(queue), "Worker-" + i);
threads.add(thread);
// Store the queue in the map.
queues.put(i, queue);
// Start the process.
thread.start();
}
// Test one.
queues.get(5).put(new Message("Hello"));
// Close down.
for (BlockingQueue<Message> q : queues.values()) {
// Stop each queue.
q.put(Stop);
}
// Join all threads to wait for them to finish.
for (Thread t : threads) {
t.join();
}
}
public static void main(String args[]) {
try {
new Test().test();
} catch (Throwable t) {
t.printStackTrace(System.err);
}
}
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句