Skip to content

Commit a9fa9d6

Browse files
Add roller shade and hubmini matter support (#315)
* Add roller shade and hubmini matter support * add unit test * chore(pre-commit.ci): auto fixes * add unit tests for passive data, stop and set position case * chore(pre-commit.ci): auto fixes --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent cf04817 commit a9fa9d6

10 files changed

+578
-1
lines changed

switchbot/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .devices.lock import SwitchbotLock
3030
from .devices.plug import SwitchbotPlugMini
3131
from .devices.relay_switch import SwitchbotRelaySwitch
32+
from .devices.roller_shade import SwitchbotRollerShade
3233
from .discovery import GetSwitchbotDevices
3334
from .models import SwitchBotAdvertisement
3435

@@ -58,6 +59,7 @@
5859
"SwitchbotPlugMini",
5960
"SwitchbotPlugMini",
6061
"SwitchbotRelaySwitch",
62+
"SwitchbotRollerShade",
6163
"SwitchbotSupportedType",
6264
"SwitchbotSupportedType",
6365
"close_stale_connections",

switchbot/adv_parser.py

+14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .adv_parsers.contact import process_wocontact
1818
from .adv_parsers.curtain import process_wocurtain
1919
from .adv_parsers.hub2 import process_wohub2
20+
from .adv_parsers.hubmini_matter import process_hubmini_matter
2021
from .adv_parsers.humidifier import process_evaporative_humidifier, process_wohumidifier
2122
from .adv_parsers.keypad import process_wokeypad
2223
from .adv_parsers.leak import process_leak
@@ -30,6 +31,7 @@
3031
process_worelay_switch_1pm,
3132
)
3233
from .adv_parsers.remote import process_woremote
34+
from .adv_parsers.roller_shade import process_worollershade
3335
from .const import SwitchbotModel
3436
from .models import SwitchBotAdvertisement
3537

@@ -216,6 +218,18 @@ class SwitchbotSupportedType(TypedDict):
216218
"func": process_woremote,
217219
"manufacturer_id": 89,
218220
},
221+
",": {
222+
"modelName": SwitchbotModel.ROLLER_SHADE,
223+
"modelFriendlyName": "Roller Shade",
224+
"func": process_worollershade,
225+
"manufacturer_id": 2409,
226+
},
227+
"%": {
228+
"modelName": SwitchbotModel.HUBMINI_MATTER,
229+
"modelFriendlyName": "HubMini Matter",
230+
"func": process_hubmini_matter,
231+
"manufacturer_id": 2409,
232+
},
219233
}
220234

