当前位置:   article > 正文

Apache Zeppelin系列教程第五篇——Interpreter原理分析_zeppelin jdbc原理

zeppelin jdbc原理

Apache Zeppelin系列教程第四篇——JDBCInterpreter原理分析

以JDBCInterpreter为例讲解了实际jdbc的执行过程。下面是一个整体的架构图

其实就是web 向server 发送请求,然后调用zengine,再到interpreter,最后到实际的执行模块,比如上文中介绍的JDBCInterpreter

本篇文章重点分析下Interpreter模块,重点来看下测试类

zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java

  1. @Test
  2. public void testInterpreter2() throws Exception {
  3. final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost",
  4. RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", "groupId", true);
  5. server.init(new HashMap<>());
  6. server.intpEventClient = mock(RemoteInterpreterEventClient.class);
  7. Map<String, String> intpProperties = new HashMap<>();
  8. intpProperties.put("property_1", "value_1");
  9. intpProperties.put("zeppelin.interpreter.localRepo", "/tmp");
  10. // create Test1Interpreter in session_1
  11. server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(),
  12. intpProperties, "user_1");
  13. Test1Interpreter interpreter1 = (Test1Interpreter)
  14. ((LazyOpenInterpreter) server.getInterpreterGroup().get("session_1").get(0))
  15. .getInnerInterpreter();
  16. assertEquals(1, server.getInterpreterGroup().getSessionNum());
  17. assertEquals(1, server.getInterpreterGroup().get("session_1").size());
  18. assertEquals(2, interpreter1.getProperties().size());
  19. assertEquals("value_1", interpreter1.getProperty("property_1"));
  20. // create Test2Interpreter in session_1
  21. server.createInterpreter("group_1", "session_1", Test2Interpreter.class.getName(),
  22. intpProperties, "user_1");
  23. assertEquals(2, server.getInterpreterGroup().get("session_1").size());
  24. final RemoteInterpreterContext intpContext = new RemoteInterpreterContext();
  25. intpContext.setNoteId("note_1");
  26. intpContext.setParagraphId("paragraph_1");
  27. intpContext.setGui("{}");
  28. intpContext.setNoteGui("{}");
  29. intpContext.setLocalProperties(new HashMap<>());
  30. // single output of SUCCESS
  31. RemoteInterpreterResult result = server.interpret("session_1", Test2Interpreter.class.getName(),
  32. "COMBO_OUTPUT_SUCCESS", intpContext);
  33. System.out.println(new Gson().toJson(result));
  34. //List<InterpreterResultMessage> resultMessages = intpContext.out.toInterpreterResultMessage();
  35. //System.out.println(new Gson().toJson(resultMessages));
  36. /*assertEquals("SUCCESS", result.code);
  37. assertEquals(2, result.getMsg().size());
  38. assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData());
  39. assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData());*/
  40. }

这边简单修改了这个测试类的代码

createInterpreter 是采用反射的方式构建进行实例化Interpreter,核心代码如下:

  1. Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className);
  2. Properties p = new Properties();
  3. p.putAll(properties);
  4. setSystemProperty(p);
  5. Constructor<Interpreter> constructor =
  6. replClass.getConstructor(new Class[]{Properties.class});
  7. Interpreter interpreter = constructor.newInstance(p);
  8. interpreter.setClassloaderUrls(new URL[]{});
  9. interpreter.setInterpreterGroup(interpreterGroup);
  10. interpreter.setUserName(userName);
  11. interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId);

