博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
adb 简要分析
阅读量:4207 次
发布时间:2019-05-26

本文共 18032 字,大约阅读时间需要 60 分钟。

http://blog.csdn.net/new_abc/article/details/7467537

前面有篇文章介绍了adb install的后面的流程,但前面的通信过程没有怎么介绍,这里主要介绍下adb 启动的流程,以及connect、install的前面流程,这里介绍的都是服务端的。

一、adb 启动流程:

首先看下服务端adb 启动的流程

adb_main主要 调用4个初始化函数:

1、init_transport_registration:初始化本地事务处理,每个client连接都会有一个专门的处理,对每个client都会在服务端有一对套接字对与该client相连的socket上的数据流进行处理。

2、install_listener(local_name, "*smartsocket*", NULL),这个函数 暂时还不知道做什么的;

3、local_init,初始化本地套接字,等待client的连接。

4、init_jdwp这个好像是跟dalvik中间的某个东东通讯的,没去研究。

5、fdevent_loop,事件处理,当相应的套接字上有数据到来时,调用注册的函数进行处理。

这里主要介绍下init_transport_registration,local_init,fdevent_loop这三个函数。

1)、init_transport_registration

[cpp] 
  1. void init_transport_registration(void)  
  2. {  
  3.     int s[2];  
  4.   
  5.     if(adb_socketpair(s)){  
  6.         fatal_errno("cannot open transport registration socketpair");  
  7.     }  
  8.   
  9.     transport_registration_send = s[0];  
  10.     transport_registration_recv = s[1];  
  11.   
  12.     fdevent_install(&transport_registration_fde,  
  13.                     transport_registration_recv,  
  14.                     transport_registration_func,  
  15.                     0);  
  16.   
  17.     fdevent_set(&transport_registration_fde, FDE_READ);  
  18. }  

这里调用adb_socketpair建立一对匿名的已经连接的套接字,分别赋给transport_registration_send和transport_registration_recv,之后调用fdevent_install把transport_registration_recv这个套接字注册到监听到套接字里面,并把相应的处理函数设置为transport_registration_func。

2)、local_init

[cpp] 
  1. void local_init(int port)  
  2. {  
  3.     adb_thread_t thr;  
  4.     void* (*func)(void *);  
  5.   
  6.     if(HOST) {  
  7.         func = client_socket_thread;  
  8.     } else {  
  9.         func = server_socket_thread;  
  10.     }  
  11.   
  12.     D("transport: local %s init\n", HOST ? "client" : "server");  
  13.   
  14.     if(adb_thread_create(&thr, func, (void *)port)) {  
  15.         fatal_errno("cannot create local socket %s thread",  
  16.                     HOST ? "client" : "server");  
  17.     }  
  18. }  
这里是server端,所以这里func为server_socket_thread,这里创建了这个线程:

[cpp] 
  1. static void *server_socket_thread(void * arg)  
  2. {  
  3.     int serverfd, fd;  
  4.     struct sockaddr addr;  
  5.     socklen_t alen;  
  6.     int port = (int)arg;  
  7.   
  8.     D("transport: server_socket_thread() starting\n");  
  9.     serverfd = -1;  
  10.     for(;;) {  
  11.         if(serverfd == -1) {  
  12.             serverfd = socket_inaddr_any_server(port, SOCK_STREAM);  
  13.             if(serverfd < 0) {  
  14.                 D("server: cannot bind socket yet\n");  
  15.                 adb_sleep_ms(1000);  
  16.                 continue;  
  17.             }  
  18.             close_on_exec(serverfd);  
  19.         }  
  20.   
  21.         alen = sizeof(addr);  
  22.         D("server: trying to get new connection from %d\n", port);  
  23.         fd = adb_socket_accept(serverfd, &addr, &alen);  
  24.         if(fd >= 0) {  
  25.             D("server: new connection on fd %d\n", fd);  
  26.             close_on_exec(fd);  
  27.             disable_tcp_nagle(fd);  
  28.             register_socket_transport(fd, "host", port, 1);  
  29.         }  
  30.     }  
  31.     D("transport: server_socket_thread() exiting\n");  
  32.     return 0;  
  33. }  
