Android源码系列(17) -- QueuedWork

栏目: IOS · Android · 发布时间: 5年前

内容简介:这是个内部工具类,用于跟踪那些未完成的或尚未结束的全局任务,新任务通过方法这个类用于可延迟

一、类签名

这是个内部 工具 类,用于跟踪那些未完成的或尚未结束的全局任务,新任务通过方法 queue 加入。添加 finisher 的runnables,由 waitToFinish 方法保证执行,用于保证任务已被处理完成。

这个类用于 SharedPreference 编辑后修改异步写入到磁盘,所以设计一个在 Activity.onPause 或类似地方等待写入操作机制,而这个机制也能用于其他功能。所有排队的异步任务都在一个独立、专用的线程上处理。

public class QueuedWork

二、常量

可延迟 Runnable 的延迟值。该值设置得尽可能大,但也需要足够小,令延迟难以察觉。

private static final long DELAY = 100;

waitToFinish() 运行超过 MAX_WAIT_TIME_MILLIS 毫秒,发出警告

private static final long MAX_WAIT_TIME_MILLIS = 512;

本类使用的锁对象

private static final Object sLock = new Object();

三、静态变量

用此锁约束任何时候都只有一个线程在处理一个任务,意味着执行顺序就是任务添加的顺序。

因为任务执行过程中会一直持有 sProcessingWork 锁,所以 sProcessingWork 独立于 sLock 避免阻塞整个类的运转

private static Object sProcessingWork = new Object();

存放 Finisher 的链表。通过 added 方法添加,或 removed 方法移除

@GuardedBy("sLock")
private static final LinkedList<Runnable> sFinishers = new LinkedList<>();

QueuedWork内部的静态 HandlerThread

@GuardedBy("sLock")
private static Handler sHandler = null;

存放通过 queue 方法添加任务的队列

@GuardedBy("sLock")
private static final LinkedList<Runnable> sWork = new LinkedList<>();

指示新任务是否能被延迟,默认为 true

@GuardedBy("sLock")
private static boolean sCanDelay = true;

任务执行前等待的时间

@GuardedBy("sLock")
private final static ExponentiallyBucketedHistogram
        mWaitTimes = new ExponentiallyBucketedHistogram(
        16);

正在等待处理的任务数

private static int mNumWaits = 0;

四、静态方法

4.1 getHandler()

HandlerThread实现细节请参考 Android源码系列(11) – HandlerThread 。该文章已经提到 HandlerThread 本质是线程,启动 HandlerThread 即启动子线程,并让任务在该线程上执行。

private static Handler getHandler() {
    synchronized (sLock) {
        if (sHandler == null) {
            HandlerThread handlerThread = new HandlerThread("queued-work-looper",
                    Process.THREAD_PRIORITY_FOREGROUND);
            handlerThread.start();

            sHandler = new QueuedWorkHandler(handlerThread.getLooper());
        }
        return sHandler;
    }
}

4.2 addFinisher(Runnable finisher)

添加 finisher-runnable 等待 queue() 异步处理任务,由 SharedPreferences$Editor.startCommit() 使用

应注意的是,这个方法不会引起 finisher 的执行。这只是调用者的集合,让调用者异步跟踪任务的最新状态。在大多数情况下,调用者(如:SharedPreferences) 会在 add() 后很快又调用 remove() 。这些任务实际在 waitToFinish() 执行

public static void addFinisher(Runnable finisher) {
    synchronized (sLock) {
        sFinishers.add(finisher);
    }
}

4.3 removeFinisher(Runnable finisher)

移除一个已经添加的 finisher-runnable

public static void removeFinisher(Runnable finisher) {
    synchronized (sLock) {
        sFinishers.remove(finisher);
    }
}

4.4 waitToFinish()

触发排队任务马上执行。排队任务在独立的线程异步执行。任务和 finishers 都在这个线程上运行, finishers 通过这种方式实现检查排队任务是否完成。

Activity.onPause()、 BroadcastReceiver.onReceive()之后处理Service.command之后 都会调用此方法,所以异步任务永远不会丢失