interpreter 方法,就是实际执行具体的interpreter了,通过放入队列然后去执行job,最终实际执行代码InterpretJob 里面的jobRun()方法

  1. public InterpreterResult jobRun() throws Throwable {
  2. ClassLoader currentThreadContextClassloader = Thread.currentThread().getContextClassLoader();
  3. try {
  4. InterpreterContext.set(context);
  5. // clear the result of last run in frontend before running this paragraph.
  6. context.out.clear();
  7. InterpreterResult result = null;
  8. // Open the interpreter instance prior to calling interpret().
  9. // This is necessary because the earliest we can register a hook
  10. // is from within the open() method.
  11. LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
  12. if (!lazy.isOpen()) {
  13. lazy.open();
  14. result = lazy.executePrecode(context);
  15. }
  16. if (result == null || result.code() == Code.SUCCESS) {
  17. // Add hooks to script from registry.
  18. // note scope first, followed by global scope.
  19. // Here's the code after hooking:
  20. // global_pre_hook
  21. // note_pre_hook
  22. // script
  23. // note_post_hook
  24. // global_post_hook
  25. processInterpreterHooks(context.getNoteId());
  26. processInterpreterHooks(null);
  27. LOGGER.debug("Script after hooks: {}", script);
  28. result = interpreter.interpret(script, context);
  29. }
  30. // data from context.out is prepended to InterpreterResult if both defined
  31. context.out.flush();
  32. List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
  33. for (InterpreterResultMessage resultMessage : result.message()) {
  34. // only add non-empty InterpreterResultMessage
  35. if (!StringUtils.isBlank(resultMessage.getData())) {
  36. resultMessages.add(resultMessage);
  37. }
  38. }
  39. List<String> stringResult = new ArrayList<>();
  40. for (InterpreterResultMessage msg : resultMessages) {
  41. if (msg.getType() == InterpreterResult.Type.IMG) {
  42. LOGGER.debug("InterpreterResultMessage: IMAGE_DATA");
  43. } else {
  44. LOGGER.debug("InterpreterResultMessage: {}", msg);
  45. }
  46. stringResult.add(msg.getData());
  47. }
  48. // put result into resource pool
  49. if (context.getLocalProperties().containsKey("saveAs")) {
  50. if (stringResult.size() == 1) {
  51. LOGGER.info("Saving result into ResourcePool as single string: " +
  52. context.getLocalProperties().get("saveAs"));
  53. context.getResourcePool().put(
  54. context.getLocalProperties().get("saveAs"), stringResult.get(0));
  55. } else {
  56. LOGGER.info("Saving result into ResourcePool as string list: " +
  57. context.getLocalProperties().get("saveAs"));
  58. context.getResourcePool().put(
  59. context.getLocalProperties().get("saveAs"), stringResult);
  60. }
  61. }
  62. return new InterpreterResult(result.code(), resultMessages);
  63. } catch (Throwable e) {
  64. return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e));
  65. } finally {
  66. Thread.currentThread().setContextClassLoader(currentThreadContextClassloader);
  67. InterpreterContext.remove();
  68. }
  69. }

至此这个代码已经和上一篇文章的jdbc Interpreter代码呼应了,也就是Interpreter 执行具体的jdbc Interpreter 的过程

RemoteInterpreterServer里面的main 方法启动线程实际在run方法 里面启动thrift server 端服务

  1. public static void main(String[] args) throws Exception {
  2. String zeppelinServerHost = null;
  3. int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
  4. String portRange = ":";
  5. String interpreterGroupId = null;
  6. if (args.length > 0) {
  7. zeppelinServerHost = args[0];
  8. port = Integer.parseInt(args[1]);
  9. interpreterGroupId = args[2];
  10. if (args.length > 3) {
  11. portRange = args[3];
  12. }
  13. }
  14. RemoteInterpreterServer remoteInterpreterServer =
  15. new RemoteInterpreterServer(zeppelinServerHost, port, interpreterGroupId, portRange);
  16. remoteInterpreterServer.start();
  17. /*
  18. * Registration of a ShutdownHook in case of an unpredictable system call
  19. * Examples: STRG+C, SIGTERM via kill
  20. */
  21. shutdownThread = remoteInterpreterServer.new ShutdownThread(ShutdownThread.CAUSE_SHUTDOWN_HOOK);
  22. Runtime.getRuntime().addShutdownHook(shutdownThread);
  23. remoteInterpreterServer.join();
  24. LOGGER.info("RemoteInterpreterServer thread is finished");
  25. /* TODO(pdallig): Remove System.exit(0) if the thrift server can be shut down successfully.
  26. * https://github.com/apache/thrift/commit/9cb1c794cd39cfb276771f8e52f0306eb8d462fd
  27. * should be part of the next release and solve the problem.
  28. * We may have other threads that are not terminated successfully.
  29. */
  30. if (remoteInterpreterServer.isForceShutdown) {
  31. LOGGER.info("Force shutting down");
  32. System.exit(0);
  33. }
  34. }
  1. @Override
  2. public void run() {
  3. RemoteInterpreterService.Processor<RemoteInterpreterServer> processor =
  4. new RemoteInterpreterService.Processor<>(this);
  5. try (TServerSocket tSocket = new TServerSocket(port)){
  6. server = new TThreadPoolServer(
  7. new TThreadPoolServer.Args(tSocket)
  8. .stopTimeoutVal(DEFAULT_SHUTDOWN_TIMEOUT)
  9. .stopTimeoutUnit(TimeUnit.MILLISECONDS)
  10. .processor(processor));
  11. if (null != intpEventServerHost && !isTest) {
  12. Thread registerThread = new Thread(new RegisterRunnable());
  13. registerThread.setName("RegisterThread");
  14. registerThread.start();
  15. }
  16. LOGGER.info("Launching ThriftServer at {}:{}", this.host, this.port);
  17. server.serve();
  18. } catch (TTransportException e) {
  19. LOGGER.error("Failure in TTransport", e);
  20. }
  21. LOGGER.info("RemoteInterpreterServer-Thread finished");
  22. }

参考:

mock 使用:https://zhuanlan.zhihu.com/p/51673406

Apache Thrift系列详解(一) - 概述与入门 - 掘金

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/491884
推荐阅读
相关标签
  

闽ICP备14008679号