硬核奶爸!用樹莓派做個「智能嬰兒監視器」:啼哭自動通知,還能分析哭聲含義

2020-11-06     大數據文摘

原標題:硬核奶爸!用樹莓派做個「智能嬰兒監視器」:啼哭自動通知,還能分析哭聲含義

大數據文摘出品

來源:Medium

編譯:陳之炎

作為一名新晉奶爸和程式設計師,我在新身份中最常思考的問題就是「照料嬰兒的工作真的無法自動化嗎?

當然,這也許能夠實現,就算有給孩子換尿布的機器人(假設有足夠多的父母同意在自己蹣跚學步的孩子身上測試這樣的設備),願意自動化照料嬰兒的父母還真為數不多。

作為父親,我首先意識到的事情是:嬰兒很多時候都會在哭,即使我在家,也不可能總是能聽到孩子的哭聲。

通常,商用嬰兒監視器可以填補這一空白,它們充當對講機,讓你在另一個房間也能聽到嬰兒的哭聲。

但我很快意識到:商用嬰兒監視器沒有我想像中的理想設備智能:

  • 它們只能充當一個傳聲筒:把聲音從源頭帶到揚聲器,卻無法發現孩子哭聲的含義;
  • 當家長要去到另一個房間裡時,相應要把揚聲器帶到另一個房間,無法在任何其他現有的音頻設備上播放聲音;
  • 揚聲器通常是低功率揚聲器,無法連接到外部揚聲器-這意味著,如果我在另一個房間播放音樂,我可能會聽不到孩子的哭聲,即便監控器和我在同一個房間也無法聽到;
  • 大多數揚聲器都是在低功率無線電波上工作的,這意味著如果嬰兒在他/她的房間裡,而你必須走到樓下,它們才能工作。

因此,我萌生了自製一個更好用的「智能嬰兒監視器」的想法。

說干就干,我先給這個「智能嬰兒監視器」定義了一些需要的功能。

  • 它可以運行於價廉物美的樹莓派(RaspberryPI)與USB麥克風
  • 當孩子開始/停止哭泣時,它應該檢測到孩子的哭聲,並通知我(理想情況下是在我的手機上),或者跟蹤我儀錶板上的數據點,或者運行相應的任務。它不應該是一個單純的對講器,簡單地將聲音從一個源傳遞到另一個兼容的設備
  • 它能夠在揚聲器,智慧型手機,電腦等設備上傳輸音頻。
  • 它不受源和揚聲器之間距離的影響,無需在整個房子裡將揚聲器移來移去。
  • 它還應該有一個攝像頭,可以利用攝像頭對孩子實時監控,當他一開始哭,我便可以抓拍到圖片或嬰兒床的短視頻,以檢查有什麼不對勁。

來看看一個新晉奶爸如何使用工程師的大腦開源工具來完成這項任務吧。

採集音頻樣本

首先,購買一塊樹莓派(RaspberryPi),在SD卡上燒錄好Linux作業系統(建議使用RaspberryPI3或更高版本),運行Tensorflow模型。還可以購買一個與樹莓派兼容的USB麥克風。

然後安裝需要的相關項:

[sudo] apt-get install ffmpeg lame libatlas-base-dev alsa-utils

[sudo] pip3 install tensorflow

第一步,必須記錄足夠的音頻樣本,嬰兒在什麼時候哭,在什麼時候不哭。稍後將利用這些樣本來訓練音頻檢測模型。

注意:在這個例子中,我將展示如何利用聲音檢測來識別嬰兒的哭聲,同樣的精準程序可以用來檢測任何其它類型的聲音-只要它們足夠長(例如:警報或鄰居家的鑽孔聲)。

首先,查看音頻輸入設備:

arecord -l

在樹莓派(RaspberryPI)上,得到以下輸出(注意,有兩個USB麥克風):

**** List of CAPTURE Hardware Devices ****

card 1: Device [USB PnP Sound Device], device 0: USB Audio [USB Audio]

Subdevices: 0/1

Subdevice #0: subdevice #0

card 2: Device_1 [USB PnP Sound Device], device 0: USB Audio [USB Audio]

Subdevices: 0/1

Subdevice #0: subdevice #0

