1use std::{fmt::Display, pin::Pin, time::Duration};
4
5use btleplug::{
6 api::{
7 BDAddr, Central as _, Characteristic, Manager as _, Peripheral, ValueNotification,
8 WriteType,
9 },
10 platform::Manager,
11};
12use futures::{stream::StreamExt, Stream};
13use tracing::{debug, error, trace, warn};
14
15use crate::{
16 info::{ble_spec_by_service_uuid, model_by_ble_service_uuid, ConnInfo, LedgerInfo},
17 Error, Exchange, Transport,
18};
19
20pub struct BleTransport {
22 manager: Manager,
23 peripherals: Vec<(LedgerInfo, btleplug::platform::Peripheral)>,
24}
25
26#[derive(Clone, Debug, PartialEq)]
28pub struct BleInfo {
29 name: String,
30 addr: BDAddr,
31}
32
33impl Display for BleInfo {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 write!(f, "{}", self.name)
36 }
37}
38
39pub struct BleDevice {
41 pub info: BleInfo,
42 mtu: u8,
43 p: btleplug::platform::Peripheral,
44 c_write: Characteristic,
45 c_read: Characteristic,
46}
47
48impl BleTransport {
49 pub async fn new() -> Result<Self, Error> {
50 let manager = Manager::new().await?;
52
53 Ok(Self {
54 manager,
55 peripherals: vec![],
56 })
57 }
58
59 async fn scan_internal(
61 &self,
62 duration: Duration,
63 ) -> Result<Vec<(LedgerInfo, btleplug::platform::Peripheral)>, Error> {
64 let mut matched = vec![];
65
66 let adapters = self.manager.adapters().await?;
68
69 for adapter in adapters.iter() {
71 let info = adapter.adapter_info().await?;
72 debug!("Scan with adapter {info}");
73
74 adapter.start_scan(Default::default()).await?;
79
80 tokio::time::sleep(duration).await;
81
82 let mut peripherals = adapter.peripherals().await?;
84 if peripherals.is_empty() {
85 debug!("No peripherals found on adaptor {info}");
86 continue;
87 }
88
89 for p in peripherals.drain(..) {
91 let (properties, _connected) = (p.properties().await?, p.is_connected().await?);
93
94 let properties = match properties {
96 Some(v) => v,
97 None => {
98 debug!("Failed to fetch properties for peripheral: {p:?}");
99 continue;
100 }
101 };
102
103 debug!("Peripheral: {p:?} props: {properties:?}");
104
105 let Some(model) = properties
106 .services
107 .iter()
108 .find_map(model_by_ble_service_uuid)
109 else {
110 continue;
111 };
112
113 matched.push((
115 LedgerInfo {
116 model,
117 conn: BleInfo {
118 name: properties.local_name.unwrap_or(String::new()),
119 addr: properties.address,
120 }
121 .into(),
122 },
123 p,
124 ));
125 }
126 }
127
128 Ok(matched)
129 }
130}
131
132impl Transport for BleTransport {
134 type Filters = ();
135 type Info = BleInfo;
136 type Device = BleDevice;
137
138 async fn list(&mut self, _filters: Self::Filters) -> Result<Vec<LedgerInfo>, Error> {
140 let devices = self.scan_internal(Duration::from_millis(1000)).await?;
142
143 let info: Vec<_> = devices.iter().map(|d| d.0.clone()).collect();
145
146 self.peripherals = devices;
148
149 Ok(info)
150 }
151
152 async fn connect(&mut self, info: Self::Info) -> Result<Self::Device, Error> {
156 let (d, p) = match self
158 .peripherals
159 .iter()
160 .find(|(d, _p)| d.conn == info.clone().into())
161 {
162 Some(v) => v,
163 None => {
164 warn!("No device found matching: {info:?}");
165 return Err(Error::NoDevices);
166 }
167 };
168 let i = match &d.conn {
169 ConnInfo::Ble(i) => i,
170 _ => unreachable!(),
171 };
172
173 let name = &i.name;
174
175 let properties = p
177 .properties()
178 .await?
179 .ok_or(Error::CannotReadBleDeviceProperties)?;
180
181 debug!("peripheral {name}: {p:?} properties: {properties:?}");
182
183 let specs = properties
186 .services
187 .iter()
188 .find_map(ble_spec_by_service_uuid)
189 .ok_or(Error::CannotFindBleDeviceSpecs)?;
190
191 if !p.is_connected().await? {
193 if let Err(e) = p.connect().await {
194 warn!("Failed to connect to {name}: {e:?}");
195 return Err(Error::Ble(e));
196 }
197
198 if !p.is_connected().await? {
199 warn!("Not connected to {name}");
200 return Err(Error::NotConnectedAfterSuccessfulBleConnect);
201 }
202 }
203
204 p.discover_services().await?;
206
207 let characteristics = p.characteristics();
208
209 trace!("Characteristics: {characteristics:?}");
210
211 let c_write = characteristics.iter().find(|c| c.uuid == specs.write_uuid);
212 let c_read = characteristics.iter().find(|c| c.uuid == specs.notify_uuid);
213
214 let (c_write, c_read) = match (c_write, c_read) {
215 (Some(w), Some(r)) => (w, r),
216 _ => {
217 error!("Failed to match read and write characteristics for {name}");
218 return Err(Error::MissingReadOrWriteBleCharacteristics);
219 }
220 };
221
222 let mut d = BleDevice {
224 info: info.clone(),
225 mtu: 23,
226 p: p.clone(),
227 c_write: c_write.clone(),
228 c_read: c_read.clone(),
229 };
230
231 match d.fetch_mtu().await {
233 Ok(mtu) => d.mtu = mtu,
234 Err(e) => {
235 warn!("Failed to fetch MTU: {:?}", e);
236 }
237 }
238
239 debug!("using MTU: {}", d.mtu);
240
241 Ok(d)
242 }
243}
244
245const BLE_HEADER_LEN: usize = 3;
246
247impl BleDevice {
248 async fn write_command(&mut self, cmd: u8, payload: &[u8]) -> Result<(), Error> {
250 let mut data = Vec::with_capacity(payload.len() + 2);
252 data.extend_from_slice(&(payload.len() as u16).to_be_bytes()); data.extend_from_slice(payload); debug!("TX cmd: 0x{cmd:02x} payload: {data:02x?}");
256
257 for (i, c) in data.chunks(self.mtu as usize - BLE_HEADER_LEN).enumerate() {
259 let mut buff = Vec::with_capacity(self.mtu as usize);
261 let cmd = match i == 0 {
262 true => cmd,
263 false => 0x03,
264 };
265
266 buff.push(cmd); buff.extend_from_slice(&(i as u16).to_be_bytes()); buff.extend_from_slice(c);
269
270 debug!("Write chunk {i}: {:02x?}", buff);
271
272 self.p
273 .write(&self.c_write, &buff, WriteType::WithResponse)
274 .await?;
275 }
276
277 Ok(())
278 }
279
280 async fn read_data(
282 &mut self,
283 mut notifications: Pin<Box<dyn Stream<Item = ValueNotification> + Send>>,
284 ) -> Result<Vec<u8>, Error> {
285 let v = match notifications.next().await {
287 Some(v) => v.value,
288 None => {
289 return Err(Error::Closed);
290 }
291 };
292
293 debug!("RX: {:02x?}", v);
294
295 if v.len() < 5 {
297 error!("response too short");
298 return Err(Error::UnexpectedResponse);
299 } else if v[0] != 0x05 {
300 error!("unexpected response type: {:?}", v[0]);
301 return Err(Error::UnexpectedResponse);
302 }
303
304 let len = v[4] as usize;
306 if len == 0 {
307 return Err(Error::EmptyResponse);
308 }
309
310 trace!("Expecting response length: {}", len);
311
312 let mut buff = Vec::with_capacity(len);
314 buff.extend_from_slice(&v[5..]);
315
316 while buff.len() < len {
319 let v = match notifications.next().await {
321 Some(v) => v.value,
322 None => {
323 error!("Failed to fetch next chunk from peripheral");
324 self.p.unsubscribe(&self.c_read).await?;
325 return Err(Error::Closed);
326 }
327 };
328
329 debug!("RX: {v:02x?}");
330
331 buff.extend_from_slice(&v[5..]);
335 }
336
337 Ok(buff)
338 }
339
340 async fn fetch_mtu(&mut self) -> Result<u8, Error> {
342 self.p.subscribe(&self.c_read).await?;
344 let mut n = self.p.notifications().await?;
345
346 self.write_command(0x08, &[]).await?;
348
349 let mtu = match n.next().await {
351 Some(r) if r.value[0] == 0x08 && r.value.len() == 6 => {
352 debug!("RX: {:02x?}", r);
353 r.value[5]
354 }
355 Some(r) => {
356 warn!("Unexpected MTU response: {r:02x?}");
357 return Err(Error::UnexpectedMtuResponse);
358 }
359 None => {
360 warn!("Failed to request MTU");
361 return Err(Error::Closed);
362 }
363 };
364
365 self.p.unsubscribe(&self.c_read).await?;
367
368 Ok(mtu)
369 }
370
371 pub(crate) async fn is_connected(&self) -> Result<bool, Error> {
372 let c = self.p.is_connected().await?;
373 Ok(c)
374 }
375}
376
377impl Exchange for BleDevice {
379 async fn exchange(&mut self, command: &[u8], timeout: Duration) -> Result<Vec<u8>, Error> {
380 self.p.subscribe(&self.c_read).await?;
382 let notifications = self.p.notifications().await?;
383
384 if let Err(e) = self.write_command(0x05, command).await {
386 self.p.unsubscribe(&self.c_read).await?;
387 return Err(e);
388 }
389
390 debug!("Await response");
391
392 let buff = match tokio::time::timeout(timeout, self.read_data(notifications)).await {
394 Ok(Ok(v)) => v,
395 Ok(Err(e)) => {
396 self.p.unsubscribe(&self.c_read).await?;
397 return Err(e);
398 }
399 Err(e) => {
400 self.p.unsubscribe(&self.c_read).await?;
401 return Err(e.into());
402 }
403 };
404
405 Ok(buff)
406 }
407}