ActivityManagerService广播并行发送与串行发送怎么实现

其他教程   发布日期:2023年07月25日   浏览次数:342

这篇文章主要讲解了“ActivityManagerService广播并行发送与串行发送怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“ActivityManagerService广播并行发送与串行发送怎么实现”吧!

"并行"广播的发送

本文以 ActivityManagerService之广播(1): 注册与发送 为基础,分析“串行”和“并行”广播的发送流程,并介绍广播 ANR 的原理。

// 1. 获取广播队列
final BroadcastQueue queue = broadcastQueueForIntent(intent);
// 2. 创建广播记录
BroadcastRecord r = new BroadcastRecord(queue, intent, callerApp, callerPackage,
        callerFeatureId, callingPid, callingUid, callerInstantApp, resolvedType,
        requiredPermissions, excludedPermissions, appOp, brOptions, registeredReceivers,
        resultTo, resultCode, resultData, resultExtras, ordered, sticky, false, userId,
        allowBackgroundActivityStarts, backgroundActivityStartsToken,
        timeoutExempt);
// 3. 广播记录加入到并行队列中
queue.enqueueParallelBroadcastLocked(r);
// 4. 调度发送广播
queue.scheduleBroadcastsLocked();

第3步,把广播记录保存到并行队列中

// BroadcastQueue.java
public void enqueueParallelBroadcastLocked(BroadcastRecord r) {
    // mParallelBroadcasts 类型为 ArrayList<BroadcastRecord>
    mParallelBroadcasts.add(r);
    enqueueBroadcastHelper(r);
}

第4步,调度发送广播,最终会调用如下方法

// BroadcastQueue.java
// 此时,参数 fromMsg 为 true,skipOomAdj 为 false
final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    BroadcastRecord r;
    mService.updateCpuStats();
    if (fromMsg) {
        mBroadcastsScheduled = false;
    }
    // 遍历"并行"广播队列
    while (mParallelBroadcasts.size() > 0) {
        r = mParallelBroadcasts.remove(0);
        r.dispatchTime = SystemClock.uptimeMillis();
        r.dispatchClockTime = System.currentTimeMillis();
        final int N = r.receivers.size();
        for (int i=0; i<N; i++) {
            Object target = r.receivers.get(i);
            // 广播发送给动态接收器
            deliverToRegisteredReceiverLocked(r,
                    (BroadcastFilter) target, false, i);
        }
        addBroadcastToHistoryLocked(r);
    }
    // ... 省略"串行"广播的发送 ...
}

虽然名为“并行”广播,但是仍然是从队列取出广播,然后逐个发送给动态接收器。很显然,这里的行为与“并行”的含义并不一致?那么广播的“并行”发送究竟是什么意思?

接着看“并行”广播如何发送给动态接收器的

private void deliverToRegisteredReceiverLocked(BroadcastRecord r,
        BroadcastFilter filter, boolean ordered, int index) {
    // ... 省略一大堆的权限或者异常检测 ...
    // 一个广播可能有多个接收者,因此需要一个数组来保存发送的状态
    r.delivery[index] = BroadcastRecord.DELIVERY_DELIVERED;
    // ordered 目前为 false
    if (ordered) {
        // ...
        }
    } else if (filter.receiverList.app != null) {
        // 马上要发送广播给接收方,因此要暂时解冻接收方的进程
        mService.mOomAdjuster.mCachedAppOptimizer.unfreezeTemporarily(filter.receiverList.app);
    }
    try {
        if (filter.receiverList.app != null && filter.receiverList.app.isInFullBackup()) {
            // ... 处于备份状态中 ...
        } else {
            r.receiverTime = SystemClock.uptimeMillis();
            // 保存允许从后台启动activity的token
            maybeAddAllowBackgroundActivityStartsToken(filter.receiverList.app, r);
            // 添加到省电模式白名单中
            maybeScheduleTempAllowlistLocked(filter.owningUid, r, r.options);
            // 执行广播的发送
            performReceiveLocked(filter.receiverList.app, filter.receiverList.receiver,
                    new Intent(r.intent), r.resultCode, r.resultData,
                    r.resultExtras, r.ordered, r.initialSticky, r.userId);
            // parallel broadcasts are fire-and-forget, not bookended by a call to
            // finishReceiverLocked(), so we manage their activity-start token here
            if (filter.receiverList.app != null
                    && r.allowBackgroundActivityStarts && !r.ordered) {
                postActivityStartTokenRemoval(filter.receiverList.app, r);
            }
        }
        // ordered 目前为 false
        if (ordered) {
            r.state = BroadcastRecord.CALL_DONE_RECEIVE;
        }
    } catch (RemoteException e) {
        // ...
    }
}

