内容简介:本文主要研究一下storm的submitTopologystorm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.javastorm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
序
本文主要研究一下storm的submitTopology
提交topology日志实例
2018-10-08 17:32:55.738 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Generated ZooKeeper secret payload for MD5-digest: -8659577410336375158:-6351873438041855318
2018-10-08 17:32:55.893 INFO 2870 --- [ main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.059 INFO 2870 --- [ main] o.apache.storm.security.auth.AuthUtils : Got AutoCreds []
2018-10-08 17:32:56.073 INFO 2870 --- [ main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.123 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading dependencies - jars...
2018-10-08 17:32:56.125 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading dependencies - artifacts...
2018-10-08 17:32:56.125 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Dependency Blob keys - jars : [] / artifacts : []
2018-10-08 17:32:56.149 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Uploading topology jar /tmp/storm-demo/target/storm-demo-0.0.1-SNAPSHOT.jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.105 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Successfully uploaded topology jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.106 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Submitting topology DemoTopology in distributed mode with conf {"nimbus.seeds":["192.168.99.100"],"storm.zookeeper.topology.auth.scheme":"digest","topology.workers":1,"storm.zookeeper.port":2181,"nimbus.thrift.port":6627,"storm.zookeeper.topology.auth.payload":"-8659577410336375158:-6351873438041855318","storm.zookeeper.servers":["192.168.99.100"]}
2018-10-08 17:32:58.008 INFO 2870 --- [ main] org.apache.storm.StormSubmitter : Finished submitting topology: DemoTopology
- 这里可以看到这里上传到了nimbus的路径为/data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
StormSubmitter
submitTopology
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
public static void submitTopology(String name, Map stormConf, StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopology(name, stormConf, topology, null, null);
}
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
}
public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
stormConf.putAll(prepareZookeeperAuthentication(conf));
validateConfs(conf, topology);
Map<String,String> passedCreds = new HashMap<>();
if (opts != null) {
Credentials tmpCreds = opts.get_creds();
if (tmpCreds != null) {
passedCreds = tmpCreds.get_creds();
}
}
Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
if (!fullCreds.isEmpty()) {
if (opts == null) {
opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
}
opts.set_creds(new Credentials(fullCreds));
}
try {
if (localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
if (opts!=null) {
localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
} else {
// this is for backwards compatibility
localNimbus.submitTopology(name, stormConf, topology);
}
LOG.info("Finished submitting topology: " + name);
} else {
String serConf = JSONValue.toJSONString(stormConf);
try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
if (topologyNameExists(name, client)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
// Dependency uploading only makes sense for distributed mode
List<String> jarsBlobKeys = Collections.emptyList();
List<String> artifactsBlobKeys;
DependencyUploader uploader = new DependencyUploader();
try {
uploader.init();
jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);
artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
} catch (Throwable e) {
// remove uploaded jars blobs, not artifacts since they're shared across the cluster
uploader.deleteBlobs(jarsBlobKeys);
uploader.shutdown();
throw e;
}
try {
setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
// remove uploaded jars blobs, not artifacts since they're shared across the cluster
// Note that we don't handle TException to delete jars blobs
// because it's safer to leave some blobs instead of topology not running
uploader.deleteBlobs(jarsBlobKeys);
throw e;
} finally {
uploader.shutdown();
}
}
}
} catch(TException e) {
throw new RuntimeException(e);
}
invokeSubmitterHook(name, asUser, conf, topology);
}
private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener, String asUser, Map conf,
String serConf, NimbusClient client) throws TException {
try {
String jar = submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
LOG.info("Submitting topology {} in distributed mode with conf {}", name, serConf);
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, jar, serConf, topology);
}
LOG.info("Finished submitting topology: {}", name);
} catch (InvalidTopologyException e) {
LOG.warn("Topology submission exception: {}", e.get_msg());
throw e;
} catch (AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
throw e;
}
}
public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
if (localJar == null) {
throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
}
try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);
long totalSize = new File(localJar).length();
if (listener != null) {
listener.onStart(localJar, uploadLocation, totalSize);
}
long bytesUploaded = 0;
while(true) {
byte[] toSubmit = is.read();
bytesUploaded += toSubmit.length;
if (listener != null) {
listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
}
if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);
if (listener != null) {
listener.onCompleted(localJar, uploadLocation, totalSize);
}
LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
}
}
- 主要通过submitTopologyAs方法来提交topology
- 而submitTopologyAs调用了submitTopologyInDistributeMode,通过DependencyUploader上传依赖,最后再通过submitJarAs方法上传topology的jar包
- 从前面的日志可以看到,上传到nimbus的路径为/data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
- client.getClient().submitTopology主要是提交topology信息
uploadDependencyJarsToBlobStore
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
LOG.info("Uploading dependencies - jars...");
DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
String depJarsProp = System.getProperty("storm.dependency.jars", "");
List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);
try {
return uploader.uploadFiles(depJars, true);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
uploadDependencyArtifactsToBlobStore
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
LOG.info("Uploading dependencies - artifacts...");
DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);
try {
return uploader.uploadArtifacts(depArtifacts);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
DependencyUploader
storm-core-1.1.0-sources.jar!/org/apache/storm/dependency/DependencyUploader.java
public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
checkFilesExist(dependencies);
List<String> keys = new ArrayList<>(dependencies.size());
try {
for (File dependency : dependencies) {
String fileName = dependency.getName();
String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));
try {
uploadDependencyToBlobStore(key, dependency);
} catch (KeyAlreadyExistsException e) {
// it should never happened since we apply UUID
throw new RuntimeException(e);
}
keys.add(key);
}
} catch (Throwable e) {
if (getBlobStore() != null && cleanupIfFails) {
deleteBlobs(keys);
}
throw new RuntimeException(e);
}
return keys;
}
public List<String> uploadArtifacts(Map<String, File> artifacts) {
checkFilesExist(artifacts.values());
List<String> keys = new ArrayList<>(artifacts.size());
try {
for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
String artifact = artifactToFile.getKey();
File dependency = artifactToFile.getValue();
String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
try {
uploadDependencyToBlobStore(key, dependency);
} catch (KeyAlreadyExistsException e) {
// we lose the race, but it doesn't matter
}
keys.add(key);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
return keys;
}
private boolean uploadDependencyToBlobStore(String key, File dependency)
throws KeyAlreadyExistsException, AuthorizationException, IOException {
boolean uploadNew = false;
try {
// FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
// as a workaround, we call getBlobMeta() for all keys
getBlobStore().getBlobMeta(key);
} catch (KeyNotFoundException e) {
// TODO: do we want to add ACL here?
AtomicOutputStream blob = getBlobStore()
.createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
Files.copy(dependency.toPath(), blob);
blob.close();
uploadNew = true;
}
return uploadNew;
}
- uploadFiles以及uploadArtifacts方法最后都调用uploadDependencyToBlobStore
- uploadDependencyToBlobStore方法将数据写入AtomicOutputStream
NimbusUploadAtomicOutputStream
storm-core-1.1.0-sources.jar!/org/apache/storm/blobstore/NimbusBlobStore.java
public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
private String session;
private int maxChunkSize = 4096;
private String key;
public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
this.session = session;
this.maxChunkSize = bufferSize;
this.key = key;
}
@Override
public void cancel() throws IOException {
try {
synchronized(client) {
client.getClient().cancelBlobUpload(session);
}
} catch (TException e) {
throw new RuntimeException(e);
}
}
@Override
public void write(int b) throws IOException {
try {
synchronized(client) {
client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
}
} catch (TException e) {
throw new RuntimeException(e);
}
}
@Override
public void write(byte []b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte []b, int offset, int len) throws IOException {
try {
int end = offset + len;
for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
int realLen = Math.min(end - realOffset, maxChunkSize);
LOG.debug("Writing {} bytes of {} remaining",realLen,(end-realOffset));
synchronized(client) {
client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
}
}
} catch (TException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
try {
synchronized(client) {
client.getClient().finishBlobUpload(session);
client.getClient().createStateInZookeeper(key);
}
} catch (TException e) {
throw new RuntimeException(e);
}
}
}
- NimbusUploadAtomicOutputStream的write方法通过client.getClient().uploadBlobChunk完成数据上传
send&recv
storm-core-1.1.0-sources.jar!/org/apache/storm/generated/Nimbus.java
public String beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
{
send_beginFileUpload();
return recv_beginFileUpload();
}
public void send_beginFileUpload() throws org.apache.thrift.TException
{
beginFileUpload_args args = new beginFileUpload_args();
sendBase("beginFileUpload", args);
}
public String recv_beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
{
beginFileUpload_result result = new beginFileUpload_result();
receiveBase(result, "beginFileUpload");
if (result.is_set_success()) {
return result.success;
}
if (result.aze != null) {
throw result.aze;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "beginFileUpload failed: unknown result");
}
public void send_finishFileUpload(String location) throws org.apache.thrift.TException
{
finishFileUpload_args args = new finishFileUpload_args();
args.set_location(location);
sendBase("finishFileUpload", args);
}
public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
{
send_uploadChunk(location, chunk);
recv_uploadChunk();
}
public void send_uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift.TException
{
uploadChunk_args args = new uploadChunk_args();
args.set_location(location);
args.set_chunk(chunk);
sendBase("uploadChunk", args);
}
public void recv_uploadChunk() throws AuthorizationException, org.apache.thrift.TException
{
uploadChunk_result result = new uploadChunk_result();
receiveBase(result, "uploadChunk");
if (result.aze != null) {
throw result.aze;
}
return;
}
public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
{
send_submitTopology(name, uploadedJarLocation, jsonConf, topology);
recv_submitTopology();
}
public void send_submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift.TException
{
submitTopology_args args = new submitTopology_args();
args.set_name(name);
args.set_uploadedJarLocation(uploadedJarLocation);
args.set_jsonConf(jsonConf);
args.set_topology(topology);
sendBase("submitTopology", args);
}
public void recv_submitTopology() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
{
submitTopology_result result = new submitTopology_result();
receiveBase(result, "submitTopology");
if (result.e != null) {
throw result.e;
}
if (result.ite != null) {
throw result.ite;
}
if (result.aze != null) {
throw result.aze;
}
return;
}
public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
{
send_uploadBlobChunk(session, chunk);
recv_uploadBlobChunk();
}
public void send_uploadBlobChunk(String session, ByteBuffer chunk) throws org.apache.thrift.TException
{
uploadBlobChunk_args args = new uploadBlobChunk_args();
args.set_session(session);
args.set_chunk(chunk);
sendBase("uploadBlobChunk", args);
}
public void recv_uploadBlobChunk() throws AuthorizationException, org.apache.thrift.TException
{
uploadBlobChunk_result result = new uploadBlobChunk_result();
receiveBase(result, "uploadBlobChunk");
if (result.aze != null) {
throw result.aze;
}
return;
}
- 通过sendBase发送数据,通过receiveBase接收数据
小结
storm的submitTopology会先上传storm.dependency.jars指定的依赖jar,再上传storm.dependency.artifacts指定的依赖,最后再上传指定的jar包,他们都是通过远程方法sendBase发送数据以及receiveBase接收数据。
doc
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Spring in Action
Craig Walls / Manning Publications / 2011-6-29 / USD 49.99
Spring in Action, Third Edition has been completely revised to reflect the latest features, tools, practices Spring offers to java developers. It begins by introducing the core concepts of Spring and......一起来看看 《Spring in Action》 这本书的介绍吧!