Make NDC not block forever and handle restart.

Still possible for netd to get wedged but system won't die because of it.
Leads the way to having forking netd - then only individual commands would
wedge, promoting stability.

bug:5864209
bug:6019952
Change-Id: I43e4c5072863b8b812d4fe24d30d92eb1a11651a
This commit is contained in:
Robert Greenwalt
2012-02-07 11:36:55 -08:00
parent 30aab0989d
commit 7f03e44443
2 changed files with 216 additions and 60 deletions

View File

@@ -34,8 +34,8 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.LinkedBlockingQueue; import java.util.LinkedList;
/** /**
* Generic connector class for interfacing with a native daemon which uses the * Generic connector class for interfacing with a native daemon which uses the
@@ -50,11 +50,15 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
private OutputStream mOutputStream; private OutputStream mOutputStream;
private LocalLog mLocalLog; private LocalLog mLocalLog;
private final BlockingQueue<NativeDaemonEvent> mResponseQueue; private final ResponseQueue mResponseQueue;
private INativeDaemonConnectorCallbacks mCallbacks; private INativeDaemonConnectorCallbacks mCallbacks;
private Handler mCallbackHandler; private Handler mCallbackHandler;
private AtomicInteger mSequenceNumber;
private static final int DEFAULT_TIMEOUT = 1 * 60 * 1000; /* 1 minute */
/** Lock held whenever communicating with native daemon. */ /** Lock held whenever communicating with native daemon. */
private final Object mDaemonLock = new Object(); private final Object mDaemonLock = new Object();
@@ -64,7 +68,8 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
int responseQueueSize, String logTag, int maxLogSize) { int responseQueueSize, String logTag, int maxLogSize) {
mCallbacks = callbacks; mCallbacks = callbacks;
mSocket = socket; mSocket = socket;
mResponseQueue = new LinkedBlockingQueue<NativeDaemonEvent>(responseQueueSize); mResponseQueue = new ResponseQueue(responseQueueSize);
mSequenceNumber = new AtomicInteger(0);
TAG = logTag != null ? logTag : "NativeDaemonConnector"; TAG = logTag != null ? logTag : "NativeDaemonConnector";
mLocalLog = new LocalLog(maxLogSize); mLocalLog = new LocalLog(maxLogSize);
} }
@@ -79,7 +84,7 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
try { try {
listenToSocket(); listenToSocket();
} catch (Exception e) { } catch (Exception e) {
Slog.e(TAG, "Error in NativeDaemonConnector", e); loge("Error in NativeDaemonConnector: " + e);
SystemClock.sleep(5000); SystemClock.sleep(5000);
} }
} }
@@ -90,12 +95,10 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
String event = (String) msg.obj; String event = (String) msg.obj;
try { try {
if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) { if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) {
Slog.w(TAG, String.format( log(String.format("Unhandled event '%s'", event));
"Unhandled event '%s'", event));
} }
} catch (Exception e) { } catch (Exception e) {
Slog.e(TAG, String.format( loge("Error handling '" + event + "': " + e);
"Error handling '%s'", event), e);
} }
return true; return true;
} }
@@ -111,7 +114,9 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
socket.connect(address); socket.connect(address);
InputStream inputStream = socket.getInputStream(); InputStream inputStream = socket.getInputStream();
mOutputStream = socket.getOutputStream(); synchronized (mDaemonLock) {
mOutputStream = socket.getOutputStream();
}
mCallbacks.onDaemonConnected(); mCallbacks.onDaemonConnected();
@@ -120,7 +125,10 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
while (true) { while (true) {
int count = inputStream.read(buffer, start, BUFFER_SIZE - start); int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
if (count < 0) break; if (count < 0) {
loge("got " + count + " reading with start = " + start);
break;
}
// Add our starting point to the count and reset the start. // Add our starting point to the count and reset the start.
count += start; count += start;
@@ -140,14 +148,10 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
mCallbackHandler.sendMessage(mCallbackHandler.obtainMessage( mCallbackHandler.sendMessage(mCallbackHandler.obtainMessage(
event.getCode(), event.getRawEvent())); event.getCode(), event.getRawEvent()));
} else { } else {
try { mResponseQueue.add(event.getCmdNumber(), event);
mResponseQueue.put(event);
} catch (InterruptedException ex) {
Slog.e(TAG, "Failed to put response onto queue: " + ex);
}
} }
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
Slog.w(TAG, "Problem parsing message: " + rawEvent, e); log("Problem parsing message: " + rawEvent + " - " + e);
} }
start = i + 1; start = i + 1;
@@ -169,15 +173,16 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
} }
} }
} catch (IOException ex) { } catch (IOException ex) {
Slog.e(TAG, "Communications error", ex); loge("Communications error: " + ex);
throw ex; throw ex;
} finally { } finally {
synchronized (mDaemonLock) { synchronized (mDaemonLock) {
if (mOutputStream != null) { if (mOutputStream != null) {
try { try {
loge("closing stream for " + mSocket);
mOutputStream.close(); mOutputStream.close();
} catch (IOException e) { } catch (IOException e) {
Slog.w(TAG, "Failed closing output stream", e); loge("Failed closing output stream: " + e);
} }
mOutputStream = null; mOutputStream = null;
} }
@@ -188,17 +193,17 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
socket.close(); socket.close();
} }
} catch (IOException ex) { } catch (IOException ex) {
Slog.w(TAG, "Failed closing socket", ex); loge("Failed closing socket: " + ex);
} }
} }
} }
/** /**
* Send command to daemon, escaping arguments as needed. * Make command for daemon, escaping arguments as needed.
* *
* @return the final command issued. * @return the final command.
*/ */
private String sendCommandLocked(String cmd, Object... args) private StringBuilder makeCommand(String cmd, Object... args)
throws NativeDaemonConnectorException { throws NativeDaemonConnectorException {
// TODO: eventually enforce that cmd doesn't contain arguments // TODO: eventually enforce that cmd doesn't contain arguments
if (cmd.indexOf('\0') >= 0) { if (cmd.indexOf('\0') >= 0) {
@@ -216,22 +221,33 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
appendEscaped(builder, argString); appendEscaped(builder, argString);
} }
final String unterminated = builder.toString(); return builder;
log("SND -> {" + unterminated + "}"); }
private int sendCommand(StringBuilder builder)
throws NativeDaemonConnectorException {
int sequenceNumber = mSequenceNumber.incrementAndGet();
builder.insert(0, Integer.toString(sequenceNumber) + " ");
log("SND -> {" + builder.toString() + "}");
builder.append('\0'); builder.append('\0');
if (mOutputStream == null) { synchronized (mDaemonLock) {
throw new NativeDaemonConnectorException("missing output stream"); if (mOutputStream == null) {
} else { throw new NativeDaemonConnectorException("missing output stream");
try { } else {
mOutputStream.write(builder.toString().getBytes(Charsets.UTF_8)); try {
} catch (IOException e) { mOutputStream.write(builder.toString().getBytes(Charsets.UTF_8));
throw new NativeDaemonConnectorException("problem sending command", e); } catch (IOException e) {
throw new NativeDaemonConnectorException("problem sending command", e);
}
} }
} }
return unterminated; return sequenceNumber;
} }
/** /**
@@ -292,39 +308,42 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
*/ */
public NativeDaemonEvent[] executeForList(String cmd, Object... args) public NativeDaemonEvent[] executeForList(String cmd, Object... args)
throws NativeDaemonConnectorException { throws NativeDaemonConnectorException {
synchronized (mDaemonLock) { return execute(DEFAULT_TIMEOUT, cmd, args);
return executeLocked(cmd, args);
}
} }
private NativeDaemonEvent[] executeLocked(String cmd, Object... args) /**
* Issue the given command to the native daemon and return any
* {@linke NativeDaemonEvent@isClassContinue()} responses, including the
* final terminal response. Note that the timeout does not count time in
* deep sleep.
*
* @throws NativeDaemonConnectorException when problem communicating with
* native daemon, or if the response matches
* {@link NativeDaemonEvent#isClassClientError()} or
* {@link NativeDaemonEvent#isClassServerError()}.
*/
public NativeDaemonEvent[] execute(int timeout, String cmd, Object... args)
throws NativeDaemonConnectorException { throws NativeDaemonConnectorException {
final ArrayList<NativeDaemonEvent> events = Lists.newArrayList(); final ArrayList<NativeDaemonEvent> events = Lists.newArrayList();
final StringBuilder sentCommand = makeCommand(cmd, args);
while (mResponseQueue.size() > 0) { final int cmdNumber = sendCommand(sentCommand);
try {
log("ignoring {" + mResponseQueue.take() + "}");
} catch (Exception e) {}
}
final String sentCommand = sendCommandLocked(cmd, args);
NativeDaemonEvent event = null; NativeDaemonEvent event = null;
cmd = sentCommand.toString();
do { do {
try { event = mResponseQueue.remove(cmdNumber, timeout, cmd);
event = mResponseQueue.take(); if (event == null) {
} catch (InterruptedException e) { loge("timed-out waiting for response to " + cmdNumber + " " + cmd);
Slog.w(TAG, "interrupted waiting for event line"); throw new NativeDaemonFailureException(cmd, event);
continue;
} }
events.add(event); events.add(event);
} while (event.isClassContinue()); } while (event.isClassContinue());
if (event.isClassClientError()) { if (event.isClassClientError()) {
throw new NativeDaemonArgumentException(sentCommand, event); throw new NativeDaemonArgumentException(cmd, event);
} }
if (event.isClassServerError()) { if (event.isClassServerError()) {
throw new NativeDaemonFailureException(sentCommand, event); throw new NativeDaemonFailureException(cmd, event);
} }
return events.toArray(new NativeDaemonEvent[events.size()]); return events.toArray(new NativeDaemonEvent[events.size()]);
@@ -448,10 +467,120 @@ final class NativeDaemonConnector implements Runnable, Handler.Callback, Watchdo
public void dump(FileDescriptor fd, PrintWriter pw, String[] args) { public void dump(FileDescriptor fd, PrintWriter pw, String[] args) {
mLocalLog.dump(fd, pw, args); mLocalLog.dump(fd, pw, args);
pw.println();
mResponseQueue.dump(fd, pw, args);
} }
private void log(String logstring) { private void log(String logstring) {
if (LOGD) Slog.d(TAG, logstring); if (LOGD) Slog.d(TAG, logstring);
mLocalLog.log(logstring); mLocalLog.log(logstring);
} }
private void loge(String logstring) {
Slog.e(TAG, logstring);
mLocalLog.log(logstring);
}
private static class ResponseQueue {
private static class Response {
public int cmdNum;
public LinkedList<NativeDaemonEvent> responses = new LinkedList<NativeDaemonEvent>();
public String request;
public Response(int c, String r) {cmdNum = c; request = r;}
}
private final LinkedList<Response> mResponses;
private int mMaxCount;
ResponseQueue(int maxCount) {
mResponses = new LinkedList<Response>();
mMaxCount = maxCount;
}
public void add(int cmdNum, NativeDaemonEvent response) {
Response found = null;
synchronized (mResponses) {
for (Response r : mResponses) {
if (r.cmdNum == cmdNum) {
found = r;
break;
}
}
if (found == null) {
// didn't find it - make sure our queue isn't too big before adding
// another..
while (mResponses.size() >= mMaxCount) {
Slog.e("NativeDaemonConnector.ResponseQueue",
"more buffered than allowed: " + mResponses.size() +
" >= " + mMaxCount);
// let any waiter timeout waiting for this
Response r = mResponses.remove();
Slog.e("NativeDaemonConnector.ResponseQueue",
"Removing request: " + r.request + " (" + r.cmdNum + ")");
}
found = new Response(cmdNum, null);
mResponses.add(found);
}
found.responses.add(response);
}
synchronized (found) {
found.notify();
}
}
// note that the timeout does not count time in deep sleep. If you don't want
// the device to sleep, hold a wakelock
public NativeDaemonEvent remove(int cmdNum, int timeoutMs, String origCmd) {
long endTime = SystemClock.uptimeMillis() + timeoutMs;
long nowTime;
Response found = null;
while (true) {
synchronized (mResponses) {
for (Response response : mResponses) {
if (response.cmdNum == cmdNum) {
found = response;
// how many response fragments are left
switch (response.responses.size()) {
case 0: // haven't got any - must wait
break;
case 1: // last one - remove this from the master list
mResponses.remove(response); // fall through
default: // take one and move on
response.request = origCmd;
return response.responses.remove();
}
}
}
nowTime = SystemClock.uptimeMillis();
if (endTime <= nowTime) {
Slog.e("NativeDaemonConnector.ResponseQueue",
"Timeout waiting for response");
return null;
}
/* pre-allocate so we have something unique to wait on */
if (found == null) {
found = new Response(cmdNum, origCmd);
mResponses.add(found);
}
}
try {
synchronized (found) {
found.wait(endTime - nowTime);
}
} catch (InterruptedException e) {
// loop around to check if we're done or if it's time to stop waiting
}
}
}
public void dump(FileDescriptor fd, PrintWriter pw, String[] args) {
pw.println("Pending requests:");
synchronized (mResponses) {
for (Response response : mResponses) {
pw.println(" Cmd " + response.cmdNum + " - " + response.request);
}
}
}
}
} }

