Workbench(5)

clojure特集が有ったり、 雲の中にclojureが出てきたりしてる ようだけど、レールのちからなんだな。相変わらず、世間は騒々しいぞと。

この間、岡山発のマスカットが2房、7000円で出荷されたとニュースで報じていた。製造現場が映って いたけど、温室育ちみたい。石油製品ですか?それとも、太陽熱エネルギー製品ですか? 上の記事同様に 少々、気になるよ。

こちらの葡萄はその後順調に生育を続け、一週間ぐらい前から袋が被せられ始めた。 (ご丁寧な事に、袋が濡れないように、袋の上端に屋根が付いてるよ)背丈程の葡萄棚から 垂れ下がる房に、一袋々被せて行くのは容易な作業じゃなかろうに! いくら、腰の曲がったじいちゃん、 ばーちゃんでも、一日中背伸びして、上を見上げて万歳しながらの作業じゃ、いやになっちゃうよな。 それもこれも、金の成る樹に愛情を注いで、孫の小遣いを捻出する為なのかな?

早朝散歩の時、葡萄棚にいる婆やに聞いてみたよ。袋って何のためにかけるんですか? ってね。 婆やの言う事にゃ、農薬がかからないようにするのが一番の理由とか。粒を一つづつ洗って食べる なんて、みなさんに期待できないからね。第二の理由は、鳥さんのデザートにされない為とか。 果樹園の周りがネットで覆われているのも、そういう理由なのね。ちなみに、上からもぐり込もう 作戦は、多分失敗するでしょう。葉と蔓が棚一杯に広がっていて、緑の天井みたいになってるから。

葡萄クラスターの色は、申し合わせたように緑色です。緑の種類しか作っていないの? って聞いて みたら、そんな事は無いとの事。そのうちに、赤や黒々した巨峰にもなるけど、全ては袋の中で 進化してくので、赤い房がぶら下がっているのは、お目にかかれないそうだ。へぇー、そんなの 知らなかったよ。ちなみに、今の粒は、パチンコ玉ぐらいですかね。

今のこちらの旬は、さくらんぼ。随分と長い期間に渡ってあるな。種類が豊富で、すこしづつ時期が ずれているから、長い期間楽しめるんだな。

桃は尻に色気が出てきたよ。全体はまだ青いけど、割れ目がピンクっぽくなってきた。林檎は、握り拳 ぐらいまで大きくなってきたけど、まだ青い。柿は柿の形をしてるけど、まだ青い。赤くなるまで あと数ヶ月か。

dir

インターネット探検(略してIE)してたんだ。勿論、使うブラウザーはIEじゃないけどね。M$って、 Windowsに限らず、どうしてこんな紛らわしい名前を付けるんかね?

で、見つけたのが、 Pythonイントロスペクション入門なんて 言う記事。Pythonってググルな所でお勧めかと思ったら、IBMにもファンが居たのね。オブジェクトを 手軽に検査するって考え、Lispのそれと一緒だな。Smalltalkを開発した人も、オブジェクトや ソースを手軽に検査出来るのが欲しかったので作ったそうだけど、後でLispがすでに実現してる事を 知って、それならわざわざ開発する事無かったじゃんと、言ったとか。

この記事に出てた、dir()って便利だな。Lispで言う、oblistとかnamespaceに相当すんのかな? どうやって実現してんの? internを。少し探してみたら、面白いのにぶつかった。

# -*- coding: utf-8 -*-
""" mydir.py
組み込み関数dir()をカスタマイズする例
指定のモジュールの名前空間に属する変数名の一覧を出力する
"""
from re import compile

verbose = 0

def all(module):
    if verbose:
        print "-" * 30
        print "name:", module.__name__, "  file:", module.__file__
        print "-" * 30

    count = 0
    for attr in sorted(module.__dict__.keys()):  # 名前空間の内容を調べる
        print "%02d) %s   " % (count+1, attr),
        if attr[0:2] == "__":  # __で始まる属性はビルトイン
            print "<built-in name>"  # ビルトイン属性の出力
        else:
            print getattr(module, attr)  # .__dict__[attr]と同等
        count += 1

    if verbose:
        print "-" * 30
        print module.__name__, "has %d names" % count
        print "-" * 30

