内容简介:聊聊pg jdbc的queryTimeout及next方法
@Test public void testReadTimeout() throws SQLException { Connection connection = dataSource.getConnection(); //https://jdbc.postgresql.org/documentation/head/query.html connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false String sql = "select * from demo_table"; PreparedStatement pstmt; try { pstmt = (PreparedStatement)connection.prepareStatement(sql); pstmt.setQueryTimeout(1); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大 pstmt.setFetchSize(5000); //NOTE 这样设置为了模拟query timeout的异常 System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout()); System.out.println("ps.getFetchSize():" + pstmt.getFetchSize()); System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection()); System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize()); ResultSet rs = pstmt.executeQuery(); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大 //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据 int col = rs.getMetaData().getColumnCount(); System.out.println("============================"); while (rs.next()) { //NOTE 这个的timeout由socket的超时时间设置,oracle.jdbc.ReadTimeout=60000 for (int i = 1; i <= col; i++) { System.out.print(rs.getObject(i)); } System.out.println(""); } System.out.println("============================"); } catch (SQLException e) { e.printStackTrace(); } finally { //close resources } }
PgStatement
ostgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgStatement.java
executeInternal()
private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags) throws SQLException { closeForNextExecution(); // Enable cursor-based resultset if possible. if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit() && !wantsHoldableResultSet()) { flags |= QueryExecutor.QUERY_FORWARD_CURSOR; } if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS; // If the no results flag is set (from executeUpdate) // clear it so we get the generated keys results. // if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) { flags &= ~(QueryExecutor.QUERY_NO_RESULTS); } } if (isOneShotQuery(cachedQuery)) { flags |= QueryExecutor.QUERY_ONESHOT; } // Only use named statements after we hit the threshold. Note that only // named statements can be transferred in binary format. if (connection.getAutoCommit()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } // updateable result sets do not yet support binary updates if (concurrency != ResultSet.CONCUR_READ_ONLY) { flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER; } Query queryToExecute = cachedQuery.query; if (queryToExecute.isEmpty()) { flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { // Simple 'Q' execution does not need to know parameter types // When binaryTransfer is forced, then we need to know resulting parameter and column types, // thus sending a describe request. int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY; StatementResultHandler handler2 = new StatementResultHandler(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0, flags2); ResultWrapper result2 = handler2.getResults(); if (result2 != null) { result2.getResultSet().close(); } } StatementResultHandler handler = new StatementResultHandler(); result = null; try { startTimer(); connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows, fetchSize, flags); } finally { killTimerTask(); } result = firstUnclosedResult = handler.getResults(); if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) { generatedKeys = result; result = result.getNext(); if (wantsGeneratedKeysOnce) { wantsGeneratedKeysOnce = false; } } }
注意,这里在执行前后分别调用了startTimer()和killTimerTask()
startTimer()
private void startTimer() { /* * there shouldn't be any previous timer active, but better safe than sorry. */ cleanupTimer(); STATE_UPDATER.set(this, StatementCancelState.IN_QUERY); if (timeout == 0) { return; } TimerTask cancelTask = new TimerTask() { public void run() { try { if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) { // Nothing to do here, statement has already finished and cleared // cancelTimerTask reference return; } PgStatement.this.cancel(); } catch (SQLException e) { } } }; CANCEL_TIMER_UPDATER.set(this, cancelTask); connection.addTimerTask(cancelTask, timeout); }
- startTimer调用了cleanupTimer()
- cancelTask调用的是PgStatement.this.cancel()
- 最后调用connection.addTimerTask添加定时任务
cleanupTimer()
/** * Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would * never invoke {@link #cancel()}. */ private boolean cleanupTimer() { TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this); if (timerTask == null) { // If timeout is zero, then timer task did not exist, so we safely report "all clear" return timeout == 0; } if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) { // Failed to update reference -> timer has just fired, so we must wait for the query state to // become "cancelling". return false; } timerTask.cancel(); connection.purgeTimerTasks(); // All clear return true; }
注意这里更新statement状态之后,调用task的cancel,以及connection.purgeTimerTasks()
cancel()
public void cancel() throws SQLException { if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) { // Not in query, there's nothing to cancel return; } try { // Synchronize on connection to avoid spinning in killTimerTask synchronized (connection) { connection.cancelQuery(); } } finally { STATE_UPDATER.set(this, StatementCancelState.CANCELLED); synchronized (connection) { connection.notifyAll(); // wake-up killTimerTask } } }
executeQuery超时了则直接调用connection.cancelQuery()
public void cancelQuery() throws SQLException { checkClosed(); queryExecutor.sendQueryCancel(); }
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/QueryExecutorBase.java
public void sendQueryCancel() throws SQLException { if (cancelPid <= 0) { return; } PGStream cancelStream = null; // Now we need to construct and send a cancel packet try { if (logger.logDebug()) { logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")"); } cancelStream = new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout); if (cancelSignalTimeout > 0) { cancelStream.getSocket().setSoTimeout(cancelSignalTimeout); } cancelStream.sendInteger4(16); cancelStream.sendInteger2(1234); cancelStream.sendInteger2(5678); cancelStream.sendInteger4(cancelPid); cancelStream.sendInteger4(cancelKey); cancelStream.flush(); cancelStream.receiveEOF(); } catch (IOException e) { // Safe to ignore. if (logger.logDebug()) { logger.debug("Ignoring exception on cancel request:", e); } } finally { if (cancelStream != null) { try { cancelStream.close(); } catch (IOException e) { // Ignored. } } } }
向数据库server发送cancel指令
killTimerTask()
private void killTimerTask() { boolean timerTaskIsClear = cleanupTimer(); // The order is important here: in case we need to wait for the cancel task, the state must be // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query. // It is believed that this case is very rare, so "additional cancel and wait below" would not // harm it. if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) { return; } // Being here means someone managed to call .cancel() and our connection did not receive // "timeout error" // We wait till state becomes "cancelled" boolean interrupted = false; while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) { synchronized (connection) { try { // Note: wait timeout here is irrelevant since synchronized(connection) would block until // .cancel finishes connection.wait(10); } catch (InterruptedException e) { // NOSONAR // Either re-interrupt this method or rethrow the "InterruptedException" interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } }
这里先调用cleanupTimer,然后更新statement的状态
PgConnection
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
getTimer()
private synchronized Timer getTimer() { if (cancelTimer == null) { cancelTimer = Driver.getSharedTimer().getTimer(); } return cancelTimer; }
这里创建或获取一个timer
addTimerTask()
public void addTimerTask(TimerTask timerTask, long milliSeconds) { Timer timer = getTimer(); timer.schedule(timerTask, milliSeconds); }
这个添加timerTask就是直接调度了
purgeTimerTasks()
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgConnection.java
public void purgeTimerTasks() { Timer timer = cancelTimer; if (timer != null) { timer.purge(); } }
在cleanupTimer中被调用,用来清理已经被cancel掉的timer task
PgResultSet
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/jdbc/PgResultSet.java
next()
public boolean next() throws SQLException { checkClosed(); if (onInsertRow) { throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."), PSQLState.INVALID_CURSOR_STATE); } if (current_row + 1 >= rows.size()) { if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) { current_row = rows.size(); this_row = null; rowBuffer = null; return false; // End of the resultset. } // Ask for some more data. row_offset += rows.size(); // We are discarding some data. int fetchRows = fetchSize; if (maxRows != 0) { if (fetchRows == 0 || row_offset + fetchRows > maxRows) { // Fetch would exceed maxRows, limit it. fetchRows = maxRows - row_offset; } } // Execute the fetch and update this resultset. connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows); current_row = 0; // Test the new rows array. if (rows.isEmpty()) { this_row = null; rowBuffer = null; return false; } } else { current_row++; } initRowBuffer(); return true; }
这里的fetch没有像executeQuery那样加timer
postgresql-9.4.1212.jre7-sources.jar!/org/postgresql/core/v3/QueryExecutorImpl.java
public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize) throws SQLException { waitOnLock(); final Portal portal = (Portal) cursor; // Insert a ResultHandler that turns bare command statuses into empty datasets // (if the fetch returns no rows, we see just a CommandStatus..) final ResultHandler delegateHandler = handler; handler = new ResultHandlerDelegate(delegateHandler) { public void handleCommandStatus(String status, int updateCount, long insertOID) { handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null); } }; // Now actually run it. try { processDeadParsedQueries(); processDeadPortals(); sendExecute(portal.getQuery(), portal, fetchSize); sendSync(); processResults(handler, 0); estimatedReceiveBufferBytes = 0; } catch (IOException e) { abort(); handler.handleError( new PSQLException(GT.tr("An I/O error occurred while sending to the backend."), PSQLState.CONNECTION_FAILURE, e)); } handler.handleCompletion(); }
小结
- queryTimeout是采用添加timer来控制,如果请求过多,可能会造成timer过多
timeout时间不宜过长,不过正常执行完sql,会调用killTimerTask()方,里头会先cleanupTimer,取消timerTask,然后调用purgeTimerTasks()清理cancel掉的task,避免timeout时间过长导致task堆积最后内存溢出
- 超时之后会timer task会向数据库server发送cancel query指令
-
发送完cancel query指令之后,client端的查询按预期应该抛出SQLException(
这里头的机制有待深入研究,可能是server端返回timeout error
) - executeQuery方法默认会拉取fetchSize的数据并返回
- next()方法根据需要再去fetch,这个fetch方法就没有timer来限制时间了,但是最底层应该是受socketTimeout限制
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 聊聊storm TridentBoltExecutor的finishBatch方法
- 聊聊动态规划(2) -- 特征
- 聊聊动态规划(1) -- 概念
- 聊聊基准测试
- 聊聊MyBatis缓存机制
- 聊聊 HashMap
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Practical JavaScript, DOM Scripting and Ajax Projects
Frank Zammetti / Apress / April 16, 2007 / $44.99
http://www.amazon.com/exec/obidos/tg/detail/-/1590598164/ Book Description Practical JavaScript, DOM, and Ajax Projects is ideal for web developers already experienced in JavaScript who want to ......一起来看看 《Practical JavaScript, DOM Scripting and Ajax Projects》 这本书的介绍吧!
XML 在线格式化
在线 XML 格式化压缩工具
HEX CMYK 转换工具
HEX CMYK 互转工具