这里调用socket_inaddr_any_server建立监听套接字,这里的port默认是5555,再调用adb_socket_accept等待客户连接的到来。

3)、fdevent_loop

[cpp] 
  1. void fdevent_loop()  
  2. {  
  3.     fdevent *fde;  
  4.   
  5.     for(;;) {  
  6. #if DEBUG  
  7.         fprintf(stderr,"--- ---- waiting for events\n");  
  8. #endif  
  9.         fdevent_process();  
  10.   
  11.         while((fde = fdevent_plist_dequeue())) {  
  12.             unsigned events = fde->events;  
  13.             fde->events = 0;  
  14.             fde->state &= (~FDE_PENDING);  
  15.             dump_fde(fde, "callback");  
  16.             fde->func(fde->fd, events, fde->arg);  
  17.         }  
  18.     }  
  19. }  
这里先调用fdevent_process启动select或epoll检测机制,这里面会把有数据到来的套接字消息调用fdevent_plist_enqueue压入list_pending,调用fdevent_plist_dequeue从list_pending取出一条消息,并调用它的处理函数进行处理。

二、adb connect流程:

上面 我们讲server_socket_thread讲到创建了一个线程并监听等待客户连接的到来,当client连接过来的时候, 这里accept会返回,我们接着往下看,会调用register_socket_transport注册这个连接

[cpp] 
  1. void register_socket_transport(int s, const char *serial, int port, int local)  
  2. {  
  3.     atransport *t = calloc(1, sizeof(atransport));  
  4.     D("transport: %p init'ing for socket %d, on port %d\n", t, s, port);  
  5.     if ( init_socket_transport(t, s, port, local) < 0 ) {  
  6.         adb_close(s);  
  7.         free(t);  
  8.         return;  
  9.     }  
  10.     if(serial) {  
  11.         t->serial = strdup(serial);  
  12.     }  
  13.     register_transport(t);  
  14. }  

分配了一个atransport结构并调用init_socket_transport进行初始化

[cpp] 
  1. int init_socket_transport(atransport *t, int s, int adb_port, int local)  
  2. {  
  3.     int  fail = 0;  
  4.   
  5.     t->kick = remote_kick;  
  6.     t->close = remote_close;  
  7.     t->read_from_remote = remote_read;  
  8.     t->write_to_remote = remote_write;  
  9.     t->sfd = s;  
  10.     t->sync_token = 1;  
  11.     t->connection_state = CS_OFFLINE;  
  12.     t->type = kTransportLocal;  
  13.     t->adb_port = 0;  
  14. }  
注意这里的sdf是这个连接的套接字,还有remote_read及remote_write.

接着调用register_transport注册这个事务

[cpp] 
  1. static void register_transport(atransport *transport)  
  2. {  
  3.     tmsg m;  
  4.     m.transport = transport;  
  5.     m.action = 1;  
  6.     D("transport: %p registered\n", transport);  
  7.     if(transport_write_action(transport_registration_send, &m)) {  
  8.         fatal_errno("cannot write transport registration socket\n");  
  9.     }  
  10. }  
注意这里transport_write_action的第一个参数是transport_registration_send,是否有点印象,对的,这个fd就是我们在init_transport_registration建立的一对已连接的匿名套接字中 的一个。这样在transport_registration_recv就会有数据到来。并调用transport_registration_func,

