Source code for polyglot.nodeserver_manager

''' The element management module for Polyglot '''

from collections import OrderedDict
import copy
import json
import logging
import os
from polyglot import SOURCE_DIR
from polyglot.utils import AsyncFileReader, Queue, Empty, MyProcessLookupError
from polyglot.version import PGVERSION
import polyglot.nodeserver_helpers as helpers
import random
import string
import subprocess
import sys
import threading
import time

_LOGGER = logging.getLogger(__name__)
ELEMENT = 'core'
SERVER_TYPES = {'python': [sys.executable]}
NS_QUIT_WAIT_TIME = 5

# Increment this version number each time a breaking change is made or a
# major new message (feature) is added to the API between the node server
# manager (implemented by this source file) and its clients.
# This allows the client an opportunity to adjust its behavior to suit the
# installed version of Polyglot -- keep in mind that the client node server
# is independent of Polyglot, and may not even be implemented in Python --
# and thus has no other way to know about the Polyglot server itself.
PGAPIVER = '1'

[docs]class NodeServerManager(object): """ Node Server Manager :param pglot: The parent Polyglot object :ivar pglot: The parent Polyglot object :ivar servers: Dictionary of active Node Servers """ servers = OrderedDict() def __init__(self, pglot): self.pglot = pglot def __getitem__(self, key): """ Get server by base name. """ return self.servers[key] @property def config(self): """ Node Server configuration block. """ output = [] for nsbase_url, nodeserver in self.servers.items(): output.append({'platform': nodeserver.platform, 'url_base': nsbase_url, 'name': nodeserver.name, 'profile_number': nodeserver.profile_number, 'config': nodeserver.config}) return output
[docs] def start_server(self, ns_platform, profile_number, nsname=None, base=None, config=None): """ starts a node server """ # pylint: disable=broad-except _LOGGER.info('Starting Node Server: %s:%s', ns_platform, nsname) # find node server path = helpers.get_path(ns_platform) # read node server attributes try: def_file = os.path.join(path, 'server.json') definition = json.loads(open(def_file).read()) except (IOError, ValueError, KeyError): raise ValueError("Error reading server.json for {}".format(path)) # parse server attributes try: nstype = definition['type'] nsexe = os.path.join(path, definition['executable']) except KeyError: raise ValueError( "server.json for {} is missing type or executable." .format(ns_platform)) # get server base name while base in self.servers or base is None: base = random_string(5) # create sandbox sandbox = self.pglot.config.nodeserver_sandbox(ns_platform) # create server try: server = NodeServer(self.pglot, ns_platform, profile_number, nstype, nsexe, nsname or ns_platform, config or {}, sandbox) except Exception: _LOGGER.exception('Node Server %s could not start', ns_platform) raise ValueError( "Error starting Node Server: {}.", ns_platform) # store server self.servers[base] = server return True
[docs] def load(self): """ Initial load of the active Node Servers """ _LOGGER.info('Loading Node Servers') nsconfigs = self.pglot.config.get("nodeservers", []) for count, nsconfig in enumerate(nsconfigs, 1): ns_platform = nsconfig.get("platform", None) profile_number = nsconfig.get("profile_number", None) url_base = nsconfig.get("url_base", None) name = nsconfig.get("name", ns_platform) config = nsconfig.get("config", {}) if None in [ns_platform, profile_number]: _LOGGER.error( 'Bad Node Server configuration in config file. ' + 'Node Server %d', count) else: try: self.start_server( ns_platform, profile_number, name, url_base, config) except ValueError as err: _LOGGER.error(err.args[0])
[docs] def delete(self, base_url): """ Remove a server from Polyglot. """ node_server = self.servers[base_url] node_server.send_exit() for _ in range(10): if not node_server.alive: break time.sleep(0.5) else: node_server.kill() del self.servers[base_url]
[docs] def unload(self): """ Unload all node servers """ # request node server shutdowns for node_server in self.servers.values(): node_server.send_exit() # wait for node servers to quit gracefully ns_running = any([node_server.alive for node_server in self.servers.values()]) timer = 0 while ns_running and timer < NS_QUIT_WAIT_TIME: time.sleep(1) timer += 1 ns_running = any([node_server.alive for node_server in self.servers.values()]) # kill any remaining node servers for node_server in self.servers.values(): if node_server.alive: node_server.kill() _LOGGER.warning( 'Timed out waiting for Node Server %s to quit. ' + 'Terminated Node Server.', node_server.name) _LOGGER.info('Unloaded Node Servers')
[docs]class NodeServer(object): """ Node Server Class """ # pylint: disable=too-many-instance-attributes def __init__(self, pglot, ns_platform, profile_number, nstype, nsexe, nsname, config, sandbox): # build run command if nstype in SERVER_TYPES: cmd = copy.deepcopy(SERVER_TYPES[nstype]) else: _LOGGER.error("Unrecognized server type %s for %s", nstype, ns_platform) raise TypeError('bad server type') cmd.append(nsexe) self.pglot = pglot self.isy_version = self.pglot.isy_version self.config = config self._cmd = cmd self.platform = ns_platform self.profile_number = profile_number self.type = nstype self.exe = nsexe self.path = os.path.dirname(nsexe) self.name = nsname self.sandbox = sandbox self.pgver = PGVERSION self.pgapiver = PGAPIVER self.params = {'isyver': self.isy_version, 'sandbox': self.sandbox, 'name': self.name, 'pgver': self.pgver, 'pgapiver': self.pgapiver, 'profile': self.profile_number} self._proc = None self._inq = None self._rqq = None self._lastping = None self._lastpong = None # define handlers isy = self.pglot.elements.isy self._handlers = {'status': isy.report_node_status, 'command': isy.report_command, 'add': isy.node_add, 'change': isy.node_change, 'remove': isy.node_remove, 'restcall': isy.restcall, 'request': isy.report_request_status} self.start()
[docs] def start(self): """ start the node server """ # start process proc = subprocess.Popen( self._cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=1, env={'PYTHONPATH': SOURCE_DIR}, cwd=self.sandbox) self._proc = proc self._inq = Queue() self._rqq = Queue(maxsize=4096) self._lastping = None self._lastpong = None # start Threads self._threads = {} self._threads['stdout'] = AsyncFileReader(self._proc.stdout, self._recv_out) self._threads['stderr'] = AsyncFileReader(self._proc.stderr, self._recv_err) self._threads['requests'] = threading.Thread(target=self._request_handler) self._threads['requests'].daemon = True self._threads['stdin'] = threading.Thread(target=self._send_in) self._threads['stdin'].daemon = True for _, thread in self._threads.items(): thread.start() # wait, then send config time.sleep(1) self.send_params() self.send_config() _LOGGER.info('Started Node Server: %s:%s (%s)', self.platform, self.name, self._proc.pid)
[docs] def restart(self): """ restart the nodeserver """ self.send_exit() for _ in range(10): if not self.alive: break time.sleep(0.5) else: self.kill() self.start()
@property def definition(self): """ Return the server defintion from server.json """ def_file = os.path.join(self.path, 'server.json') instruct_file = os.path.join(self.path, 'instructions.txt') definition = json.loads(open(def_file).read()) definition['running'] = self.alive and self.responding definition['instructions'] = open(instruct_file).read() definition['profile_number'] = self.profile_number return definition @property def profile(self): """ Return the profile.zip data. """ return open(os.path.join(self.path, 'profile.zip'), 'rb').read() @property def alive(self): """ Indicates if the Node Server is running. """ try: os.kill(self._proc.pid, 0) except MyProcessLookupError: return False return self._inq is not None @property def responding(self): """ Indicates if the Node Server is responding. """ if self._lastping is None: # node server has not been pinged self.send_ping() self._lastping = time.time() return True elif time.time() - self._lastping >= 30: # last ping has expired (more than 30 seconds old) if self._lastpong and self._lastpong > self._lastping: # pong was received self.send_ping() self._lastping = time.time() return True else: # pong was not received _LOGGER.warning('Node Server %s: time since last pong: %5.2f', self.name, (time.time() - self._lastpong)) return False else: # ping hasn't expired, we have to assume responding return True # manage IO def _send_in(self): """ Write pending input to node server. Kill process if unresponsive. """ while True and self._inq: try: # try to get a line from the queue line = self._inq.get(True, 5) except Empty: # no line in queue, check if the Node Server is responding if not self.responding: _LOGGER.error( 'Node Server %s has stopped responding.', self.name) self._inq = None self._rqq = None self._proc.kill() else: try: # found line, try to write it self._proc.stdin.write('{}\n'.format(line)) self._proc.stdin.flush() except IOError: # stdin pipe is broken. process is likely dead. _LOGGER.error( 'Node Server %s has exited unexpectedly.', self.name) self._inq = None self._rqq = None self._proc.kill() else: # line wrote successfully _LOGGER.debug('%s STDIN: %s', self.name, line) if self._inq: self._inq.task_done() def _request_handler(self): """ Read and process network requests for a node server """ while True and self._rqq: msg = self._rqq.get(True) # parse message command = list(msg.keys())[0] arguments = msg[command] seq = arguments.get('seq', None) ts = time.time() _LOGGER.debug('%8s [%d] (%5.2f) _request_handler: command=%s seq=%s', self.name, (0 if self._rqq is None else self._rqq.qsize()), 0.0, command, ('' if seq is None else seq)) fun = self._handlers.get(command) if fun: result = fun(self.profile_number, **arguments) if seq and result: self._mk_cmd('result', **result) # Signal that this is handled if self._rqq: self._rqq.task_done() _LOGGER.debug('%8s [%d] (%5.2f) _request_handler: completed.', self.name, (0 if self._rqq is None else self._rqq.qsize()), (time.time() - ts)) def _recv_out(self, line): """ Process node server output. """ l = (line[:57] + '...') if len(line) > 60 else line _LOGGER.debug('%8s [%d] (%5.2f) STDOUT: %s', self.name, (0 if self._rqq is None else self._rqq.qsize()), 0.0, l) ts = time.time() # parse message message = json.loads(line) command = list(message.keys())[0] arguments = message[command] # direct command if command == 'pong': # store pong time self._lastpong = time.time() elif command == 'config': # store new configuration in config file self.config = arguments self.pglot.update_config() elif command == 'install': # install node server on isy # [future] implement when documentation is available raise NotImplementedError('Install command is not yet supported.') elif command == 'exit': # node server is done. Kill it. Clean up is automatic. self._proc.kill() self._inq = None self._rqq = None else: fun = self._handlers.get(command) if fun and self._rqq: self._rqq.put(message, True, 30) else: _LOGGER.error('Node Server %s delivered bad command %s', self.name, command) _LOGGER.debug('%8s [%d] (%5.2f) Done: %s', self.name, (0 if self._rqq is None else self._rqq.qsize()), (time.time() - ts), l) def _recv_err(self, line): """ Process error stream from node server. """ if line.startswith('**INFO: '): _LOGGER.info('%s: %s', self.name, line) elif line.startswith('**DEBUG: '): _LOGGER.debug('%s: %s', self.name, line) elif line.startswith('**WARNING: '): _LOGGER.warning('%s: %s', self.name, line) else: _LOGGER.error('%s: %s', self.name, line) # handle output def _mk_cmd(self, cmd_code, **kwargs): """ Enqueue a command for transmission to server. """ msg = json.dumps({cmd_code: kwargs}) if self._inq: self._inq.put(msg, True, 5)
[docs] def send_config(self): """ Send configuration to Node Server. """ self._mk_cmd('config', **self.config)
[docs] def send_params(self): """ Send parameters to Node Server. """ self._mk_cmd('params', **self.params)
[docs] def send_install(self, profile_number=None): """ Send install command to Node Server. """ if not profile_number: profile_number = self.profile_number else: self.profile_number = profile_number self.pglot.update_config() self._mk_cmd('install', profile_number=self.profile_number)
[docs] def send_query(self, node_address, request_id=None): """ Send query command to Node Server. """ self._mk_cmd('query', node_address=node_address, request_id=request_id)
[docs] def send_status(self, node_address, request_id=None): """ Send status request to Node Server. """ self._mk_cmd('status', node_address=node_address, request_id=request_id)
[docs] def send_addall(self, request_id=None): """ Send add all request to Node Server. """ self._mk_cmd('add_all', request_id=request_id)
[docs] def send_added(self, node_address, node_def_id, primary_node_address, name): """ Send node added confirmation to Node Server. """ self._mk_cmd('added', node_address=node_address, node_def_id=node_def_id, primary_node_address=primary_node_address, name=name)
[docs] def send_removed(self, node_address): """ Send node removed confirmation to Node Server. """ self._mk_cmd('removed', node_address=node_address)
[docs] def send_renamed(self, node_address, name): """ Send node renamed confirmation to Node Server. """ self._mk_cmd('renamed', node_address=node_address, name=name)
[docs] def send_enabled(self, node_address): """ Send node enabled confirmation to Node Server. """ self._mk_cmd('enabled', node_address=node_address)
[docs] def send_disabled(self, node_address): """ Send node disabled confirmation to Node Server. """ self._mk_cmd('disabled', node_address=node_address)
[docs] def send_cmd(self, node_address, command, value=None, uom=None, request_id=None, **kwargs): """ Send run command signal to Node Server. """ self._mk_cmd('cmd', node_address=node_address, command=command, value=value, uom=uom, request_id=request_id, **kwargs)
[docs] def send_ping(self): """ Send Ping request to the Node Server. """ self._mk_cmd('ping')
[docs] def send_exit(self): """ Send exit command to the Node Server. """ self._mk_cmd('exit')
[docs] def kill(self): """ Kill the node server process. """ try: self._proc.kill() except MyProcessLookupError: pass
[docs]def random_string(length): """ Generate a random string of uppercase, lowercase, and digits """ library = string.ascii_uppercase + string.ascii_lowercase + string.digits return ''.join(random.choice(library) for _ in range(length))