My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package com.appspot.ajnweb.tools;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;

import org.apache.commons.httpclient.HttpException;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slim3.datastore.Datastore;

import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.User;
import appengine.test.util.AppEngineTestUtil;
import appengine.util.MakeSyncCallServletDelegate;

import com.appspot.ajnweb.meta.TweetMeta;
import com.appspot.ajnweb.model.Tweet;
import com.google.appengine.api.memcache.MemcacheServiceFactory;

/**
* {@link Tweet}に{@code screenName}属性を追加したので、古いデータをリモートバッチ処理で追随させる。
* <p>古いデータは{@literal schemaVersion == 1}の条件で抽出できる</p>
* <ol>
* <li>{@literal schemaVersion == 1}の条件で移行対象のエンティティを取得する。</li>
* <li>念のため、それらのエンティティのバックアップを保存しておく。</li>
* <li>{@link Tweet#getUserId()}(Twitterのユーザ)で集約し、Twitterのユーザに対応する{@code screenName}を取得して該当するエンティティを更新する<ul>
* <li>取得した{@code screenName}を設定</li><li>スキーマバージョンを{@literal 2}に設定</li></ul></li>
* <li>更新したエンティティをデプロイ環境へ保存する。</li>
* </ol>
* @author shin1ogawa
*/
public class MigrationTweet1To2 {

static final Logger logger = Logger.getLogger(MigrationTweet1To2.class.getName());

static final String SERVER = "http://2009-11-03c.latest.ajn-web.appspot.com/";

static final String SERVLET = "sys/makesynccall";

static String email;

static String password;


/**
* リモート環境へ直結して{@link Tweet}エンティティの移行処理を行う。
* @param args
* @throws IOException
* @throws TwitterException
*/
public static void main(String[] args) throws IOException, TwitterException {
setUpBeforeClass(); // ローカルのAppEngine環境を開始する
try {
// デプロイ環境のMakeSyncCallServletに接続するためのアカウント情報を入力させる
getAccountInfo();
// 移行対象のエンティティを取得する
final List<Tweet> tweets = getOldEntities();
// ローカルのデータストアにバックアップを作成する
backupToLocalDatastore(tweets);
// データ移行済みのエンティティを作成する
final List<Tweet> updated = createUpdatedEntity(tweets, 50);
// デプロイ環境側のデータストアに、データ移行済みのエンティティを保存する
MakeSyncCallServletDelegate.runInDelegateWithAuth(new Runnable() {

@Override
public void run() {
executeBatch(updated);
}
}, email, password, SERVER, SERVLET);
} finally {
tearDownAfterClass(); // ローカルのAppEngine環境を終了する
}
}

static void executeBatch(final List<Tweet> updated) {
logger.info("更新処理を開始");
// 100件ずつまとめて、batch putする。
List<Tweet> buffer = new ArrayList<Tweet>(100);
int bufferCount = 0;
for (Tweet tweet : updated) {
logger.info(ToStringBuilder.reflectionToString(tweet));
buffer.add(tweet);
if (++bufferCount == 100) {
Datastore.put(buffer);
buffer.clear();
bufferCount = 0;
}
}
if (buffer.isEmpty() == false) {
Datastore.put(buffer);
}
logger.info("更新処理が終了");
MemcacheServiceFactory.getMemcacheService().clearAll();
logger.info("Memcache#clearAll()しました");
}

static List<Tweet> createUpdatedEntity(final List<Tweet> tweets, int limit) {
logger.info("更新済みエンティティの作成を開始");
final Set<Integer> users = new HashSet<Integer>();
final Map<Integer, List<Tweet>> tweetMap = new HashMap<Integer, List<Tweet>>();
final List<Tweet> updated = new ArrayList<Tweet>();
for (Tweet tweet : tweets) {
users.add(tweet.getUserId());
List<Tweet> list = tweetMap.get(tweet.getUserId());
if (list == null) {
list = new ArrayList<Tweet>();
}
list.add(tweet);
tweetMap.put(tweet.getUserId(), list);
}
int userCount = 0;
for (Integer userId : users) {
Twitter twitter = new Twitter();
User user;
try {
user = twitter.showUser(String.valueOf(userId));
} catch (TwitterException e) {
logger.warning("Twitter#showUser()でエラーが返ってきたのでユーザ情報の取得を中断します");
break;
}
logger.info("Twitter#showUser: userName=" + user.getName() + ", screenName="
+ user.getScreenName());
List<Tweet> list = tweetMap.get(userId);
for (Tweet tweet : list) {
tweet.setScreenName(user.getScreenName());
tweet.setSchemaVersion(2);
updated.add(tweet);
}
if (++userCount >= limit) {
break;
}
}
logger.info("更新済みエンティティの作成が完了");
return updated;
}

private static void backupToLocalDatastore(List<Tweet> tweets) {
logger.info("backup開始");
for (Tweet tweet : tweets) {
Datastore.put(tweet);
}
logger.info("backup完了");
}

/**
* 移行対象のエンティティを取得する。
* <p>Twitter側の制限にひっかかるので、一度に100件ずつくらいを処理対象とする。</p>
* @return 移行対象のTweetのリスト
* @throws HttpException
* @throws IOException
*/
private static List<Tweet> getOldEntities() throws HttpException, IOException {
logger.info("更新対象のエンティティの取得を開始");
final List<Tweet> tweets = new ArrayList<Tweet>();
MakeSyncCallServletDelegate.runInDelegateWithAuth(new Runnable() {

@Override
public void run() {
TweetMeta meta = new TweetMeta();
tweets.addAll(Datastore.query(meta).filter(meta.schemaVersion.equal(1)).sort(
meta.schemaVersion.desc).sort(meta.createdAt.desc).limit(1000).asList());
}
}, email, password, SERVER, SERVLET);
logger.info("更新対象のエンティティの取得が終了: 件数=" + tweets.size());
return tweets;
}

private static void getAccountInfo() throws IOException {
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
System.out.println("google email:");
email = input.readLine();
System.out.println("google password:");
password = input.readLine();
}

static void setUpBeforeClass() throws IOException {
AppEngineTestUtil.setUpAppEngine(new LocalEnvironment(), "target/MigrationTweet1To2",
"war", false);
}

static void tearDownAfterClass() {
AppEngineTestUtil.tearDownAppEngine();
}
}
Show details Hide details

Change log

r18 by shin1ogawa on Nov 04, 2009   Diff
デプロイ環境へbatch
putする際のバッファの件数のカウンタをクリアし忘れていたので修正した。

もう使わないプログラムだけど、ブログからリンクを貼ってしまったので修正しておく。
Go to: 
Project members, sign in to write a code review

Older revisions

r17 by shin1ogawa on Nov 03, 2009   Diff
説明用のコメントを追加した。
r15 by shin1ogawa on Nov 03, 2009   Diff
ローカルからリモートへ直結して実行するバッチ処理を作成した。

TweetしたユーザのScreenNameが取得できていないエンティティを更
新するためのバッチ。
All revisions of this file

File info

Size: 6823 bytes, 192 lines

File properties

svn:mime-type
text/plain
Hosted by Google Code