内容简介:本文主要研究一下flink的log.file配置flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.propertiesflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
序
本文主要研究一下flink的log.file配置
log4j.properties
flink-release-1.6.2/flink-dist/src/main/flink-bin/conf/log4j.properties
# This affects logging for both user code and Flink log4j.rootLogger=INFO, file # Uncomment this if you want to _only_ change Flink's logging #log4j.logger.org.apache.flink=INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO # Log all infos in the given file log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.append=false log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Suppress the irrelevant (wrong) warnings from the Netty channel handler log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
- 这里使用log.file这个系统属性配置log4j.appender.file.file
MiniCluster
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/minicluster/MiniCluster.java
/** * Starts the mini cluster, based on the configured properties. * * @throws Exception This method passes on any exception that occurs during the startup of * the mini cluster. */ public void start() throws Exception { synchronized (lock) { checkState(!running, "FlinkMiniCluster is already running"); LOG.info("Starting Flink Mini Cluster"); LOG.debug("Using configuration {}", miniClusterConfiguration); final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { initializeIOFormatClasses(configuration); LOG.info("Starting Metrics Registry"); metricRegistry = createMetricRegistry(configuration); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, "localhost"); final RpcService jobManagerRpcService; final RpcService resourceManagerRpcService; final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; // bring up all the RPC services LOG.info("Starting RPC Service(s)"); // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem(); metricRegistry.startQueryService(actorSystem, null); if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = commonRpcService; } jobManagerRpcService = commonRpcService; resourceManagerRpcService = commonRpcService; this.resourceManagerRpcService = null; this.jobManagerRpcService = null; this.taskManagerRpcServices = null; } else { // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = createRpcService( configuration, rpcTimeout, true, taskManagerBindAddress); } this.jobManagerRpcService = jobManagerRpcService; this.taskManagerRpcServices = taskManagerRpcServices; this.resourceManagerRpcService = resourceManagerRpcService; } // create the high-availability services LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, commonRpcService.getExecutor()); blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); heartbeatServices = HeartbeatServices.fromConfiguration(configuration); // bring up the ResourceManager(s) LOG.info("Starting ResourceManger"); resourceManagerRunner = startResourceManager( configuration, haServices, heartbeatServices, metricRegistry, resourceManagerRpcService, new ClusterInformation("localhost", blobServer.getPort()), jobManagerMetricGroup); blobCacheService = new BlobCacheService( configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); taskManagers = startTaskManagers( configuration, haServices, heartbeatServices, metricRegistry, blobCacheService, numTaskManagers, taskManagerRpcServices); // starting the dispatcher rest endpoint LOG.info("Starting dispatcher rest endpoint."); dispatcherGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L)); final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L)); this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( actorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); dispatcherRestEndpoint.start(); restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup(metricRegistry, "localhost"); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, haServices, resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist); dispatcher.start(); resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever(); resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever); dispatcherLeaderRetriever.start(dispatcherGatewayRetriever); } catch (Exception e) { // cleanup everything try { close(); } catch (Exception ee) { e.addSuppressed(ee); } throw e; } // create a new termination future terminationFuture = new CompletableFuture<>(); // now officially mark this as running running = true; LOG.info("Flink Mini Cluster started successfully"); } }
- 这里先创建了metricRegistry、commonRpcService、jobManagerRpcService、resourceManagerRpcService、haServices、blobServer、heartbeatServices、resourceManagerRunner、blobCacheService、taskManagers、dispatcherGatewayRetriever、dispatcherRestEndpoint、dispatcher、dispatcherLeaderRetriever
RestServerEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/RestServerEndpoint.java
/** * Starts this REST server endpoint. * * @throws Exception if we cannot start the RestServerEndpoint */ public final void start() throws Exception { synchronized (lock) { Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted."); log.info("Starting rest endpoint."); final Router router = new Router(); final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture); /* sort the handlers such that they are ordered the following: * /jobs * /jobs/overview * /jobs/:jobid * /jobs/:jobid/config * /:* */ Collections.sort( handlers, RestHandlerUrlComparator.INSTANCE); handlers.forEach(handler -> { log.debug("Register handler {} under {}@{}.", handler.f1, handler.f0.getHttpMethod(), handler.f0.getTargetRestEndpointURL()); registerHandler(router, handler); }); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(router, responseHeaders); // SSL should be the first handler in the pipeline if (sslEngineFactory != null) { ch.pipeline().addLast("ssl", new RedirectingSslHandler(restAddress, restAddressFuture, sslEngineFactory)); } ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders)); } }; NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss")); NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker")); bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); log.debug("Binding rest endpoint to {}:{}.", restBindAddress, restBindPort); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(restBindPort); } else { channel = bootstrap.bind(restBindAddress, restBindPort); } serverChannel = channel.syncUninterruptibly().channel(); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this.restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info("Rest endpoint listening at {}:{}", advertisedAddress, port); final String protocol; if (sslEngineFactory != null) { protocol = "https://"; } else { protocol = "http://"; } restBaseUrl = protocol + advertisedAddress + ':' + port; restAddressFuture.complete(restBaseUrl); state = State.RUNNING; startInternal(); } }
- 这里调用了initializeHandlers来获取ChannelInboundHandler,initializeHandlers在子类DispatcherRestEndpoint中有实现
DispatcherRestEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(restAddressFuture); // Add the Dispatcher specific handlers final Time timeout = restConfiguration.getTimeout(); JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) { try { webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension( leaderRetriever, restAddressFuture, timeout, responseHeaders, uploadDir, executor, clusterConfiguration); // register extension handlers handlers.addAll(webSubmissionExtension.getHandlers()); } catch (FlinkException e) { if (log.isDebugEnabled()) { log.debug("Failed to load web based job submission extension.", e); } else { log.info("Failed to load web based job submission extension. " + "Probable reason: flink-runtime-web is not in the classpath."); } } } else { log.info("Web-based job submission is not enabled."); } handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; }
- 这里首先调用了父类的initializeHandlers,这里的父类为WebMonitorEndpoint(
它是RestServerEndpoint的直接子类,而DispatcherRestEndpoint又继承了WebMonitorEndpoint
)
WebMonitorEndpoint
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30); final Time timeout = restConfiguration.getTimeout(); //...... // TODO: Remove once the Yarn proxy can forward all REST verbs handlers.add(Tuple2.of(YarnCancelJobTerminationHeaders.getInstance(), jobCancelTerminationHandler)); handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); handlers.add(Tuple2.of(shutdownHandler.getMessageHeaders(), shutdownHandler)); //...... // load the log and stdout file handler for the main cluster component final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration); final ChannelInboundHandler logFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.logFile); final ChannelInboundHandler stdoutFileHandler = createStaticFileHandler( restAddressFuture, timeout, logFileLocation.stdOutFile); handlers.add(Tuple2.of(LogFileHandlerSpecification.getInstance(), logFileHandler)); handlers.add(Tuple2.of(StdoutFileHandlerSpecification.getInstance(), stdoutFileHandler)); // TaskManager log and stdout file handler final Time cacheEntryDuration = Time.milliseconds(restConfiguration.getRefreshInterval()); final TaskManagerLogFileHandler taskManagerLogFileHandler = new TaskManagerLogFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerLogFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); final TaskManagerStdoutFileHandler taskManagerStdoutFileHandler = new TaskManagerStdoutFileHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, TaskManagerStdoutFileHeaders.getInstance(), resourceManagerRetriever, transientBlobService, cacheEntryDuration); handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); //...... } @Nonnull private ChannelInboundHandler createStaticFileHandler( CompletableFuture<String> restAddressFuture, Time timeout, File fileToServe) { if (fileToServe == null) { return new ConstantTextHandler("(file unavailable)"); } else { try { return new StaticFileServerHandler<>( leaderRetriever, restAddressFuture, timeout, fileToServe); } catch (IOException e) { log.info("Cannot load log file handler.", e); return new ConstantTextHandler("(log file unavailable)"); } } }
- 它初始化了一系列的ChannelInboundHandler,然后注册到handlers中
- 对于JobManager的FileHandler,它先调用了WebMonitorUtils.LogFileLocation.find(clusterConfiguration),构建了logFileLocation,之后使用logFileLocation.logFile及logFileLocation.stdOutFile分别构造了logFileHandler、stdoutFileHandler,分别用于处理log及stdout文件的下载
- 对于TaskManager的FileHandler,分别构造了TaskManagerLogFileHandler以及TaskManagerStdoutFileHandler来处理log及stdout文件的下载
JobManager FileHandler
WebMonitorUtils.LogFileLocation.find
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
/** * Singleton to hold the log and stdout file. */ public static class LogFileLocation { public final File logFile; public final File stdOutFile; private LogFileLocation(File logFile, File stdOutFile) { this.logFile = logFile; this.stdOutFile = stdOutFile; } /** * Finds the Flink log directory using log.file Java property that is set during startup. */ public static LogFileLocation find(Configuration config) { final String logEnv = "log.file"; String logFilePath = System.getProperty(logEnv); if (logFilePath == null) { LOG.warn("Log file environment variable '{}' is not set.", logEnv); logFilePath = config.getString(WebOptions.LOG_PATH); } // not configured, cannot serve log files if (logFilePath == null || logFilePath.length() < 4) { LOG.warn("JobManager log files are unavailable in the web dashboard. " + "Log file location not found in environment variable '{}' or configuration key '{}'.", logEnv, WebOptions.LOG_PATH); return new LogFileLocation(null, null); } String outFilePath = logFilePath.substring(0, logFilePath.length() - 3).concat("out"); LOG.info("Determined location of main cluster component log file: {}", logFilePath); LOG.info("Determined location of main cluster component stdout file: {}", outFilePath); return new LogFileLocation(resolveFileLocation(logFilePath), resolveFileLocation(outFilePath)); } /** * Verify log file location. * * @param logFilePath Path to log file * @return File or null if not a valid log file */ private static File resolveFileLocation(String logFilePath) { File logFile = new File(logFilePath); return (logFile.exists() && logFile.canRead()) ? logFile : null; } }
- 这里先从系统属性读取log.file属性,没有找到,则打印warning(
Log file environment variable 'log.file' is not set.
) - log.file没有配置的话,则从flink的Configuration读取WebOptions.LOG_PATH(
web.log.path
)配置,如果没有或者logFilePath.length()小于4,则打印warning(JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'.
) - 这里之所以要logFilePath.length()大于等于4,主要是后面要使用logFilePath.substring(0, logFilePath.length() - 3).concat("out")来构建outFilePath;然后通过resolveFileLocation方法校验logFilePath及outFilePath,构建LogFileLocation返回
StaticFileServerHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
/** * Simple file server handler that serves requests to web frontend's static files, such as * HTML, CSS, or JS files. * * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server * example.</p> */ @ChannelHandler.Sharable public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> { /** Timezone in which this server answers its "if-modified" requests. */ private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); /** Date format for HTTP. */ public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; /** Be default, we allow files to be cached for 5 minutes. */ private static final int HTTP_CACHE_SECONDS = 300; // ------------------------------------------------------------------------ /** The path in which the static documents are. */ private final File rootPath; public StaticFileServerHandler( GatewayRetriever<? extends T> retriever, CompletableFuture<String> localJobManagerAddressFuture, Time timeout, File rootPath) throws IOException { super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap()); this.rootPath = checkNotNull(rootPath).getCanonicalFile(); } // ------------------------------------------------------------------------ // Responses to requests // ------------------------------------------------------------------------ @Override protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, RoutedRequest routedRequest, T gateway) throws Exception { final HttpRequest request = routedRequest.getRequest(); final String requestPath; // make sure we request the "index.html" in case there is a directory request if (routedRequest.getPath().endsWith("/")) { requestPath = routedRequest.getPath() + "index.html"; } // in case the files being accessed are logs or stdout files, find appropriate paths. else if (routedRequest.getPath().equals("/jobmanager/log") || routedRequest.getPath().equals("/jobmanager/stdout")) { requestPath = ""; } else { requestPath = routedRequest.getPath(); } respondToRequest(channelHandlerContext, request, requestPath); } //...... @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { if (ctx.channel().isActive()) { logger.error("Caught exception", cause); sendError(ctx, INTERNAL_SERVER_ERROR); } } }
- 对于/jobmanager/log以及/jobmanager/stdout它会重置一下requestPath,之后调用respondToRequest处理,它根据rootPath来传输文件
TaskManager FileHandler
TaskManagerLogFileHandler
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogFileHandler.java
/** * Rest handler which serves the log files from {@link TaskExecutor}. */ public class TaskManagerLogFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerLogFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.LOG, timeout); } }
- 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.LOG类型
TaskManagerStdoutFileHandler.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerStdoutFileHandler.java
/** * Rest handler which serves the stdout file of the {@link TaskExecutor}. */ public class TaskManagerStdoutFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { public TaskManagerStdoutFileHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, @Nonnull Time timeout, @Nonnull Map<String, String> responseHeaders, @Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders, @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, @Nonnull TransientBlobService transientBlobService, @Nonnull Time cacheEntryDuration) { super(localAddressFuture, leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); } @Override protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) { return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.STDOUT, timeout); } }
- 它的requestFileUpload是调用了ResourceManager.requestTaskManagerFileUpload,传递的FileType是FileType.STDOUT类型
ResourceManager.requestTaskManagerFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@Override public CompletableFuture<TransientBlobKey> requestTaskManagerFileUpload(ResourceID taskManagerId, FileType fileType, Time timeout) { log.debug("Request file {} upload from TaskExecutor {}.", fileType, taskManagerId); final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId); if (taskExecutor == null) { log.debug("Requested file {} upload from unregistered TaskExecutor {}.", fileType, taskManagerId); return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId)); } else { return taskExecutor.getTaskExecutorGateway().requestFileUpload(fileType, timeout); } }
- ResourceManager的requestTaskManagerFileUpload是通过TaskExecutor.requestFileUpload来实现的
TaskExecutor.requestFileUpload
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@Override public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType, Time timeout) { log.debug("Request file {} upload.", fileType); final String filePath; switch (fileType) { case LOG: filePath = taskManagerConfiguration.getTaskManagerLogPath(); break; case STDOUT: filePath = taskManagerConfiguration.getTaskManagerStdoutPath(); break; default: filePath = null; } if (filePath != null && !filePath.isEmpty()) { final File file = new File(filePath); if (file.exists()) { final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService(); final TransientBlobKey transientBlobKey; try (FileInputStream fileInputStream = new FileInputStream(file)) { transientBlobKey = transientBlobService.putTransient(fileInputStream); } catch (IOException e) { log.debug("Could not upload file {}.", fileType, e); return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e)); } return CompletableFuture.completedFuture(transientBlobKey); } else { log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor.")); } } else { log.debug("The file {} is unavailable on the TaskExecutor {}.", fileType, getResourceID()); return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " is not available on the TaskExecutor.")); } }
- TaskExecutor的requestFileUpload会根据fileType来获取filePath,如果是LOG类型取的是taskManagerConfiguration.getTaskManagerLogPath();如果是STDOUT类型,取的是taskManagerConfiguration.getTaskManagerStdoutPath(),之后将文件传输过去
TaskManagerRunner.startTaskManager
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
public static TaskExecutor startTaskManager( Configuration configuration, ResourceID resourceID, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, BlobCacheService blobCacheService, boolean localCommunicationOnly, FatalErrorHandler fatalErrorHandler) throws Exception { checkNotNull(configuration); checkNotNull(resourceID); checkNotNull(rpcService); checkNotNull(highAvailabilityServices); LOG.info("Starting TaskManager with ResourceID: {}", resourceID); InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( configuration, remoteAddress, localCommunicationOnly); TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, resourceID, rpcService.getExecutor(), // TODO replace this later with some dedicated executor for io. EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory()); TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup( metricRegistry, taskManagerServices.getTaskManagerLocation(), taskManagerServices.getNetworkEnvironment()); TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); return new TaskExecutor( rpcService, taskManagerConfiguration, highAvailabilityServices, taskManagerServices, heartbeatServices, taskManagerMetricGroup, blobCacheService, fatalErrorHandler); }
- TaskManagerRunner.startTaskManager通过TaskManagerConfiguration.fromConfiguration(configuration)构造了taskManagerConfiguration
TaskManagerConfiguration.fromConfiguration
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { int numberSlots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1); if (numberSlots == -1) { numberSlots = 1; } //...... final String taskManagerLogPath = configuration.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file")); final String taskManagerStdoutPath; if (taskManagerLogPath != null) { final int extension = taskManagerLogPath.lastIndexOf('.'); if (extension > 0) { taskManagerStdoutPath = taskManagerLogPath.substring(0, extension) + ".out"; } else { taskManagerStdoutPath = null; } } else { taskManagerStdoutPath = null; } return new TaskManagerConfiguration( numberSlots, tmpDirPaths, timeout, finiteRegistrationDuration, initialRegistrationPause, maxRegistrationPause, refusedRegistrationPause, configuration, exitOnOom, FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder), alwaysParentFirstLoaderPatterns, taskManagerLogPath, taskManagerStdoutPath); }
- TaskManagerConfiguration.fromConfiguration里头首先根据ConfigConstants.TASK_MANAGER_LOG_PATH_KEY(
taskmanager.log.path
)从flink的Configuration读取taskManagerLogPath,如果读取不到,则取系统属性log.file;如果读取到taskManagerLogPath不为null,则换个后缀构建taskManagerStdoutPath
小结
web.log.path taskmanager.log.path
doc
以上所述就是小编给大家介绍的《聊聊flink的log.file配置 原 荐》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- 聊聊flink的logback配置
- 聊聊 Nacos 配置隔离和分类的使用
- 聊聊flink的slot.request.timeout配置
- 聊聊flink的slot.idle.timeout配置
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Coding the Matrix
Philip N. Klein / Newtonian Press / 2013-7-26 / $35.00
An engaging introduction to vectors and matrices and the algorithms that operate on them, intended for the student who knows how to program. Mathematical concepts and computational problems are motiva......一起来看看 《Coding the Matrix》 这本书的介绍吧!