抛开一些细节,直接看 performReceiveLocked()

// BroadcastQueue.java
void performReceiveLocked(ProcessRecord app, IIntentReceiver receiver,
        Intent intent, int resultCode, String data, Bundle extras,
        boolean ordered, boolean sticky, int sendingUser)
        throws RemoteException {
    // 动态广播接收器的进程,应该是存在的
    if (app != null) {
        final IApplicationThread thread = app.getThread();
        if (thread != null) {
            try {
                // 发送广播给接收方进程
                thread.scheduleRegisteredReceiver(receiver, intent, resultCode,
                        data, extras, ordered, sticky, sendingUser,
            } catch (RemoteException ex) {
               // ...
            }
        } else {
            throw new RemoteException("app.thread must not be null");
        }
    } else {
        // ...
    }
}

很简单,就是通过进程 attach 的 IApplicationThread 接口,发送广播给进程。这个过程,暂时先不分析,后面会分析到。

那么,现在来回答一下,何为“并行”广播?其实这个答案,我也是对比了串行广播的发送过程,才得出来的。所谓的"并行"发送,实际上就是把广播逐个发送给动态接收器,但是不需要等待前一个接收器反馈处理结果,就可以发送下一个。而“串行”广播的发送,是需要等待前一个广播接收器反馈处理结果后,才能调度发送下一个广播。

“串行”广播的发送

// 1.获取广播队列
BroadcastQueue queue = broadcastQueueForIntent(intent);
// 2.创建广播记录
BroadcastRecord r = new BroadcastRecord(queue, intent, callerApp, callerPackage,
        callerFeatureId, callingPid, callingUid, callerInstantApp, resolvedType,
        requiredPermissions, excludedPermissions, excludedPackages, appOp, brOptions,
        receivers, resultTo, resultCode, resultData, resultExtras,
        ordered, sticky, false, userId, allowBackgroundActivityStarts,
        backgroundActivityStartsToken, timeoutExempt);
// 3.广播记录加入到串行队列中
queue.enqueueOrderedBroadcastLocked(r);
// 4.调度发送广播
queue.scheduleBroadcastsLocked();

第3步,广播加入到串行队列中

// BroadcastQueue.java
public void enqueueOrderedBroadcastLocked(BroadcastRecord r) {
    mDispatcher.enqueueOrderedBroadcastLocked(r);
    enqueueBroadcastHelper(r);
}
// BroadcastDispatcher.java
void enqueueOrderedBroadcastLocked(BroadcastRecord r) {
    mOrderedBroadcasts.add(r);
}

并行发送的广播保存到 BroadcastQueue#mParallelBroadcasts 中,而串行发送的广播保存到 BroadcastDispatcher#mOrderedBroadcasts 中,为何要这样设计呢?有兴趣的读者可以研究下。

第4步,“串行”广播的调度发送,仍然使用的是 processNextBroadcastLocked() 方法,但是代码量是非常的大,下面将把函数分段解析。

processNextBroadcastLocked() 函数有400多行代码,这个函数里有很多东西都可以抽出来的,但是随着版本的更新,这块代码一直没有优化过。

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    BroadcastRecord r;
    mService.updateCpuStats();
    if (fromMsg) {
        mBroadcastsScheduled = false;
    }
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // 检测 receiver 进程是否死亡
        boolean isDead;
        if (mPendingBroadcast.curApp.getPid() > 0) {
            synchronized (mService.mPidsSelfLocked) {
                ProcessRecord proc = mService.mPidsSelfLocked.get(
                        mPendingBroadcast.curApp.getPid());
                isDead = proc == null || proc.mErrorState.isCrashing();
            }
        } else {
            final ProcessRecord proc = mService.mProcessList.getProcessNamesLOSP().get(
                    mPendingBroadcast.curApp.processName, mPendingBroadcast.curApp.uid);
            isDead = proc == null || !proc.isPendingStart();
        }
        if (!isDead) {
            // 进程仍然存活,结束此次广播的处理流程,继续等待
            // 等待什么呢?等待广播进程起来,并与 AMS 完成 attach application
            // 在 attach application 的过程中,会完成广播的发送
            return;
        } else {
            // 进程死亡,继续处理下一个广播
            mPendingBroadcast.state = BroadcastRecord.IDLE;
            mPendingBroadcast.nextReceiver = mPendingBroadcastRecvIndex;
            mPendingBroadcast = null;
        }
    }