def grep(module):
    pat = raw_input("Pattern: ")
    p = compile(pat)
    for s in module.__dict__.keys():
        if p.search(s):
            print s

if __name__ == "__main__":
    import mydir
    all(mydir)  # トップレベルファイルとして実行した場合、自身を引数とする

オブジェクトの名前はモジュール毎に __dict__ と言う辞書に登録されてるのね。大きいモジュールだとオブジェクト数が軽く 1000を超えてしまって始末に終えないので、軽めに検索機能のgrepを付けといた。使い方は、こんな感じ

>>> import math
>>> all(math)
01) __doc__    <built-in name>
  :
43) tan    <built-in function tan>
44) tanh    <built-in function tanh>
45) trunc    <built-in function trunc>
>>> grep(math)
Pattern: tan
tan
atanh
atan
tanh
atan2

これで、ちまちまと見ていたら、 degrees とか radians なんてのが見つかった。 おいらも同じようなのを用意したけど、すでに有ったのね。とんだ再発明をしちゃったな。

それから、上記のスクリプトでは、verboseが有効になっていないけど、これを有効にして、all(math)とか すると、ファイルが見つからんと言われて落ちちゃうんだ。mathってbinaryになってるんかな。この際だから ソースを開いてみるかな。その前に下調べをFreeBSDでやっておく。

[sakae@cdr /usr/local/lib/python2.6/lib-dynload]$ nm math.so | lv
   :
00001770 t math_degrees
00004b60 d math_degrees_doc
    :
00001710 t math_radians
000051a0 d math_radians_doc
    :

ドキュメントもbinaryになってるっぽい。そんじゃ、ソースを探検(だから、M$風に略すと、SEするって事 だな)。

static const double degToRad = Py_MATH_PI / 180.0;
static const double radToDeg = 180.0 / Py_MATH_PI;

static PyObject *
math_degrees(PyObject *self, PyObject *arg)
{
    double x = PyFloat_AsDouble(arg);
    if (x == -1.0 && PyErr_Occurred())
        return NULL;
    return PyFloat_FromDouble(x * radToDeg);
}

PyDoc_STRVAR(math_degrees_doc,
"degrees(x)\n\n\
Convert angle x from radians to degrees.");

static PyObject *
math_radians(PyObject *self, PyObject *arg)
{
    double x = PyFloat_AsDouble(arg);
    if (x == -1.0 && PyErr_Occurred())
        return NULL;
    return PyFloat_FromDouble(x * degToRad);
}

PyDoc_STRVAR(math_radians_doc,
"radians(x)\n\n\
Convert angle x from degrees to radians.");


static PyMethodDef math_methods[] = {
         :
    {"degrees",         math_degrees,   METH_O,         math_degrees_doc},
         :
    {"radians",         math_radians,   METH_O,         math_radians_doc},
         :
};

以上、一時調査終了。PyFloat_AsDoubleした値が、-1.0だと、未定義になるんだ。そんな数学的な事は 今まで知らんかったよ。たまには、ソースも見るもんだなあ。で、力を入れて開発されたと言う、SmallTalk では、こういうプリミテェブな所まで、検査出来るんでしょうかね? プリミティブな部分を調べると、 いきなりアセンブラーのコードが浮かび上がってくるのかな?

ETの導きでBugを知る

前回、ET?からの信号をFFTして、その美しさに見とれた。どうせなら、Sメーターの振れ幅を理想的に したらどうなるだろう? 宇宙からやって来る電波は、ドップラーシフトで、帯域が広がっているかも? なんて思ったからだ。

急峻なパルスを作ればいいんだな。

>>> x = zeros(SMPL, dtype=float)
>>> x[0] = 1
>>> run('fft(x) | disp')

