2025年8月12日火曜日

Raspberry Pi Pico W×2台:Bluetooth接続とGPIO制御の実装例

   

今回はPico W同士をBluetoothでつないで、ボタンを押すともう片方のGPIOが反応してLEDやモーターを制御する…という感じの実験をやってみます。前回のモジュール構成をそのまま使いつつ、実装の練習も兼ねて進めていきます。

◆Muエディタの導入の仕方
【micro:bit】Lチカ←こちらではmicro:bitで利用してますが、
Raspbery Pi PicoWでも使えるのでThonny2つ起動できないので片方をMuエディタにします。

1.接続図



2.Pico W×2台:Bluetooth接続

前回は、Raspberry Pi Pico Wを子機(ペリフェラル)として使用し、親機(セントラル)にはAndroidのSerial Bluetooth Terminalを使ってBluetooth接続を行いました。
今回は、その親機側もRaspberry Pi Pico Wで構築し、2台のPico WによるBluetooth通信を試してみます。一方を子機(ペリフェラル)、もう一方を親機(セントラル)として動作させる構成です。

BLEのサンプルコードは、micropython/micropython リポジトリの examples/bluetoothディレクトリに用意されています。
今回はそれらをライブラリのように扱い、Thonny上に取り込んで活用していきます。


子機側の構成


main.py
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
import time

ble = bluetooth.BLE()
p = BLESimplePeripheral(ble)

def on_rx(v):
    print("RX", v)

p.on_write(on_rx)

i = 0
while True:
    if p.is_connected():
        # 1回ずつデータを送信
        data = str(i) + "_"
        print("TX", data)
        p.send(data)
        i += 1
        time.sleep_ms(1000)  # 1秒待機して次のデータを送信



親機側の構成



import bluetooth
import time
from ble_simple_central import BLESimpleCentral

# BLEを初期化してCentral(中央機)として動作
ble = bluetooth.BLE()
central = BLESimpleCentral(ble)

# ペリフェラルが見つからなかったかどうかのフラグ
not_found = False

# スキャン時のコールバック関数
def on_scan(addr_type, addr, name):
    if addr_type is not None:
        # ペリフェラルが見つかった場合の処理
        print("Found peripheral:", addr_type, addr, name)
        central.connect()  # 自動で接続
    else:
        # 見つからなかった場合の処理
        global not_found
        not_found = True
        print("No peripheral found.")

# 受信したデータをそのまま返す(エコー)
def on_rx(v):
    msg = bytes(v).decode().strip()  # 受信したデータをデコード
    print("Received:", msg)

    # 受け取った値を整数に変換し、+100する
    try:
        value = int(msg)  # 受信した値を整数に変換
        value += 100  # 100を加算
        print("Echoed (after adding 100):", value)
        central.write(str(value) + "\r\n", False)  # 加算した値を返す
    except Exception as e:
        print("Failed to echo:", e)

# スキャン開始
central.scan(callback=on_scan)

# 受信通知を設定
central.on_notify(on_rx)

# 接続待機ループ
is_connect = True
while not central.is_connected():
    time.sleep_ms(100)  # 100ms待機
    if not_found:
        # スキャンに失敗したらループを抜ける
        is_connect = False
        break

# 接続成功時の処理
if is_connect:
    print("Connected")

    # 受信データを待ちながら処理を続ける
    while central.is_connected():
        time.sleep(0.1)  # 接続中に何もしないで待機

    # 接続が切れたとき
    print("Disconnected")






3.Pico W×2台:Bluetooth接続でGPIO操作


子機側の構成


main.py
from machine import Pin
import bluetooth
from ble_simple_peripheral import BLESimplePeripheral
import time

ble = bluetooth.BLE()
p = BLESimplePeripheral(ble)

# スイッチのピン設定
sw1 = Pin(16, Pin.IN, Pin.PULL_UP)
sw2 = Pin(17, Pin.IN, Pin.PULL_UP)
sw3 = Pin(18, Pin.IN, Pin.PULL_UP)
sw4 = Pin(19, Pin.IN, Pin.PULL_UP)

def on_rx(v):
    print("RX", v)

p.on_write(on_rx)

# 前回のスイッチ状態を記録する変数
prev_sw = [1, 1, 1, 1]

while True:
    if p.is_connected():
        # 現在の状態を取得
        curr_sw = [sw1.value(), sw2.value(), sw3.value(), sw4.value()]
       
        # 各スイッチをチェック
        for idx, (prev, curr) in enumerate(zip(prev_sw, curr_sw)):
            if prev == 1 and curr == 0:
                # 押された瞬間(立ち下がりエッジ)
                msg = str(idx + 1)
                print("TX", msg)
                p.send(msg)
       
        # 状態を更新
        prev_sw = curr_sw.copy()
       
    time.sleep_ms(50)  # チャタリング防止とCPU節約のための待機




MPY: soft reboot
Starting advertising
New connection 64
TX 1
RX b'101\r\n'
TX 2
RX b'102\r\n'
TX 3
RX b'103\r\n'
TX 4
RX b'104\r\n'

親機側の構成



import bluetooth
import time
from machine import Pin
from ble_simple_central import BLESimpleCentral

# GPIO設定
pin16 = Pin(16, Pin.OUT)
pin17 = Pin(17, Pin.OUT)

# BLEを初期化してCentralとして動作
ble = bluetooth.BLE()
central = BLESimpleCentral(ble)

# ペリフェラルが見つからなかったかどうかのフラグ
not_found = False

# スキャン時のコールバック関数
def on_scan(addr_type, addr, name):
    if addr_type is not None:
        print("Found peripheral:", addr_type, addr, name)
        central.connect()  # 自動で接続
    else:
        global not_found
        not_found = True
        print("No peripheral found.")

# 受信したデータを処理する
def on_rx(v):
    msg = bytes(v).decode().strip()
    print("Received:", msg)

    try:
        value = int(msg)
        if value == 1:
            pin16.on()
            print("GPIO16 ON")
        elif value == 2:
            pin16.off()
            print("GPIO16 OFF")
        elif value == 3:
            pin17.on()
            print("GPIO17 ON")
        elif value == 4:
            pin17.off()
            print("GPIO17 OFF")
        else:
            print("Unknown command:", value)

        # 応答として +100 した値を返す(必要なら)
        central.write(str(value + 100) + "\r\n", False)
    except Exception as e:
        print("Failed to process input:", e)

# スキャン開始
central.scan(callback=on_scan)

# 受信通知を設定
central.on_notify(on_rx)

# 接続待機ループ
is_connect = True
while not central.is_connected():
    time.sleep_ms(100)
    if not_found:
        is_connect = False
        break

if is_connect:
    print("Connected")
    while central.is_connected():
        time.sleep(0.1)
    print("Disconnected")



Found peripheral: 0 b'(\xcd\xc1\x12\xccb' mpy-uart
service (64, 1, 3, UUID(0x1800))
service (64, 4, 6, UUID(0x1801))
service (64, 7, 12, UUID('6e400001-b5a3-f393-e0a9-e50e24dcca9e'))
Connected
Received: 1
GPIO16 ON
Received: 2
GPIO16 OFF
Received: 3
GPIO17 ON
Received: 4
GPIO17 OFF


0 件のコメント:

コメントを投稿

Google Spread Sheetの利用

  以前に ESP32 で作っていたものを、Raspberry Pi Pico W + MicroPythonで再現してみました。 1.Googleスプレッドシートの設定 1.Google Drive → 右クリック → Google スプレッドシート 2.作成して共有をクリック