当发送一个广播给 receiver 时,如果 receiver 进程没有启动,那么会先 fork 一个 receiver 进程,然后用 mPendingBroadcast 保存待发送的广播。当 receiver 进程起来的时候,会与 AMS 执行 attach application 过程,在这个过程中,会自动把 mPendingBroadcast 保存的广播发送给 receiver 进程。

因此,这里检测到 mPendingBroadcast 不为 null 时,那么 receiver 进程肯定在启动中,只要 receiver 进程没有死亡,就什么也不用做,因为广播会自动发送给 receiver 进程。

接着看下一步的处理

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    // ...
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // ...
    }
    boolean looped = false;
    // 2. 通过 do-while 循环,找到一个现在可以处理的广播
    do {
        final long now = SystemClock.uptimeMillis();
        // 获取一个待处理的广播
        r = mDispatcher.getNextBroadcastLocked(now);
        if (r == null) {
            // ... 没有广播需要处理 ...
            return;
        }
        boolean forceReceive = false;
        // 处理严重超时的广播,有两种情况
        // 一种情况是,在系统还没有起来前,发送的广播得不到执行,发生严重超时
        // 另外一种情况是,在系统起来后,有一些超时豁免的广播,发生了严重超时
        int numReceivers = (r.receivers != null) ? r.receivers.size() : 0;
        if (mService.mProcessesReady && !r.timeoutExempt && r.dispatchTime > 0) {
            if ((numReceivers > 0) &&
                    (now > r.dispatchTime + (2 * mConstants.TIMEOUT * numReceivers))) {
                broadcastTimeoutLocked(false); // forcibly finish this broadcast
                forceReceive = true;
                r.state = BroadcastRecord.IDLE;
            }
        }
        if (r.state != BroadcastRecord.IDLE) {
            return;
        }
        // 当前广播因为某种原因,终止处理,然后处理下一个广播 
        if (r.receivers == null || r.nextReceiver >= numReceivers
                || r.resultAbort || forceReceive) {
            // ...
            // 通知 BroadcastDispatcher ,不处理这个广播了
            mDispatcher.retireBroadcastLocked(r);
            r = null;
            looped = true;
            // 下一次循环,获取下一个广播来处理
            continue;
        }
        // 处理推迟发送广播的情况
        if (!r.deferred) {
            final int receiverUid = r.getReceiverUid(r.receivers.get(r.nextReceiver));
            if (mDispatcher.isDeferringLocked(receiverUid)) {
                // ...
                // 保存推迟发送的广播
                mDispatcher.addDeferredBroadcast(receiverUid, defer);
                r = null;
                looped = true;
                // 下一次循环时,获取下一个广播来处理
                continue;
            }
        }
    } while (r == null);

先从整体看,通过一个 do-while 循环,最终是为了找到下一个处理的广播。为何要用一个循环来寻找呢? 因为广播可能没有接收器,或者已经严重超时,又或者广播需要推迟发送。所以要通过一个循环,找到一个能立即发送的广播。

由于本文主要是为了分析广播发送的整体流程,对于有些细节,只做注释而不做细致分析。需要深入研究的读者,可以在本文的基础上继续分析。

继续接着看下一步

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    BroadcastRecord r;
    mService.updateCpuStats();
    if (fromMsg) {
        mBroadcastsScheduled = false;
    }
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // ...
    }
    boolean looped = false;
    // 2. 通过 do-while 循环,找到一个现在可以处理的广播
    do {
        final long now = SystemClock.uptimeMillis();
        // 获取一个待处理的广播
        r = mDispatcher.getNextBroadcastLocked(now);
        // ...
    } while (r == null);
    // 走到这里,表示已经获取了一个现在可以处理的广播
    int recIdx = r.nextReceiver++;
    // 3. 在发送广播之前,先发送一个超时消息
    r.receiverTime = SystemClock.uptimeMillis();
    if (recIdx == 0) {
        // 在广播开始发送给第一个接收器时,记录发送的时间
        r.dispatchTime = r.receiverTime;
        r.dispatchClockTime = System.currentTimeMillis();
    }
    if (! mPendingBroadcastTimeoutMessage) {
        long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
        setBroadcastTimeoutLocked(timeoutTime);
    }