これで、スペクトラムと位相グラフが出てくるはずなんですが、表示されるのは、時間領域のグラフ でした。はてな? しょうがない、fftした結果を確認してみるか。

>>> fft(x)
array([ 1.+0.j,  1.+0.j,  1.+0.j, ...,  1.+0.j,  1.+0.j,  1.+0.j])

ありゃ、どこまで行っても、虚部は0jだよ! 今の複素数判定は曰くつきだったな。

def tap(a):                                   # tapping  
    figure()
    if True in iscomplex(a):
       :

こんなんじゃ、全データについて調べても、Falseになっちゃう。すなわち、実数と判定されちゃう。 これは、直しておかないとね。ETさん、Bugを教えてくれてありがとう。さて、直しますか。

def tap(a):                                   # tapping  
    figure()
    if str(a[0]).find('j') > 0:
        :

おいら、テスト・ファーストって実施するような新しい人じゃないけど、たとえ実施してても、こういう 領域が有ったなんて気づいていただろうか? テスト・ファーストより、問題領域の把握の方がはるかに 重要だと思うぞ。

で、このインパルスは、全周波数帯に渡って同一スペクトラム、同一位相になってるので、 フィルターの試験にはかかせないな。impとかの名前で、登録しておくか。余談だけど、インパルスの 発生位置をずらすと(a[100]=1 のように)、 位相が美しく変化してくよ。数学って面白いな。

3M

スリー・エムって、あの会社の事ではない! 3K(きつい、きたない、危険)とか、5S(って、何処の 会社の標語でしたっけ?)、さしすせそ女房(裁縫、躾、炊事、洗濯、掃除)の類のやつだ。

無理、無駄、むら の頭文字だったかな? こういうのが根源にあってBugが生まれて、事故が起こる。 上で見たように、すでに提供されてる機能を再定義なんてのは、苦労してBugを呼び込むようなものだ。 こういうの、『骨折り損のくたびれ儲け』って言うんでしたっけ?

無駄と言う観点から見ると、ループの中で、ifの分岐が有るのは、イマイチいい気分がしない筆頭に なる。(おいら的には、、、ですが。)たとえば、

def en(fn='zmywb.dat'):                       # entry into pipe
    a = []
    with open(fn) as f:
        dats = f.readlines()
    jp = True if dats[0].find('j') > 0 else False
    for n in range(SMPL):
        try:
            v = complex(dats[n]) if jp else float(dats[n])
            a.append(v)
        except:
            v = 0.0+0.0j if jp else 0.0
            a.append(v)
    return array(a) 

forのループ中で、いちいち、複素数か実数かの場合分けをしている。こういうの、気分悪いな。判定を 外に出しちゃえ。

def ent(fn='zmywb.dat'):                       # entry into pipe
    a = []
    with open(fn) as f:
        dats = f.readlines()
    conv = complex  if dats[0].find('j') > 0 else float
    zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0
    for n in range(SMPL):
        try:
            a.append(conv(dats[n]))
        except IndexError:
            a.append(zrov)
    return array(a) 

ついでに、例外の種類も限定して、except句の中の動作をより明確にした。まあ、コメントを きちんと書けよと言われれば、返す言葉はありません。

とまあ、ここまで書いてふと思ったんだけど、Python自身には、オブジェクトのシリアライズと言うか マーシャリングをやってくれるモジュールが用意されてたね。こちらを使うのも一興かも知れんな。

速度計測

今まで速度計測は特にやってこなかった。対話しながらグラフを眺めていたので、実行スピードは気に ならなかったからだ。上で、3Mとかを始めたからには、多少は気にしておくかな。 workbenchシリーズの初回だったかに、timeitを使って、時間を計測しようをやったけど、これを そのまま使おうとすると、実行する世界が違うため、エラーになってしまう。 そんな訳なので、自前のtimeitもどきを書いてみた。

def meas(f=fft, rpt=3, cnt=1000):             # like timeit
    import time
    res = [0] * rpt
    for r in range(rpt):
        t1 = time.time()
        for n in range(cnt):
            f(x)                              # dut
        res[r] = (time.time() - t1)
    print(res)