public static void waitToFinish() {
    long startTime = System.currentTimeMillis();
    boolean hadMessages = false;

    Handler handler = getHandler();

    synchronized (sLock) {
        if (handler.hasMessages(QueuedWorkHandler.MSG_RUN)) {
            // 延迟任务会在processPendingWork()中处理
            handler.removeMessages(QueuedWorkHandler.MSG_RUN);
        }

        // We should not delay any work as this might delay the finishers
        sCanDelay = false;
    }

    StrictMode.ThreadPolicy oldPolicy = StrictMode.allowThreadDiskWrites();
    try {
        processPendingWork();
    } finally {
        StrictMode.setThreadPolicy(oldPolicy);
    }

    try {
        while (true) {
            Runnable finisher;
            
            // 从sFinishers获取一个Finisher
            synchronized (sLock) {
                finisher = sFinishers.poll();
            }

            if (finisher == null) {
                break;
            }
            
            // 执行Finisher
            finisher.run();
        }
    } finally {
        sCanDelay = true;
    }

    // 统计任务执行的总时间和等待次数
    synchronized (sLock) {
        long waitTime = System.currentTimeMillis() - startTime;

        if (waitTime > 0 || hadMessages) {
            mWaitTimes.add(Long.valueOf(waitTime).intValue());
            mNumWaits++;
        }
    }
}

ActivityThread 类内共有五段源码调用 QueuedWork.waitToFinish();

位置一:

private void handleServiceArgs(ServiceArgsData data) {
    Service s = mServices.get(data.token);
    if (s != null) {
        try {
            if (data.args != null) {
                data.args.setExtrasClassLoader(s.getClassLoader());
                data.args.prepareToEnterProcess();
            }
            int res;
            if (!data.taskRemoved) {
                res = s.onStartCommand(data.args, data.flags, data.startId);
            } else {
                s.onTaskRemoved(data.args);
                res = Service.START_TASK_REMOVED_COMPLETE;
            }

            QueuedWork.waitToFinish();

            try {
                ActivityManager.getService().serviceDoneExecuting(
                        data.token, SERVICE_DONE_EXECUTING_START, data.startId, res);
            } catch (RemoteException e) {
                throw e.rethrowFromSystemServer();
            }
            ensureJitEnabled();
        } catch (Exception e) {
            if (!mInstrumentation.onException(s, e)) {
                throw new RuntimeException(
                        "Unable to start service " + s
                        + " with " + data.args + ": " + e.toString(), e);
            }
        }
    }
}

位置二:

private void handleStopService(IBinder token) {
    Service s = mServices.remove(token);
    if (s != null) {
        try {
            if (localLOGV) Slog.v(TAG, "Destroying service " + s);
            s.onDestroy();
            s.detachAndCleanUp();
            Context context = s.getBaseContext();
            if (context instanceof ContextImpl) {
                final String who = s.getClassName();
                ((ContextImpl) context).scheduleFinalCleanup(who, "Service");
            }

            QueuedWork.waitToFinish();

            try {
                ActivityManager.getService().serviceDoneExecuting(
                        token, SERVICE_DONE_EXECUTING_STOP, 0, 0);
            } catch (RemoteException e) {
                throw e.rethrowFromSystemServer();
            }
        } catch (Exception e) {
            if (!mInstrumentation.onException(s, e)) {
                throw new RuntimeException(
                        "Unable to stop service " + s
                        + ": " + e.toString(), e);
            }
            Slog.i(TAG, "handleStopService: exception for " + token, e);
        }
    } else {
        Slog.i(TAG, "handleStopService: token=" + token + " not found.");
    }
    //Slog.i(TAG, "Running services: " + mServices);
}

位置三:

@Override
public void handlePauseActivity(IBinder token, boolean finished, boolean userLeaving,
        int configChanges, PendingTransactionActions pendingActions, String reason) {
    ActivityClientRecord r = mActivities.get(token);
    if (r != null) {
        if (userLeaving) {
            performUserLeavingActivity(r);
        }

        r.activity.mConfigChangeFlags |= configChanges;
        performPauseActivity(r, finished, reason, pendingActions);

        // Make sure any pending writes are now committed.
        if (r.isPreHoneycomb()) {
            QueuedWork.waitToFinish();
        }
        mSomeActivitiesChanged = true;
    }
}

位置四:

@Override
public void handleStopActivity(IBinder token, boolean show, int configChanges,
        PendingTransactionActions pendingActions, boolean finalStateRequest, String reason) {
    final ActivityClientRecord r = mActivities.get(token);
    r.activity.mConfigChangeFlags |= configChanges;

    final StopInfo stopInfo = new StopInfo();
    performStopActivityInner(r, stopInfo, show, true /* saveState */, finalStateRequest,
            reason);

    if (localLOGV) Slog.v(
        TAG, "Finishing stop of " + r + ": show=" + show
        + " win=" + r.window);

    updateVisibility(r, show);

    // Make sure any pending writes are now committed.
    if (!r.isPreHoneycomb()) {
        QueuedWork.waitToFinish();
    }

    stopInfo.setActivity(r);
    stopInfo.setState(r.state);
    stopInfo.setPersistentState(r.persistentState);
    pendingActions.setStopInfo(stopInfo);
    mSomeActivitiesChanged = true;
}

