Skip to content

Commit a011382

Browse files
committed
driver: add LAA drivers
Add drivers for all LAA resource types. This includes serial console, power control, USB gadget mass storage, USB port control, button control, LED control, temperature reading, watt measurement and file upload with URL download support. Signed-off-by: Anders Roxell <anders.roxell@linaro.org>
1 parent c0e26bb commit a011382

2 files changed

Lines changed: 379 additions & 0 deletions

File tree

labgrid/driver/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,7 @@
5050
from .dediprogflashdriver import DediprogFlashDriver
5151
from .httpdigitaloutput import HttpDigitalOutputDriver
5252
from .eth008digitaloutput import Eth008DigitalOutputDriver
53+
from .laadriver import LAASerialDriver, LAAPowerDriver, \
54+
LAAUSBGadgetMassStorageDriver, LAAUSBDriver, \
55+
LAAButtonDriver, LAALedDriver, LAATempDriver, LAAWattDriver, \
56+
LAAProviderDriver

labgrid/driver/laadriver.py

Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
import os
2+
import shutil
3+
import tempfile
4+
import time
5+
from importlib import import_module
6+
from urllib.parse import urlparse
7+
from urllib.request import urlopen
8+
9+
import attr
10+
11+
from ..factory import target_factory
12+
from ..protocol import ConsoleProtocol, PowerProtocol
13+
from ..step import step
14+
from .common import Driver
15+
from .consoleexpectmixin import ConsoleExpectMixin
16+
from .exception import ExecutionError
17+
from .powerdriver import PowerResetMixin
18+
19+
20+
def _get_laam():
21+
try:
22+
return import_module('laam')
23+
except ModuleNotFoundError:
24+
raise ModuleNotFoundError(
25+
"laam package not found, install labgrid[laa]"
26+
)
27+
28+
29+
@target_factory.reg_driver
30+
@attr.s(eq=False)
31+
class LAASerialDriver(ConsoleExpectMixin, Driver, ConsoleProtocol):
32+
"""Driver for serial console via LAA WebSocket bridge"""
33+
bindings = {"port": "LAASerialPort", }
34+
35+
txdelay = attr.ib(default=0.0, validator=attr.validators.instance_of(float))
36+
timeout = attr.ib(default=3.0, validator=attr.validators.instance_of(float))
37+
38+
def __attrs_post_init__(self):
39+
super().__attrs_post_init__()
40+
self._laam = _get_laam()
41+
self._laa = None
42+
self._conn = None
43+
44+
def on_activate(self):
45+
self._laa = self._laam.LAA(self.port.laa_identity)
46+
self._conn = self._laa.serials.connect_pexpect(self.port.serial_name)
47+
48+
def on_deactivate(self):
49+
if self._conn is not None:
50+
self._conn.close()
51+
self._conn = None
52+
self._laa = None
53+
54+
def _read(self, size: int = 1, timeout: float = 0.0, max_size: int = None):
55+
# ConnectPexpect.read_nonblocking() returns str, labgrid expects bytes
56+
data = self._conn.read_nonblocking(size, timeout)
57+
return data.encode("utf-8", errors="replace")
58+
59+
def _write(self, data: bytes):
60+
return self._conn.send(data)
61+
62+
63+
@target_factory.reg_driver
64+
@attr.s(eq=False)
65+
class LAAPowerDriver(Driver, PowerResetMixin, PowerProtocol):
66+
"""Driver for DUT power control via LAA"""
67+
bindings = {"port": "LAAPowerPort", }
68+
69+
delay = attr.ib(default=2.0, validator=attr.validators.instance_of(float))
70+
71+
def __attrs_post_init__(self):
72+
super().__attrs_post_init__()
73+
self._laam = _get_laam()
74+
self._laa = None
75+
76+
def on_activate(self):
77+
self._laa = self._laam.LAA(self.port.laa_identity)
78+
79+
def on_deactivate(self):
80+
self._laa = None
81+
82+
def _execute_sequence(self, sequence):
83+
for vbus, state in sequence:
84+
try:
85+
self._laa.laacli.power(vbus, state)
86+
except self._laam.exceptions.LAAError as e:
87+
raise ExecutionError(
88+
f"LAA power command failed ({vbus} {state}): {e}"
89+
) from e
90+
91+
@Driver.check_active
92+
@step()
93+
def on(self):
94+
self._execute_sequence(self.port.power_on)
95+
96+
@Driver.check_active
97+
@step()
98+
def off(self):
99+
self._execute_sequence(self.port.power_off)
100+
101+
@Driver.check_active
102+
@step()
103+
def cycle(self):
104+
if self.port.power_cycle is not None:
105+
self._execute_sequence(self.port.power_cycle)
106+
else:
107+
self.off()
108+
time.sleep(self.delay)
109+
self.on()
110+
111+
112+
@target_factory.reg_driver
113+
@attr.s(eq=False)
114+
class LAAUSBGadgetMassStorageDriver(Driver):
115+
"""Driver for USB gadget mass storage control via LAA"""
116+
bindings = {"port": "LAAUSBGadgetMassStorage", }
117+
118+
def __attrs_post_init__(self):
119+
super().__attrs_post_init__()
120+
self._laam = _get_laam()
121+
self._laa = None
122+
123+
def on_activate(self):
124+
self._laa = self._laam.LAA(self.port.laa_identity)
125+
126+
def on_deactivate(self):
127+
self._laa = None
128+
129+
@Driver.check_active
130+
@step()
131+
def on(self):
132+
try:
133+
self._laa.laacli.usbg_ms("on", self.port.image)
134+
except self._laam.exceptions.LAAError as e:
135+
raise ExecutionError(f"LAA usbg-ms on failed: {e}") from e
136+
137+
@Driver.check_active
138+
@step()
139+
def off(self):
140+
try:
141+
self._laa.laacli.usbg_ms("off", "")
142+
except self._laam.exceptions.LAAError as e:
143+
raise ExecutionError(f"LAA usbg-ms off failed: {e}") from e
144+
145+
146+
@target_factory.reg_driver
147+
@attr.s(eq=False)
148+
class LAAUSBDriver(Driver):
149+
"""Driver for USB port control via LAA"""
150+
bindings = {"port": "LAAUSBPort", }
151+
152+
def __attrs_post_init__(self):
153+
super().__attrs_post_init__()
154+
self._laam = _get_laam()
155+
self._laa = None
156+
157+
def on_activate(self):
158+
self._laa = self._laam.LAA(self.port.laa_identity)
159+
160+
def on_deactivate(self):
161+
self._laa = None
162+
163+
@Driver.check_active
164+
@step()
165+
def on(self):
166+
for port in self.port.usb_ports:
167+
try:
168+
self._laa.laacli.usb(port, "on")
169+
except self._laam.exceptions.LAAError as e:
170+
raise ExecutionError(
171+
f"LAA USB command failed (port {port} on): {e}"
172+
) from e
173+
174+
@Driver.check_active
175+
@step()
176+
def off(self):
177+
for port in self.port.usb_ports:
178+
try:
179+
self._laa.laacli.usb(port, "off")
180+
except self._laam.exceptions.LAAError as e:
181+
raise ExecutionError(
182+
f"LAA USB command failed (port {port} off): {e}"
183+
) from e
184+
185+
186+
@target_factory.reg_driver
187+
@attr.s(eq=False)
188+
class LAAButtonDriver(Driver):
189+
"""Driver for virtual button control via LAA"""
190+
bindings = {"port": "LAAButtonPort", }
191+
192+
def __attrs_post_init__(self):
193+
super().__attrs_post_init__()
194+
self._laam = _get_laam()
195+
self._laa = None
196+
197+
def on_activate(self):
198+
self._laa = self._laam.LAA(self.port.laa_identity)
199+
200+
def on_deactivate(self):
201+
self._laa = None
202+
203+
@Driver.check_active
204+
@step()
205+
def press(self, button):
206+
if button not in self.port.buttons:
207+
raise ExecutionError(
208+
f"Button '{button}' not in resource buttons {self.port.buttons}"
209+
)
210+
try:
211+
self._laa.laacli.button(button, "on")
212+
except self._laam.exceptions.LAAError as e:
213+
raise ExecutionError(f"LAA button command failed ({button} on): {e}") from e
214+
215+
@Driver.check_active
216+
@step()
217+
def release(self, button):
218+
if button not in self.port.buttons:
219+
raise ExecutionError(
220+
f"Button '{button}' not in resource buttons {self.port.buttons}"
221+
)
222+
try:
223+
self._laa.laacli.button(button, "off")
224+
except self._laam.exceptions.LAAError as e:
225+
raise ExecutionError(
226+
f"LAA button command failed ({button} off): {e}"
227+
) from e
228+
229+
230+
@target_factory.reg_driver
231+
@attr.s(eq=False)
232+
class LAALedDriver(Driver):
233+
"""Driver for LED control via LAA"""
234+
bindings = {"port": "LAALed", }
235+
236+
def __attrs_post_init__(self):
237+
super().__attrs_post_init__()
238+
self._laam = _get_laam()
239+
self._laa = None
240+
241+
def on_activate(self):
242+
self._laa = self._laam.LAA(self.port.laa_identity)
243+
244+
def on_deactivate(self):
245+
self._laa = None
246+
247+
@Driver.check_active
248+
@step()
249+
def on(self):
250+
try:
251+
self._laa.laacli.led("on")
252+
except self._laam.exceptions.LAAError as e:
253+
raise ExecutionError(f"LAA LED on failed: {e}") from e
254+
255+
@Driver.check_active
256+
@step()
257+
def off(self):
258+
try:
259+
self._laa.laacli.led("off")
260+
except self._laam.exceptions.LAAError as e:
261+
raise ExecutionError(f"LAA LED off failed: {e}") from e
262+
263+
264+
@target_factory.reg_driver
265+
@attr.s(eq=False)
266+
class LAATempDriver(Driver):
267+
"""Driver for temperature sensor reading via LAA"""
268+
bindings = {"port": "LAATempSensor", }
269+
270+
def __attrs_post_init__(self):
271+
super().__attrs_post_init__()
272+
self._laam = _get_laam()
273+
self._laa = None
274+
275+
def on_activate(self):
276+
self._laa = self._laam.LAA(self.port.laa_identity)
277+
278+
def on_deactivate(self):
279+
self._laa = None
280+
281+
@Driver.check_active
282+
@step()
283+
def get_temp(self, probe):
284+
try:
285+
return self._laa.laacli.temp(probe)
286+
except self._laam.exceptions.LAAError as e:
287+
raise ExecutionError(f"LAA temp command failed ({probe}): {e}") from e
288+
289+
290+
@target_factory.reg_driver
291+
@attr.s(eq=False)
292+
class LAAWattDriver(Driver):
293+
"""Driver for power measurement via LAA"""
294+
bindings = {"port": "LAAWattMeter", }
295+
296+
def __attrs_post_init__(self):
297+
super().__attrs_post_init__()
298+
self._laam = _get_laam()
299+
self._laa = None
300+
301+
def on_activate(self):
302+
self._laa = self._laam.LAA(self.port.laa_identity)
303+
304+
def on_deactivate(self):
305+
self._laa = None
306+
307+
@Driver.check_active
308+
@step()
309+
def get_watts(self, vbus):
310+
try:
311+
return self._laa.laacli.watt(vbus)
312+
except self._laam.exceptions.LAAError as e:
313+
raise ExecutionError(f"LAA watt command failed ({vbus}): {e}") from e
314+
315+
316+
@target_factory.reg_driver
317+
@attr.s(eq=False)
318+
class LAAProviderDriver(Driver):
319+
"""Driver for uploading files to LAA file storage.
320+
321+
Files are served via TFTP to the DUT. Source can be a local path
322+
or a URL."""
323+
bindings = {"provider": "LAAProvider", }
324+
325+
def __attrs_post_init__(self):
326+
super().__attrs_post_init__()
327+
self._laam = _get_laam()
328+
self._laa = None
329+
330+
def on_activate(self):
331+
self._laa = self._laam.LAA(self.provider.laa_identity)
332+
333+
def on_deactivate(self):
334+
self._laa = None
335+
336+
@Driver.check_active
337+
@step(args=['source'], result=True)
338+
def stage(self, source):
339+
"""Upload a file to the LAA. Source can be a local path or URL.
340+
341+
Returns the filename as stored on the LAA."""
342+
is_url = source.startswith("http://") or source.startswith("https://")
343+
344+
if is_url:
345+
name = os.path.basename(urlparse(source).path)
346+
with tempfile.NamedTemporaryFile(delete=False) as tmp:
347+
tmp_path = tmp.name
348+
try:
349+
with urlopen(source) as r, open(tmp_path, "wb") as f: # noqa: S310
350+
shutil.copyfileobj(r, f)
351+
self._laa.files.push(name, tmp_path)
352+
finally:
353+
os.unlink(tmp_path)
354+
else:
355+
name = os.path.basename(source)
356+
if not os.path.exists(source):
357+
raise ExecutionError(f"file not found: {source}")
358+
self._laa.files.push(name, source)
359+
360+
return name
361+
362+
@Driver.check_active
363+
@step(result=True)
364+
def list(self):
365+
"""Return list of files on the LAA."""
366+
return self._laa.files.list()
367+
368+
@Driver.check_active
369+
@step(args=['name'])
370+
def remove(self, name):
371+
"""Remove a file from the LAA."""
372+
try:
373+
self._laa.files.remove(name)
374+
except self._laam.exceptions.LAAError as e:
375+
raise ExecutionError(f"LAA file remove failed ({name}): {e}") from e

0 commit comments

Comments
 (0)