1
0
Fork 0
cherry-circadian/cherry/circadian/__init__.py

347 lines
11 KiB
Python

# cherry-circadian - adapts lighting based on the sun movement
# Copyright (C) 2021 Bob Carroll <bob.carroll@alum.rit.edu>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import sys
import asyncio
from contextlib import AsyncExitStack
from collections import namedtuple
import logging
import yaml
from dns.asyncresolver import resolve
from asyncio_mqtt import Client
import umsgpack
Rule = namedtuple('Rule', 'bri ct')
class SceneManager(object):
"""
Manages light state changes.
:param client: mqtt client
:param rule: new light state values
:param state: shared state dictionary
"""
def __init__(self, client, rule, state):
self.client = client
self.rule = rule
self.elevation = state['solar']['elevation']
self.direction = state['solar']['direction']
self.auto_on = state['options'].get('auto_on', False)
def get_auto_on_config(self, options):
"""
Gets the solar elevation and direction settings for a light.
:param options: light configuration
:returns: a tuple of elevation and direction values
"""
return options.get('auto_on_elevation'), options.get('auto_on_direction', 1)
def check_auto_on(self, options):
"""
Determiens whether a light should automatically turn on based on configured and current
solar elevation and direction.
:param options: light configuration
:returns: True if the light should turn on, False otherwise
"""
elevation, direction = self.get_auto_on_config(options)
if self.auto_on and elevation is not None and self.direction == direction:
tod = 'morning' if direction else 'evening'
logging.debug(f'Auto-on will trigger in the {tod} at {elevation} degrees')
if direction and self.elevation >= elevation:
return True
elif not direction and self.elevation <= elevation:
return True
return False
def adjust_brightness(self, state):
"""
Adjusts the rule brightness value given an offset.
:param state: light state dictionary
:returns: new brightness value
"""
offset = state['options'].get('brightness_offset', 0)
return max(min(self.rule.bri + offset, 100), 0)
def adjust_colour_temp(self, state):
"""
Adjusts the rule colour temperature value given an offset.
:param state: light state dictionary
:returns: new colour temperature value
"""
offset = state['options'].get('colour_temp_offset', 0)
return max(min(self.rule.ct + offset, 10000), 0)
async def turn_on(self, name, state):
"""
Turns on a light based on current state and configuration.
:param name: light name
:param state: light state dictionary
"""
bri = self.adjust_brightness(state)
ct = self.adjust_colour_temp(state)
payload = {'mode': 'dim',
'brightness': bri,
'effect': {'type': 'temperature',
'value': f'{ct}K'}}
if state.get('on', False) or self.check_auto_on(state['options']):
logging.debug(f'Setting state for light {name}: {payload}')
await self.client.publish(f'light/{name}/state/set', umsgpack.packb(payload))
else:
logging.debug(f'Skipping light {name} because it is off')
def match_step_rule(rules, current):
"""
Gets the closest rule for the given index.
:param rules: dictionary of step rules
:param current: the current step index
:returns: the matched rule
"""
return rules[min(rules.keys(), key=lambda k: abs(k - current))]
async def update_light_states(client, state):
"""
Updates the state of all lights based on the solar state.
:param client mqtt client
:param state: shared state dictionary
"""
if state['paused']:
logging.debug('Circadian automation is paused, skipping')
return
elif state['solar']['elevation'] < 0:
logging.debug('Sun is below the horizon, skipping')
return
index = (state['solar']['elevation'] / state['solar']['noon']) * 100
logging.debug(f'Current step index: {index}')
rule = match_step_rule(state['rules'], index)
logging.debug(f'Matched step rule: {rule}')
scene = SceneManager(client, rule, state)
for light_name, light_state in state['lights'].items():
await scene.turn_on(light_name, light_state)
await client.publish('scene/activate', state['scene_name'])
async def on_solar_state_change(client, state, mutex, messages):
"""
Event handler for receiving solar state changes.
:param client: mqtt client
:param state: shared state dictionary
:param mutex: lock for making changes to shared state
:param messages: mqtt message generator
"""
async for m in messages:
try:
async with mutex:
state['solar'] = umsgpack.unpackb(m.payload)
logging.debug(f'Received solar state: {state["solar"]}')
await update_light_states(client, state)
except Exception as ex:
logging.error(str(ex))
async def on_light_event(client, state, mutex, messages):
"""
Event handler for receiving light 'on' changes.
:param client: mqtt client
:param state: shared state dictionary
:param mutex: lock for making changes to shared state
:param messages: mqtt message generator
"""
async for m in messages:
try:
async with mutex:
_, name, _ = m.topic.split('/', 3)
logging.debug(f'Received light {name} state: {m.payload}')
if name in state['lights']:
current = state['lights'][name]['on']
state['lights'][name]['on'] = bool(int(m.payload))
if not current and 'solar' in state:
await update_light_states(client, state)
except Exception as ex:
logging.error(str(ex))
async def on_pause(client, state, mutex, messages):
"""
Handler for pausing and unpausing the automation.
:param client: mqtt client
:param state: shared state dictionary
:param mutex: lock for making changes to shared state
:param messages: mqtt message generator
"""
async for m in messages:
try:
async with mutex:
logging.debug(f'Setting paused state: {m.payload}')
state['paused'] = bool(int(m.payload))
await client.publish('circadian/paused', m.payload, retain=True)
except Exception as ex:
logging.error(str(ex))
def compute_steps(range_min, range_max, count):
"""
Computes the intervals given a range and step.
:param range_min:
:param range_max:
:param count: the number of steps to iterate over
:returns: a list of intervals
"""
step = (range_max - range_min) / count
result = []
i = 0
while range_min + i * step <= range_max:
result.append(round(range_min + i * step))
i += 1
return result
def make_rule_map(config):
"""
Builds a mapping of step rules to light state rules.
:param config: configuration dictionary
:returns: a dictionary of rules
"""
steps = min(max(config.get('change_steps', 1), 1), 100)
elev = compute_steps(0, 100, steps)
bri = compute_steps(max(config.get('min_brightness', 0), 0),
min(config.get('max_brightness', 100), 100),
steps)
ct = compute_steps(max(config.get('min_colour_temp', 0), 0),
min(config.get('max_colour_temp', 10000), 10000),
steps)
return {e: Rule(b, t) for e, b, t in zip(elev, bri, ct)}
def make_light_map(config):
"""
Builds a mapping of light options and state.
:param config: configuration dictionary
:returns: a dictionary of light states
"""
def transform(in_):
options = {k: v for k, v in in_.items() if k != 'name'}
return {'options': options, 'on': False}
return {x['name']: transform(x) for x in config}
async def get_broker(config):
"""
Gets the mqtt broker address from an SRV record.
:param config: configuration dictionary
:returns: the broker address
"""
broker = config.get('mqtt', {}).get('broker')
if broker is not None:
logging.debug(f'Using pre-defined broker: {broker}')
return broker
answer = await resolve('_mqtt._tcp', 'SRV', search=True)
broker = next((x.target.to_text() for x in answer))
logging.debug(f'Found SRV record: {broker}')
return broker
async def init(config):
"""
Initializes the circadian lighting agent.
:param config: configuration dictionary
"""
state = {'lights': make_light_map(config.get('lights', [])),
'rules': make_rule_map(config.get('options', {})),
'scene_name': config.get('scene_name', 'Circadian'),
'options': config.get('options', {}),
'paused': False}
mutex = asyncio.Lock()
tasks = set()
async with AsyncExitStack() as stack:
client = Client(await get_broker(config), client_id='cherry-circadian')
await stack.enter_async_context(client)
logging.info('Connected to mqtt broker')
topics = {
'circadian/paused/set': on_pause,
'light/+/on': on_light_event,
'sun/state': on_solar_state_change}
for t, cb in topics.items():
manager = client.filtered_messages(t)
messages = await stack.enter_async_context(manager)
task = asyncio.create_task(cb(client, state, mutex, messages))
tasks.add(task)
await client.subscribe('circadian/#')
await client.publish('circadian/paused', 0, retain=True)
await client.subscribe('light/#')
await client.subscribe('sun/#')
await asyncio.gather(*tasks)
def main():
"""
CLI entry point.
"""
if len(sys.argv) != 2:
print('USAGE: cherry-circadian <config file>')
sys.exit(1)
with open(sys.argv[1], 'r') as f:
config = yaml.safe_load(f)
log = config.get('log', {})
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s',
level=log.get('level', logging.ERROR))
try:
asyncio.run(init(config))
except KeyboardInterrupt:
pass