我利用第二個麥克風來記錄聲音-即卡2,設備0。識別它的ALSA方法要麼是hw:2,0(直接訪問硬體設備),要麼是plughw:2,0(如果需要的話,它會輸入採樣率和格式轉換插件)。確保SD卡上有足夠的空間,然後開始錄製一些音頻:

arecord -D plughw:2,0 -c 1 -f cd | lame - audio.mp3

和孩子在同一個房間裡,記錄幾分鐘或幾個小時的音頻-最好是長時間的沉默、嬰兒哭聲和其他與之無關的聲音-,錄音完成後按Ctrl-C。儘可能多的重複這個過程多次,在一天中的不同時刻或不同的日子裡獲取不同的音頻樣本。

標註音頻示例

一旦有了足夠的音頻樣本,就可以把它們複製到電腦上來訓練模型了-可以使用SCP複製文件,也可以直接從SD卡上複製。

把它們都存儲在相同目錄下,例如:~/datasets/sound-detect/audio。另外,為每個示例音頻文件創建一個新文件夾,它包含一個音頻文件(名為audio.mp3)和一個標註文件(名為labels.json),利用它來標記音頻文件中的負/正音頻段,原始數據集的結構如下:

~/datasets/sound-detect/audio

-> sample_1

-> audio.mp3

-> labels.json

-> sample_2

-> audio.mp3

-> labels.json

下面:標註錄製的音頻文件-如果它包含了孩子幾個小時的哭聲,可能會特別受虐。在你最喜歡的音頻播放器或Audacity中打開每個數據集音頻文件,並在每個示例目錄中創建一個新的label.json文件。確定哭泣開始的確切時間和結束時間,並在labels.json中標註為time_string -> label的關鍵值結構。例:

{

"00:00": "negative",

"02:13": "positive",

"04:57": "negative",

"15:41": "positive",

"18:24": "negative"

}

在上面的例子中,00:00到02:12之間的所有音頻段將被標記為負,02:13到04:56之間的所有音頻段將被標記為正,以此類推。

生成數據集

對所有的音頻示例標註完成之後,接下來是生成數據集,最後將它輸入到Tensorflow模型中去。首先,創建了一個名為micmon的通用庫和一組用於聲音監視的實用工具。然後,開始安裝:

git clone [email protected]:/BlackLight/micmon.git

cd micmon

[sudo] pip3 install -r requirements.txt

[sudo] python3 setup.py build install

本模型設計基於音頻的頻率樣本而非原始音頻,因為,在這裡我們想檢測到一個特定的聲音,這個聲音有著特定的「頻譜」標籤,即:基頻(或基頻下降的窄帶範圍)和一組特定的諧波。這些諧波頻率與基波之間的比率既不受振幅的影響(頻率比恆定,與輸入幅度無關),也不受相位的影響(無論何時開始記錄,連續的聲音都會有相同的頻譜特徵)。

這種與振幅和相位無關的特性使得這種方法更有可能訓練出一個魯棒的聲音檢測模型,而不是簡單地將原始音頻樣本饋送到模型中。此外,該模型可以更簡單(可以在不影響性能的情況下將多個頻率分為一組,從而可以有效地實現降維),無論樣本持續時間多長,該模型將50~ 100個頻帶作為輸入值,一秒鐘的原始音頻通常包含44100個數據點,並且輸入的長度隨著樣本的持續時間而增加,並且不太容易發生過擬合。

micmon能計算音頻樣本某些段的FFT(快速傅立葉變換),將結果頻譜分為低通和高通濾波器的頻帶,並將結果保存到一組numpy壓縮(.npz)文件中。可以通過在命令行上執行micmon-datagen命令來實現:

micmon-datagen \

--low 250 --high 2500 --bins 100 \

--sample-duration 2 --channels 1 \

~/datasets/sound-detect/audio ~/datasets/sound-detect/data

在上面的示例中,我們從存儲在~/dataset/sound-detect/audio下的原始音頻樣本生成一個數據集,並將生成的頻譜數據存儲到~/datasets/sound-detect/data. –low和~/datasets/sound-detect/data. --high中, low和high分別表示最低和最高頻率,最低頻率的默認值為20Hz(人耳可聞的最低頻率),最高頻率的默認值為20kHz(健康的年輕人耳可聞的最高頻率)。

