expected_stale_test.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 # Software License Agreement (BSD License)
00003 #
00004 # Copyright (c) 2009, Willow Garage, Inc.
00005 # All rights reserved.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions
00009 # are met:
00010 #
00011 #  * Redistributions of source code must retain the above copyright
00012 #    notice, this list of conditions and the following disclaimer.
00013 #  * Redistributions in binary form must reproduce the above
00014 #    copyright notice, this list of conditions and the following
00015 #    disclaimer in the documentation and/or other materials provided
00016 #    with the distribution.
00017 #  * Neither the name of the Willow Garage nor the names of its
00018 #    contributors may be used to endorse or promote products derived
00019 #    from this software without specific prior written permission.
00020 #
00021 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00022 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00023 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
00024 # FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
00025 # COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
00026 # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
00027 # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
00028 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00029 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
00030 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
00031 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
00032 # POSSIBILITY OF SUCH DAMAGE.
00033 #
00034 
00035 ##\author Kevin Watts
00036 
00037 ##\brief Tests that expected items from GenericAnalyzer will appear stale
00038 
00039 from __future__ import with_statement
00040 DURATION = 15
00041 PKG = 'diagnostic_aggregator'
00042 import roslib; roslib.load_manifest(PKG)
00043 import rospy, rostest, unittest
00044 from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
00045 from time import sleep
00046 import sys
00047 import threading
00048 
00049 def get_raw_name(agg_name):
00050     return agg_name.split('/')[-1]
00051 
00052 class DiagnosticItem:
00053     def __init__(self, msg):
00054         self.name = get_raw_name(msg.name)
00055         self.level = msg.level
00056         self.message = msg.message
00057 
00058         self.update_time = rospy.get_time()
00059 
00060     def is_stale(self):
00061         return rospy.get_time() - self.update_time > 5
00062 
00063     def update(self, msg):
00064         self.level = msg.level
00065         self.message = msg.message
00066 
00067         self.update_time = rospy.get_time()
00068 
00069 class TestExpectedItemsStale(unittest.TestCase):
00070     def __init__(self, *args):
00071         super(TestExpectedItemsStale, self).__init__(*args)
00072 
00073         self._mutex = threading.Lock()
00074 
00075         self._expecteds = {}
00076         self._agg_expecteds = {}
00077 
00078         rospy.init_node('test_expected_stale')        
00079         self._starttime = rospy.get_time()
00080 
00081         sub = rospy.Subscriber("/diagnostics", DiagnosticArray, self.diag_cb)
00082         sub_agg = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.diag_agg_cb)
00083 
00084     def diag_cb(self, msg):
00085         with self._mutex:
00086             for stat in msg.status:
00087                 if stat.name.find('expected') == 0:
00088                     self._expecteds[stat.name] = DiagnosticItem(stat)
00089 
00090     def diag_agg_cb(self, msg):
00091         with self._mutex:
00092             for stat in msg.status:
00093                 if stat.name.find('expected') > 0:
00094                     self._agg_expecteds[get_raw_name(stat.name)] = DiagnosticItem(stat)
00095 
00096     def test_expecteds_arent_stale(self):
00097         while not rospy.is_shutdown():
00098             sleep(1.0)
00099             if rospy.get_time() - self._starttime > DURATION:
00100                 break
00101 
00102         with self._mutex:
00103             self.assert_(len(self._expecteds) > 0, "No expected items found in raw data!")
00104 
00105             for name, item in self._expecteds.iteritems():
00106                 self.assert_(self._agg_expecteds.has_key(name), "Item %s not found in aggregated diagnostics output" % name)
00107                 if item.is_stale():
00108                     self.assert_(self._agg_expecteds[name].level == 3, "Stale item in diagnostics, but aggregated didn't report as stale. Item: %s" % name)
00109                 else:
00110                     self.assert_(self._agg_expecteds[name].level == item.level, "Diagnostic level of aggregated, raw item don't match for %s" % name)
00111 
00112 if __name__ == '__main__':
00113     rostest.run(PKG, sys.argv[0], TestExpectedItemsStale, sys.argv)


diagnostic_aggregator
Author(s): Kevin Watts
autogenerated on Fri Jan 3 2014 11:19:06