1
2
3
4
5
6
7
8
9 """
10 Services Twisted de collection et de publication de données.
11 """
12
13 import locale, gettext, os, pwd, shutil, random
14 from glob import glob
15 from ConfigParser import ConfigParser
16
17
18 from zephir.monitor.agentmanager import ZEPHIRAGENTS_DATADIR
19 APP = 'zephir-agents'
20 DIR = os.path.join(ZEPHIRAGENTS_DATADIR, 'i18n')
21 gettext.install(APP, DIR, unicode=False)
22
23
24 from twisted.application import internet, service
25 from twisted.internet import utils, reactor
26 from twisted.web import resource, server, static, util, xmlrpc
27 from twisted.python import syslog
28
29 from zephir.monitor.agentmanager import config as cfg
30 from zephir.monitor.agentmanager.util import ensure_dirs, md5file, md5files, log
31 from zephir.monitor.agentmanager.web_resources import ZephirServerResource
32 from zephir.monitor.agentmanager.clientmanager import ClientManager
33
34 try:
35 import zephir.zephir_conf.zephir_conf as conf_zeph
36 from zephir.lib_zephir import zephir_proxy, convert, zephir_dir, update_sudoers
37 from zephir.lib_zephir import log as zeph_log
38 registered = 1
39 except:
40
41 registered = 0
42
44 """Main Twisted service for Zephir apps"""
45
46 - def __init__(self, config, root_resource=None, serve_static=False):
47 """config will be completed by default values"""
48 service.MultiService.__init__(self)
49 self.config = cfg.DEFAULT_CONFIG.copy()
50 self.config.update(config)
51 self.updater = self.publisher = None
52
53 if registered:
54 update_sudoers()
55
56 if root_resource is None:
57 self.root_resource = resource.Resource()
58 webserver = internet.TCPServer(self.config['webserver_port'],
59 server.Site(self.root_resource))
60 webserver.setServiceParent(service.IServiceCollection(self))
61 else:
62 self.root_resource = root_resource
63
64 if serve_static:
65 self.root_resource.putChild('static',
66 static.File(self.config['static_web_dir']))
67
68
69
70
72 assert self.updater is None
73 self.updater = UpdaterService(self.config, self, self.root_resource)
74 return self
75
77 assert self.publisher is None
78 self.publisher = PublisherService(self.config, self, self.root_resource)
79 return self
80
82 assert self.updater is None
83 assert self.publisher is None
84 self.updater = UpdaterService(self.config, self, self.root_resource)
85 self.publisher = PublisherService(self.config, self, self.root_resource,
86 show_clients_page = False,
87 live_agents={self.config['host_ref']: self.updater.agents})
88 return self
89
90
91
92
94 """Schedules measures, data serialisation and upload."""
95
96 - def __init__(self, config, parent, root_resource):
97 """config should be complete"""
98 service.MultiService.__init__(self)
99 xmlrpc.XMLRPC.__init__(self)
100 self.old_obs = None
101 self.config = config
102
103 self.update_static_data()
104
105 loc, enc = locale.getdefaultlocale()
106 log.msg(_('default locale: %s encoding: %s') % (loc, enc))
107 if enc == 'utf':
108 log.msg(_('Warning: locale encoding %s broken in RRD graphs, set e.g: LC_ALL=fr_FR') % enc)
109 self.agents = self.load_agents()
110
111 self.setServiceParent(service.IServiceCollection(parent))
112 root_resource.putChild('xmlrpc', self)
113
115 """initialize zephir services"""
116 service.MultiService.startService(self)
117 reactor.callLater(2,self.schedule_all)
118
119
120
121 self.old_obs = log.theLogPublisher.observers[0]
122 try:
123 from zephir.backend import config as conf_zeph
124 log_prefix = 'zephir_backend'
125 except:
126 log_prefix = 'zephiragents'
127 new_obs = syslog.SyslogObserver(log_prefix, options=syslog.DEFAULT_OPTIONS, facility=syslog.DEFAULT_FACILITY)
128 log.addObserver(new_obs.emit)
129 log.removeObserver(self.old_obs)
130 if registered != 0:
131
132
133 self.setup_uucp()
134
135 if os.path.isfile(os.path.join(zephir_dir,'reboot.lck')):
136 try:
137 zeph_log('REBOOT',0,'redémarrage du serveur terminé')
138 os.unlink(os.path.join(zephir_dir,'reboot.lck'))
139 except:
140 pass
141
143 """stops zephir services"""
144 if self.old_obs:
145 log.removeObserver(log.theLogPublisher.observers[0])
146 log.addObserver(self.old_obs)
147 service.MultiService.stopService(self)
148
150 """Charge tous les agents du répertoire de configurations."""
151 log.msg(_("Loading agents from %s...") % self.config['config_dir'])
152 loaded_agents = {}
153 list_agents = glob(os.path.join(self.config['config_dir'], "*.agent"))
154 for f in list_agents:
155 log.msg(_(" from %s:") % os.path.basename(f))
156 h = { 'AGENTS': None }
157 execfile(f, globals(), h)
158 assert h.has_key('AGENTS')
159 for a in h['AGENTS']:
160 assert not loaded_agents.has_key(a.name)
161
162 a.init_data(os.path.join(self.config['state_dir'],
163 self.config['host_ref'],
164 a.name))
165 a.manager = self
166 a.archive()
167 loaded_agents[a.name] = a
168 log.msg(_(" %s, period %d") % (a.name, a.period))
169 log.msg(_("Loaded."))
170 return loaded_agents
171
172
173
174
176 """Planifie les mesures périodiques d'un agent."""
177 assert self.agents.has_key(agent_name)
178 if self.agents[agent_name].period > 0:
179 timer = internet.TimerService(self.agents[agent_name].period,
180 self.wakeup_for_measure, agent_name)
181 timer.setName(agent_name)
182 timer.setServiceParent(service.IServiceCollection(self))
183
184
190
191
193 """Planifie tous les agents chargés.
194 Démarre le cycle de mesures périodiques de chaque agent
195 chargé. La première mesure est prise immédiatement.
196 """
197 for agent_name in self.agents.keys():
198
199
200 for action_dir in (os.path.join(self.config['action_dir'],'eole'), self.config['action_dir']):
201 f_actions = os.path.join(action_dir, "%s.actions" % agent_name)
202 if os.path.isfile(f_actions):
203 actions = {}
204 execfile(f_actions, globals(), actions)
205 for item in actions.keys():
206 if item.startswith('action_'):
207 setattr(self.agents[agent_name], item, actions[item])
208
209 self.schedule(agent_name)
210
211
213 assert self.agents.has_key(agent_name)
214 return self.getServiceNamed(agent_name)
215
216
217
218
220 ensure_dirs(self.config['uucp_dir'])
221 self.update_static_data()
222
223 try:
224 reload(conf_zeph)
225
226
227
228
229 if not os.path.isfile('/etc/init.d/zephir'):
230 for st_dir in os.listdir(self.config['state_dir']):
231 if st_dir != str(conf_zeph.id_serveur):
232 shutil.rmtree(os.path.join(self.config['state_dir'],st_dir))
233
234 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
235 except:
236 period = 0
237
238 if period < 30:
239 period = self.config['upload_period']
240 log.msg(_('Using default period : %s seconds') % period)
241
242
243 delay = random.randrange(30,period)
244 reactor.callLater(delay,self.wakeup_for_upload)
245
247 original = os.path.join(self.config['config_dir'], 'site.cfg')
248 if os.path.isfile(original):
249 destination = cfg.client_data_dir(self.config, self.config['host_ref'])
250 ensure_dirs(destination)
251 need_copy = False
252 try:
253 org_mtime = os.path.getmtime(original)
254 dest_mtime = os.path.getmtime(os.path.join(destination, 'site.cfg'))
255 except OSError:
256 need_copy = True
257 if need_copy or (org_mtime > dest_mtime):
258 shutil.copy(original, destination)
259
261
262 try:
263 reload(conf_zeph)
264 period = convert(zephir_proxy.serveurs.get_timeout(conf_zeph.id_serveur)[1])
265 except:
266 period = 0
267
268 if period < 30:
269 period = self.config['upload_period']
270 log.msg(_('Using default period : %s seconds') % period)
271
272
273 if recall:
274 reactor.callLater(period,self.wakeup_for_upload)
275
276
277 for agent in self.agents.values():
278 agent.archive()
279
280 self.update_static_data()
281
282 try:
283 assert conf_zeph.id_serveur != 0
284 client_dir = os.path.join(self.config['tmp_data_dir'],str(conf_zeph.id_serveur))
285 except:
286 client_dir = os.path.join(self.config['tmp_data_dir'],self.config['host_ref'])
287 try:
288
289 if os.path.isdir(client_dir):
290 shutil.rmtree(client_dir)
291 os.makedirs(client_dir)
292 except:
293 pass
294 args = ['-Rf',os.path.abspath(os.path.join(cfg.client_data_dir(self.config, self.config['host_ref']),'site.cfg'))]
295 ignore_file = os.path.abspath(os.path.join(self.config['state_dir'],'ignore_list'))
296 if os.path.exists(ignore_file):
297 args.append(ignore_file)
298
299
300 for agent_name in self.agents.keys():
301 args.append(os.path.abspath(cfg.agent_data_dir(self.config, self.config['host_ref'],agent_name)))
302 args.append(os.path.abspath(client_dir))
303 res = utils.getProcessOutput('/bin/cp', args = args)
304 res.addCallbacks(self._make_archive,
305 lambda x: log.msg(_("/!\ copy failed (%s)\n"
306 "data: %s")
307 % (x, self.config['state_dir'])))
308
310
311 rep_src = "/usr/share/eole/creole"
312 rep_conf = "/etc/eole"
313 data = []
314 for src, dst, pattern in md5files[cfg.distrib_version]:
315 if src == 'variables.eol':
316
317 orig_eol = os.path.join(rep_conf, 'config.eol')
318 if os.path.isfile(orig_eol):
319 var_eol = os.path.join(rep_src, 'variables.eol')
320
321 conf = ConfigParser()
322 conf.read(orig_eol)
323 var_names = conf.sections()
324 var_names.sort()
325 f_var = file(var_eol, 'w')
326 for var_name in var_names:
327 if var_name != 'mode_zephir':
328 f_var.write("%s:%s\n" % (var_name, ''.join(eval(conf.get(var_name, 'val')))))
329 f_var.close()
330 if os.path.isdir(os.path.join(rep_src,src)):
331 fics = os.listdir(os.path.join(rep_src,src))
332 fics = [(os.path.join(src,fic),os.path.join(dst,fic)) for fic in fics]
333 else:
334 fics = [(src,dst)]
335 for fic, fic_dst in fics:
336 if os.path.isfile(os.path.join(rep_src,fic)):
337 if (pattern is None) or fic.endswith(pattern):
338 md5res = md5file(os.path.join(rep_src,fic))
339 data.append("%s %s\n" % (md5res, fic_dst))
340 try:
341 assert conf_zeph.id_serveur != 0
342 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % str(conf_zeph.id_serveur)), "w")
343 except:
344 outf = file(os.path.join(self.config['tmp_data_dir'],"config%s.md5" % self.config['host_ref']), "w")
345 outf.writelines(data)
346 outf.close()
347
349 """génère une liste des paquets installés
350 """
351 try:
352 assert conf_zeph.id_serveur != 0
353 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % str(conf_zeph.id_serveur)))
354 except:
355 cmd_pkg = ("/usr/bin/dpkg-query -W >" + os.path.join(self.config['tmp_data_dir'],"packages%s.list" % self.config['host_ref']))
356 os.system(cmd_pkg)
357
359 self._check_md5()
360 self._get_packages()
361
362 try:
363 assert conf_zeph.id_serveur != 0
364 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % str(conf_zeph.id_serveur))
365 except:
366 tarball = os.path.join(self.config['uucp_dir'],'site%s.tar' % self.config['host_ref'])
367 tar_cwd = os.path.dirname(os.path.abspath(self.config['tmp_data_dir']))
368 tar_dir = os.path.basename(os.path.abspath(self.config['tmp_data_dir']))
369 res = utils.getProcessOutput('/bin/tar',
370 args = ('czf', tarball,
371 '--exclude', 'private',
372 '-C', tar_cwd,
373 tar_dir))
374 res.addCallbacks(self._try_chown,
375 lambda x: log.msg(_("/!\ archiving failed (%s)\n"
376 "data: %s\narchive: %s")
377 % (x, self.config['state_dir'], tarball)),
378 callbackArgs = [tarball])
379
381 try:
382 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
383 uid = os.getuid()
384 os.chown(tarball, uucp_uid, uucp_gid)
385 except OSError, e:
386 log.msg("/!\ chown error, check authorizations (%s)" % e)
387
388
389 try:
390 uucp_uid, uucp_gid = pwd.getpwnam('uucp')[2:4]
391 os.chown('/usr/share/zephir/deffered_logs', uucp_uid, uucp_gid)
392 except:
393 log.msg("/!\ chown error on deffered_logs")
394 os.system('/usr/share/zephir/scripts/zephir_client call &> /dev/null')
395
396
397
398
400 """@return: Liste des agents chargés"""
401 return self.agents.keys()
402 xmlrpc_list_agents.signature = [['array']]
403
405 """@return: Liste des agents chargés et structure d'affichage"""
406 try:
407 menu = {}
408 for name, agent in self.agents.items():
409 if agent.section != None:
410 if not menu.has_key(agent.section):
411 menu[agent.section] = []
412 menu[agent.section].append((name, agent.description))
413 return menu
414 except Exception, e:
415 log.msg(e)
416 xmlrpc_agents_menu.signature = [['struct']]
417
419 """
420 @return: Les statuts des agents listés dans un dictionnaire
421 C{{nom:status}}. Le status est lui-même un dictionnaire avec
422 pour clés C{'level'} et C{'message'}. Seuls les noms d'agents
423 effectivement chargés apparaîtront parmi les clés du
424 dictionnaire.
425 """
426 result = {}
427 if len(agent_name_list) == 0:
428 agent_name_list = self.agents.keys()
429 for agent_name in agent_name_list:
430 if self.agents.has_key(agent_name):
431 result[agent_name] = self.agents[agent_name].check_status().to_dict()
432 return result
433 xmlrpc_status_for_agents.signature = [['string', 'struct']]
434
442
446
447
449 """Serves the web interface for current agent data"""
450
451 - def __init__(self, config, parent, root_resource,
452 live_agents=None,
453 show_clients_page=True):
454 """config should be complete"""
455 service.MultiService.__init__(self)
456 self.config = config
457 self.show_clients_page = show_clients_page
458 self.manager = ClientManager(self.config, live_agents)
459
460 self.setServiceParent(service.IServiceCollection(parent))
461
462 rsrc = ZephirServerResource(self.config, self.manager)
463 root_resource.putChild('agents', rsrc)
464 default_page = './agents/'
465 if not self.show_clients_page:
466 default_page += self.config['host_ref'] + '/'
467 root_resource.putChild('', util.Redirect(default_page))
468
469
470
471
472
473
474
475