通過對此範圍做出限定,儘可能多地捕獲希望檢測到的其他類型的音頻背景和無關諧波的聲音。在本案例中, 250-2500赫茲的範圍足以檢測嬰兒的哭聲。

嬰兒的哭聲通常是高頻的(歌劇女高音能達到的最高音符在1000赫茲左右),在這裡設置了至少雙倍的最高頻率,以確保能獲得足夠高的諧波(諧波是更高的頻率),但也不要將最高頻率設得太高,以防止其他背景聲音的諧波。我剪切掉了頻率低於250赫茲的音頻信號-嬰兒的哭聲不太可能發生在低頻段,例如,可以打開一些positive音頻樣本,利用均衡器/頻譜分析儀,檢查哪些頻率在positive樣本中占主導地位,並將數據集集中在這些頻率上。--bins指定了頻率空間的組數(默認值:100),更大的數值意味著更高的頻率解析度/粒度,但如果太高,可能會使模型容易發生過度擬合。

腳本將原始音頻分割成較小的段,並計算每個段的頻譜標籤。示例持續時間指定每個音頻段有多長時間(默認:2秒)。對於持續時間較長的聲音,取更大的值會起到更好的作用,但它同時會減少檢測的時間,而且可能會在短音上失效。對於持續時間較短的聲音,可以取較低的值,但捕獲的片段可能沒有足夠的信息量來可靠地識別聲音。

除了micmon-datagen腳本之外,也可以利用micmonAPI,編寫腳本來生成數據集。例:

import os

from micmon.audio import AudioDirectory, AudioPlayer, AudioFile

from micmon.dataset import DatasetWriter

basedir = os.path.expanduser('~/datasets/sound-detect')

audio_dir = os.path.join(basedir, 'audio')

datasets_dir = os.path.join(basedir, 'data')

cutoff_frequencies = [250, 2500]

# Scan the base audio_dir for labelled audio samples

audio_dirs = AudioDirectory.scan(audio_dir)

# Save the spectrum information and labels of the samples to a

# different compressed file for each audio file.

for audio_dir in audio_dirs:

dataset_file = os.path.join(datasets_dir, os.path.basename(audio_dir.path) + '.npz')

print(f'Processing audio sample {audio_dir.path}')

with AudioFile(audio_dir) as reader, \

DatasetWriter(dataset_file,

low_freq=cutoff_frequencies[0],

high_freq=cutoff_frequencies[1]) as writer:

for sample in reader:

writer += sample

無論是使用micmon-datagen還是使用micmon Python API生成數據集,在過程結束時,應該在~/datasets/sound-detect/data目錄下找到一堆.npz文件,每個標註後的音頻原始文件對應一個數據集。之後,便可以利用這個數據集來訓練神經網絡進行聲音檢測。

訓練模型

micmon利用Tensorflow+Keras來定義和訓練模型,有了PythonAPI,可以很容易地實現。例如:

import os

from tensorflow.keras import layers

from micmon.dataset import Dataset

from micmon.model import Model

# This is a directory that contains the saved .npz dataset files

datasets_dir = os.path.expanduser('~/datasets/sound-detect/data')

# This is the output directory where the model will be saved

model_dir = os.path.expanduser('~/models/sound-detect')

# This is the number of training epochs for each dataset sample

epochs = 2

# Load the datasets from the compressed files.

# 70% of the data points will be included in the training set,

# 30% of the data points will be included in the evaluation set

# and used to evaluate the performance of the model.

datasets = Dataset.scan(datasets_dir, validation_split=0.3)

labels = ['negative', 'positive']

freq_bins = len(datasets[0].samples[0])

# Create a network with 4 layers (one input layer, two intermediate layers and one output layer).

# The first intermediate layer in this example will have twice the number of units as the number

# of input units, while the second intermediate layer will have 75% of the number of

# input units. We also specify the names for the labels and the low and high frequency range

# used when sampling.