View File

@@ -28,16 +28,22 @@ public class NativeDaemonEvent {
// TODO: keep class ranges in sync with ResponseCode.h // TODO: keep class ranges in sync with ResponseCode.h
// TODO: swap client and server error ranges to roughly mirror HTTP spec // TODO: swap client and server error ranges to roughly mirror HTTP spec
private final int mCmdNumber;
private final int mCode; private final int mCode;
private final String mMessage; private final String mMessage;
private final String mRawEvent; private final String mRawEvent;
private NativeDaemonEvent(int code, String message, String rawEvent) { private NativeDaemonEvent(int cmdNumber, int code, String message, String rawEvent) {
mCmdNumber = cmdNumber;
mCode = code; mCode = code;
mMessage = message; mMessage = message;
mRawEvent = rawEvent; mRawEvent = rawEvent;
} }
public int getCmdNumber() {
return mCmdNumber;
}
public int getCode() { public int getCode() {
return mCode; return mCode;
} }
@@ -89,7 +95,11 @@ public class NativeDaemonEvent {
* Test if event represents an unsolicited event from native daemon. * Test if event represents an unsolicited event from native daemon.
*/ */
public boolean isClassUnsolicited() { public boolean isClassUnsolicited() {
return mCode >= 600 && mCode < 700; return isClassUnsolicited(mCode);
}
private static boolean isClassUnsolicited(int code) {
return code >= 600 && code < 700;
} }
/** /**
@@ -110,20 +120,37 @@ public class NativeDaemonEvent {
* from native side. * from native side.
*/ */
public static NativeDaemonEvent parseRawEvent(String rawEvent) { public static NativeDaemonEvent parseRawEvent(String rawEvent) {
final int splitIndex = rawEvent.indexOf(' '); final String[] parsed = rawEvent.split(" ");
if (splitIndex == -1) { if (parsed.length < 2) {
throw new IllegalArgumentException("unable to find ' ' separator"); throw new IllegalArgumentException("Insufficient arguments");
} }
int skiplength = 0;
final int code; final int code;
try { try {
code = Integer.parseInt(rawEvent.substring(0, splitIndex)); code = Integer.parseInt(parsed[0]);
skiplength = parsed[0].length() + 1;
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new IllegalArgumentException("problem parsing code", e); throw new IllegalArgumentException("problem parsing code", e);
} }
final String message = rawEvent.substring(splitIndex + 1); int cmdNumber = -1;
return new NativeDaemonEvent(code, message, rawEvent); if (isClassUnsolicited(code) == false) {
if (parsed.length < 3) {
throw new IllegalArgumentException("Insufficient arguemnts");
}
try {
cmdNumber = Integer.parseInt(parsed[1]);
skiplength += parsed[1].length() + 1;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("problem parsing cmdNumber", e);
}
}
final String message = rawEvent.substring(skiplength);
return new NativeDaemonEvent(cmdNumber, code, message, rawEvent);
} }
/** /**