使い方は、次のようになる。

>>> x=rnd()
>>> meas(f=lff)
[5.694000005722046, 5.616000175476074, 6.052999973297119]

被測定関数名は、measの引数として指定します(省略時は、fftです)。被測定関数は、アリティーが1の 関数(正確には、必須引数が1つの関数)で、引数名は、 x です。xは、meas()の外側で定義されてる必要があります。

被測定関数を1000回連続実行させ、その実行時間を測定。結果は、これを3回繰り返したものとなってます。 1000回繰り返してますから、結果の単位は、ms になります。

lffって、内部では、fft | lf | ifft | getr してるんだけど、どこで一番時間を食っているのかな?

>>> meas()
[0.25, 0.23399996757507324, 0.25]
>>> meas(f=getr)
[2.682999849319458, 2.63700008392334, 2.699000120162964]

上記は、実数入力の関数です。

>>> x = fft(x)
>>> meas(f=lf)
[1.2170000076293945, 1.247999906539917, 1.2170000076293945]
>>> meas(f=ifft)
[0.42100000381469727, 0.437000036239624, 0.42100000381469727]

こちらは、複素数入力の関数です。

fftやifftに時間を食われると思っていたら想像が外れました。一番時間がかかっているのがgetrとは以外な 結果です。こうやって調べてみないとだめですね。

そんじゃ、fftする時のサンプル数の影響を調べてみます。

>>> SMPL=256
>>> x=rnd()
>>> meas()
[0.014999866485595703, 0.016000032424926758, 0.016000032424926758]
>>> SMPL=512
>>> x=rnd()
>>> meas()
[0.031000137329101562, 0.03099989891052246, 0.03099989891052246]
>>> SMPL=1024
>>> x=rnd()
>>> meas()
[0.06200003623962402, 0.04699993133544922, 0.06299996376037598]
>>> SMPL=2048
>>> x=rnd()
>>> meas()
[0.10900020599365234, 0.125, 0.15599989891052246]
>>> SMPL=4096
>>> x=rnd()
>>> meas()
[0.25, 0.23399996757507324, 0.24900007247924805]
>>> SMPL=8192
>>> x=rnd()
>>> meas()
[0.6399998664855957, 0.6550002098083496, 0.623999834060669]
>>> SMPL=16384
>>> x=rnd()
>>> meas()
[1.371999979019165, 1.3259999752044678, 1.310999870300293]
>>> SMPL=32768
>>> x=rnd()
>>> meas()
[4.726000070571899, 4.680000066757202, 4.523999929428101]

ものの本によると、fftの計算時間はサンプル数によって決まりそのオーダーは、(N/2)log(N -1 ) の 乗算と、NlogN の加算が必要との事。上の計測値は、そうなってるかな?

仮に、20KHzで4096ポイントをサンプリングすれば、データが揃うまで、0.2秒かかるから、lffみたいな 豪華なフィルターでも余裕で使えるな。でも、2wayのインターリーブは必須か。Pythonには、A/DとかD/A を扱うモジュールって有るのかな?オーディオモジュールあたりに、ハードの直叩きがこっそり鎮座 してるといいんだけど。

そうそう、このデータを取ったのは、Intelの歴史的遺物、セレロン2.2GHz(だったかな、余り興味無し) Windows7と言う、Wintel機です。そして、meas()を空回ししたデータも載せておきます。

>>> meas(f=(lambda a: True))
[0.0, 0.0, 0.0]
>>> meas(f=(lambda a: True), cnt=1000000)
[0.28099989891052246, 0.21799993515014648, 0.23400020599365234]

It's SONY or CISCO ?

前回だったか、周波数変調波の位相特性を表示した時、右側の高い周波数領域が表示打ち切りで、どうな っているか確認出来なかった。これは、表示に棒グラフを使っていたため、拡大時等の描画に膨大な 時間がかかってしまう為、苦肉の策でエリアを狭めていたからだ。