model = Model(

[

layers.Input(shape=(freq_bins,)),

layers.Dense(int(2 * freq_bins), activation='relu'),

layers.Dense(int(0.75 * freq_bins), activation='relu'),

layers.Dense(len(labels), activation='softmax'),

],

labels=labels,

low_freq=datasets[0].low_freq,

high_freq=datasets[0].high_freq

# Train the model

for epoch in range(epochs):

for i, dataset in enumerate(datasets):

print(f'[epoch {epoch+1}/{epochs}] [audio sample {i+1}/{len(datasets)}]')

model.fit(dataset)

evaluation = model.evaluate(dataset)

print(f'Validation set loss and accuracy: {evaluation}')

# Save the model

model.save(model_dir, overwrite=True)

運行此腳本後(在對模型的準確性感到滿意後),可以在~/models/sound-detect目錄下找保存的新模型。在我的這個例子中,我採集~5小時的聲音就足夠用了,通過定義一個較優的頻率範圍來訓練模型,準確率大於98%。如果是在計算機上訓練模型,只需將其複製到RaspberryPI,便可以準備進入下一步了。

利用模型進行預測

這時候,製作一個腳本:利用以前訓練過的模型,當孩子開始哭的時候,通知我們:

import os

from micmon.audio import AudioDevice

from micmon.model import Model

model_dir = os.path.expanduser('~/models/sound-detect')

model = Model.load(model_dir)

audio_system = 'alsa' # Supported: alsa and pulse

audio_device = 'plughw:2,0' # Get list of recognized input devices with arecord -l

with AudioDevice(audio_system, device=audio_device) as source:

for sample in source:

source.pause() # Pause recording while we process the frame

prediction = model.predict(sample)

print(prediction)

source.resume() # Resume recording

在RaspberryPI上運行腳本,並讓它運行一段時間-如果在過去2秒內沒有檢測到哭聲,它將在標準輸出中列印negative,如果在過去2秒內檢測到哭聲否,則在標準輸出中列印positive。

然而,如果孩子哭了,簡單地將消息列印到標準輸出中並沒有太大作用-我們希望得到明確實時通知!

可以利用Platypush來實現這個功能。在本例中,我們將使用pushbullet集成在檢測到cry時向我們的手機發送消息。接下來安裝Redis(Platypush用於接收消息)和Platypush,利用HTTP和Pushbullet來集成:

[sudo] apt-get install redis-server

[sudo] systemctl start redis-server.service

[sudo] systemctl enable redis-server.service

[sudo] pip3 install 'platypush[http,pushbullet]'

將Pushbullet應用程式安裝在智慧型手機上,到pushbullet.com上以獲取API token。然後創建一個~/.config/platypush/config.yaml文件,該文件啟用HTTP和Pushbullet集成:

backend.http:

enabled: True

pushbullet:

token: YOUR_TOKEN

接下來,對前面的腳本進行修改,不讓它將消息列印到標準輸出,而是觸發一個可以被Platypush hook捕獲的自定義事件CustomEvent:

#!/usr/bin/python3

import argparse

import logging

import os

import sys

from platypush import RedisBus

from platypush.message.event.custom import CustomEvent

from micmon.audio import AudioDevice

from micmon.model import Model

logger = logging.getLogger('micmon')

def get_args():

parser = argparse.ArgumentParser()

parser.add_argument('model_path', help='Path to the file/directory containing the saved Tensorflow model')

parser.add_argument('-i', help='Input sound device (e.g. hw:0,1 or default)', required=True, dest='sound_device')

parser.add_argument('-e', help='Name of the event that should be raised when a positive event occurs', required=True, dest='event_type')

parser.add_argument('-s', '--sound-server', help='Sound server to be used (available: alsa, pulse)', required=False, default='alsa', dest='sound_server')

parser.add_argument('-P', '--positive-label', help='Model output label name/index to indicate a positive sample (default: positive)', required=False, default='positive', dest='positive_label')

parser.add_argument('-N', '--negative-label', help='Model output label name/index to indicate a negative sample (default: negative)', required=False, default='negative', dest='negative_label')

parser.add_argument('-l', '--sample-duration', help='Length of the FFT audio samples (default: 2 seconds)', required=False, type=float, default=2., dest='sample_duration')

parser.add_argument('-r', '--sample-rate', help='Sample rate (default: 44100 Hz)', required=False, type=int, default=44100, dest='sample_rate')

parser.add_argument('-c', '--channels', help='Number of audio recording channels (default: 1)', required=False, type=int, default=1, dest='channels')

parser.add_argument('-f', '--ffmpeg-bin', help='FFmpeg executable path (default: ffmpeg)', required=False, default='ffmpeg', dest='ffmpeg_bin')

parser.add_argument('-v', '--verbose', help='Verbose/debug mode', required=False, action='store_true', dest='debug')

parser.add_argument('-w', '--window-duration', help='Duration of the look-back window (default: 10 seconds)', required=False, type=float, default=10., dest='window_length')

parser.add_argument('-n', '--positive-samples', help='Number of positive samples detected over the window duration to trigger the event (default: 1)', required=False, type=int, default=1, dest='positive_samples')

opts, args = parser.parse_known_args(sys.argv[1:])

return opts

def main():

args = get_args()

if args.debug:

logger.setLevel(logging.DEBUG)

model_dir = os.path.abspath(os.path.expanduser(args.model_path))

model = Model.load(model_dir)

window = []

cur_prediction = args.negative_label

bus = RedisBus()

with AudioDevice(system=args.sound_server,

device=args.sound_device,

sample_duration=args.sample_duration,

sample_rate=args.sample_rate,

channels=args.channels,

ffmpeg_bin=args.ffmpeg_bin,

debug=args.debug) as source:

for sample in source:

source.pause() # Pause recording while we process the frame

prediction = model.predict(sample)

logger.debug(f'Sample prediction: {prediction}')

has_change = False

if len(window) < args.window_length:

window += [prediction]

else:

window = window[1:] + [prediction]

positive_samples = len([pred for pred in window if pred == args.positive_label])

if args.positive_samples <= positive_samples and \

prediction == args.positive_label and \

cur_prediction != args.positive_label:

cur_prediction = args.positive_label

has_change = True

logging.info(f'Positive sample threshold detected ({positive_samples}/{len(window)})')

elif args.positive_samples > positive_samples and \

prediction == args.negative_label and \

cur_prediction != args.negative_label:

cur_prediction = args.negative_label

has_change = True

logging.info(f'Negative sample threshold detected ({len(window)-positive_samples}/{len(window)})')

if has_change:

evt = CustomEvent(subtype=args.event_type, state=prediction)

bus.post(evt)

source.resume() # Resume recording

if __name__ == '__main__':

main()

將上面的腳本保存為~/bin/micmon_detect.py。如果在滑動窗口時間內上檢測到positive_samples樣本(為了減少預測錯誤或臨時故障引起的噪聲),則腳本觸發事件,並且它只會在當前預測從negative到positive的情況下觸發事件。然後,它被分派給Platypush。對於其它不同的聲音模型(不一定是哭泣嬰兒),該腳本也是通用的,對應其它正/負標籤、其它頻率範圍和其它類型的輸出事件,這個腳本也能工作。

創建一個Platypush hook來對事件作出響應,並向設備發送通知。首先,創建 Platypush腳本目錄:

mkdir -p ~/.config/platypush/scripts

cd ~/.config/platypush/scripts

# Define the directory as a module

touch __init__.py

# Create a script for the baby-cry events

vi babymonitor.py

babymonitor.py的內容為:

from platypush.context import get_plugin

from platypush.event.hook import hook

from platypush.message.event.custom import CustomEvent

@hook(CustomEvent, subtype='baby-cry', state='positive')

def on_baby_cry_start(event, **_):

pb = get_plugin('pushbullet')

pb.send_note(title='Baby cry status', body='The baby is crying!')

@hook(CustomEvent, subtype='baby-cry', state='negative')

def on_baby_cry_stop(event, **_):

pb = get_plugin('pushbullet')

pb.send_note(title='Baby cry status', body='The baby stopped crying - good job!')

為Platypush創建一個服務文件,並啟動/啟用服務,這樣它就會在終端上啟動:

mkdir -p ~/.config/systemd/user

wget -O ~/.config/systemd/user/platypush.service \

https://raw.githubusercontent.com/BlackLight/platypush/master/examples/systemd/platypush.service

systemctl --user start platypush.service

systemctl --user enable platypush.service

為嬰兒監視器創建一個服務文件-如:

~/.config/systemd/user/babymonitor.service:

[Unit]

Description=Monitor to detect my baby's cries

After=network.target sound.target

[Service]

ExecStart=/home/pi/bin/micmon_detect.py -i plughw:2,0 -e baby-cry -w 10 -n 2 ~/models/sound-detect

Restart=always

RestartSec=10

[Install]

WantedBy=default.target

該服務將啟動ALSA設備plughw:2,0上的麥克風監視器,如果在過去10秒內檢測到至少2個positive 2秒樣本,並且先前的狀態為negative,則會觸發state=positive事件;如果在過去10秒內檢測到少於2個positive樣本,並且先前的狀態為positive,則state=negative。然後可以啟動/啟用服務:

systemctl --user start babymonitor.service

systemctl --user enable babymonitor.service

確認一旦嬰兒開始哭泣,就會在手機上收到通知。如果沒有收到通知,可以檢查一下音頻示例的標籤、神經網絡的架構和參數,或樣本長度/窗口/頻帶等參數是否正確。

此外,這是一個相對基本的自動化例子-可以為它添加更多的自動化任務。例如,可以向另一個Platypush設備發送請求(例如:在臥室或客廳),用TTS插件大聲提示嬰兒在哭。還可以擴展micmon_detect.py腳本,以便捕獲的音頻樣本也可以通過HTTP流-例如使用Flask包裝器和ffmpeg進行音頻轉換。另一個有趣的用例是,當嬰兒開始/停止哭泣時,將數據點發送到本地資料庫(可以參考我先前關於「如何使用Platypush+PostgreSQL+Mosquitto+Grafana創建靈活和自我管理的儀錶板」的文章https://towardsdatascience.com/how-to-build-your-home-infrastructure-for-data-collection-and-visualization-and-be-the-real-owner-af9b33723b0c):這是一組相當有用的數據,可以用來跟蹤嬰兒睡覺、醒著或需要喂食時的情況。雖然監測寶寶一直是我開發micmon的初衷,但是同樣的程序也可以用來訓練和檢測其它類型聲音的模型。最後,可以考慮使用一組良好的電源或鋰電池組,這樣監視器便可以便攜化了。

安裝寶貝攝像頭

有了一個好的音頻饋送和檢測方法之後,還可以添加一個視頻饋送,以保持對孩子的監控。一開始,我 在RaspberryPI3上安裝了一個PiCamera用於音頻檢測,後來,我發現這個配置相當不切實際。想想看:一個RaspberryPi 3、一個附加的電池包和一個攝像頭,組合在一起會相當笨拙;如果你找到一個輕型相機,可以很容易地安裝在支架或靈活的手臂上,而且可以四處移動,這樣,無論他/她在哪裡,都可以密切關注孩子。最終,我選擇了體積較小的RaspberryPi Zero,它與PiCamera兼容,再配一個小電池。

嬰兒監視器攝像頭模塊的第一個原型

同樣,先插入一個燒錄了與RaspberryPI兼容的作業系統的SD卡。然後在其插槽中插入一個與RaspberryPI兼容的攝像頭,確保攝像頭模塊在raspi-config中啟用,安裝集成有PiCamera的Platypush:

[sudo] pip3 install 'platypush[http,camera,picamera]'

然後在~/.config/platypush/config.yaml:中添加相機配置:

camera.pi:

listen_port: 5001

在Platypush重新啟動時檢查此配置,並通過HTTP從攝像頭獲取快照:

wget http://raspberry-pi:8008/camera/pi/photo.jpg

或在瀏覽器中打開視頻:

http://raspberry-pi:8008/camera/pi/video.mjpg

同樣,當應用程式啟動時,可以創建一個hook,該hook通過TCP/H264啟動攝像頭饋送:

mkdir -p ~/.config/platypush/scripts

cd ~/.config/platypush/scripts

touch __init__.py

vi camera.py

也可以通過VLC:播放視頻。

vlc tcp/h264://raspberry-pi:5001

在手機上通過VLC應用程式或利用RPi Camera Viewer應用程式觀看視頻。

從想法到最後實現效果還不錯,這也算是一個新晉奶爸從護理瑣事中脫身的自我救贖吧。

原文連結:

https://towardsdatascience.com/create-your-own-smart-baby-monitor-with-a-raspberrypi-and-tensorflow-5b25713410ca

文章來源: https://twgreatdaily.com/zh/OJjonHUB2uKmW_kOxR08.html