jenkins源码分析 —— 执行主节点的远程请求(四)


六月 17 2016

本文解析jenkins从节点执行主节点的远程请求过程的源码

从上一篇文章“接受主节点的远程请求(三)”得知,执行UserRequst是通过创建Runnable并提交到Executor,而Executor负责执行构建

SlaveExecutingRemoteRequest.png

执行请求

具体执行请求的代码是UserRequest类的perform()方法

protected UserResponse<RSP,EXC> perform(Channel channel) throws EXC {
   try {
        ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy);

        RSP r = null;
        Channel oldc = Channel.setCurrent(channel);
       try {
            Object o;
           try {
                o = deserialize(channel,request,cl);
           } catch (ClassNotFoundException e) {
               throw new ClassNotFoundException("Failed to deserialize the Callable object. Perhaps you needed to implement DelegatingCallable?",e);
           } catch (RuntimeException e) {
               throw new Error("Failed to deserialize the Callable object.",e);
           }

            Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o;
           if(!channel.isArbitraryCallableAllowed() && !(callable instanceof RPCRequest))
               throw new SecurityException("Execution of "+callable.toString()+" is prohibited because the channel is restricted");

            callable = channel.decorators.wrapUserRequest(callable);

            ClassLoader old = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(cl);
           // execute the service
           try {
                r = callable.call();
           } finally {
                Thread.currentThread().setContextClassLoader(old);
           }
       } finally {
            Channel.setCurrent(oldc);
       }

       return new UserResponse<RSP,EXC>(serialize(r,channel),false);
       ...
}

创建ClassLoader

通过UserRequest类的perform()方法里面的ClassLoader cl = channel.importedClassLoaders.get(classLoaderProxy)创建ClassLoader 

反序列化Callable

通过UserRequest类的perform()方法里面Object o = deserialize(channel,request,cl)以及Callable<RSP,EXC> callable = (Callable<RSP,EXC>)o得到Callable对象,即“发送远程请求(二)”发送的RemoteLaunchCallable

deserialize()方法代码:

/*package*/ static Object deserialize(final Channel channel, byte[] data, ClassLoader defaultClassLoader) throws IOException, ClassNotFoundException {
    ByteArrayInputStream in = new ByteArrayInputStream(data);

    ObjectInputStream ois;
   if (channel.remoteCapability.supportsMultiClassLoaderRPC()) {
       // this code is coupled with the ObjectOutputStream subtype above
       ois = new MultiClassLoaderSerializer.Input(channel, in);
   } else {
        ois = new ObjectInputStreamEx(in, defaultClassLoader);
   }
   return ois.readObject();
}

MultiClassLoaderSerializer.Input继承ObjectInputStream,通过重写resolveClass,resolveProxyClass方法以及readClassLoader方法来获得ObjectInputStream对象

这么做的原因:反序列化时,如果在本地找不到这个对象的类的sourceCode,则序列化就会失败。但是可以通过覆盖ObjectOutputStream.annotateClass和ObjectInputStream.resolveClass来实现从主节点将类的sourceCode传到从节点,并运用ClassLoader来载入这个类。

class MultiClassLoaderSerializer {
 ...
 static final class Input extends ObjectInputStream {
       private final Channel channel;
       private final List<ClassLoader> classLoaders = new ArrayList<ClassLoader>();

        Input(Channel channel, InputStream in) throws IOException {
           super(in);
           this.channel = channel;
       }

       private ClassLoader readClassLoader() throws IOException, ClassNotFoundException {
       ...
       }
       @Override
       protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
       ...
       }
       @Override
       protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
       ...
       }

 }
 ...
}

执行callable.call

通过UserRequest类的perform()方法里面callable.call()调用jenkins-core项目的Launcher类的内部类RemoteLaunchCallable类的call方法

   private static class RemoteLaunchCallable extends MasterToSlaveCallable<RemoteProcess,IOException> {
       ...
       public RemoteProcess call() throws IOException {
            Launcher.ProcStarter ps = new LocalLauncher(listener).launch();
            ps.cmds(cmd).masks(masks).envs(env).stdin(in).stdout(out).stderr(err).quiet(quiet);
           if(workDir!=null)   ps.pwd(workDir);
           if (reverseStdin)   ps.writeStdin();
           if (reverseStdout)  ps.readStdout();
           if (reverseStderr)  ps.readStderr();

           final Proc p = ps.start();
       ...
   }

通过上面代码的Launcher.ProcStarter ps = new LocalLauncher(listener).launch()以及final Proc p = ps.start(),最终调用Launcher类的LocalLauncher内部类的launch方法

   public static class LocalLauncher extends Launcher {
   ...
       @Override
       public Proc launch(ProcStarter ps) throws IOException {
           if (!ps.quiet) {
                maskedPrintCommandLine(ps.commands, ps.masks, ps.pwd);
           }

            EnvVars jobEnv = inherit(ps.envs);

           // replace variables in command line
           String[] jobCmd = new String[ps.commands.size()];
           for ( int idx = 0 ; idx < jobCmd.length; idx++ )
             jobCmd[idx] = jobEnv.expand(ps.commands.get(idx));

           return new LocalProc(jobCmd, Util.mapToEnv(jobEnv),
                    ps.reverseStdin ?LocalProc.SELFPUMP_INPUT:ps.stdin,
                    ps.reverseStdout?LocalProc.SELFPUMP_OUTPUT:ps.stdout,
                    ps.reverseStderr?LocalProc.SELFPUMP_OUTPUT:ps.stderr,
                    toFile(ps.pwd));
       }
      ...
   }

而LocalProc构造方法

       public LocalProc(String[] cmd,String[] env,InputStream in,OutputStream out,OutputStream err,File workDir) throws IOException {
           this( calcName(cmd),
                  stderr(environment(new ProcessBuilder(cmd),env).directory(workDir), err==null || err== SELFPUMP_OUTPUT),
                  in, out, err );
       }

里面的java.lang.ProcessBuilder才是jenkins job里面Execute Shell的最根本的代码。

标签:
在2016/06/17 11:28上被李立泓创建
 
本站采用XWiki系统搭建