赞
踩
在Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析
以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图,
其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter
本篇文章重点分析下Interpreter模块,重点来看下测试类
zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java
- @Test
- public void testInterpreter2() throws Exception {
- final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
- RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
- server.init(new HashMap<>());
- server.intpEventClient = mock(RemoteInterpreterEventClient.class);
-
- Map<String, String> intpProperties = new HashMap<>();
- intpProperties.put("property_1", "value_1");
- intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");
-
- // create Test1Interpreter in session_1
- server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
- intpProperties, "user_1");
- Test1Interpreter interpreter1 = (Test1Interpreter)
- ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
- .getInnerInterpreter();
- assertEquals(1, server.getInterpreterGroup().getSessionNum());
- assertEquals(1, server.getInterpreterGroup().get("session_1").size());
- assertEquals(2, interpreter1.getProperties().size());
- assertEquals("value_1", interpreter1.getProperty("property_1"));
-
- // create Test2Interpreter in session_1
- server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
- intpProperties, "user_1");
- assertEquals(2, server.getInterpreterGroup().get("session_1").size());
-
-
- final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
- intpContext.setNoteId("note_1");
- intpContext.setParagraphId("paragraph_1");
- intpContext.setGui("{}");
- intpContext.setNoteGui("{}");
- intpContext.setLocalProperties(new HashMap<>());
-
- // single output of SUCCESS
- RemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(),
- "COMBO_OUTPUT_SUCCESS", intpContext);
- System.out.println(new Gson().toJson(result));
- //List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage();
- //System.out.println(new Gson().toJson(resultMessages));
- /*assertEquals("SUCCESS", result.code);
- assertEquals(2, result.getMsg().size());
- assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
- assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/
- }
这边简单修改了这个测试类的代码
createInterpreter 是采用反射的方式构建进行实例化Interpreter,核心代码如下:
- Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
- Properties p = new Properties();
- p.putAll(properties);
- setSystemProperty(p);
-
- Constructor<Interpreter> constructor =
- replClass.getConstructor(new Class[]{Properties.class});
- Interpreter interpreter = constructor.newInstance(p);
- interpreter.setClassloaderUrls(new URL[]{});
-
- interpreter.setInterpreterGroup(interpreterGroup);
- interpreter.setUserName(userName);
-
- interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);
interpreter 方法,就是实际执行具体的interpreter了,通过放入队列然后去执行job,最终实际执行代码InterpretJob 里面的jobRun()方法
- public InterpreterResult jobRun() throws Throwable {
- ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();
- try {
- InterpreterContext.set(context);
- // clear the result of last run in frontend before running this paragraph.
- context.out.clear();
-
- InterpreterResult result = null;
-
- // Open the interpreter instance prior to calling interpret().
- // This is necessary because the earliest we can register a hook
- // is from within the open() method.
- LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
- if (!lazy.isOpen()) {
- lazy.open();
- result = lazy.executePrecode(context);
- }
-
- if (result == null || result.code() == Code.SUCCESS) {
- // Add hooks to script from registry.
- // note scope first, followed by global scope.
- // Here's the code after hooking:
- // global_pre_hook
- // note_pre_hook
- // script
- // note_post_hook
- // global_post_hook
- processInterpreterHooks(context.getNoteId());
- processInterpreterHooks(null);
- LOGGER.debug("Script after hooks: {}", script);
- result = interpreter.interpret(script, context);
- }
-
- // data from context.out is prepended to InterpreterResult if both defined
- context.out.flush();
- List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-
- for (InterpreterResultMessage resultMessage : result.message()) {
- // only add non-empty InterpreterResultMessage
- if (!StringUtils.isBlank(resultMessage.getData())) {
- resultMessages.add(resultMessage);
- }
- }
-
- List<String> stringResult = new ArrayList<>();
- for (InterpreterResultMessage msg : resultMessages) {
- if (msg.getType() == InterpreterResult.Type.IMG) {
- LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");
- } else {
- LOGGER.debug("InterpreterResultMessage: {}", msg);
- }
- stringResult.add(msg.getData());
- }
- // put result into resource pool
- if (context.getLocalProperties().containsKey("saveAs")) {
- if (stringResult.size() == 1) {
- LOGGER.info("Saving result into ResourcePool as single string: " +
- context.getLocalProperties().get("saveAs"));
- context.getResourcePool().put(
- context.getLocalProperties().get("saveAs"), stringResult.get(0));
- } else {
- LOGGER.info("Saving result into ResourcePool as string list: " +
- context.getLocalProperties().get("saveAs"));
- context.getResourcePool().put(
- context.getLocalProperties().get("saveAs"), stringResult);
- }
- }
- return new InterpreterResult(result.code(), resultMessages);
- } catch (Throwable e) {
- return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
- } finally {
- Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
- InterpreterContext.remove();
- }
- }
至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程
RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务
- public static void main(String[] args) throws Exception {
- String zeppelinServerHost = null;
- int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
- String portRange = ":";
- String interpreterGroupId = null;
- if (args.length > 0) {
- zeppelinServerHost = args[0];
- port = Integer.parseInt(args[1]);
- interpreterGroupId = args[2];
- if (args.length > 3) {
- portRange = args[3];
- }
- }
- RemoteInterpreterServer remoteInterpreterServer =
- new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
- remoteInterpreterServer.start();
-
- /*
- * Registration of a ShutdownHook in case of an unpredictable system call
- * Examples: STRG+C, SIGTERM via kill
- */
- shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);
- Runtime.getRuntime().addShutdownHook(shutdownThread);
-
- remoteInterpreterServer.join();
- LOGGER.info("RemoteInterpreterServer thread is finished");
-
- /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.
- * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd
- * should be part of the next release and solve the problem.
- * We may have other threads that are not terminated successfully.
- */
- if (remoteInterpreterServer.isForceShutdown) {
- LOGGER.info("Force shutting down");
- System.exit(0);
- }
- }
- @Override
- public void run() {
- RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
- new RemoteInterpreterService.Processor<>(this);
- try (TServerSocket tSocket = new TServerSocket(port)){
- server = new TThreadPoolServer(
- new TThreadPoolServer.Args(tSocket)
- .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT)
- .stopTimeoutUnit(TimeUnit.MILLISECONDS)
- .processor(processor));
-
- if (null != intpEventServerHost && !isTest) {
- Thread registerThread = new Thread(new RegisterRunnable());
- registerThread.setName("RegisterThread");
- registerThread.start();
- }
- LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
- server.serve();
- } catch (TTransportException e) {
- LOGGER.error("Failure in TTransport", e);
- }
- LOGGER.info("RemoteInterpreterServer-Thread finished");
- }
参考:
mock 使用:https://zhuanlan.zhihu.com/p/51673406
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。