Workbench(5)
clojure特集が有ったり、 雲の中にclojureが出てきたりしてる ようだけど、レールのちからなんだな。相変わらず、世間は騒々しいぞと。
この間、岡山発のマスカットが2房、7000円で出荷されたとニュースで報じていた。製造現場が映って いたけど、温室育ちみたい。石油製品ですか?それとも、太陽熱エネルギー製品ですか? 上の記事同様に 少々、気になるよ。
こちらの葡萄はその後順調に生育を続け、一週間ぐらい前から袋が被せられ始めた。 (ご丁寧な事に、袋が濡れないように、袋の上端に屋根が付いてるよ)背丈程の葡萄棚から 垂れ下がる房に、一袋々被せて行くのは容易な作業じゃなかろうに! いくら、腰の曲がったじいちゃん、 ばーちゃんでも、一日中背伸びして、上を見上げて万歳しながらの作業じゃ、いやになっちゃうよな。 それもこれも、金の成る樹に愛情を注いで、孫の小遣いを捻出する為なのかな?
早朝散歩の時、葡萄棚にいる婆やに聞いてみたよ。袋って何のためにかけるんですか? ってね。 婆やの言う事にゃ、農薬がかからないようにするのが一番の理由とか。粒を一つづつ洗って食べる なんて、みなさんに期待できないからね。第二の理由は、鳥さんのデザートにされない為とか。 果樹園の周りがネットで覆われているのも、そういう理由なのね。ちなみに、上からもぐり込もう 作戦は、多分失敗するでしょう。葉と蔓が棚一杯に広がっていて、緑の天井みたいになってるから。
葡萄クラスターの色は、申し合わせたように緑色です。緑の種類しか作っていないの? って聞いて みたら、そんな事は無いとの事。そのうちに、赤や黒々した巨峰にもなるけど、全ては袋の中で 進化してくので、赤い房がぶら下がっているのは、お目にかかれないそうだ。へぇー、そんなの 知らなかったよ。ちなみに、今の粒は、パチンコ玉ぐらいですかね。
今のこちらの旬は、さくらんぼ。随分と長い期間に渡ってあるな。種類が豊富で、すこしづつ時期が ずれているから、長い期間楽しめるんだな。
桃は尻に色気が出てきたよ。全体はまだ青いけど、割れ目がピンクっぽくなってきた。林檎は、握り拳 ぐらいまで大きくなってきたけど、まだ青い。柿は柿の形をしてるけど、まだ青い。赤くなるまで あと数ヶ月か。
dir
インターネット探検(略してIE)してたんだ。勿論、使うブラウザーはIEじゃないけどね。M$って、 Windowsに限らず、どうしてこんな紛らわしい名前を付けるんかね?
で、見つけたのが、 Pythonイントロスペクション入門なんて 言う記事。Pythonってググルな所でお勧めかと思ったら、IBMにもファンが居たのね。オブジェクトを 手軽に検査するって考え、Lispのそれと一緒だな。Smalltalkを開発した人も、オブジェクトや ソースを手軽に検査出来るのが欲しかったので作ったそうだけど、後でLispがすでに実現してる事を 知って、それならわざわざ開発する事無かったじゃんと、言ったとか。
この記事に出てた、dir()って便利だな。Lispで言う、oblistとかnamespaceに相当すんのかな? どうやって実現してんの? internを。少し探してみたら、面白いのにぶつかった。
# -*- coding: utf-8 -*-
""" mydir.py
組み込み関数dir()をカスタマイズする例
指定のモジュールの名前空間に属する変数名の一覧を出力する
"""
from re import compile
verbose = 0
def all(module):
if verbose:
print "-" * 30
print "name:", module.__name__, " file:", module.__file__
print "-" * 30
count = 0
for attr in sorted(module.__dict__.keys()): # 名前空間の内容を調べる
print "%02d) %s " % (count+1, attr),
if attr[0:2] == "__": # __で始まる属性はビルトイン
print "<built-in name>" # ビルトイン属性の出力
else:
print getattr(module, attr) # .__dict__[attr]と同等
count += 1
if verbose:
print "-" * 30
print module.__name__, "has %d names" % count
print "-" * 30
def grep(module):
pat = raw_input("Pattern: ")
p = compile(pat)
for s in module.__dict__.keys():
if p.search(s):
print s
if __name__ == "__main__":
import mydir
all(mydir) # トップレベルファイルとして実行した場合、自身を引数とする
オブジェクトの名前はモジュール毎に __dict__ と言う辞書に登録されてるのね。大きいモジュールだとオブジェクト数が軽く 1000を超えてしまって始末に終えないので、軽めに検索機能のgrepを付けといた。使い方は、こんな感じ
>>> import math >>> all(math) 01) __doc__ <built-in name> : 43) tan <built-in function tan> 44) tanh <built-in function tanh> 45) trunc <built-in function trunc> >>> grep(math) Pattern: tan tan atanh atan tanh atan2
これで、ちまちまと見ていたら、 degrees とか radians なんてのが見つかった。 おいらも同じようなのを用意したけど、すでに有ったのね。とんだ再発明をしちゃったな。
それから、上記のスクリプトでは、verboseが有効になっていないけど、これを有効にして、all(math)とか すると、ファイルが見つからんと言われて落ちちゃうんだ。mathってbinaryになってるんかな。この際だから ソースを開いてみるかな。その前に下調べをFreeBSDでやっておく。
[sakae@cdr /usr/local/lib/python2.6/lib-dynload]$ nm math.so | lv
:
00001770 t math_degrees
00004b60 d math_degrees_doc
:
00001710 t math_radians
000051a0 d math_radians_doc
:
ドキュメントもbinaryになってるっぽい。そんじゃ、ソースを探検(だから、M$風に略すと、SEするって事 だな)。
static const double degToRad = Py_MATH_PI / 180.0;
static const double radToDeg = 180.0 / Py_MATH_PI;
static PyObject *
math_degrees(PyObject *self, PyObject *arg)
{
double x = PyFloat_AsDouble(arg);
if (x == -1.0 && PyErr_Occurred())
return NULL;
return PyFloat_FromDouble(x * radToDeg);
}
PyDoc_STRVAR(math_degrees_doc,
"degrees(x)\n\n\
Convert angle x from radians to degrees.");
static PyObject *
math_radians(PyObject *self, PyObject *arg)
{
double x = PyFloat_AsDouble(arg);
if (x == -1.0 && PyErr_Occurred())
return NULL;
return PyFloat_FromDouble(x * degToRad);
}
PyDoc_STRVAR(math_radians_doc,
"radians(x)\n\n\
Convert angle x from degrees to radians.");
static PyMethodDef math_methods[] = {
:
{"degrees", math_degrees, METH_O, math_degrees_doc},
:
{"radians", math_radians, METH_O, math_radians_doc},
:
};
以上、一時調査終了。PyFloat_AsDoubleした値が、-1.0だと、未定義になるんだ。そんな数学的な事は 今まで知らんかったよ。たまには、ソースも見るもんだなあ。で、力を入れて開発されたと言う、SmallTalk では、こういうプリミテェブな所まで、検査出来るんでしょうかね? プリミティブな部分を調べると、 いきなりアセンブラーのコードが浮かび上がってくるのかな?
ETの導きでBugを知る
前回、ET?からの信号をFFTして、その美しさに見とれた。どうせなら、Sメーターの振れ幅を理想的に したらどうなるだろう? 宇宙からやって来る電波は、ドップラーシフトで、帯域が広がっているかも? なんて思ったからだ。
急峻なパルスを作ればいいんだな。
>>> x = zeros(SMPL, dtype=float)
>>> x[0] = 1
>>> run('fft(x) | disp')
これで、スペクトラムと位相グラフが出てくるはずなんですが、表示されるのは、時間領域のグラフ でした。はてな? しょうがない、fftした結果を確認してみるか。
>>> fft(x) array([ 1.+0.j, 1.+0.j, 1.+0.j, ..., 1.+0.j, 1.+0.j, 1.+0.j])
ありゃ、どこまで行っても、虚部は0jだよ! 今の複素数判定は曰くつきだったな。
def tap(a): # tapping
figure()
if True in iscomplex(a):
:
こんなんじゃ、全データについて調べても、Falseになっちゃう。すなわち、実数と判定されちゃう。 これは、直しておかないとね。ETさん、Bugを教えてくれてありがとう。さて、直しますか。
def tap(a): # tapping
figure()
if str(a[0]).find('j') > 0:
:
おいら、テスト・ファーストって実施するような新しい人じゃないけど、たとえ実施してても、こういう 領域が有ったなんて気づいていただろうか? テスト・ファーストより、問題領域の把握の方がはるかに 重要だと思うぞ。
で、このインパルスは、全周波数帯に渡って同一スペクトラム、同一位相になってるので、 フィルターの試験にはかかせないな。impとかの名前で、登録しておくか。余談だけど、インパルスの 発生位置をずらすと(a[100]=1 のように)、 位相が美しく変化してくよ。数学って面白いな。
3M
スリー・エムって、あの会社の事ではない! 3K(きつい、きたない、危険)とか、5S(って、何処の 会社の標語でしたっけ?)、さしすせそ女房(裁縫、躾、炊事、洗濯、掃除)の類のやつだ。
無理、無駄、むら の頭文字だったかな? こういうのが根源にあってBugが生まれて、事故が起こる。 上で見たように、すでに提供されてる機能を再定義なんてのは、苦労してBugを呼び込むようなものだ。 こういうの、『骨折り損のくたびれ儲け』って言うんでしたっけ?
無駄と言う観点から見ると、ループの中で、ifの分岐が有るのは、イマイチいい気分がしない筆頭に なる。(おいら的には、、、ですが。)たとえば、
def en(fn='zmywb.dat'): # entry into pipe
a = []
with open(fn) as f:
dats = f.readlines()
jp = True if dats[0].find('j') > 0 else False
for n in range(SMPL):
try:
v = complex(dats[n]) if jp else float(dats[n])
a.append(v)
except:
v = 0.0+0.0j if jp else 0.0
a.append(v)
return array(a)
forのループ中で、いちいち、複素数か実数かの場合分けをしている。こういうの、気分悪いな。判定を 外に出しちゃえ。
def ent(fn='zmywb.dat'): # entry into pipe
a = []
with open(fn) as f:
dats = f.readlines()
conv = complex if dats[0].find('j') > 0 else float
zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0
for n in range(SMPL):
try:
a.append(conv(dats[n]))
except IndexError:
a.append(zrov)
return array(a)
ついでに、例外の種類も限定して、except句の中の動作をより明確にした。まあ、コメントを きちんと書けよと言われれば、返す言葉はありません。
とまあ、ここまで書いてふと思ったんだけど、Python自身には、オブジェクトのシリアライズと言うか マーシャリングをやってくれるモジュールが用意されてたね。こちらを使うのも一興かも知れんな。
速度計測
今まで速度計測は特にやってこなかった。対話しながらグラフを眺めていたので、実行スピードは気に ならなかったからだ。上で、3Mとかを始めたからには、多少は気にしておくかな。 workbenchシリーズの初回だったかに、timeitを使って、時間を計測しようをやったけど、これを そのまま使おうとすると、実行する世界が違うため、エラーになってしまう。 そんな訳なので、自前のtimeitもどきを書いてみた。
def meas(f=fft, rpt=3, cnt=1000): # like timeit
import time
res = [0] * rpt
for r in range(rpt):
t1 = time.time()
for n in range(cnt):
f(x) # dut
res[r] = (time.time() - t1)
print(res)
使い方は、次のようになる。
>>> x=rnd() >>> meas(f=lff) [5.694000005722046, 5.616000175476074, 6.052999973297119]
被測定関数名は、measの引数として指定します(省略時は、fftです)。被測定関数は、アリティーが1の 関数(正確には、必須引数が1つの関数)で、引数名は、 x です。xは、meas()の外側で定義されてる必要があります。
被測定関数を1000回連続実行させ、その実行時間を測定。結果は、これを3回繰り返したものとなってます。 1000回繰り返してますから、結果の単位は、ms になります。
lffって、内部では、fft | lf | ifft | getr してるんだけど、どこで一番時間を食っているのかな?
>>> meas() [0.25, 0.23399996757507324, 0.25] >>> meas(f=getr) [2.682999849319458, 2.63700008392334, 2.699000120162964]
上記は、実数入力の関数です。
>>> x = fft(x) >>> meas(f=lf) [1.2170000076293945, 1.247999906539917, 1.2170000076293945] >>> meas(f=ifft) [0.42100000381469727, 0.437000036239624, 0.42100000381469727]
こちらは、複素数入力の関数です。
fftやifftに時間を食われると思っていたら想像が外れました。一番時間がかかっているのがgetrとは以外な 結果です。こうやって調べてみないとだめですね。
そんじゃ、fftする時のサンプル数の影響を調べてみます。
>>> SMPL=256 >>> x=rnd() >>> meas() [0.014999866485595703, 0.016000032424926758, 0.016000032424926758] >>> SMPL=512 >>> x=rnd() >>> meas() [0.031000137329101562, 0.03099989891052246, 0.03099989891052246] >>> SMPL=1024 >>> x=rnd() >>> meas() [0.06200003623962402, 0.04699993133544922, 0.06299996376037598] >>> SMPL=2048 >>> x=rnd() >>> meas() [0.10900020599365234, 0.125, 0.15599989891052246] >>> SMPL=4096 >>> x=rnd() >>> meas() [0.25, 0.23399996757507324, 0.24900007247924805] >>> SMPL=8192 >>> x=rnd() >>> meas() [0.6399998664855957, 0.6550002098083496, 0.623999834060669] >>> SMPL=16384 >>> x=rnd() >>> meas() [1.371999979019165, 1.3259999752044678, 1.310999870300293] >>> SMPL=32768 >>> x=rnd() >>> meas() [4.726000070571899, 4.680000066757202, 4.523999929428101]
ものの本によると、fftの計算時間はサンプル数によって決まりそのオーダーは、(N/2)log(N -1 ) の 乗算と、NlogN の加算が必要との事。上の計測値は、そうなってるかな?
仮に、20KHzで4096ポイントをサンプリングすれば、データが揃うまで、0.2秒かかるから、lffみたいな 豪華なフィルターでも余裕で使えるな。でも、2wayのインターリーブは必須か。Pythonには、A/DとかD/A を扱うモジュールって有るのかな?オーディオモジュールあたりに、ハードの直叩きがこっそり鎮座 してるといいんだけど。
そうそう、このデータを取ったのは、Intelの歴史的遺物、セレロン2.2GHz(だったかな、余り興味無し) Windows7と言う、Wintel機です。そして、meas()を空回ししたデータも載せておきます。
>>> meas(f=(lambda a: True)) [0.0, 0.0, 0.0] >>> meas(f=(lambda a: True), cnt=1000000) [0.28099989891052246, 0.21799993515014648, 0.23400020599365234]
It's SONY or CISCO ?
前回だったか、周波数変調波の位相特性を表示した時、右側の高い周波数領域が表示打ち切りで、どうな っているか確認出来なかった。これは、表示に棒グラフを使っていたため、拡大時等の描画に膨大な 時間がかかってしまう為、苦肉の策でエリアを狭めていたからだ。
棒グラフって、ようするに長方形をいちいち描いて中を塗りつぶしてる訳だから、時間がかかるのは 当たり前だ。見栄えのする棒グラフは、EXCELに任せる事にして、なるべく軽い表示が欲しい。 例をつらつら見てたら、垂直線が使えるようなので、試してみた。関数名は、tapの仲間っつう事で、tp にしといた。
def tp(a): # full range of freq. domain
figure()
if str(a[0]).find('j') > 0:
subplot(211)
f = geta(a)
vlines(arange(SMPL/2), [0], f[:SMPL/2])
subplot(212)
p = gett(nb(a))
vlines(arange(SMPL/2), [0], p[:SMPL/2])
else:
plot(a)
return array(a)
def dsp(a): # put on pipe end
tp(a)
show()
これで表示させて、拡大してくと、どうも何処かで見た雰囲気だなあ。ああ、思い出したよ。 ソニーのCMで、ちらっと出てきた、線が音楽に合せて上下に踊るやつとそっくりだなあ。
他でも、見たぞ。CISCOのロゴに似てるな。このロゴを 再現するWAVEデータでも作ってみるかな。勿論、結果は、FFTしてから得ると言う、暇潰しには もってこいの方法でね。
最後に、今日のワークベンチです。
from pylab import *
from numpy import *
from numpy.fft import *
from math import *
from cmath import phase
SMPL= 4096 # sampling points, must be 2**n
r2d = lambda r: 180 * r / pi # radian to degree
d2r = lambda d: d * pi / 180 # degree to radian
th = lambda z: r2d(phase(z)) # angle
getr = lambda a: array([x.real for x in a]) # real
geti = lambda a: array([x.imag for x in a]) # imag
geta = lambda a: array([abs(x) for x in a]) # power
gett = lambda a: array([th(x) for x in a]) # phase
mix = lambda a,b: array((a + b)) # add array
mul = lambda a,b: array((a * b)) # product array
def fm(fs, fc=300, m=0.5): # freq. modulation
for n in range(SMPL):
fs[n] = cos(2 * pi * (fc + m * fs[n]) * n / SMPL)
return array(fs)
def pm(fs, fc=300, m=0.5): # phase modulation
for n in range(SMPL):
fs[n] = cos(m * fs[n] + 2 * pi * fc * n / SMPL)
return array(fs)
def lf(a, fc=20): # low pass filter
for n in range(SMPL):
a[n] = 0+0j if fc <= n < (SMPL - fc) else a[n]
return array(a)
def lff(a, fc=20): # lf module
v = lf(fft(a), fc)
return array(getr(ifft(v)))
def tap(a): # tapping
figure()
if str(a[0]).find('j') > 0:
#subplots_adjust(hspace=0.001)
subplot(211)
f = geta(a)
bar(arange(SMPL/8), f[:SMPL/8])
subplot(212)
p = gett(nb(a))
bar(arange(SMPL/8), p[:SMPL/8])
else:
plot(a)
return array(a)
def tp(a): # full range of freq. domain
figure()
if str(a[0]).find('j') > 0:
subplot(211)
f = geta(a)
vlines(arange(SMPL/2), [0], f[:SMPL/2])
subplot(212)
p = gett(nb(a))
vlines(arange(SMPL/2), [0], p[:SMPL/2])
else:
plot(a)
return array(a)
def disp(a): # put on pipe end
tap(a)
show()
def dsp(a): # put on pipe end
tp(a)
show()
def nb(a, th=1e-4): # round to 0+0j when less th
for n in range(SMPL):
a[n] = 0+0j if abs(a[n]) < th else a[n]
return a
def dt(a): # display time domain
figure()
plot(a)
show()
def df(a): # display freq domain
figure()
#plot(a[:SMPL/2])
#bar(arange(SMPL/2), a[:SMPL/2])
bar(arange(SMPL/8), a[:SMPL/8])
show()
def rnd(g=1): # random data
import random
wave = []
for n in range(SMPL):
wave.append(g * (random.random() - 0.5))
return array(wave)
def imp(): # implus (for filter test)
a = zeros(SMPL, dtype=float)
a[0] = 1
return a
def st(fs1=3, g=1.0): # single tone
wave = []
for n in range(SMPL):
wave.append(g * cos(2 * pi * fs1 * n / SMPL) )
return array(wave)
def tt(fs1=3, fs2=5, g=0.8): # two tone (for audio)
wave = []
for n in range(SMPL):
wave.append(g * (cos(2 * pi * fs1 * n / SMPL) +
sin(2 * pi * fs2 * n / SMPL)) )
return array(wave)
def vo(fs1=2, fs2=3, fs3=4, fs4=5, g=0.7): # like voice
wave = []
for n in range(SMPL):
wave.append(g * (0.4 * cos(2 * pi * fs1 * n / SMPL) +
0.8 * sin(2 * pi * fs2 * n / SMPL) +
0.6 * cos(2 * pi * fs3 * n / SMPL) +
0.2 * sin(2 * pi * fs4 * n / SMPL)) )
return array(wave)
def dd(msg='2133321331213116', m=0.5, s=-0.5): # dot dash (msg=JPL)
wave = SMPL * [s]
dots = 0
for n in msg:
if n == '1' or n == '3':
dots += (1 + int(n))
elif n == '2' or n == '6':
dots += int(n)
else:
pass
t = int(SMPL / dots)
ref = [m] * 3 * t
st = 0
for n in msg:
if n == '1' or n == '3':
wave[st:st+t*int(n)] = ref[0:t*int(n)]
st += (t * (int(n) + 1))
elif n == '2' or n == '6':
st += (t * int(n))
else:
pass
return array(wave)
def am(fs, fc=300, m=0.5): # amplitude modurator
for n in range(SMPL):
fs[n] = (1 + m * fs[n] ) * cos(2 * pi * fc * n / SMPL)
return fs
def dsb(fs, fc=300): # double side band
for n in range(SMPL):
fs[n] = fs[n] * cos(2 * pi * fc * n / SMPL)
return fs
def saveon(a, fn='zmywb.dat'): # data save
with open(fn, 'w') as f:
for n in range(SMPL):
f.write(str(a[n]))
f.write('\n')
return array(a)
def en(fn='zmywb.dat'): # entry into pipe
a = []
with open(fn) as f:
dats = f.readlines()
conv = complex if dats[0].find('j') > 0 else float
zrov = 0.0+0.0j if dats[0].find('j') > 0 else 0.0
for n in range(SMPL):
try:
a.append(conv(dats[n]))
except IndexError:
a.append(zrov)
return array(a)
def meas(f=fft, rpt=3, cnt=1000): # like timeit
import time
res = [0] * rpt
for r in range(rpt):
t1 = time.time()
for n in range(cnt):
f(x) # dut
res[r] = (time.time() - t1)
print(res)
def run(wire):
try:
exlude = set(['show', 'show()', ])
cmds = wire.split('|')
for n, cmd in enumerate(cmds):
cmd = cmd.strip()
if (n == 0) or cmd in exlude:
do_cmd = (cmd if cmd[-1] == ')' else (cmd + '()' ))
else:
do_cmd = (cmd.replace('(', '(val,', 1) if cmd[-1] == ')'
else (cmd + '(val)' ))
val = eval(do_cmd)
return val
except Exception as e:
print("Error: {0}".format(e))
def wb(): #################### my REPL
while True:
run(raw_input('wb> '))
if __name__ == '__main__': wb()