位置五:

// TODO: This method should be changed to use {@link #performStopActivityInner} to perform to
// stop operation on the activity to reduce code duplication and the chance of fixing a bug in
// one place and missing the other.
private void handleSleeping(IBinder token, boolean sleeping) {
    ActivityClientRecord r = mActivities.get(token);

    if (r == null) {
        Log.w(TAG, "handleSleeping: no activity for token " + token);
        return;
    }

    if (sleeping) {
        if (!r.stopped && !r.isPreHoneycomb()) {
            callActivityOnStop(r, true /* saveState */, "sleeping");
        }

        // Make sure any pending writes are now committed.
        if (!r.isPreHoneycomb()) {
            QueuedWork.waitToFinish();
        }

        // Tell activity manager we slept.
        try {
            ActivityManager.getService().activitySlept(r.token);
        } catch (RemoteException ex) {
            throw ex.rethrowFromSystemServer();
        }
    } else {
        if (r.stopped && r.activity.mVisibleFromServer) {
            r.activity.performRestart(true /* start */, "handleSleeping");
            r.setState(ON_START);
        }
    }
}

4.5 queue(Runnable work, boolean shouldDelay)

安排任务异步执行。 work 是新加入的任务, shouldDelay 指明本任务是否能延迟处理。

SharedPreferences.Editor.appy()源码间接调用此方法,调用时参数 shouldDelaytrue

public static void queue(Runnable work, boolean shouldDelay) {
    Handler handler = getHandler();

    synchronized (sLock) {
        // 任务放入sWork列表中
        sWork.add(work);

        // 然后放入一个Message作为通知
        if (shouldDelay && sCanDelay) {
            // sCanDelay默认为true
            handler.sendEmptyMessageDelayed(QueuedWorkHandler.MSG_RUN, DELAY);
        } else {
            handler.sendEmptyMessage(QueuedWorkHandler.MSG_RUN);
        }
    }
}

4.6 hasPendingWork()

当队列存在等待的异步任务时返回 true

public static boolean hasPendingWork() {
    synchronized (sLock) {
        return !sWork.isEmpty();
    }
}

4.7 processPendingWork()

从任务队列中取任务执行

private static void processPendingWork() {
    // 此处使用sProcessingWork锁,保证sWork
    synchronized (sProcessingWork) {
        LinkedList<Runnable> work;

        synchronized (sLock) {
            // 克隆sWork为work实例,work内任务将送去执行
            work = (LinkedList<Runnable>) sWork.clone();
            // 清除sWork
            sWork.clear();

            // 利用QueuedWorkHandler.MSG_RUN移除相关Messages,避免以后收到通知但任务早已执行
            getHandler().removeMessages(QueuedWorkHandler.MSG_RUN);
        }
        
        // 根据work任务排列顺序依次执行
        if (work.size() > 0) {
            for (Runnable w : work) {
                w.run();
            }
        }
    }
}

五、QueuedWorkHandler

收到 msg.what == MSG_RUN 消息的通知,去调用processPendingWork()方法

private static class QueuedWorkHandler extends Handler {
    static final int MSG_RUN = 1;

    QueuedWorkHandler(Looper looper) {
        super(looper);
    }

    public void handleMessage(Message msg) {
        // 收到Message
        if (msg.what == MSG_RUN) {
            // 触发方法调用,把sWork内所有任务批量送去执行
            processPendingWork();
        }
    }
}

以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数据挖掘十大算法

数据挖掘十大算法

(美)吴信东(Xindong Wu)、(美),库玛尔 ,(Vipin Kumar) / 李文波、吴素研 / 清华大学出版社 / 2013-5 / 39.00元

《世界著名计算机教材精选:数据挖掘十大算法》详细介绍了在实际中用途最广、影响最大的十种数据挖掘算法,这十种算法是数据挖掘领域的顶级专家进行投票筛选的,覆盖了分类、聚类、统计学习、关联分析和链接分析等重要的数据挖掘研究和发展主题。《世界著名计算机教材精选:数据挖掘十大算法》对每一种算法都进行了多个角度的深入剖析,包括算法历史、算法过程、算法特性、软件实现、前沿发展等,此外,在每章最后还给出了丰富的习......一起来看看 《数据挖掘十大算法》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

Markdown 在线编辑器
Markdown 在线编辑器

Markdown 在线编辑器

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具