本文共 18032 字,大约阅读时间需要 60 分钟。
http://blog.csdn.net/new_abc/article/details/7467537
前面有篇文章介绍了adb install的后面的流程,但前面的通信过程没有怎么介绍,这里主要介绍下adb 启动的流程,以及connect、install的前面流程,这里介绍的都是服务端的。
一、adb 启动流程:
首先看下服务端adb 启动的流程
adb_main主要 调用4个初始化函数:
1、init_transport_registration:初始化本地事务处理,每个client连接都会有一个专门的处理,对每个client都会在服务端有一对套接字对与该client相连的socket上的数据流进行处理。
2、install_listener(local_name, "*smartsocket*", NULL),这个函数 暂时还不知道做什么的;
3、local_init,初始化本地套接字,等待client的连接。
4、init_jdwp这个好像是跟dalvik中间的某个东东通讯的,没去研究。
5、fdevent_loop,事件处理,当相应的套接字上有数据到来时,调用注册的函数进行处理。
这里主要介绍下init_transport_registration,local_init,fdevent_loop这三个函数。
1)、init_transport_registration
- void init_transport_registration(void)
- {
- int s[2];
-
- if(adb_socketpair(s)){
- fatal_errno("cannot open transport registration socketpair");
- }
-
- transport_registration_send = s[0];
- transport_registration_recv = s[1];
-
- fdevent_install(&transport_registration_fde,
- transport_registration_recv,
- transport_registration_func,
- 0);
-
- fdevent_set(&transport_registration_fde, FDE_READ);
- }
这里调用adb_socketpair建立一对匿名的已经连接的套接字,分别赋给transport_registration_send和transport_registration_recv,之后调用fdevent_install把transport_registration_recv这个套接字注册到监听到套接字里面,并把相应的处理函数设置为transport_registration_func。
2)、local_init
- void local_init(int port)
- {
- adb_thread_t thr;
- void* (*func)(void *);
-
- if(HOST) {
- func = client_socket_thread;
- } else {
- func = server_socket_thread;
- }
-
- D("transport: local %s init\n", HOST ? "client" : "server");
-
- if(adb_thread_create(&thr, func, (void *)port)) {
- fatal_errno("cannot create local socket %s thread",
- HOST ? "client" : "server");
- }
- }
这里是server端,所以这里func为server_socket_thread,这里创建了这个线程:
- static void *server_socket_thread(void * arg)
- {
- int serverfd, fd;
- struct sockaddr addr;
- socklen_t alen;
- int port = (int)arg;
-
- D("transport: server_socket_thread() starting\n");
- serverfd = -1;
- for(;;) {
- if(serverfd == -1) {
- serverfd = socket_inaddr_any_server(port, SOCK_STREAM);
- if(serverfd < 0) {
- D("server: cannot bind socket yet\n");
- adb_sleep_ms(1000);
- continue;
- }
- close_on_exec(serverfd);
- }
-
- alen = sizeof(addr);
- D("server: trying to get new connection from %d\n", port);
- fd = adb_socket_accept(serverfd, &addr, &alen);
- if(fd >= 0) {
- D("server: new connection on fd %d\n", fd);
- close_on_exec(fd);
- disable_tcp_nagle(fd);
- register_socket_transport(fd, "host", port, 1);
- }
- }
- D("transport: server_socket_thread() exiting\n");
- return 0;
- }
这里调用socket_inaddr_any_server建立监听套接字,这里的port默认是5555,再调用adb_socket_accept等待客户连接的到来。
3)、fdevent_loop
- void fdevent_loop()
- {
- fdevent *fde;
-
- for(;;) {
- #if DEBUG
- fprintf(stderr,"--- ---- waiting for events\n");
- #endif
- fdevent_process();
-
- while((fde = fdevent_plist_dequeue())) {
- unsigned events = fde->events;
- fde->events = 0;
- fde->state &= (~FDE_PENDING);
- dump_fde(fde, "callback");
- fde->func(fde->fd, events, fde->arg);
- }
- }
- }
这里先调用fdevent_process启动select或epoll检测机制,这里面会把有数据到来的套接字消息调用fdevent_plist_enqueue压入list_pending,调用fdevent_plist_dequeue从list_pending取出一条消息,并调用它的处理函数进行处理。
二、adb connect流程:
上面 我们讲server_socket_thread讲到创建了一个线程并监听等待客户连接的到来,当client连接过来的时候, 这里accept会返回,我们接着往下看,会调用register_socket_transport注册这个连接
- void register_socket_transport(int s, const char *serial, int port, int local)
- {
- atransport *t = calloc(1, sizeof(atransport));
- D("transport: %p init'ing for socket %d, on port %d\n", t, s, port);
- if ( init_socket_transport(t, s, port, local) < 0 ) {
- adb_close(s);
- free(t);
- return;
- }
- if(serial) {
- t->serial = strdup(serial);
- }
- register_transport(t);
- }
分配了一个atransport结构并调用init_socket_transport进行初始化
- int init_socket_transport(atransport *t, int s, int adb_port, int local)
- {
- int fail = 0;
-
- t->kick = remote_kick;
- t->close = remote_close;
- t->read_from_remote = remote_read;
- t->write_to_remote = remote_write;
- t->sfd = s;
- t->sync_token = 1;
- t->connection_state = CS_OFFLINE;
- t->type = kTransportLocal;
- t->adb_port = 0;
- }
注意这里的sdf是这个连接的套接字,还有remote_read及remote_write.
接着调用register_transport注册这个事务
- static void register_transport(atransport *transport)
- {
- tmsg m;
- m.transport = transport;
- m.action = 1;
- D("transport: %p registered\n", transport);
- if(transport_write_action(transport_registration_send, &m)) {
- fatal_errno("cannot write transport registration socket\n");
- }
- }
注意这里transport_write_action的第一个参数是transport_registration_send,是否有点印象,对的,这个fd就是我们在init_transport_registration建立的一对已连接的匿名套接字中 的一个。这样在transport_registration_recv就会有数据到来。并调用transport_registration_func,
- static void transport_registration_func(int _fd, unsigned ev, void *data)
- {
- tmsg m;
- adb_thread_t output_thread_ptr;
- adb_thread_t input_thread_ptr;
- int s[2];
- atransport *t;
-
- if(!(ev & FDE_READ)) {
- return;
- }
-
- if(transport_read_action(_fd, &m)) {
- fatal_errno("cannot read transport registration socket");
- }
-
- t = m.transport;
-
- if(m.action == 0){
- D("transport: %p removing and free'ing %d\n", t, t->transport_socket);
-
-
-
-
- fdevent_remove(&(t->transport_fde));
- adb_close(t->fd);
-
- adb_mutex_lock(&transport_lock);
- t->next->prev = t->prev;
- t->prev->next = t->next;
- adb_mutex_unlock(&transport_lock);
-
- run_transport_disconnects(t);
-
- if (t->product)
- free(t->product);
- if (t->serial)
- free(t->serial);
-
- memset(t,0xee,sizeof(atransport));
- free(t);
-
- update_transports();
- return;
- }
-
-
- if (t->connection_state != CS_NOPERM) {
-
- t->ref_count = 2;
-
- if(adb_socketpair(s)) {
- fatal_errno("cannot open transport socketpair");
- }
-
- D("transport: %p (%d,%d) starting\n", t, s[0], s[1]);
-
- t->transport_socket = s[0];
- t->fd = s[1];
-
- D("transport: %p install %d\n", t, t->transport_socket );
- fdevent_install(&(t->transport_fde),
- t->transport_socket,
- transport_socket_events,
- t);
-
- fdevent_set(&(t->transport_fde), FDE_READ);
-
- if(adb_thread_create(&input_thread_ptr, input_thread, t)){
- fatal_errno("cannot create input thread");
- }
-
- if(adb_thread_create(&output_thread_ptr, output_thread, t)){
- fatal_errno("cannot create output thread");
- }
- }
-
-
- adb_mutex_lock(&transport_lock);
- t->next = &transport_list;
- t->prev = transport_list.prev;
- t->next->prev = t;
- t->prev->next = t;
- adb_mutex_unlock(&transport_lock);
-
- t->disconnects.next = t->disconnects.prev = &t->disconnects;
-
- update_transports();
- }
在这个函数中,先调用transport_read_action读取前面写入的tmsg数据,m.action =1,所以不执行,这里t->connection_state!= CS_NOPERM,前面赋值了为CS_OFFLINE。adb_socketpair又建立两个本地通信套接字,分别赋给t->transport_socket和t->fd,接着调用fdevent_install把t->transport_socket添加进去,创建input_thread 和output_thread两线程。
- static void *input_thread(void *_t)
- {
- atransport *t = _t;
- apacket *p;
- int active = 0;
-
- D("to_remote: starting input_thread for %p, reading from fd %d\n",
- t, t->fd);
-
- for(;;){
- if(read_packet(t->fd, &p)) {
- D("to_remote: failed to read apacket from transport %p on fd %d\n",
- t, t->fd );
- break;
- }
- if(p->msg.command == A_SYNC){
- if(p->msg.arg0 == 0) {
- D("to_remote: transport %p SYNC offline\n", t);
- put_apacket(p);
- break;
- } else {
- if(p->msg.arg1 == t->sync_token) {
- D("to_remote: transport %p SYNC online\n", t);
- active = 1;
- } else {
- D("to_remote: trandport %p ignoring SYNC %d != %d\n",
- t, p->msg.arg1, t->sync_token);
- }
- }
- } else {
- if(active) {
- D("to_remote: transport %p got packet, sending to remote\n", t);
- t->write_to_remote(p, t);
- } else {
- D("to_remote: transport %p ignoring packet while offline\n", t);
- }
- }
-
- put_apacket(p);
- }
-
-
-
- close_all_sockets(t);
-
- D("to_remote: thread is exiting for transport %p, fd %d\n", t, t->fd);
- kick_transport(t);
- transport_unref(t);
- return 0;
- }
input_thread就是从t->fd读数据,然后调用write_to_remote,write_to_remote中有一句writex(t->sfd,&p->msg, sizeof(amessage) + length)发给远端
再来看下output_thread
- static void *output_thread(void *_t)
- {
- atransport *t = _t;
- apacket *p;
-
- D("from_remote: starting thread for transport %p, on fd %d\n", t, t->fd );
-
- D("from_remote: transport %p SYNC online (%d)\n", t, t->sync_token + 1);
- p = get_apacket();
- p->msg.command = A_SYNC;
- p->msg.arg0 = 1;
- p->msg.arg1 = ++(t->sync_token);
- p->msg.magic = A_SYNC ^ 0xffffffff;
- if(write_packet(t->fd, &p)) {
- put_apacket(p);
- D("from_remote: failed to write SYNC apacket to transport %p", t);
- goto oops;
- }
-
- D("from_remote: data pump for transport %p\n", t);
- for(;;) {
- p = get_apacket();
-
- if(t->read_from_remote(p, t) == 0){
- D("from_remote: received remote packet, sending to transport %p\n",
- t);
- if(write_packet(t->fd, &p)){
- put_apacket(p);
- D("from_remote: failed to write apacket to transport %p", t);
- goto oops;
- }
- } else {
- D("from_remote: remote read failed for transport %p\n", p);
- put_apacket(p);
- break;
- }
- }
-
- D("from_remote: SYNC offline for transport %p\n", t);
- p = get_apacket();
- p->msg.command = A_SYNC;
- p->msg.arg0 = 0;
- p->msg.arg1 = 0;
- p->msg.magic = A_SYNC ^ 0xffffffff;
- if(write_packet(t->fd, &p)) {
- put_apacket(p);
- D("from_remote: failed to write SYNC apacket to transport %p", t);
- }
-
- oops:
- D("from_remote: thread is exiting for transport %p\n", t);
- kick_transport(t);
- transport_unref(t);
- return 0;
- }
output_thread先往t->fd写一个A_SYNC命令,然后循环调用read_from_remote,remote_read 中有(readx(t->sfd,p->data, p->msg.data_length),然后write_packet把数据写到t->fd。上面output_thread中往t->fdk中写了一个A_SYNC命令,这样就触发了t->transport_socket上的读操作,并调用transport_socket_events
- static void transport_socket_events(int fd, unsigned events, void *_t)
- {
- if(events & FDE_READ){
- apacket *p = 0;
- if(read_packet(fd, &p)){
- D("failed to read packet from transport socket on fd %d\n", fd);
- } else {
- handle_packet(p, (atransport *) _t);
- }
- }
- }
这里先从fd读数据,然后调用handle_packet处理,对于A_SYNC,有相应的如下代码:
- void handle_packet(apacket *p, atransport *t)
- {
- ...
- case A_SYNC:
- if(p->msg.arg0){
- send_packet(p, t);
- if(HOST) send_connect(t);
- } else {
- t->connection_state = CS_OFFLINE;
- handle_offline(t);
- send_packet(p, t);
- }
- return;
- ...
- }
p->msg.arg0= 1;所以直接send_packet,这里面会调用write_packet(t->transport_socket,&p),这样与它相应的对端就可以收到数据,在input_thread中会从t->fd读出数据并发给远端,注意这里远端连接的套接字并没有加入fdevent,而是在output_thread中阻塞在read上面,直到Client端有数据到来。客户端收到A_SYNC命令后,会发送一个connect消息过来,收到数据后就转发给t->fd,触发t->transport_socket,执行transport_socket_events同样调用handle_packet处理收到的数据。
- void handle_packet(apacket *p, atransport *t)
- {
- ...
- case A_CNXN:
-
- if(t->connection_state != CS_OFFLINE) {
- t->connection_state = CS_OFFLINE;
- handle_offline(t);
- }
- parse_banner((char*) p->data, t);
- handle_online();
- if(!HOST) send_connect(t);
- break;
- ....
- }
调用send_connect发送版本等信息到远端.
三、adb install 通讯过程
同样消息从远端的sfd送到t->fd,触发t->transport_socket执行handle_packet,这里Client先发送一条A_OPEN消息
- void handle_packet(apacket *p, atransport *t)
- {
- ...
- case A_OPEN:
- if(t->connection_state != CS_OFFLINE) {
- char *name = (char*) p->data;
- name[p->msg.data_length > 0 ? p->msg.data_length - 1 : 0] = 0;
- s = create_local_service_socket(name);
- if(s == 0) {
- send_close(0, p->msg.arg0, t);
- } else {
- s->peer = create_remote_socket(p->msg.arg0, t);
- s->peer->peer = s;
- send_ready(s->id, s->peer->id, t);
- s->ready(s);
- }
- }
- break;
- ...
- }
这里的name 是”sync:”,用于通知服务端同步读取client发送过来的文件,调用create_local_service_socket创建一个本地服务的socket
- asocket *create_local_service_socket(const char *name)
- {
- asocket *s;
- int fd;
- fd = service_to_fd(name);
- if(fd < 0) return 0;
-
- s = create_local_socket(fd);
- D("LS(%d): bound to '%s'\n", s->id, name);
- return s;
- }
在service_to_fd中这里对应于create_service_thread(file_sync_service, NULL);
- static int create_service_thread(void (*func)(int, void *), void *cookie)
- {
- stinfo *sti;
- adb_thread_t t;
- int s[2];
-
- if(adb_socketpair(s)) {
- printf("cannot create service socket pair\n");
- return -1;
- }
-
- sti = malloc(sizeof(stinfo));
- if(sti == 0) fatal("cannot allocate stinfo");
- sti->func = func;
- sti->cookie = cookie;
- sti->fd = s[1];
-
- if(adb_thread_create( &t, service_bootstrap_func, sti)){
- free(sti);
- adb_close(s[0]);
- adb_close(s[1]);
- printf("cannot create service thread\n");
- return -1;
- }
-
- D("service thread started, %d:%d\n",s[0], s[1]);
- return s[0];
- }
在这里面又调用adb_socketpair创建了两个本地通信套接字,并返回其中s[0],这里还创建一个线程:service_bootstrap_func,执行void *service_bootstrap_func(void *x)
- void *service_bootstrap_func(void *x)
- {
- stinfo *sti = x;
- sti->func(sti->fd, sti->cookie);
- free(sti);
- return 0;
- }
这里的func是file_sync_service,sti->fd= s[1]。
- void file_sync_service(int fd, void *cookie)
- {
- syncmsg msg;
- char name[1025];
- unsigned namelen;
-
- char *buffer = malloc(SYNC_DATA_MAX);
- if(buffer == 0) goto fail;
-
- for(;;) {
- D("sync: waiting for command\n");
-
- if(readx(fd, &msg.req, sizeof(msg.req))) {
- fail_message(fd, "command read failure");
- break;
- }
- namelen = ltohl(msg.req.namelen);
- if(namelen > 1024) {
- fail_message(fd, "invalid namelen");
- break;
- }
- if(readx(fd, name, namelen)) {
- fail_message(fd, "filename read failure");
- break;
- }
- name[namelen] = 0;
-
- msg.req.namelen = 0;
- D("sync: '%s' '%s'\n", (char*) &msg.req, name);
-
- switch(msg.req.id) {
- case ID_STAT:
- if(do_stat(fd, name)) goto fail;
- break;
- case ID_LIST:
- if(do_list(fd, name)) goto fail;
- break;
- case ID_SEND:
- if(do_send(fd, name, buffer)) goto fail;
- break;
- case ID_RECV:
- if(do_recv(fd, name, buffer)) goto fail;
- break;
- case ID_QUIT:
- goto fail;
- default:
- fail_message(fd, "unknown command");
- goto fail;
- }
- }
-
- fail:
- if(buffer != 0) free(buffer);
- D("sync: done\n");
- adb_close(fd);
- }
这里就阻塞在读上面了,等待套接字上数据的到来。
回到create_local_service_socket,继续调用create_local_socket,并把刚返回的s[0]传进来了
- asocket *create_local_socket(int fd)
- {
- asocket *s = calloc(1, sizeof(asocket));
- if(s == 0) fatal("cannot allocate socket");
- install_local_socket(s);
- s->fd = fd;
- s->enqueue = local_socket_enqueue;
- s->ready = local_socket_ready;
- s->close = local_socket_close;
-
- fdevent_install(&s->fde, fd, local_socket_event_func, s);
-
-
- D("LS(%d): created (fd=%d)\n", s->id, s->fd);
- return s;
- }
注意这里asocket类型变量t的enqueue为local_socket_enqueue,这里fd就是前面 返回的s[0],安装到fdevent,这里的处理函数是local_socket_event_func.,这里的local_socket_event_func函数是在s[1],即进行文件操作的那端需要往对端发送消息时触发的。
回到handle_packet,再调用create_remote_socket,创建一个asocket,初始化,把这个peer初始化为他的对端。
- asocket *create_remote_socket(unsigned id, atransport *t)
- {
- asocket *s = calloc(1, sizeof(aremotesocket));
- adisconnect* dis = &((aremotesocket*)s)->disconnect;
-
- if(s == 0) fatal("cannot allocate socket");
- s->id = id;
- s->enqueue = remote_socket_enqueue;
- s->ready = remote_socket_ready;
- s->close = remote_socket_close;
- s->transport = t;
-
- dis->func = remote_socket_disconnect;
- dis->opaque = s;
- add_transport_disconnect( t, dis );
- D("RS(%d): created\n", s->id);
- return s;
- }
赋值给s->peer;然后调用send_ready发送A_OKAY,最后调用s->ready把s->fd加入fdevent
A_OPEN处理完了,就会在与Client连接的套接字字上收到A_WRTE,
- void handle_packet(apacket *p, atransport *t)
- {
- asocket *s;
- ...
- case A_WRTE:
- if(t->connection_state != CS_OFFLINE) {
- if((s = find_local_socket(p->msg.arg1))) {
- unsigned rid = p->msg.arg0;
- p->len = p->msg.data_length;
-
- if(s->enqueue(s, p) == 0) {
- D("Enqueue the socket\n");
- send_ready(s->id, rid, t);
- }
- return;
- }
- }
- break;
- ....
- }
先调用find_local_socket找到前面建立的local_service_socket,调用s->enqueue,这里调用local_socket_enqueue
- static int local_socket_enqueue(asocket *s, apacket *p)
- {
- ...
- while(p->len > 0) {
- int r = adb_write(s->fd, p->ptr, p->len);
- if(r > 0) {
- p->len -= r;
- p->ptr += r;
- continue;
- }
- if((r == 0) || (errno != EAGAIN)) {
- D( "LS(%d): not ready, errno=%d: %s\n", s->id, errno, strerror(errno) );
- s->close(s);
- return 1;
- } else {
- break;
- }
- }
- ...
- }
这里把数据写到s->fd,也即前面的s[0],往s[0]写数据时候会触发file_sync_service读数据,这个函数阻塞在readx读数据,上面写了数据后,这里就会去读。所有数据都处理完了则发送send_ready
数据的发送过程:
先发送ID_STAT,查看文件是否存在,不存在的话再发送ID_SEND发送文件,文件发送完毕发送ID_QUIT退出,关关闭套接字
这里在client发送了ID_STAT时会回一个消息给client,就是通过往s[1]里面写,而s[0]是加到了select或poll中监听的,这个时候 s[0]就会触发,执行local_socket_event_func
- static void local_socket_event_func(int fd, unsigned ev, void *_s)
- {
- ....
- if(ev & FDE_READ){
- apacket *p = get_apacket();
- unsigned char *x = p->data;
- size_t avail = MAX_PAYLOAD;
- int r;
- int is_eof = 0;
-
- while(avail > 0) {
- r = adb_read(fd, x, avail);
- if(r > 0) {
- avail -= r;
- x += r;
- continue;
- }
- if(r < 0) {
- if(errno == EAGAIN) break;
- if(errno == EINTR) continue;
- }
-
-
- is_eof = 1;
- break;
- }
-
- if((avail == MAX_PAYLOAD) || (s->peer == 0)) {
- put_apacket(p);
- } else {
- p->len = MAX_PAYLOAD - avail;
-
- r = s->peer->enqueue(s->peer, p);
-
- if(r < 0) {
-
-
-
-
-
-
-
-
- return;
- }
- }
- ....
- }
这里的s->peer->enpueue即是remote_socket_enqueue
- static int remote_socket_enqueue(asocket *s, apacket *p)
- {
- D("Calling remote_socket_enqueue\n");
- p->msg.command = A_WRTE;
- p->msg.arg0 = s->peer->id;
- p->msg.arg1 = s->id;
- p->msg.data_length = p->len;
- send_packet(p, s->transport);
- return 1;
- }
这里往transport写数据又回到了前面的流程了,会在input_thread中会从t->fd读出数据并发给远端。
具体文件接收过程:
先接收消息头,判断是否为文件内容消息,以及是否发送完毕消息:
如果是文件内容消息,则接收数据并盘入文件
如果是发送完毕消息,则跳出循环,发送ID_OKAY消息。
文件接收完毕,Client会再发个A_OPEN消息name = shell:pm install /data/local/tmp/sipDemo.apk,这个时候会重新建立一个create_local_socket,最后调用create_subprocess,这里会打开伪终端/dev/ptmx。( 通过函数open()打开设备“/dev/ptmx”,可以得到一对伪终端的主从设备,得到的fd是主设备的文件描述符)在父进程里使用fd,子进程中打开从设备的设备名可以通过函数ptsname(),在子进程中重定位标准输出到伪终端,这样子进程中的有输出父进程就能得到,注意在pm.java runInstall函数中有这样几行代码
- private void runInstall() {
- .....
- PackageInstallObserver obs = new PackageInstallObserver();
- try {
- mPm.installPackage(Uri.fromFile(new File(apkFilePath)), obs, installFlags,
- installerPackageName);
-
- synchronized (obs) {
- while (!obs.finished) {
- try {
- obs.wait();
- } catch (InterruptedException e) {
- }
- }
- if (obs.result == PackageManager.INSTALL_SUCCEEDED) {
- System.out.println("Success");
- } else {
- System.err.println("Failure ["
- + installFailureToString(obs.result)
- + "]");
- }
- }
- } catch (RemoteException e) {
- System.err.println(e.toString());
- System.err.println(PM_NOT_RUNNING_ERR);
- }
- ...,
- }
这里用System.err.println就是打到标准输出设备,也就是伪终端了
安装完毕后,发送结果给Client,Client还会发一个A_OPEN消息,name = shell:rm /data/local/tmp/sipDemo.apk,删除这个apk最后发送A_CLSE删除这个连接
Adb处理install流程:
参考: