jenkins源码分析 —— 发送远程请求(二)


六月 15 2016

本文解析jenkins主节点向从节点发送远程请求过程的源码

SendingtheRemoteRequest.png

job执行shell命令入口

入口位于jenkins-core项目下的CommandInterpreter类

public boolean perform(AbstractBuild<?,?> build, Launcher launcher, TaskListener listener) throws InterruptedException {
...
 r = join(launcher.launch().cmds(buildCommandLine(script)).envs(envVars).stdout(listener).pwd(ws).start());
...
}

通过上面的start方法启动一个新的进程

跳转到jenkins-core项目下的launcher类

public Proc start() throws IOException {
    return launch(this);
}

然后调用launcher类的内部类RemoteLauncher的launch方法

public Proc launch(ProcStarter ps) throws IOException {
   ...
   return new ProcImpl(getChannel().call(new RemoteLaunchCallable(ps.commands, ps.masks, ps.envs, in, ps.reverseStdin, out, ps.reverseStdout, err, ps.reverseStderr, ps.quiet, workDir, listener)));
   ...
}

通过Channel.call()发送远程请求

通过getChannel().call()调用remoting项目(即从节点的slave.jar)的Channel类的call方法

public <V,T extends Throwable> V call(Callable<V,T> callable) throws IOException, T, InterruptedException {
           ...
            request = new UserRequest<V, T>(this, callable);
            UserResponse<V,T> r = request.call(this);
           return r.retrieve(this, UserRequest.getClassLoader(callable));
           ...
}

然后通过new UserRequest<V, T>(this, callable)方法来序列化callable(传入的是实现callable接口的RemoteLaunchCallable)并创建字节数组

通过new UserReques初始化UserRequest

位于remoting项目UserRequest类的构造方法

   public UserRequest(Channel local, Callable<?,EXC> c) throws IOException {
        exports = local.startExportRecording();
       try {
            request = serialize(c,local);
       } finally {
            exports.stopRecording();
       }

       this.toString = c.toString();
        ClassLoader cl = getClassLoader(c);
        classLoaderProxy = RemoteClassLoader.export(cl,local);
   }

1.通过serialize方法序列化RemoteLaunchCallable并创建字节数组

   private byte[] _serialize(Object o, final Channel channel) throws IOException {
        Channel old = Channel.setCurrent(channel);
       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos;
           if (channel.remoteCapability.supportsMultiClassLoaderRPC())
                oos = new MultiClassLoaderSerializer.Output(channel,baos);
           else
                oos = new ObjectOutputStream(baos);

            oos.writeObject(o);
           return baos.toByteArray();
       } finally {
            Channel.setCurrent(old);
       }
   }

2. 创建一个ClassLoader代理

ClassLoader cl = getClassLoader(c);
classLoaderProxy = RemoteClassLoader.export(cl,local);

3. 回到getChannel().call()方法,发送UserRequest到slave节点

UserResponse<V,T> r = request.call(this);
return r.retrieve(this, UserRequest.getClassLoader(callable));

通过request.call(this)调用remoting项目的Request类的call方法来发送UserRequest

    //Sends this request to a remote system, and blocks until we receives a response.
    public final RSP call(Channel channel) throws EXC, InterruptedException, IOException {
       ...          
       // Channel.send() locks channel, and there are other call sequences
       // (  like Channel.terminate()->Request.abort()->Request.onCompleted()  )
       // that locks channel -> request, so lock objects in the same order
       synchronized(channel) {
           synchronized(this) {
                response=null;

                channel.pendingCalls.put(id,this);
                channel.send(this);
           }
       }
       ...
   }

通过channel.send(this)方法,channel把UserRequest对象写入输出流

然后等待response响应

在2016/06/14 16:06上被李立泓创建
 
本站采用XWiki系统搭建