棒グラフって、ようするに長方形をいちいち描いて中を塗りつぶしてる訳だから、時間がかかるのは 当たり前だ。見栄えのする棒グラフは、EXCELに任せる事にして、なるべく軽い表示が欲しい。 例をつらつら見てたら、垂直線が使えるようなので、試してみた。関数名は、tapの仲間っつう事で、tp にしといた。

def tp(a):                                   # full range of freq. domain 
    figure()
    if str(a[0]).find('j') > 0:
        subplot(211)
        f = geta(a)
        vlines(arange(SMPL/2), [0], f[:SMPL/2])           
        subplot(212)
        p = gett(nb(a))
        vlines(arange(SMPL/2), [0], p[:SMPL/2])
    else:
        plot(a)
    return array(a)

def dsp(a):                                   # put on pipe end
    tp(a)
    show()

これで表示させて、拡大してくと、どうも何処かで見た雰囲気だなあ。ああ、思い出したよ。 ソニーのCMで、ちらっと出てきた、線が音楽に合せて上下に踊るやつとそっくりだなあ。

他でも、見たぞ。CISCOのロゴに似てるな。このロゴを 再現するWAVEデータでも作ってみるかな。勿論、結果は、FFTしてから得ると言う、暇潰しには もってこいの方法でね。

最後に、今日のワークベンチです。

from pylab import *
from numpy import *
from numpy.fft import *
from math import *
from cmath import phase

SMPL= 4096                                    # sampling points, must be 2**n
r2d = lambda r: 180 * r / pi                  # radian to degree
d2r = lambda d: d * pi  / 180                 # degree to radian
th  = lambda z: r2d(phase(z))                 # angle
getr = lambda a: array([x.real for x in a])   # real
geti = lambda a: array([x.imag for x in a])   # imag
geta = lambda a: array([abs(x) for x in a])   # power
gett = lambda a: array([th(x) for x in a])    # phase
mix  = lambda a,b: array((a + b))             # add array
mul  = lambda a,b: array((a * b))             # product array

def fm(fs, fc=300, m=0.5):                    # freq. modulation
    for n in range(SMPL):
        fs[n] = cos(2 * pi * (fc + m * fs[n]) * n / SMPL)
    return array(fs)

def pm(fs, fc=300, m=0.5):                    # phase modulation
    for n in range(SMPL):
        fs[n] = cos(m * fs[n] + 2 * pi * fc * n / SMPL)
    return array(fs)

def lf(a, fc=20):                             # low pass filter
       for n in range(SMPL):
           a[n] = 0+0j if fc <= n < (SMPL - fc) else a[n]
       return array(a)

def lff(a, fc=20):                            # lf module
    v = lf(fft(a), fc)
    return array(getr(ifft(v)))

def tap(a):                                   # tapping  
    figure()
    if str(a[0]).find('j') > 0:
        #subplots_adjust(hspace=0.001) 
        subplot(211)
        f = geta(a)
        bar(arange(SMPL/8), f[:SMPL/8])        
        subplot(212)
        p = gett(nb(a))
        bar(arange(SMPL/8), p[:SMPL/8])
    else:
        plot(a)
    return array(a)

def tp(a):                                   # full range of freq. domain 
    figure()
    if str(a[0]).find('j') > 0:
        subplot(211)
        f = geta(a)
        vlines(arange(SMPL/2), [0], f[:SMPL/2])           
        subplot(212)
        p = gett(nb(a))
        vlines(arange(SMPL/2), [0], p[:SMPL/2])
    else:
        plot(a)
    return array(a)

def disp(a):                                  # put on pipe end
    tap(a)
    show()

def dsp(a):                                   # put on pipe end
    tp(a)
    show()

def nb(a, th=1e-4):                           # round to 0+0j when less th
    for n in range(SMPL):
        a[n] = 0+0j if abs(a[n]) < th else a[n]
    return a

def dt(a):                                    # display time domain
    figure()
    plot(a)
    show()

