Workbench(5)
clojure特集が有ったり、 雲の中にclojureが出てきたりしてる ようだけど、レールのちからなんだな。相変わらず、世間は騒々しいぞと。
この間、岡山発のマスカットが2房、7000円で出荷されたとニュースで報じていた。製造現場が映って いたけど、温室育ちみたい。石油製品ですか?それとも、太陽熱エネルギー製品ですか? 上の記事同様に 少々、気になるよ。
こちらの葡萄はその後順調に生育を続け、一週間ぐらい前から袋が被せられ始めた。 (ご丁寧な事に、袋が濡れないように、袋の上端に屋根が付いてるよ)背丈程の葡萄棚から 垂れ下がる房に、一袋々被せて行くのは容易な作業じゃなかろうに! いくら、腰の曲がったじいちゃん、 ばーちゃんでも、一日中背伸びして、上を見上げて万歳しながらの作業じゃ、いやになっちゃうよな。 それもこれも、金の成る樹に愛情を注いで、孫の小遣いを捻出する為なのかな?
早朝散歩の時、葡萄棚にいる婆やに聞いてみたよ。袋って何のためにかけるんですか? ってね。 婆やの言う事にゃ、農薬がかからないようにするのが一番の理由とか。粒を一つづつ洗って食べる なんて、みなさんに期待できないからね。第二の理由は、鳥さんのデザートにされない為とか。 果樹園の周りがネットで覆われているのも、そういう理由なのね。ちなみに、上からもぐり込もう 作戦は、多分失敗するでしょう。葉と蔓が棚一杯に広がっていて、緑の天井みたいになってるから。
葡萄クラスターの色は、申し合わせたように緑色です。緑の種類しか作っていないの? って聞いて みたら、そんな事は無いとの事。そのうちに、赤や黒々した巨峰にもなるけど、全ては袋の中で 進化してくので、赤い房がぶら下がっているのは、お目にかかれないそうだ。へぇー、そんなの 知らなかったよ。ちなみに、今の粒は、パチンコ玉ぐらいですかね。
今のこちらの旬は、さくらんぼ。随分と長い期間に渡ってあるな。種類が豊富で、すこしづつ時期が ずれているから、長い期間楽しめるんだな。
桃は尻に色気が出てきたよ。全体はまだ青いけど、割れ目がピンクっぽくなってきた。林檎は、握り拳 ぐらいまで大きくなってきたけど、まだ青い。柿は柿の形をしてるけど、まだ青い。赤くなるまで あと数ヶ月か。
dir
インターネット探検(略してIE)してたんだ。勿論、使うブラウザーはIEじゃないけどね。M$って、 Windowsに限らず、どうしてこんな紛らわしい名前を付けるんかね?
で、見つけたのが、 Pythonイントロスペクション入門なんて 言う記事。Pythonってググルな所でお勧めかと思ったら、IBMにもファンが居たのね。オブジェクトを 手軽に検査するって考え、Lispのそれと一緒だな。Smalltalkを開発した人も、オブジェクトや ソースを手軽に検査出来るのが欲しかったので作ったそうだけど、後でLispがすでに実現してる事を 知って、それならわざわざ開発する事無かったじゃんと、言ったとか。
この記事に出てた、dir()って便利だな。Lispで言う、oblistとかnamespaceに相当すんのかな? どうやって実現してんの? internを。少し探してみたら、面白いのにぶつかった。
# -*- coding: utf-8 -*- """ mydir.py 組み込み関数dir()をカスタマイズする例 指定のモジュールの名前空間に属する変数名の一覧を出力する """ from re import compile verbose = 0 def all(module): if verbose: print "-" * 30 print "name:", module.__name__, " file:", module.__file__ print "-" * 30 count = 0 for attr in sorted(module.__dict__.keys()): # 名前空間の内容を調べる print "%02d) %s " % (count+1, attr), if attr[0:2] == "__": # __で始まる属性はビルトイン print "<built-in name>" # ビルトイン属性の出力 else: print getattr(module, attr) # .__dict__[attr]と同等 count += 1 if verbose: print "-" * 30 print module.__name__, "has %d names" % count print "-" * 30 def grep(module): pat = raw_input("Pattern: ") p = compile(pat) for s in module.__dict__.keys(): if p.search(s): print s if __name__ == "__main__": import mydir all(mydir) # トップレベルファイルとして実行した場合、自身を引数とする
オブジェクトの名前はモジュール毎に __dict__ と言う辞書に登録されてるのね。大きいモジュールだとオブジェクト数が軽く 1000を超えてしまって始末に終えないので、軽めに検索機能のgrepを付けといた。使い方は、こんな感じ
>>> import math >>> all(math) 01) __doc__ <built-in name> : 43) tan <built-in function tan> 44) tanh <built-in function tanh> 45) trunc <built-in function trunc> >>> grep(math) Pattern: tan tan atanh atan tanh atan2
これで、ちまちまと見ていたら、 degrees とか radians なんてのが見つかった。 おいらも同じようなのを用意したけど、すでに有ったのね。とんだ再発明をしちゃったな。
それから、上記のスクリプトでは、verboseが有効になっていないけど、これを有効にして、all(math)とか すると、ファイルが見つからんと言われて落ちちゃうんだ。mathってbinaryになってるんかな。この際だから ソースを開いてみるかな。その前に下調べをFreeBSDでやっておく。
[sakae@cdr /usr/local/lib/python2.6/lib-dynload]$ nm math.so | lv : 00001770 t math_degrees 00004b60 d math_degrees_doc : 00001710 t math_radians 000051a0 d math_radians_doc :
ドキュメントもbinaryになってるっぽい。そんじゃ、ソースを探検(だから、M$風に略すと、SEするって事 だな)。
static const double degToRad = Py_MATH_PI / 180.0; static const double radToDeg = 180.0 / Py_MATH_PI; static PyObject * math_degrees(PyObject *self, PyObject *arg) { double x = PyFloat_AsDouble(arg); if (x == -1.0 && PyErr_Occurred()) return NULL; return PyFloat_FromDouble(x * radToDeg); } PyDoc_STRVAR(math_degrees_doc, "degrees(x)\n\n\ Convert angle x from radians to degrees."); static PyObject * math_radians(PyObject *self, PyObject *arg) { double x = PyFloat_AsDouble(arg); if (x == -1.0 && PyErr_Occurred()) return NULL; return PyFloat_FromDouble(x * degToRad); } PyDoc_STRVAR(math_radians_doc, "radians(x)\n\n\ Convert angle x from degrees to radians."); static PyMethodDef math_methods[] = { : {"degrees", math_degrees, METH_O, math_degrees_doc}, : {"radians", math_radians, METH_O, math_radians_doc}, : };
以上、一時調査終了。PyFloat_AsDoubleした値が、-1.0だと、未定義になるんだ。そんな数学的な事は 今まで知らんかったよ。たまには、ソースも見るもんだなあ。で、力を入れて開発されたと言う、SmallTalk では、こういうプリミテェブな所まで、検査出来るんでしょうかね? プリミティブな部分を調べると、 いきなりアセンブラーのコードが浮かび上がってくるのかな?
ETの導きでBugを知る
前回、ET?からの信号をFFTして、その美しさに見とれた。どうせなら、Sメーターの振れ幅を理想的に したらどうなるだろう? 宇宙からやって来る電波は、ドップラーシフトで、帯域が広がっているかも? なんて思ったからだ。
急峻なパルスを作ればいいんだな。
>>> x = zeros(SMPL, dtype=float) >>> x[0] = 1 >>> run('fft(x) | disp')
これで、スペクトラムと位相グラフが出てくるはずなんですが、表示されるのは、時間領域のグラフ でした。はてな? しょうがない、fftした結果を確認してみるか。
>>> fft(x) array([ 1.+0.j, 1.+0.j, 1.+0.j, ..., 1.+0.j, 1.+0.j, 1.+0.j])
ありゃ、どこまで行っても、虚部は0jだよ! 今の複素数判定は曰くつきだったな。
def tap(a): # tapping figure() if True in iscomplex(a): :
こんなんじゃ、全データについて調べても、Falseになっちゃう。すなわち、実数と判定されちゃう。 これは、直しておかないとね。ETさん、Bugを教えてくれてありがとう。さて、直しますか。
def tap(a): # tapping figure() if str(a[0]).find('j') > 0: :
おいら、テスト・ファーストって実施するような新しい人じゃないけど、たとえ実施してても、こういう 領域が有ったなんて気づいていただろうか? テスト・ファーストより、問題領域の把握の方がはるかに 重要だと思うぞ。
で、このインパルスは、全周波数帯に渡って同一スペクトラム、同一位相になってるので、 フィルターの試験にはかかせないな。impとかの名前で、登録しておくか。余談だけど、インパルスの 発生位置をずらすと(a[100]=1 のように)、 位相が美しく変化してくよ。数学って面白いな。
3M
スリー・エムって、あの会社の事ではない! 3K(きつい、きたない、危険)とか、5S(って、何処の 会社の標語でしたっけ?)、さしすせそ女房(裁縫、躾、炊事、洗濯、掃除)の類のやつだ。
無理、無駄、むら の頭文字だったかな? こういうのが根源にあってBugが生まれて、事故が起こる。 上で見たように、すでに提供されてる機能を再定義なんてのは、苦労してBugを呼び込むようなものだ。 こういうの、『骨折り損のくたびれ儲け』って言うんでしたっけ?
無駄と言う観点から見ると、ループの中で、ifの分岐が有るのは、イマイチいい気分がしない筆頭に なる。(おいら的には、、、ですが。)たとえば、
def en(fn='zmywb.dat'): # entry into pipe a = [] with open(fn) as f: dats = f.readlines() jp = True if dats[0].find('j') > 0 else False for n in range(SMPL): try: v = complex(dats[n]) if jp else float(dats[n]) a.append(v) except: v = 0.0+0.0j if jp else 0.0 a.append(v) return array(a)
forのループ中で、いちいち、複素数か実数かの場合分けをしている。こういうの、気分悪いな。判定を 外に出しちゃえ。
def ent(fn='zmywb.dat'): # entry into pipe a = [] with open(fn) as f: dats = f.readlines() conv = complex if dats[0].find('j') > 0 else float zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0 for n in range(SMPL): try: a.append(conv(dats[n])) except IndexError: a.append(zrov) return array(a)
ついでに、例外の種類も限定して、except句の中の動作をより明確にした。まあ、コメントを きちんと書けよと言われれば、返す言葉はありません。
とまあ、ここまで書いてふと思ったんだけど、Python自身には、オブジェクトのシリアライズと言うか マーシャリングをやってくれるモジュールが用意されてたね。こちらを使うのも一興かも知れんな。
速度計測
今まで速度計測は特にやってこなかった。対話しながらグラフを眺めていたので、実行スピードは気に ならなかったからだ。上で、3Mとかを始めたからには、多少は気にしておくかな。 workbenchシリーズの初回だったかに、timeitを使って、時間を計測しようをやったけど、これを そのまま使おうとすると、実行する世界が違うため、エラーになってしまう。 そんな訳なので、自前のtimeitもどきを書いてみた。
def meas(f=fft, rpt=3, cnt=1000): # like timeit import time res = [0] * rpt for r in range(rpt): t1 = time.time() for n in range(cnt): f(x) # dut res[r] = (time.time() - t1) print(res)
使い方は、次のようになる。
>>> x=rnd() >>> meas(f=lff) [5.694000005722046, 5.616000175476074, 6.052999973297119]
被測定関数名は、measの引数として指定します(省略時は、fftです)。被測定関数は、アリティーが1の 関数(正確には、必須引数が1つの関数)で、引数名は、 x です。xは、meas()の外側で定義されてる必要があります。
被測定関数を1000回連続実行させ、その実行時間を測定。結果は、これを3回繰り返したものとなってます。 1000回繰り返してますから、結果の単位は、ms になります。
lffって、内部では、fft | lf | ifft | getr してるんだけど、どこで一番時間を食っているのかな?
>>> meas() [0.25, 0.23399996757507324, 0.25] >>> meas(f=getr) [2.682999849319458, 2.63700008392334, 2.699000120162964]
上記は、実数入力の関数です。
>>> x = fft(x) >>> meas(f=lf) [1.2170000076293945, 1.247999906539917, 1.2170000076293945] >>> meas(f=ifft) [0.42100000381469727, 0.437000036239624, 0.42100000381469727]
こちらは、複素数入力の関数です。
fftやifftに時間を食われると思っていたら想像が外れました。一番時間がかかっているのがgetrとは以外な 結果です。こうやって調べてみないとだめですね。
そんじゃ、fftする時のサンプル数の影響を調べてみます。
>>> SMPL=256 >>> x=rnd() >>> meas() [0.014999866485595703, 0.016000032424926758, 0.016000032424926758] >>> SMPL=512 >>> x=rnd() >>> meas() [0.031000137329101562, 0.03099989891052246, 0.03099989891052246] >>> SMPL=1024 >>> x=rnd() >>> meas() [0.06200003623962402, 0.04699993133544922, 0.06299996376037598] >>> SMPL=2048 >>> x=rnd() >>> meas() [0.10900020599365234, 0.125, 0.15599989891052246] >>> SMPL=4096 >>> x=rnd() >>> meas() [0.25, 0.23399996757507324, 0.24900007247924805] >>> SMPL=8192 >>> x=rnd() >>> meas() [0.6399998664855957, 0.6550002098083496, 0.623999834060669] >>> SMPL=16384 >>> x=rnd() >>> meas() [1.371999979019165, 1.3259999752044678, 1.310999870300293] >>> SMPL=32768 >>> x=rnd() >>> meas() [4.726000070571899, 4.680000066757202, 4.523999929428101]
ものの本によると、fftの計算時間はサンプル数によって決まりそのオーダーは、(N/2)log(N -1 ) の 乗算と、NlogN の加算が必要との事。上の計測値は、そうなってるかな?
仮に、20KHzで4096ポイントをサンプリングすれば、データが揃うまで、0.2秒かかるから、lffみたいな 豪華なフィルターでも余裕で使えるな。でも、2wayのインターリーブは必須か。Pythonには、A/DとかD/A を扱うモジュールって有るのかな?オーディオモジュールあたりに、ハードの直叩きがこっそり鎮座 してるといいんだけど。
そうそう、このデータを取ったのは、Intelの歴史的遺物、セレロン2.2GHz(だったかな、余り興味無し) Windows7と言う、Wintel機です。そして、meas()を空回ししたデータも載せておきます。
>>> meas(f=(lambda a: True)) [0.0, 0.0, 0.0] >>> meas(f=(lambda a: True), cnt=1000000) [0.28099989891052246, 0.21799993515014648, 0.23400020599365234]
It's SONY or CISCO ?
前回だったか、周波数変調波の位相特性を表示した時、右側の高い周波数領域が表示打ち切りで、どうな っているか確認出来なかった。これは、表示に棒グラフを使っていたため、拡大時等の描画に膨大な 時間がかかってしまう為、苦肉の策でエリアを狭めていたからだ。
棒グラフって、ようするに長方形をいちいち描いて中を塗りつぶしてる訳だから、時間がかかるのは 当たり前だ。見栄えのする棒グラフは、EXCELに任せる事にして、なるべく軽い表示が欲しい。 例をつらつら見てたら、垂直線が使えるようなので、試してみた。関数名は、tapの仲間っつう事で、tp にしといた。
def tp(a): # full range of freq. domain figure() if str(a[0]).find('j') > 0: subplot(211) f = geta(a) vlines(arange(SMPL/2), [0], f[:SMPL/2]) subplot(212) p = gett(nb(a)) vlines(arange(SMPL/2), [0], p[:SMPL/2]) else: plot(a) return array(a) def dsp(a): # put on pipe end tp(a) show()
これで表示させて、拡大してくと、どうも何処かで見た雰囲気だなあ。ああ、思い出したよ。 ソニーのCMで、ちらっと出てきた、線が音楽に合せて上下に踊るやつとそっくりだなあ。
他でも、見たぞ。CISCOのロゴに似てるな。このロゴを 再現するWAVEデータでも作ってみるかな。勿論、結果は、FFTしてから得ると言う、暇潰しには もってこいの方法でね。
最後に、今日のワークベンチです。
from pylab import * from numpy import * from numpy.fft import * from math import * from cmath import phase SMPL= 4096 # sampling points, must be 2**n r2d = lambda r: 180 * r / pi # radian to degree d2r = lambda d: d * pi / 180 # degree to radian th = lambda z: r2d(phase(z)) # angle getr = lambda a: array([x.real for x in a]) # real geti = lambda a: array([x.imag for x in a]) # imag geta = lambda a: array([abs(x) for x in a]) # power gett = lambda a: array([th(x) for x in a]) # phase mix = lambda a,b: array((a + b)) # add array mul = lambda a,b: array((a * b)) # product array def fm(fs, fc=300, m=0.5): # freq. modulation for n in range(SMPL): fs[n] = cos(2 * pi * (fc + m * fs[n]) * n / SMPL) return array(fs) def pm(fs, fc=300, m=0.5): # phase modulation for n in range(SMPL): fs[n] = cos(m * fs[n] + 2 * pi * fc * n / SMPL) return array(fs) def lf(a, fc=20): # low pass filter for n in range(SMPL): a[n] = 0+0j if fc <= n < (SMPL - fc) else a[n] return array(a) def lff(a, fc=20): # lf module v = lf(fft(a), fc) return array(getr(ifft(v))) def tap(a): # tapping figure() if str(a[0]).find('j') > 0: #subplots_adjust(hspace=0.001) subplot(211) f = geta(a) bar(arange(SMPL/8), f[:SMPL/8]) subplot(212) p = gett(nb(a)) bar(arange(SMPL/8), p[:SMPL/8]) else: plot(a) return array(a) def tp(a): # full range of freq. domain figure() if str(a[0]).find('j') > 0: subplot(211) f = geta(a) vlines(arange(SMPL/2), [0], f[:SMPL/2]) subplot(212) p = gett(nb(a)) vlines(arange(SMPL/2), [0], p[:SMPL/2]) else: plot(a) return array(a) def disp(a): # put on pipe end tap(a) show() def dsp(a): # put on pipe end tp(a) show() def nb(a, th=1e-4): # round to 0+0j when less th for n in range(SMPL): a[n] = 0+0j if abs(a[n]) < th else a[n] return a def dt(a): # display time domain figure() plot(a) show() def df(a): # display freq domain figure() #plot(a[:SMPL/2]) #bar(arange(SMPL/2), a[:SMPL/2]) bar(arange(SMPL/8), a[:SMPL/8]) show() def rnd(g=1): # random data import random wave = [] for n in range(SMPL): wave.append(g * (random.random() - 0.5)) return array(wave) def imp(): # implus (for filter test) a = zeros(SMPL, dtype=float) a[0] = 1 return a def st(fs1=3, g=1.0): # single tone wave = [] for n in range(SMPL): wave.append(g * cos(2 * pi * fs1 * n / SMPL) ) return array(wave) def tt(fs1=3, fs2=5, g=0.8): # two tone (for audio) wave = [] for n in range(SMPL): wave.append(g * (cos(2 * pi * fs1 * n / SMPL) + sin(2 * pi * fs2 * n / SMPL)) ) return array(wave) def vo(fs1=2, fs2=3, fs3=4, fs4=5, g=0.7): # like voice wave = [] for n in range(SMPL): wave.append(g * (0.4 * cos(2 * pi * fs1 * n / SMPL) + 0.8 * sin(2 * pi * fs2 * n / SMPL) + 0.6 * cos(2 * pi * fs3 * n / SMPL) + 0.2 * sin(2 * pi * fs4 * n / SMPL)) ) return array(wave) def dd(msg='2133321331213116', m=0.5, s=-0.5): # dot dash (msg=JPL) wave = SMPL * [s] dots = 0 for n in msg: if n == '1' or n == '3': dots += (1 + int(n)) elif n == '2' or n == '6': dots += int(n) else: pass t = int(SMPL / dots) ref = [m] * 3 * t st = 0 for n in msg: if n == '1' or n == '3': wave[st:st+t*int(n)] = ref[0:t*int(n)] st += (t * (int(n) + 1)) elif n == '2' or n == '6': st += (t * int(n)) else: pass return array(wave) def am(fs, fc=300, m=0.5): # amplitude modurator for n in range(SMPL): fs[n] = (1 + m * fs[n] ) * cos(2 * pi * fc * n / SMPL) return fs def dsb(fs, fc=300): # double side band for n in range(SMPL): fs[n] = fs[n] * cos(2 * pi * fc * n / SMPL) return fs def saveon(a, fn='zmywb.dat'): # data save with open(fn, 'w') as f: for n in range(SMPL): f.write(str(a[n])) f.write('\n') return array(a) def en(fn='zmywb.dat'): # entry into pipe a = [] with open(fn) as f: dats = f.readlines() conv = complex if dats[0].find('j') > 0 else float zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0 for n in range(SMPL): try: a.append(conv(dats[n])) except IndexError: a.append(zrov) return array(a) def meas(f=fft, rpt=3, cnt=1000): # like timeit import time res = [0] * rpt for r in range(rpt): t1 = time.time() for n in range(cnt): f(x) # dut res[r] = (time.time() - t1) print(res) def run(wire): try: exlude = set(['show', 'show()', ]) cmds = wire.split('|') for n, cmd in enumerate(cmds): cmd = cmd.strip() if (n == 0) or cmd in exlude: do_cmd = (cmd if cmd[-1] == ')' else (cmd + '()' )) else: do_cmd = (cmd.replace('(', '(val,', 1) if cmd[-1] == ')' else (cmd + '(val)' )) val = eval(do_cmd) return val except Exception as e: print("Error: {0}".format(e)) def wb(): #################### my REPL while True: run(raw_input('wb> ')) if __name__ == '__main__': wb()