jenkins源码分析 —— 接受主节点的远程请求(三)


六月 16 2016

本系列(参考远程执行shell源码分析)主节点是通过ssh方式连接(launch slave agents on unix machines via ssh)slave节点
而在启动slave节点时会启动一个ReaderThread线程来接受主节点请求

SlaveReceivingRemoteRequest.png 

启动ReaderThread

启动slave节点入口为:通过ssh-slaves项目的SSHLauncher类来启动slave

public synchronized void launch(final SlaveComputer computer, final TaskListener listener) throws InterruptedException {
  ...
    openConnection(listener);
    verifyNoHeaderJunk(listener);
    reportEnvironment(listener);
    String java = resolveJava(computer, listener);
    String workingDirectory = getWorkingDirectory(computer);
    copySlaveJar(listener, workingDirectory);

    startSlave(computer, listener, java, workingDirectory);
  ...
}

通过startSlave方法启动slave进程

private void startSlave(SlaveComputer computer, final TaskListener listener, String java, String workingDirectory) throws IOException {
...
 computer.setChannel(session.getStdout(), session.getStdin(), listener.getLogger(), null);
...
}

通过computer.setChannel调用jenkins-core项目SlaveComputer类的setChannel方法

public void setChannel(InputStream in, OutputStream out, OutputStream launchLog, Channel.Listener listener) throws IOException, InterruptedException {
    ChannelBuilder cb = new ChannelBuilder(nodeName,threadPoolForRemoting)
       .withMode(Channel.Mode.NEGOTIATE)
       .withHeaderStream(launchLog);

   for (ChannelConfigurator cc : ChannelConfigurator.all()) {
        cc.onChannelBuilding(cb,this);
   }

    Channel channel = cb.build(in,out);
    setChannel(channel,launchLog,listener);
}

再通过Channel channel = cb.build(in,out)进入到remting项目ChannelBuilder类的build方法

public Channel build(InputStream is, OutputStream os) throws IOException {
       return new Channel(this,negotiate(is,os));
   }

通过negotiate(is,os)方法得到ChunkedCommandTransport对象

跳转到remoting项目Channel类的构造方法

protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {
   ...
    transport.setup(this, new CommandReceiver() {
       public void handle(Command cmd) {
            updateLastHeard();
           if (logger.isLoggable(Level.FINE))
                logger.fine("Received " + cmd);
           try {
                cmd.execute(Channel.this);
           } catch (Throwable t) {
                logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
                logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);
           }
       }

       public void terminate(IOException e) {
            Channel.this.terminate(e);
       }
   });
}

Channel(ChannelBuilder settings, CommandTransport transport)构造方法接受ChunkedCommandTransport对象,由于
ChunkedCommandTransport 继承——>> AbstractSynchronousByteArrayCommandTransport 继承——>> SynchronousCommandTransport 继承——>> CommandTransport

transport.setup调用SynchronousCommandTransport类的setup方法来启动一个ReaderThread线程

public void setup(Channel channel, CommandReceiver receiver) {
   this.channel = channel;
   new ReaderThread(receiver).start();
}

读取对象

通过上面的ReaderThread.start()方法启动一个线程,ReaderThread为SynchronousCommandTransport类的内部类,在run()方法中,通过cmd = read()读取对象

private final class ReaderThread extends Thread {
   private int commandsReceived = 0;
   private int commandsExecuted = 0;
   private final CommandReceiver receiver;

   public ReaderThread(CommandReceiver receiver) {
       super("Channel reader thread: "+channel.getName());
           this.receiver = receiver;
       }

       @Override
       public void run() {
       final String name =channel.getName();
       try {
           while(!channel.isInClosed()) {
                Command cmd = null;
               try {
                    cmd = read();
               } catch (EOFException e) {
                    IOException ioe = new IOException("Unexpected termination of the channel");
                    ioe.initCause(e);
                   throw ioe;
               } catch (ClassNotFoundException e) {
                    LOGGER.log(Level.SEVERE, "Unable to read a command (channel " + name + ")",e);
                   continue;
               } finally {
                    commandsReceived++;
               }

                receiver.handle(cmd);
                commandsExecuted++;
           }
            closeRead();
       }
       ...
   }
}

创建command对象,即UserRequest

通过上面的read()方法,调用remoting项目的AbstractSynchronousByteArrayCommandTransport的read方法创建一个command对象,即UserRequest(发送远程请求(二)最终发送的UserRequest)

public Command read() throws IOException, ClassNotFoundException {
   return Command.readFrom(channel,new ObjectInputStreamEx(
           new ByteArrayInputStream(readBlock(channel)),
            channel.baseClassLoader));
}

执行UserRequest

通过ReaderThread的run()方法里的receiver.handle(cmd)回调上面Channel类的构造方法里面的handle方法,而传入handle方法的cmd参数即通过上面read()得到的UserRequest

protected Channel(ChannelBuilder settings, CommandTransport transport) throws IOException {
...
public void handle(Command cmd) {
    updateLastHeard();
   if (logger.isLoggable(Level.FINE))
        logger.fine("Received " + cmd);
   try {
        cmd.execute(Channel.this);
   } catch (Throwable t) {
        logger.log(Level.SEVERE, "Failed to execute command " + cmd + " (channel " + Channel.this.name + ")", t);
        logger.log(Level.SEVERE, "This command is created here", cmd.createdAt);
   }
}
...
}

通过cmd.execute(Channel.this)来执行UserRequest

先是通过UserRequest的父类Request(继承Command)来执行execute()方法

protected final void execute(final Channel channel) {
    channel.executingCalls.put(id,this);
    future = channel.executor.submit(new Runnable() {

       private int startIoId;

       private int calcLastIoId() {
           int endIoId = channel.lastIoId();
           if (startIoId==endIoId) return 0;
           return endIoId;
       }

       public void run() {
            String oldThreadName = Thread.currentThread().getName();
            Thread.currentThread().setName(oldThreadName+" for "+channel.getName());
           try {
                Command rsp;
                CURRENT.set(Request.this);
                startIoId = channel.lastIoId();
               try {
                   // make sure any I/O preceding this has completed
                   channel.pipeWriter.get(lastIoId).get();

                    RSP r = Request.this.perform(channel);
                   // normal completion
                   rsp = new Response<RSP,EXC>(id,calcLastIoId(),r);
            ...
                   synchronized (channel) {// expand the synchronization block of the send() method to a check
                       if(!channel.isOutClosed())
                            channel.send(rsp);
                   }
            ...
       }
   });
}

创建Runnable并提交到Executor

通过channel.executor.submit(new Runnable(){...})创建一个Runnable并通过submit提交到executor

然后通过Request.this.perform(channel)调用remoting项目UserRequest类的perform()方法

protected UserResponse<RSP,EXC> perform(Channel channel) throws EXC {
   try {
        ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);

        RSP r = null;
        Channel oldc = Channel.setCurrent(channel);
       try {
            Object o;
           try {
                o = deserialize(channel,request,cl);
       ...
}

创建Response

通过new Response<RSP,EXC>(id,calcLastIoId(),r)创建Response对象

发送Response

最终通过channel.send(rsp)把Response对象发送给主节点

标签:
在2016/06/15 16:09上被李立泓创建
 
本站采用XWiki系统搭建