def df(a):                                    # display freq domain
    figure()
    #plot(a[:SMPL/2])
    #bar(arange(SMPL/2), a[:SMPL/2])
    bar(arange(SMPL/8), a[:SMPL/8])
    show()

def rnd(g=1):                                 # random data
    import random
    wave = []
    for n in range(SMPL):
        wave.append(g * (random.random() - 0.5))
    return array(wave)

def imp():                                    # implus (for filter test)
    a = zeros(SMPL, dtype=float)
    a[0] = 1
    return a

def st(fs1=3, g=1.0):                         # single tone
    wave = []
    for n in range(SMPL):
        wave.append(g * cos(2 * pi * fs1 * n / SMPL) )
    return array(wave)

def tt(fs1=3, fs2=5, g=0.8):                  # two tone (for audio)
    wave = []
    for n in range(SMPL):
        wave.append(g * (cos(2 * pi * fs1 * n / SMPL) +
                         sin(2 * pi * fs2 * n / SMPL)) )
    return array(wave)

def vo(fs1=2, fs2=3, fs3=4, fs4=5, g=0.7):    # like voice
    wave = []
    for n in range(SMPL):
        wave.append(g * (0.4 * cos(2 * pi * fs1 * n / SMPL) +
                         0.8 * sin(2 * pi * fs2 * n / SMPL) +
                         0.6 * cos(2 * pi * fs3 * n / SMPL) +
                         0.2 * sin(2 * pi * fs4 * n / SMPL)) )
    return array(wave)

def dd(msg='2133321331213116', m=0.5, s=-0.5): # dot dash (msg=JPL)
    wave = SMPL * [s]
    dots = 0
    for n in msg:
        if n == '1' or n == '3':
            dots += (1 + int(n))
        elif n == '2' or n == '6':
            dots += int(n)
        else:
            pass
    t = int(SMPL / dots)
    ref = [m] * 3 * t
    st = 0
    for n in msg:
        if n == '1' or n == '3':
            wave[st:st+t*int(n)] = ref[0:t*int(n)]
            st += (t * (int(n) + 1))
        elif n == '2' or n == '6':
            st += (t * int(n))
        else:
            pass
    return array(wave)

def am(fs, fc=300, m=0.5):                    # amplitude modurator
    for n in range(SMPL):
        fs[n] = (1 + m * fs[n] ) * cos(2 * pi * fc * n / SMPL)
    return fs

def dsb(fs, fc=300):                          # double side band
    for n in range(SMPL):
        fs[n] = fs[n] * cos(2 * pi * fc * n / SMPL)
    return fs

def saveon(a, fn='zmywb.dat'):                # data save
    with open(fn, 'w') as f:
        for n in range(SMPL):
            f.write(str(a[n]))
            f.write('\n')
    return array(a)
 
def en(fn='zmywb.dat'):                       # entry into pipe
    a = []
    with open(fn) as f:
        dats = f.readlines()
    conv = complex  if dats[0].find('j') > 0 else float
    zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0
    for n in range(SMPL):
        try:
            a.append(conv(dats[n]))
        except IndexError:
            a.append(zrov)
    return array(a) 

def meas(f=fft, rpt=3, cnt=1000):             # like timeit
    import time
    res = [0] * rpt
    for r in range(rpt):
        t1 = time.time()
        for n in range(cnt):
            f(x)                              # dut
        res[r] = (time.time() - t1)
    print(res)

def run(wire):
    try:
        exlude = set(['show', 'show()', ])
        cmds = wire.split('|')
        for n, cmd in enumerate(cmds):
            cmd = cmd.strip()
            if (n == 0) or cmd in exlude:
                do_cmd = (cmd if cmd[-1] == ')' else (cmd + '()' ))
            else:
                do_cmd = (cmd.replace('(', '(val,', 1) if cmd[-1] == ')'
                          else (cmd + '(val)' ))
            val = eval(do_cmd)
        return val
    except Exception as e:
        print("Error: {0}".format(e))

def wb():                   #################### my REPL
    while True:
        run(raw_input('wb> '))

if __name__ == '__main__': wb()