221235
_SWITCHBOT_MODEL_TO_CHAR = {
+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
"""Hubmini matter parser."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any
6+
7+
8+
def process_hubmini_matter(
9+
data: bytes | None, mfr_data: bytes | None
10+
) -> dict[str, Any]:
11+
"""Process Hubmini matter sensor manufacturer data."""
12+
temp_data = None
13+
14+
if mfr_data:
15+
temp_data = mfr_data[13:16]
16+
17+
if not temp_data:
18+
return {}
19+
20+
_temp_sign = 1 if temp_data[1] & 0b10000000 else -1
21+
_temp_c = _temp_sign * (
22+
(temp_data[1] & 0b01111111) + ((temp_data[0] & 0b00001111) / 10)
23+
)
24+
_temp_f = (_temp_c * 9 / 5) + 32
25+
_temp_f = (_temp_f * 10) / 10
26+
humidity = temp_data[2] & 0b01111111
27+
28+
if _temp_c == 0 and humidity == 0:
29+
return {}
30+
31+
paraser_data = {
32+
"temp": {"c": _temp_c, "f": _temp_f},
33+
"temperature": _temp_c,
34+
"fahrenheit": bool(temp_data[2] & 0b10000000),
35+
"humidity": humidity,
36+
}
37+
return paraser_data

switchbot/adv_parsers/roller_shade.py

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""Library to handle connection with Switchbot."""
2+
3+
from __future__ import annotations
4+
5+
6+
def process_worollershade(
7+
data: bytes | None, mfr_data: bytes | None, reverse: bool = True
8+
) -> dict[str, bool | int]:
9+
"""Process woRollerShade services data."""
10+
if mfr_data is None:
11+
return {}
12+
13+
device_data = mfr_data[6:]
14+
15+
_position = max(min(device_data[2] & 0b01111111, 100), 0)
16+
_calibrated = bool(device_data[2] & 0b10000000)
17+
_in_motion = bool(device_data[1] & 0b00000110)
18+
_light_level = (device_data[3] >> 4) & 0b00001111
19+
_device_chain = device_data[3] & 0b00001111
20+
21+
return {
22+
"calibration": _calibrated,
23+
"battery": data[2] & 0b01111111 if data else None,
24+
"inMotion": _in_motion,
25+
"position": (100 - _position) if reverse else _position,
26+
"lightLevel": _light_level,
27+
"deviceChain": _device_chain,
28+
"sequence_number": device_data[0],
29+
}

switchbot/const/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,5 @@ class SwitchbotModel(StrEnum):
6363
RELAY_SWITCH_1 = "Relay Switch 1"
6464
REMOTE = "WoRemote"
6565
EVAPORATIVE_HUMIDIFIER = "Evaporative Humidifier"
66+
ROLLER_SHADE = "Roller Shade"
67+
HUBMINI_MATTER = "HubMini Matter"

switchbot/devices/base_cover.py

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
# Cover keys
1212
COVER_COMMAND = "4501"
13+
ROLLERSHADE_COMMAND = "4701"
14+
CONTROL_SOURCE = "00"
1315

1416
# For second element of open and close arrs we should add two bytes i.e. ff00
1517
# First byte [ff] stands for speed (00 or ff - normal, 01 - slow) *

switchbot/devices/roller_shade.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
"""Library to handle connection with Switchbot."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from typing import Any
7+
8+
from ..models import SwitchBotAdvertisement
9+
from .base_cover import CONTROL_SOURCE, ROLLERSHADE_COMMAND, SwitchbotBaseCover
10+
from .device import REQ_HEADER, SwitchbotSequenceDevice, update_after_operation
11+
12+
_LOGGER = logging.getLogger(__name__)
13+
14+
15+
OPEN_KEYS = [
16+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0100",
17+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}0000",
18+
]
19+
CLOSE_KEYS = [
20+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}0164",
21+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}0064",
22+
]
23+
POSITION_KEYS = [
24+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}01{CONTROL_SOURCE}01",
25+
f"{REQ_HEADER}{ROLLERSHADE_COMMAND}05{CONTROL_SOURCE}",
26+
] # +actual_position
27+
STOP_KEYS = [f"{REQ_HEADER}{ROLLERSHADE_COMMAND}00{CONTROL_SOURCE}01"]
28+
29+
30+
class SwitchbotRollerShade(SwitchbotBaseCover, SwitchbotSequenceDevice):
31+
"""Representation of a Switchbot Roller Shade."""
32+
33+
def __init__(self, *args: Any, **kwargs: Any) -> None:
34+
"""Switchbot roller shade constructor."""
35+
# The position of the roller shade is saved returned with 0 = open and 100 = closed.
36+
# the definition of position is the same as in Home Assistant.
37+
38+
self._reverse: bool = kwargs.pop("reverse_mode", True)
39+
super().__init__(self._reverse, *args, **kwargs)
40+
41+
def _set_parsed_data(
42+
self, advertisement: SwitchBotAdvertisement, data: dict[str, Any]
43+
) -> None:
44+
"""Set data."""
45+
in_motion = data["inMotion"]
46+
previous_position = self._get_adv_value("position")
47+
new_position = data["position"]
48+
self._update_motion_direction(in_motion, previous_position, new_position)
49+
super()._set_parsed_data(advertisement, data)
50+
51+
@update_after_operation
52+
async def open(self, mode: int = 0) -> bool:
53+
"""Send open command. 0 - performance mode, 1 - unfelt mode."""
54+
self._is_opening = True
55+
self._is_closing = False
56+
return await self._send_multiple_commands(OPEN_KEYS)
57+
58+
@update_after_operation
59+
async def close(self, speed: int = 0) -> bool:
60+
"""Send close command. 0 - performance mode, 1 - unfelt mode."""
61+
self._is_closing = True
62+
self._is_opening = False
63+
return await self._send_multiple_commands(CLOSE_KEYS)
64+
65+
@update_after_operation
66+
async def stop(self) -> bool:
67+
"""Send stop command to device."""
68+
self._is_opening = self._is_closing = False
69+
return await self._send_multiple_commands(STOP_KEYS)
70+
71+
@update_after_operation
72+
async def set_position(self, position: int, mode: int = 0) -> bool:
73+
"""Send position command (0-100) to device. 0 - performance mode, 1 - unfelt mode."""
74+
position = (100 - position) if self._reverse else position
75+
self._update_motion_direction(True, self._get_adv_value("position"), position)
76+
return await self._send_multiple_commands(
77+
[
78+
f"{POSITION_KEYS[0]}{position:02X}",
79+
f"{POSITION_KEYS[1]}{mode:02X}{position:02X}",
80+
]
81+
)
82+
83+
def get_position(self) -> Any:
84+
"""Return cached position (0-100) of Curtain."""
85+
# To get actual position call update() first.
86+
return self._get_adv_value("position")
87+
88+
async def get_basic_info(self) -> dict[str, Any] | None:
89+
"""Get device basic settings."""
90+
if not (_data := await self._get_basic_info()):
91+
return None
92+
93+
_position = max(min(_data[5], 100), 0)
94+
_direction_adjusted_position = (100 - _position) if self._reverse else _position
95+
_previous_position = self._get_adv_value("position")
96+
_in_motion = bool(_data[4] & 0b00000011)
97+
self._update_motion_direction(
98+
_in_motion, _previous_position, _direction_adjusted_position
99+
)
100+
101+
return {
102+
"battery": _data[1],
103+
"firmware": _data[2] / 10.0,
104+
"chainLength": _data[3],
105+
"openDirection": (
106+
"clockwise" if _data[4] & 0b10000000 == 128 else "anticlockwise"
107+
),
108+
"fault": bool(_data[4] & 0b00010000),
109+
"solarPanel": bool(_data[4] & 0b00001000),
110+
"calibration": bool(_data[4] & 0b00000100),
111+
"calibrated": bool(_data[4] & 0b00000100),
112+
"inMotion": _in_motion,
113+
"position": _direction_adjusted_position,
114+
"timers": _data[6],
115+
}
116+
117+
def _update_motion_direction(
118+
self, in_motion: bool, previous_position: int | None, new_position: int
119+
) -> None:
120+
"""Update opening/closing status based on movement."""
121+
if previous_position is None:
122+
return
123+
if in_motion is False:
124+
self._is_closing = self._is_opening = False
125+
return
126+
127+
if new_position != previous_position:
128+
self._is_opening = new_position > previous_position
129+
self._is_closing = new_position < previous_position

switchbot/discovery.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,14 @@ async def get_tempsensors(self) -> dict[str, SwitchBotAdvertisement]:
111111
plus_meters = await self._get_devices_by_model("i")
112112
io_meters = await self._get_devices_by_model("w")
113113
hub2_meters = await self._get_devices_by_model("v")
114-
return {**base_meters, **plus_meters, **io_meters, **hub2_meters}
114+
hubmini_matter_meters = await self._get_devices_by_model("%")
115+
return {
116+
**base_meters,
117+
**plus_meters,
118+
**io_meters,
119+
**hub2_meters,
120+
**hubmini_matter_meters,
121+
}
115122

116123
async def get_contactsensors(self) -> dict[str, SwitchBotAdvertisement]:
117124
"""Return all WoContact/Contact sensor devices with services data."""

0 commit comments

Comments
 (0)