[cpp] 
  1. static void transport_registration_func(int _fd, unsigned ev, void *data)  
  2. {  
  3.     tmsg m;  
  4.     adb_thread_t output_thread_ptr;  
  5.     adb_thread_t input_thread_ptr;  
  6.     int s[2];  
  7.     atransport *t;  
  8.   
  9.     if(!(ev & FDE_READ)) {  
  10.         return;  
  11.     }  
  12.   
  13.     if(transport_read_action(_fd, &m)) {  
  14.         fatal_errno("cannot read transport registration socket");  
  15.     }  
  16.   
  17.     t = m.transport;  
  18.   
  19.     if(m.action == 0){  
  20.         D("transport: %p removing and free'ing %d\n", t, t->transport_socket);  
  21.   
  22.             /* IMPORTANT: the remove closes one half of the 
  23.             ** socket pair.  The close closes the other half. 
  24.             */  
  25.         fdevent_remove(&(t->transport_fde));  
  26.         adb_close(t->fd);  
  27.   
  28.         adb_mutex_lock(&transport_lock);  
  29.         t->next->prev = t->prev;  
  30.         t->prev->next = t->next;  
  31.         adb_mutex_unlock(&transport_lock);  
  32.   
  33.         run_transport_disconnects(t);  
  34.   
  35.         if (t->product)  
  36.             free(t->product);  
  37.         if (t->serial)  
  38.             free(t->serial);  
  39.   
  40.         memset(t,0xee,sizeof(atransport));  
  41.         free(t);  
  42.   
  43.         update_transports();  
  44.         return;  
  45.     }  
  46.   
  47.     /* don't create transport threads for inaccessible devices */  
  48.     if (t->connection_state != CS_NOPERM) {  
  49.         /* initial references are the two threads */  
  50.         t->ref_count = 2;  
  51.   
  52.         if(adb_socketpair(s)) {  
  53.             fatal_errno("cannot open transport socketpair");  
  54.         }  
  55.   
  56.         D("transport: %p (%d,%d) starting\n", t, s[0], s[1]);  
  57.   
  58.         t->transport_socket = s[0];  
  59.         t->fd = s[1];  
  60.   
  61.         D("transport: %p install %d\n", t, t->transport_socket );  
  62.         fdevent_install(&(t->transport_fde),  
  63.                         t->transport_socket,  
  64.                         transport_socket_events,  
  65.                         t);  
  66.   
  67.         fdevent_set(&(t->transport_fde), FDE_READ);  
  68.   
  69.         if(adb_thread_create(&input_thread_ptr, input_thread, t)){  
  70.             fatal_errno("cannot create input thread");  
  71.         }  
  72.   
  73.         if(adb_thread_create(&output_thread_ptr, output_thread, t)){  
  74.             fatal_errno("cannot create output thread");  
  75.         }  
  76.     }  
  77.   
  78.         /* put us on the master device list */  
  79.     adb_mutex_lock(&transport_lock);  
  80.     t->next = &transport_list;  
  81.     t->prev = transport_list.prev;  
  82.     t->next->prev = t;  
  83.     t->prev->next = t;  
  84.     adb_mutex_unlock(&transport_lock);  
  85.   
  86.     t->disconnects.next = t->disconnects.prev = &t->disconnects;  
  87.   
  88.     update_transports();  
  89. }  

在这个函数中,先调用transport_read_action读取前面写入的tmsg数据,m.action =1,所以不执行,这里t->connection_state!= CS_NOPERM,前面赋值了为CS_OFFLINE。adb_socketpair又建立两个本地通信套接字,分别赋给t->transport_socket和t->fd,接着调用fdevent_install把t->transport_socket添加进去,创建input_thread 和output_thread两线程。

[cpp] 
  1. static void *input_thread(void *_t)  
  2. {  
  3.     atransport *t = _t;  
  4.     apacket *p;  
  5.     int active = 0;  
  6.   
  7.     D("to_remote: starting input_thread for %p, reading from fd %d\n",  
  8.        t, t->fd);  
  9.   
  10.     for(;;){  
  11.         if(read_packet(t->fd, &p)) {  
  12.             D("to_remote: failed to read apacket from transport %p on fd %d\n",   
  13.                t, t->fd );  
  14.             break;  
  15.         }  
  16.         if(p->msg.command == A_SYNC){  
  17.             if(p->msg.arg0 == 0) {  
  18.                 D("to_remote: transport %p SYNC offline\n", t);  
  19.                 put_apacket(p);  
  20.                 break;  
  21.             } else {  
  22.                 if(p->msg.arg1 == t->sync_token) {  
  23.                     D("to_remote: transport %p SYNC online\n", t);  
  24.                     active = 1;  
  25.                 } else {  
  26.                     D("to_remote: trandport %p ignoring SYNC %d != %d\n",  
  27.                       t, p->msg.arg1, t->sync_token);  
  28.                 }  
  29.             }  
  30.         } else {  
  31.             if(active) {  
  32.                 D("to_remote: transport %p got packet, sending to remote\n", t);  
  33.                 t->write_to_remote(p, t);  
  34.             } else {  
  35.                 D("to_remote: transport %p ignoring packet while offline\n", t);  
  36.             }  
  37.         }  
  38.   
  39.         put_apacket(p);  
  40.     }  
  41.   
  42.     // this is necessary to avoid a race condition that occured when a transport closes  
  43.     // while a client socket is still active.  
  44.     close_all_sockets(t);  
  45.   
  46.     D("to_remote: thread is exiting for transport %p, fd %d\n", t, t->fd);  
  47.     kick_transport(t);  
  48.     transport_unref(t);  
  49.     return 0;  
  50. }  
