博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
企业搜索引擎开发之连接器connector(七)
阅读量:6680 次
发布时间:2019-06-25

本文共 11606 字,大约阅读时间需要 38 分钟。

在继续分析TimedCancelable类及QueryTraverser类之前有必要熟悉一下ThreadPool类和TaskHandle类, 这两者的作用是以线程池的方式执行连接器的功能

ThreadPool类源码如下:

/** * Pool for running {
@link TimedCancelable}, time limited tasks. * *

* Users are provided a {

@link TaskHandle} for each task. The {
@link TaskHandle} * supports canceling the task and determining if the task is done running. * *

* The ThreadPool enforces a configurable maximum time interval for tasks. Each * task is guarded by a time out task that will cancel the primary task * if the primary task does not complete within the allowed interval. * *

* Task cancellation includes two actions that are visible for the task task's * {

@link TimedCancelable} *

    *
  1. Calling {
    @link Future#cancel(boolean)} to send the task an interrupt and * mark it as done. *
  2. Calling {
    @link TimedCancelable#cancel()} to send the task a second signal * that it is being canceled. This signal has the benefit that it does not * depend on the tasks interrupt handling policy. *
* Once a task has been canceled its {
@link TaskHandle#isDone()} method will * immediately start returning true. * *

* {

@link ThreadPool} performs the following processing when a task completes *

    *
  1. Cancel the time out task for the completed task. *
  2. Log exceptions that indicate the task did not complete normally. *
*/public class ThreadPool { private static final Logger LOGGER = Logger.getLogger(ThreadPool.class.getName()); /** * The default amount of time in to wait for tasks to complete during during * shutdown. */ public static final int DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 10 * 1000; /** * Configured amount of time to let tasks run before automatic cancellation. */ private final long maximumTaskLifeMillis; /** * ExecutorService for running submitted tasks. Tasks are only submitted * through completionService. */ private final ExecutorService executor = Executors.newCachedThreadPool( new ThreadNamingThreadFactory("ThreadPoolExecutor")); /** * CompletionService for running submitted tasks. All tasks are submitted * through this CompletionService to provide blocking, queued access to * completion information. */ private final CompletionService
completionService = new ExecutorCompletionService(executor); /** * Dedicated ExecutorService for running the CompletionTask. The completion * task is run in its own ExecutorService so that it can be shut down after * the executor for submitted tasks has been shut down and drained of running * tasks. */ private final ExecutorService completionExecutor = Executors.newSingleThreadExecutor( new ThreadNamingThreadFactory("ThreadPoolCompletion")); /** * Dedicated ScheduledThreadPoolExecutor for running time out tasks. Each * primary task is guarded by a time out task that is scheduled to run when * the primary tasks maximum life time expires. When the time out task runs it * cancels the primary task. */ private final ScheduledThreadPoolExecutor timeoutService = new ScheduledThreadPoolExecutor(1, new ThreadNamingThreadFactory("ThreadPoolTimeout")); /** * Create a {
@link ThreadPool}. * * @param taskLifeSeconds minimum number of seconds to allow a task to run * before automatic cancellation. */ // TODO: This method, called from Spring, multiplies the supplied [soft] // timeout value by 2. The actual value wants to be 2x or 1.5x of a user // configured soft value. However, Spring v2 does not provide a convenient // mechanism to do arithmetic on configuration properties. Once we move to // Spring v3, the calculation should be done in the Spring XML definition // file rather than here. public ThreadPool(int taskLifeSeconds) { this.maximumTaskLifeMillis = taskLifeSeconds * 2 * 1000L; completionExecutor.execute(new CompletionTask()); } /** * Shut down the {
@link ThreadPool}. After this returns * {
@link ThreadPool#submit(TimedCancelable)} will return null. * * @param interrupt true if the threads executing tasks task should * be interrupted; otherwise, in-progress tasks are allowed to complete * normally. * @param waitMillis maximum amount of time to wait for tasks to complete. * @return true if this all the running tasks terminated and * false if the some running task did not terminate. * @throws InterruptedException if interrupted while waiting. */ boolean shutdown(boolean interrupt, long waitMillis) throws InterruptedException { if (interrupt) { executor.shutdownNow(); } else { executor.shutdown(); } timeoutService.shutdown(); try { return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS); } finally { completionExecutor.shutdownNow(); timeoutService.shutdownNow(); } } /** * Submit a {
@link TimedCancelable} for execution and return a * {
@link TaskHandle} for the running task or null if the task has not been * accepted. After {
@link ThreadPool#shutdown(boolean, long)} returns this * will always return null. */ public TaskHandle submit(TimedCancelable cancelable) { // When timeoutTask is run it will cancel 'cancelable'. TimeoutTask timeoutTask = new TimeoutTask(cancelable); // timeoutFuture will be used to cancel timeoutTask when 'cancelable' // completes. FutureTask
timeoutFuture = new FutureTask(timeoutTask, null); // cancelTimeoutRunnable runs 'cancelable'. When 'cancelable' completes // cancelTimeoutRunnable cancels 'timeoutTask'. This saves system // resources. In addition it prevents timeout task from running and // calling cancel after 'cancelable' completes successfully. CancelTimeoutRunnable cancelTimeoutRunnable = new CancelTimeoutRunnable(cancelable, timeoutFuture); // taskFuture is used to cancel 'cancelable' and to determine if // 'cancelable' is done. FutureTask
taskFuture = new FutureTask(cancelTimeoutRunnable, null); TaskHandle handle = new TaskHandle(cancelable, taskFuture, System.currentTimeMillis()); timeoutTask.setTaskHandle(handle); try { // Schedule timeoutTask to run when 'cancelable's maximum run interval // has expired. timeoutService.schedule(timeoutFuture, maximumTaskLifeMillis, TimeUnit.MILLISECONDS); // TODO(strellis): test/handle timer pop/cancel before submit. In // production with a 30 minute timeout this should never happen. completionService.submit(taskFuture, null); } catch (RejectedExecutionException re) { if (!executor.isShutdown()) { LOGGER.log(Level.SEVERE, "Unable to execute task", re); } handle = null; } return handle; } /** * A {
@link Runnable} for running {
@link TimedCancelable} that has been * guarded by a timeout task. This will cancel the timeout task when the * {
@link TimedCancelable} completes. If the timeout task has already run then * canceling it has no effect. */ private class CancelTimeoutRunnable implements Runnable { private final Future
timeoutFuture; private final TimedCancelable cancelable; /** * Constructs a {
@link CancelTimeoutRunnable}. * * @param cancelable the {
@link TimedCancelable} this runs. * @param timeoutFuture the {
@link Future} for canceling the timeout task. */ CancelTimeoutRunnable(TimedCancelable cancelable, Future
timeoutFuture) { this.timeoutFuture = timeoutFuture; this.cancelable = cancelable; } public void run() { try { cancelable.run(); } finally { timeoutFuture.cancel(true); timeoutService.purge(); } } } /** * A task that cancels another task that is running a {
@link TimedCancelable}. * The {
@link TimeoutTask} should be scheduled to run when the interval for * the {
@link TimedCancelable} to run expires. */ private static class TimeoutTask implements Runnable { final TimedCancelable timedCancelable; private volatile TaskHandle taskHandle; TimeoutTask(TimedCancelable timedCancelable) { this.timedCancelable = timedCancelable; } public void run() { if (taskHandle == null) { throw new IllegalStateException( "Run TimeoutTask called with null taskHandle."); } timedCancelable.timeout(taskHandle); } void setTaskHandle(TaskHandle taskHandle) { this.taskHandle = taskHandle; } } /** * A task that gets completion information from all the tasks that run in a * {
@link CompletionService} and logs uncaught exceptions that cause the tasks * to fail. */ private class CompletionTask implements Runnable { private void completeTask() throws InterruptedException { Future
future = completionService.take(); try { future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); // TODO(strellis): Should we call cancelable.cancel() if we get an // exception? if (cause instanceof InterruptedException) { LOGGER.log(Level.INFO, "Batch termiated due to an interrupt.", cause); } else { LOGGER.log(Level.SEVERE, "Batch failed with unhandled exception", cause); } } } public void run() { try { while (!Thread.currentThread().isInterrupted()) { completeTask(); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } LOGGER.info("Completion task shutdown."); } } /** * A {
@link ThreadFactory} that adds a prefix to thread names assigned * by {
@link Executors#defaultThreadFactory()} to provide diagnostic * context in stack traces. */ private static class ThreadNamingThreadFactory implements ThreadFactory { private final ThreadFactory delegate = Executors.defaultThreadFactory(); private final String namePrefix; ThreadNamingThreadFactory(String namePrefix) { this.namePrefix = namePrefix + "-"; } public Thread newThread(Runnable r) { Thread t = delegate.newThread(r); t.setName(namePrefix + t.getName()); return t; } }}

这里主要用到了java的多线程处理框架

TaskHandle类封装了线程执行的返回信息并提供对线程进行操作的方法,其源码如下:

/** * Handle for the management of a {
@link TimedCancelable} primary task. */public class TaskHandle { /** * The primary {
@link TimedCancelable} that is run by this task to * perform some useful work. */ final TimedCancelable cancelable; /* * The {@link future} for the primary task. */ final Future
taskFuture; /* * The time the task starts. */ final long startTime; /** * Create a TaskHandle. * * @param cancelable {
@link TimedCancelable} for the primary task. * @param taskFuture {
@link Future} for the primary task. * @param startTime startTime for the primary task. */ TaskHandle(TimedCancelable cancelable, Future
taskFuture, long startTime) { this.cancelable = cancelable; this.taskFuture = taskFuture; this.startTime = startTime; } /** * Cancel the primary task and the time out task. */ public void cancel() { cancelable.cancel(); taskFuture.cancel(true); } /** * Return true if the primary task has completed. */ public boolean isDone() { return taskFuture.isDone(); }}

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本文链接http://www.cnblogs.com/chenying99/archive/2013/03/19/2968401.html

你可能感兴趣的文章
数据结构(c语言版)文摘
查看>>
【转】用SQL实现树的查询
查看>>
绘图 Painter转接口封装的方式
查看>>
linux kill 关闭进程命令
查看>>
百度BAE环境下WordPress搭建过程
查看>>
struct ifreq 获取IP 和mac和修改mac
查看>>
twitter storm源码走读之4 -- worker进程中线程的分类及用途
查看>>
apache + tomcat 集群
查看>>
C# ComboBox自动完成功能的示例
查看>>
VC++6.0和VS2005在编写MFC应用程序时,操作方面的差异
查看>>
从tableview中拖动某个精灵
查看>>
AE读取CAD图层包括注记
查看>>
误区30日谈16-20
查看>>
【ASP.NET Web API教程】6.4 模型验证
查看>>
Android 实现书籍翻页效果----完结篇
查看>>
JS实现页面打印
查看>>
广州市例外服饰有限公司_百度百科
查看>>
2014年3月新鲜出炉的最佳 JavaScript 工具库
查看>>
Android特性与系统架构
查看>>
java基础学习总结——Object类
查看>>