My favorites | Sign in
Project Home Downloads Wiki Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
//--------------------------------------------------------------------------------------------------
// <author>Rob Gillen</author>
// <authorEmail>gillenre@ornl.gov</authorEmail>
// <remarks>
// This program loops through a supplied directory and uploads the files to an Azure container
// and records the duration of each upload
// </remarks>
// <copyright file="Program.cs" company="Oak Ridge National Laboratory">
// Copyright (c) 2010 Oak Ridge National Laboratory, unless otherwise noted.
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// The license is also available at:
// http://www.gnu.org/copyleft/lgpl.html
//
// </copyright>
//--------------------------------------------------------------------------------------------------

namespace AzureTesting
{
using System;
using System.Diagnostics;
using System.IO;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.StorageClient;
using System.Linq;
using System.Threading.Tasks;
using System.Net;
using System.Text;
using System.Collections.Generic;
using System.Threading;

/// <summary>
/// This program loops through a supplied directory and uploads the files to an Azure container
/// and records the duration of each upload
/// </summary>
public class Program
{
/// <summary>
/// Entry point for application
/// </summary>
/// <param name="args">Collection of command line arguments</param>
[STAThread]
public static void Main(string[] args)
{
int maxBlockSize = -1;

if (args.Length < 1)
{
Console.WriteLine("You must provide the source directory");
return;
}

string dirPath = args[0];

if (!Directory.Exists(dirPath))
{
Console.WriteLine("The directory provided does not exist");
return;
}

// let's see if they passed a maxBlockSize
if (args.Length >= 2)
{
if (int.TryParse(args[1], out maxBlockSize))
{
// all is good
}
else
{
maxBlockSize = -1;
}
}

// update the TCP/IP connection stack to let us multi-thread more...
ServicePointManager.DefaultConnectionLimit = 64;

var storageAccount = new CloudStorageAccount(
new StorageCredentialsAccountAndKey(Properties.Settings.Default.TargetAccountName, Properties.Settings.Default.TargetAccountKey),
false);

var blobStorage = storageAccount.CreateCloudBlobClient();
blobStorage.RetryPolicy = RetryPolicies.RetryExponential(5, TimeSpan.FromSeconds(10));
var container = blobStorage.GetContainerReference("rawdata");

string[] files = Directory.GetFiles(dirPath);
string uniqueKey = string.Format("{0}/{1}", Environment.MachineName, DateTime.Now.ToFileTime());

// write the CSV file headers
// REG: 4/14/2010 - changed the second "Time" header to "Duration"
WriteToLog("Date,Time,File,Bytes,Duration,Milliseconds,URL");

// shuffle the array of files to ensure we are adding some uniqueness
// to the transfer order and are not simply exposing an piece of caching
// or other odd behavior.
// Rather than actually re-arranging the array, we'll create a parallel
// indexer array that has a randomized order and then use that to step
// into the indexer.
// This means of sorting is not the most efficient, but is very straight-forward
// (has O(n log n) rather than O(n) as in a Fisher-Yates shuffle
var indexers = Enumerable.Range(0, files.Length - 1);
var shuffledIndexers = indexers.OrderBy(a => Guid.NewGuid());

foreach (int indexer in shuffledIndexers)
{
try
{
var file = new FileInfo(files[indexer]);

var watch = new Stopwatch();
watch.Start();

var blob = container.GetBlockBlobReference(string.Format("{0}/{1}", uniqueKey, file.Name));
blob.Properties.ContentType = "application/octet-stream";

// if max block size was set, use the || version
if (maxBlockSize == -1)
{
blob.UploadFile(file.FullName, new BlobRequestOptions() { Timeout = TimeSpan.FromMinutes(10) });
}
else
{
blob.ParallelUploadFile(file.FullName, new BlobRequestOptions() { Timeout = TimeSpan.FromMinutes(10) }, maxBlockSize);
}

watch.Stop();

string updateString = string.Format(
"{0},{1},{2},{3},{4}",
DateTime.UtcNow.ToShortDateString(),
DateTime.UtcNow.ToShortTimeString(),
file.Name,
file.Length,
watch.Elapsed);

Console.WriteLine(updateString);

// add the total milliseconds and URL to the log string
updateString = string.Format("{0},{1},{2}", updateString, watch.ElapsedMilliseconds, blob.Uri);

WriteToLog(updateString);
}
catch (Exception ex)
{
Console.WriteLine("{0} UTC {1}", DateTime.UtcNow, ex.Message);
}
}
}

/// <summary>
/// Writes the supplied value to the log file
/// </summary>
/// <param name="logData">String to be written to the log file</param>
public static void WriteToLog(string logData)
{
TextWriter tw = new StreamWriter("log.csv", true);
tw.WriteLine(logData);
tw.Close();
}
}

public static class CloudBlockBlobExtension
{
/// <summary>
/// Uploads a file from the file system to a blob. Parallel implementation of
/// UploadFile().
/// </summary>
/// <param name="blob">Blob object that is extended by this method</param>
/// <param name="fileName">The file to be uploaded.</param>
/// <param name="options">A Microsoft.WindowsAzure.StorageClient.BlobRequestOptions object indicating any addtional options to be specified on the request</param>
/// <param name="maxBlockSize">The maximum size of an individual block transferred</param>
public static void ParallelUploadFile(this CloudBlockBlob blob, string fileName, BlobRequestOptions options, int maxBlockSize)
{
var file = new FileInfo(fileName);
long fileSize = file.Length;

// let's figure out how big the file is here
long leftToRead = fileSize;
int startPosition = 0;

// have 1 block for every maxBlockSize bytes plus 1 for the remainder
var blockCount =
((int)Math.Floor((double)(fileSize / maxBlockSize))) + 1;

// setup the control array
BlockTransferDetail[] transferDetails = new BlockTransferDetail[blockCount];

// create an array of block keys
string[] blockKeys = new string[blockCount];
var blockIds = new List<string>();

// populate the control array...
for (int j = 0; j < transferDetails.Length; j++)
{
int toRead = (int)(maxBlockSize < leftToRead ?
maxBlockSize :
leftToRead);

string blockId = Convert.ToBase64String(
ASCIIEncoding.ASCII.GetBytes(
string.Format("BlockId{0}", j.ToString("0000000"))));

transferDetails[j] = new BlockTransferDetail()
{
StartPosition = startPosition,
BytesToRead = toRead,
BlockId = blockId
};

if (toRead > 0)
{
blockIds.Add(blockId);
}

// increment the starting position
startPosition += toRead;
leftToRead -= toRead;
}

// now we do a || upload of the file.
var result = Parallel.For(0, transferDetails.Length, j =>
{
using (FileStream fs = new FileStream(file.FullName,
FileMode.Open, FileAccess.Read))
{
byte[] buff = new byte[transferDetails[j].BytesToRead];
BinaryReader br = new BinaryReader(fs);

// move the file system reader to the proper position
fs.Seek(transferDetails[j].StartPosition, SeekOrigin.Begin);
br.Read(buff, 0, transferDetails[j].BytesToRead);

if (buff.Length > 0)
{
// calculate the block-level hash
string blockHash = Helpers.GetMD5HashFromStream(buff);

blob.PutBlock(transferDetails[j].BlockId, new MemoryStream(buff), blockHash, options);
}
}
});

// commit the file
blob.PutBlockList(blockIds);
}
}

public class BlockTransferDetail
{
public int StartPosition { get; set; }
public int BytesToRead { get; set; }
public string BlockId { get; set; }
}
}

Change log

r20 by argodev on Apr 26, 2010   Diff
Updating for parallel versioning
Go to: 
Project members, sign in to write a code review

Older revisions

r19 by argodev on Feb 16, 2010   Diff
Updating AzureTesting for general
usage.
All revisions of this file

File info

Size: 10851 bytes, 265 lines
Powered by Google Project Hosting