input_thread就是从t->fd读数据,然后调用write_to_remote,write_to_remote中有一句writex(t->sfd,&p->msg, sizeof(amessage) + length)发给远端

再来看下output_thread

[cpp] 
  1. static void *output_thread(void *_t)  
  2. {  
  3.     atransport *t = _t;  
  4.     apacket *p;  
  5.   
  6.     D("from_remote: starting thread for transport %p, on fd %d\n", t, t->fd );  
  7.   
  8.     D("from_remote: transport %p SYNC online (%d)\n", t, t->sync_token + 1);  
  9.     p = get_apacket();  
  10.     p->msg.command = A_SYNC;  
  11.     p->msg.arg0 = 1;  
  12.     p->msg.arg1 = ++(t->sync_token);  
  13.     p->msg.magic = A_SYNC ^ 0xffffffff;  
  14.     if(write_packet(t->fd, &p)) {  
  15.         put_apacket(p);  
  16.         D("from_remote: failed to write SYNC apacket to transport %p", t);  
  17.         goto oops;  
  18.     }  
  19.   
  20.     D("from_remote: data pump  for transport %p\n", t);  
  21.     for(;;) {  
  22.         p = get_apacket();  
  23.   
  24.         if(t->read_from_remote(p, t) == 0){  
  25.             D("from_remote: received remote packet, sending to transport %p\n",  
  26.               t);  
  27.             if(write_packet(t->fd, &p)){  
  28.                 put_apacket(p);  
  29.                 D("from_remote: failed to write apacket to transport %p", t);  
  30.                 goto oops;  
  31.             }  
  32.         } else {  
  33.             D("from_remote: remote read failed for transport %p\n", p);  
  34.             put_apacket(p);  
  35.             break;  
  36.         }  
  37.     }  
  38.   
  39.     D("from_remote: SYNC offline for transport %p\n", t);  
  40.     p = get_apacket();  
  41.     p->msg.command = A_SYNC;  
  42.     p->msg.arg0 = 0;  
  43.     p->msg.arg1 = 0;  
  44.     p->msg.magic = A_SYNC ^ 0xffffffff;  
  45.     if(write_packet(t->fd, &p)) {  
  46.         put_apacket(p);  
  47.         D("from_remote: failed to write SYNC apacket to transport %p", t);  
  48.     }  
  49.   
  50. oops:  
  51.     D("from_remote: thread is exiting for transport %p\n", t);  
  52.     kick_transport(t);  
  53.     transport_unref(t);  
  54.     return 0;  
  55. }  