在广播发送给一个 receiver 之前,会先发送一个超时消息。从广播准备发送给一个 receiver 算起,到 receiver 处理完广播,并反馈给 AMS,如果这个时间段超过了一个时间阈值,就会引发 ANR。触发 ANR 的代码设计非常巧妙,后面会具体分析这个过程。

接着看下一步

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    // ...
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // ...
    }
    boolean looped = false;
    // 2. 通过 do-while 循环,找到一个现在可以处理的广播
    do {
        // ...
    } while (r == null);
    int recIdx = r.nextReceiver++;
    // ...
    // 3. 在发送广播之前,先发送一个超时消息
    // 当广播处理超时时,会触发 ANR
    if (! mPendingBroadcastTimeoutMessage) {
        long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
        setBroadcastTimeoutLocked(timeoutTime);
    }
    final BroadcastOptions brOptions = r.options;
    // 4. 获取一个 receiver
    final Object nextReceiver = r.receivers.get(recIdx);
    // 5. 如果这个接收器是动态接收器,先把广播发送给它
    // 注意,这里处理的是有序广播发送给动态接收器的情况
    if (nextReceiver instanceof BroadcastFilter) {
        BroadcastFilter filter = (BroadcastFilter)nextReceiver;
        // 发送广播给动态接收器
        deliverToRegisteredReceiverLocked(r, filter, r.ordered, recIdx);
        if (r.receiver == null || !r.ordered) {
            if (DEBUG_BROADCAST) Slog.v(TAG_BROADCAST, "Quick finishing ["
                    + mQueueName + "]: ordered="
                    + r.ordered + " receiver=" + r.receiver);
            r.state = BroadcastRecord.IDLE;
            scheduleBroadcastsLocked();
        } else {
            if (filter.receiverList != null) {
                maybeAddAllowBackgroundActivityStartsToken(filter.receiverList.app, r);
            }
        }
        // 注意,把广播发送给 动态receiver 后,直接返回
        return;
    }

现在一切就绪,那么开始获取一个 receiver,当这个 receiver 是一个动态接收器时,直接发送广播给它,这个发送过程前面已经分析过。

注意,这里处理的情况是,把有序广播发送给动态接收器。并且发送完成后,直接 return, 也就是结束了此次广播的发送流程。

一个广播可能有多个接收器,为何这里只发送给一个动态接收器,就直接返回了? 这就是从“串行”广播的本质,需要等待当前的广播接收器处理完广播,并返回结果后,才能把广播发送给下一个广播接收器。

接着看下一步

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    // ...
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // ...
    }
    boolean looped = false;
    // 2. 通过 do-while 循环,找到一个现在可以处理的广播
    do {
        final long now = SystemClock.uptimeMillis();
        // 获取一个待处理的广播
        r = mDispatcher.getNextBroadcastLocked(now);
        // ...
    } while (r == null);
    // 走到这里,表示已经获取了一个现在可以处理的广播
    int recIdx = r.nextReceiver++;
    // 3. 在发送广播之前,先发送一个超时消息
    r.receiverTime = SystemClock.uptimeMillis();
    // ...
    if (! mPendingBroadcastTimeoutMessage) {
        long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
        setBroadcastTimeoutLocked(timeoutTime);
    }
    final BroadcastOptions brOptions = r.options;
    // 4. 获取一个 receiver
    final Object nextReceiver = r.receivers.get(recIdx);
    // 5. 如果这个接收器是动态接收器,先把广播发送给它
    // 注意,这里处理的是有序广播发送给动态接收器的情况
    if (nextReceiver instanceof BroadcastFilter) {
        // ...
        return;
    }
    // 走到这里,表示当前的广播接收器,是静态接收器
    // 获取静态接收器的信息
    ResolveInfo info =
        (ResolveInfo)nextReceiver;
    ComponentName component = new ComponentName(
            info.activityInfo.applicationInfo.packageName,
            info.activityInfo.name);
    boolean skip = false;
    // 6. 检测是否不需要把广播发送给静态接收器
    // ... 省略一大堆的检测代码 ...
    String targetProcess = info.activityInfo.processName;
    ProcessRecord app = mService.getProcessRecordLocked(targetProcess,
            info.activityInfo.applicationInfo.uid);
    if (!skip) {
        // 检测是否允许把广播发送给静态接收器
        final int allowed = mService.getAppStartModeLOSP(
                info.activityInfo.applicationInfo.uid, info.activityInfo.packageName,
                info.activityInfo.applicationInfo.targetSdkVersion, -1, true, false, false);
        // 例如,大于等于 O+ 版本的 app ,不允许广播发送给静态接收器
        if (allowed != ActivityManager.APP_START_MODE_NORMAL) {
            // ephemeral app 会返回这个模式
            if (allowed == ActivityManager.APP_START_MODE_DISABLED) {
                skip = true;
            } else if (((r.intent.getFlags()&Intent.FLAG_RECEIVER_EXCLUDE_BACKGROUND) != 0)
                    || (r.intent.getComponent() == null
                        && r.intent.getPackage() == null
                        && ((r.intent.getFlags()
                                & Intent.FLAG_RECEIVER_INCLUDE_BACKGROUND) == 0)
                        && !isSignaturePerm(r.requiredPermissions))) {
                // 打破以上任意一个条件,即可把广播发送给静态接收器
                mService.addBackgroundCheckViolationLocked(r.intent.getAction(),
                        component.getPackageName());
                skip = true;
            }
        }
    }
    // 跳过当前广播的发送
    if (skip) {
        // ...
        return;
    }