output_thread先往t->fd写一个A_SYNC命令,然后循环调用read_from_remote,remote_read 中有(readx(t->sfd,p->data, p->msg.data_length),然后write_packet把数据写到t->fd。上面output_thread中往t->fdk中写了一个A_SYNC命令,这样就触发了t->transport_socket上的读操作,并调用transport_socket_events

[cpp] 
  1. static void transport_socket_events(int fd, unsigned events, void *_t)  
  2. {  
  3.     if(events & FDE_READ){  
  4.         apacket *p = 0;  
  5.         if(read_packet(fd, &p)){  
  6.             D("failed to read packet from transport socket on fd %d\n", fd);  
  7.         } else {  
  8.             handle_packet(p, (atransport *) _t);  
  9.         }  
  10.     }  
  11. }  
这里先从fd读数据,然后调用handle_packet处理,对于A_SYNC,有相应的如下代码:

[cpp] 
  1. void handle_packet(apacket *p, atransport *t)  
  2. {  
  3.    ...  
  4.    case A_SYNC:  
  5.         if(p->msg.arg0){  
  6.             send_packet(p, t);  
  7.             if(HOST) send_connect(t);  
  8.         } else {  
  9.             t->connection_state = CS_OFFLINE;  
  10.             handle_offline(t);  
  11.             send_packet(p, t);  
  12.         }  
  13.         return;  
  14.    ...  
  15. }  

p->msg.arg0= 1;所以直接send_packet,这里面会调用write_packet(t->transport_socket,&p),这样与它相应的对端就可以收到数据,在input_thread中会从t->fd读出数据并发给远端,注意这里远端连接的套接字并没有加入fdevent,而是在output_thread中阻塞在read上面,直到Client端有数据到来。客户端收到A_SYNC命令后,会发送一个connect消息过来,收到数据后就转发给t->fd,触发t->transport_socket,执行transport_socket_events同样调用handle_packet处理收到的数据。
[cpp] 
  1. void handle_packet(apacket *p, atransport *t)  
  2. {  
  3.     ...  
  4.     case A_CNXN: /* CONNECT(version, maxdata, "system-id-string") */  
  5.             /* XXX verify version, etc */  
  6.         if(t->connection_state != CS_OFFLINE) {  
  7.             t->connection_state = CS_OFFLINE;  
  8.             handle_offline(t);  
  9.         }  
  10.         parse_banner((char*) p->data, t);  
  11.         handle_online();  
  12.         if(!HOST) send_connect(t);  
  13.         break;  
  14.       ....  
  15. }  
调用send_connect发送版本等信息到远端.

三、adb install 通讯过程

同样消息从远端的sfd送到t->fd,触发t->transport_socket执行handle_packet,这里Client先发送一条A_OPEN消息

[cpp] 
  1. void handle_packet(apacket *p, atransport *t)  
  2. {  
  3.    ...  
  4.    case A_OPEN: /* OPEN(local-id, 0, "destination") */  
  5.         if(t->connection_state != CS_OFFLINE) {  
  6.             char *name = (char*) p->data;  
  7.             name[p->msg.data_length > 0 ? p->msg.data_length - 1 : 0] = 0;  
  8.             s = create_local_service_socket(name);  
  9.             if(s == 0) {  
  10.                 send_close(0, p->msg.arg0, t);  
  11.             } else {  
  12.                 s->peer = create_remote_socket(p->msg.arg0, t);  
  13.                 s->peer->peer = s;  
  14.                 send_ready(s->id, s->peer->id, t);  
  15.                 s->ready(s);  
  16.             }  
  17.         }  
  18.         break;  
  19.    ...  
  20. }  

这里的name 是”sync:”,用于通知服务端同步读取client发送过来的文件,调用create_local_service_socket创建一个本地服务的socket

[cpp] 
  1. asocket *create_local_service_socket(const char *name)  
  2. {  
  3.     asocket *s;  
  4.     int fd;  
  5.     fd = service_to_fd(name);  
  6.     if(fd < 0) return 0;  
  7.   
  8.     s = create_local_socket(fd);  
  9.     D("LS(%d): bound to '%s'\n", s->id, name);  
  10.     return s;  
  11. }  

在service_to_fd中这里对应于create_service_thread(file_sync_service, NULL);

[cpp] 
  1. static int create_service_thread(void (*func)(intvoid *), void *cookie)  
  2. {  
  3.     stinfo *sti;  
  4.     adb_thread_t t;  
  5.     int s[2];  
  6.   
  7.     if(adb_socketpair(s)) {  
  8.         printf("cannot create service socket pair\n");  
  9.         return -1;  
  10.     }  
  11.   
  12.     sti = malloc(sizeof(stinfo));  
  13.     if(sti == 0) fatal("cannot allocate stinfo");  
  14.     sti->func = func;  
  15.     sti->cookie = cookie;  
  16.     sti->fd = s[1];  
  17.   
  18.     if(adb_thread_create( &t, service_bootstrap_func, sti)){  
  19.         free(sti);  
  20.         adb_close(s[0]);  
  21.         adb_close(s[1]);  
  22.         printf("cannot create service thread\n");  
  23.         return -1;  
  24.     }  
  25.   
  26.     D("service thread started, %d:%d\n",s[0], s[1]);  
  27.     return s[0];  
  28. }  

在这里面又调用adb_socketpair创建了两个本地通信套接字,并返回其中s[0],这里还创建一个线程:service_bootstrap_func,执行void *service_bootstrap_func(void *x)

[cpp] 
  1. void *service_bootstrap_func(void *x)  
  2. {  
  3.     stinfo *sti = x;  
  4.     sti->func(sti->fd, sti->cookie);  
  5.     free(sti);  
  6.     return 0;  
  7. }  

这里的func是file_sync_service,sti->fd= s[1]。

[cpp] 
  1. void file_sync_service(int fd, void *cookie)  
  2. {  
  3.     syncmsg msg;  
  4.     char name[1025];  
  5.     unsigned namelen;  
  6.   
  7.     char *buffer = malloc(SYNC_DATA_MAX);  
  8.     if(buffer == 0) goto fail;  
  9.   
  10.     for(;;) {  
  11.         D("sync: waiting for command\n");  
  12.   
  13.         if(readx(fd, &msg.req, sizeof(msg.req))) {  
  14.             fail_message(fd, "command read failure");  
  15.             break;  
  16.         }  
  17.         namelen = ltohl(msg.req.namelen);  
  18.         if(namelen > 1024) {  
  19.             fail_message(fd, "invalid namelen");  
  20.             break;  
  21.         }  
  22.         if(readx(fd, name, namelen)) {  
  23.             fail_message(fd, "filename read failure");  
  24.             break;  
  25.         }  
  26.         name[namelen] = 0;  
  27.   
  28.         msg.req.namelen = 0;  
  29.         D("sync: '%s' '%s'\n", (char*) &msg.req, name);  
  30.   
  31.         switch(msg.req.id) {  
  32.         case ID_STAT:  
  33.             if(do_stat(fd, name)) goto fail;  
  34.             break;  
  35.         case ID_LIST:  
  36.             if(do_list(fd, name)) goto fail;  
  37.             break;  
  38.         case ID_SEND:  
  39.             if(do_send(fd, name, buffer)) goto fail;  
  40.             break;  
  41.         case ID_RECV:  
  42.             if(do_recv(fd, name, buffer)) goto fail;  
  43.             break;  
  44.         case ID_QUIT:  
  45.             goto fail;  
  46.         default:  
  47.             fail_message(fd, "unknown command");  
  48.             goto fail;  
  49.         }  
  50.     }  
  51.   
  52. fail:  
  53.     if(buffer != 0) free(buffer);  
  54.     D("sync: done\n");  
  55.     adb_close(fd);  
  56. }  
这里就阻塞在读上面了,等待套接字上数据的到来。

回到create_local_service_socket,继续调用create_local_socket,并把刚返回的s[0]传进来了

[cpp] 
  1. asocket *create_local_socket(int fd)  
  2. {  
  3.     asocket *s = calloc(1, sizeof(asocket));  
  4.     if(s == 0) fatal("cannot allocate socket");  
  5.     install_local_socket(s);  
  6.     s->fd = fd;  
  7.     s->enqueue = local_socket_enqueue;  
  8.     s->ready = local_socket_ready;  
  9.     s->close = local_socket_close;  
  10.   
  11.     fdevent_install(&s->fde, fd, local_socket_event_func, s);  
  12. /*    fdevent_add(&s->fde, FDE_ERROR); */  
  13.     //fprintf(stderr, "Created local socket in create_local_socket \n");  
  14.     D("LS(%d): created (fd=%d)\n", s->id, s->fd);  
  15.     return s;  
  16. }  

注意这里asocket类型变量t的enqueue为local_socket_enqueue,这里fd就是前面 返回的s[0],安装到fdevent,这里的处理函数是local_socket_event_func.,这里的local_socket_event_func函数是在s[1],即进行文件操作的那端需要往对端发送消息时触发的。

回到handle_packet,再调用create_remote_socket,创建一个asocket,初始化,把这个peer初始化为他的对端。

[cpp] 
  1. asocket *create_remote_socket(unsigned id, atransport *t)  
  2. {  
  3.     asocket *s = calloc(1, sizeof(aremotesocket));  
  4.     adisconnect*  dis = &((aremotesocket*)s)->disconnect;  
  5.   
  6.     if(s == 0) fatal("cannot allocate socket");  
  7.     s->id = id;  
  8.     s->enqueue = remote_socket_enqueue;  
  9.     s->ready = remote_socket_ready;  
  10.     s->close = remote_socket_close;  
  11.     s->transport = t;  
  12.   
  13.     dis->func   = remote_socket_disconnect;  
  14.     dis->opaque = s;  
  15.     add_transport_disconnect( t, dis );  
  16.     D("RS(%d): created\n", s->id);  
  17.     return s;  
  18. }  

赋值给s->peer;然后调用send_ready发送A_OKAY,最后调用s->ready把s->fd加入fdevent

A_OPEN处理完了,就会在与Client连接的套接字字上收到A_WRTE,

[cpp] 
  1. void handle_packet(apacket *p, atransport *t)  
  2. {  
  3.     asocket *s;  
  4.     ...  
  5.     case A_WRTE:  
  6.         if(t->connection_state != CS_OFFLINE) {  
  7.             if((s = find_local_socket(p->msg.arg1))) {  
  8.                 unsigned rid = p->msg.arg0;  
  9.                 p->len = p->msg.data_length;  
  10.   
  11.                 if(s->enqueue(s, p) == 0) {  
  12.                     D("Enqueue the socket\n");  
  13.                     send_ready(s->id, rid, t);  
  14.                 }  
  15.                 return;  
  16.             }  
  17.         }  
  18.         break;  
  19.     ....  
  20. }  

先调用find_local_socket找到前面建立的local_service_socket,调用s->enqueue,这里调用local_socket_enqueue

[cpp] 
  1. static int local_socket_enqueue(asocket *s, apacket *p)  
  2. {  
  3.    ...  
  4.        while(p->len > 0) {  
  5.         int r = adb_write(s->fd, p->ptr, p->len);  
  6.         if(r > 0) {  
  7.             p->len -= r;  
  8.             p->ptr += r;  
  9.             continue;  
  10.         }  
  11.         if((r == 0) || (errno != EAGAIN)) {  
  12.             D( "LS(%d): not ready, errno=%d: %s\n", s->id, errno, strerror(errno) );  
  13.             s->close(s);  
  14.             return 1; /* not ready (error) */  
  15.         } else {  
  16.             break;  
  17.         }  
  18.     }  
  19.    ...  
  20. }  

这里把数据写到s->fd,也即前面的s[0],往s[0]写数据时候会触发file_sync_service读数据,这个函数阻塞在readx读数据,上面写了数据后,这里就会去读。所有数据都处理完了则发送send_ready

数据的发送过程:


先发送ID_STAT,查看文件是否存在,不存在的话再发送ID_SEND发送文件,文件发送完毕发送ID_QUIT退出,关关闭套接字

这里在client发送了ID_STAT时会回一个消息给client,就是通过往s[1]里面写,而s[0]是加到了select或poll中监听的,这个时候 s[0]就会触发,执行local_socket_event_func

[cpp] 
  1. static void local_socket_event_func(int fd, unsigned ev, void *_s)  
  2. {  
  3.      ....  
  4.     if(ev & FDE_READ){  
  5.         apacket *p = get_apacket();  
  6.         unsigned char *x = p->data;  
  7.         size_t avail = MAX_PAYLOAD;  
  8.         int r;  
  9.         int is_eof = 0;  
  10.   
  11.         while(avail > 0) {  
  12.             r = adb_read(fd, x, avail);  
  13.             if(r > 0) {  
  14.                 avail -= r;  
  15.                 x += r;  
  16.                 continue;  
  17.             }  
  18.             if(r < 0) {  
  19.                 if(errno == EAGAIN) break;  
  20.                 if(errno == EINTR) continue;  
  21.             }  
  22.   
  23.                 /* r = 0 or unhandled error */  
  24.             is_eof = 1;  
  25.             break;  
  26.         }  
  27.   
  28.         if((avail == MAX_PAYLOAD) || (s->peer == 0)) {  
  29.             put_apacket(p);  
  30.         } else {  
  31.             p->len = MAX_PAYLOAD - avail;  
  32.   
  33.             r = s->peer->enqueue(s->peer, p);  
  34.   
  35.             if(r < 0) {  
  36.                     /* error return means they closed us as a side-effect 
  37.                     ** and we must return immediately. 
  38.                     ** 
  39.                     ** note that if we still have buffered packets, the 
  40.                     ** socket will be placed on the closing socket list. 
  41.                     ** this handler function will be called again 
  42.                     ** to process FDE_WRITE events. 
  43.                     */  
  44.                 return;  
  45.             }  
  46.        }  
  47.      ....  
  48. }  
这里的s->peer->enpueue即是remote_socket_enqueue

[cpp] 
  1. static int remote_socket_enqueue(asocket *s, apacket *p)  
  2. {  
  3.     D("Calling remote_socket_enqueue\n");  
  4.     p->msg.command = A_WRTE;  
  5.     p->msg.arg0 = s->peer->id;  
  6.     p->msg.arg1 = s->id;  
  7.     p->msg.data_length = p->len;  
  8.     send_packet(p, s->transport);  
  9.     return 1;  
  10. }  
这里往transport写数据又回到了前面的流程了,会在input_thread中会从t->fd读出数据并发给远端。

具体文件接收过程:

先接收消息头,判断是否为文件内容消息,以及是否发送完毕消息:

如果是文件内容消息,则接收数据并盘入文件

如果是发送完毕消息,则跳出循环,发送ID_OKAY消息。

文件接收完毕,Client会再发个A_OPEN消息name = shell:pm install /data/local/tmp/sipDemo.apk,这个时候会重新建立一个create_local_socket,最后调用create_subprocess,这里会打开伪终端/dev/ptmx。( 通过函数open()打开设备“/dev/ptmx”,可以得到一对伪终端的主从设备,得到的fd是主设备的文件描述符)在父进程里使用fd,子进程中打开从设备的设备名可以通过函数ptsname(),在子进程中重定位标准输出到伪终端,这样子进程中的有输出父进程就能得到,注意在pm.java  runInstall函数中有这样几行代码


[cpp] 
  1.     private void runInstall() {  
  2.     .....  
  3.   PackageInstallObserver obs = new PackageInstallObserver();  
  4.         try {  
  5.             mPm.installPackage(Uri.fromFile(new File(apkFilePath)), obs, installFlags,  
  6.                     installerPackageName);  
  7.   
  8.             synchronized (obs) {  
  9.                 while (!obs.finished) {  
  10.                     try {  
  11.                         obs.wait();  
  12.                     } catch (InterruptedException e) {  
  13.                     }  
  14.                 }  
  15.                 if (obs.result == PackageManager.INSTALL_SUCCEEDED) {  
  16.                     System.out.println("Success");  
  17.                 } else {  
  18.                     System.err.println("Failure ["  
  19.                             + installFailureToString(obs.result)  
  20.                             + "]");  
  21.                 }  
  22.             }  
  23.         } catch (RemoteException e) {  
  24.             System.err.println(e.toString());  
  25.             System.err.println(PM_NOT_RUNNING_ERR);  
  26.         }  
  27.   ...,  
  28. }  
这里用System.err.println就是打到标准输出设备,也就是伪终端了

安装完毕后,发送结果给Client,Client还会发一个A_OPEN消息,name = shell:rm /data/local/tmp/sipDemo.apk,删除这个apk最后发送A_CLSE删除这个连接

Adb处理install流程:


参考:

你可能感兴趣的文章
DELETE_DROP
查看>>
Oracle优化08-并行执行
查看>>
Oracle优化05-执行计划
查看>>
Oracle优化09-绑定变量
查看>>
Oracle优化03-Latch和等待
查看>>
Oracle优化12-10053事件
查看>>
Oracle优化10-SQL_TRACE
查看>>
Oracle-数据字典解读
查看>>
Oracle优化02-锁和阻塞
查看>>
Oracle优化06-Hint
查看>>
Oracle-动态性能视图解读
查看>>
ORACLE常用性能监控SQL【一】
查看>>
ORACLE常用性能监控SQL【二】
查看>>
Oracle-临时表空间(组)解读
查看>>
Oracle-PL/SQL基础
查看>>
Oracle-使用awrrpt.sql生成AWR报告
查看>>
Oracle实例迁移_真实场景实操
查看>>
Oracle启动和停止的方式详解
查看>>
Oracle-AWR性能报告解读
查看>>
Oracle自动备份脚本(Linux)
查看>>