如果这个 reciever 是静态接收器,那么在把广播发送给它之前,首先得进行一大堆的检测。最常见的就是权限,但是这里展示了一段 Android O+ 限制广播发送给静态接收器的限制,有兴趣的读者可以详细分析。

接着看下一步

final void processNextBroadcastLocked(boolean fromMsg, boolean skipOomAdj) {
    // ...
    // 把并行广播发送给动态接收器
    while (mParallelBroadcasts.size() > 0) {
        // ...
    }
    // 1. 处理 receiver 进程正在启动的情况
    if (mPendingBroadcast != null) {
        // ...
    }
    boolean looped = false;
    // 2. 通过 do-while 循环,找到一个现在可以处理的广播
    do {
        final long now = SystemClock.uptimeMillis();
        // 获取一个待处理的广播
        r = mDispatcher.getNextBroadcastLocked(now);
        // ...
    } while (r == null);
    // 走到这里,表示已经获取了一个现在可以处理的广播
    int recIdx = r.nextReceiver++;
    // 3. 在发送广播之前,先发送一个超时消息
    r.receiverTime = SystemClock.uptimeMillis();
    // ...
    if (! mPendingBroadcastTimeoutMessage) {
        long timeoutTime = r.receiverTime + mConstants.TIMEOUT;
        setBroadcastTimeoutLocked(timeoutTime);
    }
    final BroadcastOptions brOptions = r.options;
    // 4. 获取一个 receiver
    final Object nextReceiver = r.receivers.get(recIdx);
    // 5. 如果这个接收器是动态接收器,先把广播发送给它
    // 注意,这里处理的是有序广播发送给动态接收器的情况
    if (nextReceiver instanceof BroadcastFilter) {
        // ...
        return;
    }
    // 走到这里,表示当前的广播接收器,是静态接收器
    ResolveInfo info =
        (ResolveInfo)nextReceiver;
    ComponentName component = new ComponentName(
            info.activityInfo.applicationInfo.packageName,
            info.activityInfo.name);
    boolean skip = false;
    // 6. 检测是否不需要把广播发送给静态接收器
    // ... 省略一大堆的检测代码 ...
    String targetProcess = info.activityInfo.processName;
    ProcessRecord app = mService.getProcessRecordLocked(targetProcess,
            info.activityInfo.applicationInfo.uid);
    // ...
    // 跳过当前广播的发送
    if (skip) {
        // ...
        return;
    }
    // 现在可以把广播发送给静态接收器了
    // ...
    // 7. 静态接收器的进程正在运行,那么就把广播发送给它
    if (app != null && app.getThread() != null && !app.isKilled()) {
        try {
            app.addPackage(info.activityInfo.packageName,
                    info.activityInfo.applicationInfo.longVersionCode, mService.mProcessStats);
            maybeAddAllowB						
						
						
						
						
						
						
					

以上就是ActivityManagerService广播并行发送与串行发送怎么实现的详细内容,更多关于ActivityManagerService广播并行发送与串行发送怎么实现的资料请关注九品源码其它相关文章!